Commit 5e418b7c authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

rewrite csst-dag as a dependency package of csst-pipeline-cli

parent 1400266d
all: all:
git pull #git pull
rm -rf ./build rm -rf build csst_dag.egg-info
pip install . --no-deps --force-reinstall pip install -e .
install: install:
rm -rf ./build rm -rf build csst_dag.egg-info
pip install . --no-deps --force-reinstall pip install . --no-deps --force-reinstall
pull: pull:
git pull git pull
compile: compile:
uv pip compile pyproject.toml -o requirements.txt uv pip compile pyproject.toml -o requirements.txt
\ No newline at end of file
rsync_zjlab:
rsync -avrP . zjlab-harbor:/mnt/csst-dag
\ No newline at end of file
# `csst-dag` # `csst-dag`
# Change log ```python
- [2025-09-15] 旧版csst-dag命令行工具安装方式 from csst_dag import CSST_DAGS
- `pip install git+https://csst-tb.bao.ac.cn/code/csst-cicd/csst-dag.git@7a0108f3 --force-reinstall`
- [2025-09-09] csst-dag将被封装为Docker镜像,原有的基于`python -m csst_dag.cli`的命令行调用方式将被放弃 # define parameters
test_kwargs = dict(
# Usages dataset="csst-msc-c9-25sqdeg-v3",
instrument="MSC",
运行之前,需要确定一个本地的 `.csst` 文件夹。 obs_type="WIDE",
obs_group="W1",
`.bashrc` 中加入以下代码 obs_id="10100100412",
proposal_id=None,
[//]: # (--pull=always) prc_status=-1024,
```shell )
# 确定本地 .csst 文件夹路径 # get DAG
export DOT_CSST=~/.csst dag = CSST_DAGS.get("csst-msc-l1-mbi")
# 用自定义命令 # run with parameters
alias csst='docker run --rm -v ${DOT_CSST}:/pipeline/app/.csst csu-harbor.csst.nao:10443/csst/csst-dag csst' dag_group_run, dag_run_list = dag.run(
``` **test_kwargs,
extra_kwargs={"a": 1},
使 `DOT_CSST``csst` 命令生效: )
```shell # `dag_group_run` is a dict containing information of the DAG group run
source ~/.bashrc # or source ~/.zshrc print(dag_group_run)
``` # {'dag_group': 'default-dag-group', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'batch_id': 'default-batch', 'priority': 1, 'created_time': '2025-09-22T06:39:29.077'}
创建 `.csst` 文件夹: # `dag_run_list` is a list[dict], whose elements are `dag_run`s
for dag_run in dag_run_list:
```shell print(dag_run)
# 创建`.csst`文件夹 # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '06', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '7a81164c505efb9a13a7da166a6ad940df713238', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
mkdir -p ${DOT_CSST} # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '07', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '7c22456c7a844dc165d095361089078a9c5f70c9', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
``` # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '08', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': 'b2a57d47fb8497690e7e2629699adc414ce1321a', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
# {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '09', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '45d6262e3974006f227101c7e44e70a58ddfbbf1', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
测试 `csst` 命令是否可以执行: # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '11', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': 'd54c44a8522187b2fdadf68b1877cde1d068c2db', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
# {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '12', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '89c4658ab0e6b5115d7991b17cb91b0d78777f83', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
```shell # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '13', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': 'c1b9273eafed4fdbac5e26c7b46110bebcd901ec', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
# 执行命令 # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '14', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '7a7237e135c8eb6aba622d1ef8ac74c2cecb4a77', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
csst --help # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '15', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '42a25ecac49b1adb89b8c154a96a1443a9fc9ccc', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
# 查看版本 # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '16', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': 'c7b6a5d88edb2ec001ceade7cf168bd64c5279f9', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
csst --version # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '17', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '126a382659b205a2523364e0a90e96f78ea96d99', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
``` # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '18', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': 'da71a90621728cbf263247a1ae92827e13041717', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
# {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '19', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': 'c7e1c71bee37ab33b0254a1046a761ad94a7ae91', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
更新: # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '20', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '1bbe6a5b7da28109a77c4e55f6789d70c24ae5d6', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
```shell # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '22', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '26b3ec23b8d18b41458906234e2aabfcbf558fd1', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
docker pull csu-harbor.csst.nao:10443/csst/csst-dag:latest # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '23', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': 'abfea84182c387b8607f41f1eed84193f1063ade', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
``` # {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '24', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': '6b218191ce462a25c12cc96fa38c38a3990f5293', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
# {'dataset': 'csst-msc-c9-25sqdeg-v3', 'instrument': 'MSC', 'obs_type': 'WIDE', 'obs_group': 'W1', 'obs_id': '10100100412', 'detector': '25', 'filter': '', 'custom_id': '', 'batch_id': 'default-batch', 'pmapname': '', 'ref_cat': '', 'dag_group': 'default-dag-group', 'dag': 'csst-msc-l1-mbi', 'dag_group_run': 'b4352fdbc465fa81919176febd5a663a014a7f7f', 'dag_run': 'd49fbd2185b1330dbc4ffebe05cc998604837c4f', 'priority': 1, 'data_list': [], 'extra_kwargs': {'a': 1}, 'created_time': '2025-09-22T06:39:29.077', 'rerun': -1, 'status_code': -1024, 'n_file_expected': 1, 'n_file_found': 1, 'object': '', 'proposal_id': ''}
主要有以下几个命令组
# currently defined DAGs
- `csst env`: 预设环境变量 for dag in CSST_DAGS.keys():
- `csst plan`: 编排数据查询 print(dag)
- `csst plan --obs-group=111` # csst-msc-l1-qc0
- `csst plan --dataset=111 --stats=obs_group --to-json` # csst-msc-l1-mbi
- `csst dag`: DAG任务操作 # csst-msc-l1-ast
- `csst dag start --dataset=xxx --batch-id=xxx` # csst-msc-l1-sls
- `csst dag cancel --dataset=xxx --batch-id=xxx` (需要Scalebox、DFS支持) # csst-msc-l1-ooc
- `csst dag status --dataset=xxx --batch-id=xxx` (需要Scalebox、DFS支持) # csst-mci-l1
- `csst dag logs --dataset=xxx --batch-id=xxx --status=0` (需要Scalebox、DFS支持) # csst-mci-l1-qc0
- `csst file`: 原始数据查询 # csst-ifs-l1
- `csst data --data-model=raw --obs-id=10100000001 --stats=obs_group` # csst-cpic-l1
- 查询原始数据,并按照obs_group进行统计 # csst-cpic-l1-qc0
- `csst data --data-model=csst-msc-l1-mbi --obs-id=10100000001 --stats=obs_group` --output=json # csst-hstdm-l1
- 查询数据产品,并按照obs_group进行统计
- `csst catalog` DFS星表查询
- `csst catalog list` 列出可用的星表
- `csst catalog show --name=trilegal` 列出指定星表的具体信息
- `csst ccds`: CCDS
- `csst ccds pmap?`
## 1. `csst env` 环境变量
```shell
# 1. 查看所有预定义环境变量
csst env list
# 2. 查看环境变量
csst env show --name csu
# 3. 保存环境变量到文件 预设name
csst env set --name csu
```
## 2. ``csst plan``
```shell
# 1. 查看编排数据
csst plan \
--dataset=csst-msc-c9-25sqdeg-v3 \
--instrument=MSC \
--obs-type=WIDE \
--obs-group=W1 \
--obs-id=10100232366
```
## ``csst dag``
```shell
1. 发起任务
csst dag run **kwargs # 发起任务
2. 查看任务状态(批量)
csst dag status **kwargs # 查看任务状态
3. 查看任务日志(批量和单个)
csst dag log **kwargs # 查看任务日志
4. 取消任务(按条件批量取消,按任务ID取消,或者取消所有)
csst dag cancel **kwargs # 取消任务
5. 0级数据查看
csst data --data-model=raw **kwargs # 查看原始数据
6. 数据产品查看 (可能要区分DFS1/2)
csst data --data-model=csst-msc-l1-mbi **kwargs # 查看产品数据
# TODO: 增加一个--stats命令/选项,按照某些列来统计数据产品
# TODO: 增加一个--print-columns命令/选项,打印数据产品的列名
``` ```
...@@ -8,7 +8,7 @@ BATCH_ID = "msc-ooc-v20241219-rdx-naoc-v1" ...@@ -8,7 +8,7 @@ BATCH_ID = "msc-ooc-v20241219-rdx-naoc-v1"
DATASET = "msc-ooc-v20241219" DATASET = "msc-ooc-v20241219"
LOCATION = "naoc" LOCATION = "naoc"
# query for data in DFS # query for dlist in DFS
result = level0.find( result = level0.find(
dataset=DATASET, dataset=DATASET,
file_type="SCI", file_type="SCI",
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
COSMOS COSMOS
""" """
# cd /nfsdata/share/simulation-collection/csst-dag # cd /nfsdata/share/simulation-collection/csst-dag_cfg
# http://159.226.170.52:3000/scalebox/TaskList?app=csst-msc-l1-mbi.apps2 # http://159.226.170.52:3000/scalebox/TaskList?app=csst-msc-l1-mbi.apps2
from csst_dag import get_redis, gen_level1_dag_message from csst_dag import get_redis, gen_level1_dag_message
......
...@@ -46,15 +46,18 @@ python -m csst_dag.cli.csst_msc_l1 \ ...@@ -46,15 +46,18 @@ python -m csst_dag.cli.csst_msc_l1 \
- `25sqdeg-test-b1` : 测试版,只跑了W1一部分 - `25sqdeg-test-b1` : 测试版,只跑了W1一部分
- `25sqdeg-test-b2` : 第二版,跑了W1-5,但是没跑第一批已经处理的 - `25sqdeg-test-b2` : 第二版,跑了W1-5,但是没跑第一批已经处理的
- `25sqdeg-test-b3` : 第三版,跑了W5所有 - `25sqdeg-test-b3` : 第三版,跑了W5所有
- `25sqdeg-full-b1` : 测试版
- `25sqdeg-full-b2` : 修复了20号探测器wcs问题,重新做了0级入库
- `25sqdeg-full-b3` : 确认没问题,跑了W1-W5
成像 成像+光谱
```bash ```bash
python -m csst_dag.cli.run \ python -m csst_dag.cli.run \
--dags csst-msc-l1-mbi csst-msc-l1-sls\ --dags csst-msc-l1-mbi csst-msc-l1-sls\
--dataset=csst-msc-c9-25sqdeg-v3 \ --dataset=csst-msc-c9-25sqdeg-v3 \
--instrument=MSC \ --instrument=MSC \
--obs-group=W5 \ --obs-group=W5 \
--batch-id=25sqdeg-full-b2 \ --batch-id=25sqdeg-full-b3 \
--pmapname=csst_000094.pmap \ --pmapname=csst_000094.pmap \
--ref-cat=trilegal_093 \ --ref-cat=trilegal_093 \
--verbose \ --verbose \
...@@ -65,7 +68,7 @@ python -m csst_dag.cli.inspect \ ...@@ -65,7 +68,7 @@ python -m csst_dag.cli.inspect \
--dataset=csst-msc-c9-25sqdeg-v3 \ --dataset=csst-msc-c9-25sqdeg-v3 \
--instrument=MSC \ --instrument=MSC \
--data-model=csst-msc-l1-mbi \ --data-model=csst-msc-l1-mbi \
--batch-id=25sqdeg-full-b1 --batch-id=25sqdeg-full-b3
``` ```
光谱 光谱
...@@ -94,26 +97,27 @@ python -m csst_dag.cli.inspect \ ...@@ -94,26 +97,27 @@ python -m csst_dag.cli.inspect \
``` ```
```bash ```bash
# 第一版 试跑
python -m csst_dag.cli.run \ python -m csst_dag.cli.run \
--dags csst-msc-l1-sls csst-msc-l1-mbi \ --dags csst-msc-l1-sls csst-msc-l1-mbi \
--dataset=csst-msc-c11-sls-v1 \ --dataset=csst-msc-c11-sls-v1 \
--instrument=MSC \ --instrument=MSC \
--batch-id=sls-cosmos-v1-b1 \ --batch-id=sls-cosmos-v1-b6 \
--ref-cat=trilegal_093 \ --ref-cat=trilegal_093 \
--pmapname=csst_000118.pmap \
--obs-group=W2 \
--obs-id=10100070335 \ --obs-id=10100070335 \
--verbose \ --verbose \
--submit --submit
# 第二版, mbi修复了wcs,obs_group更新成了W2 # 第二版, mbi修复了wcs,obs_group更新成了W2
python -m csst_dag.cli.run \ python -m csst_dag.cli.run \
--dags csst-msc-l1-mbi \ --dags csst-msc-l1-mbi csst-msc-l1-sls \
--dataset=csst-msc-c11-sls-v1 \ --dataset=csst-msc-c11-sls-v1 \
--instrument=MSC \ --instrument=MSC \
--batch-id=sls-cosmos-v1-b2 \ --batch-id=sls-cosmos-v1-b4 \
--ref-cat=trilegal_093 \ --ref-cat=trilegal_093 \
--verbose \ --verbose \
--submit --submit
python -m csst_dag.cli.run \ python -m csst_dag.cli.run \
......
from ._dfs import DFS, dfs from .dfs import DFS
from .dag import CSST_DAGS, Dispatcher, BaseDAG from .dag import CSST_DAGS, Dispatcher, BaseDAG, Level1DAG, Level2DAG
from ._csst import csst, CsstPlanObsid, CsstPlanObsgroup, DotDict from ._csst import csst, CsstPlanObsid, CsstPlanObsgroup, DotDict
......
...@@ -173,5 +173,5 @@ class CsstLevel0(DotDict): ...@@ -173,5 +173,5 @@ class CsstLevel0(DotDict):
] ]
return CsstPlanObsgroup( return CsstPlanObsgroup(
**{_["obs_id"]: CsstPlanObsid.from_plan(_) for _ in plan_data} **{_["obs_id"]: CsstPlanObsid.from_plan(_) for _ in d_list}
) )
import glob
import os
import string
import json
import numpy as np
from astropy import time
DAG_RUN_ID_DIGITS = 6
DAG_MESSAGE_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_cfg")
DAG_YAML_LIST = glob.glob(DAG_MESSAGE_TEMPLATE_DIRECTORY + "/*.yml")
DAG_LIST = [os.path.splitext(os.path.basename(_))[0] for _ in DAG_YAML_LIST]
# print(DAG_MAP)
# [
# "csst-msc-l1-ooc-bias",
# "csst-msc-l1-ooc-flat",
# "csst-msc-l1-ooc-dark",
# "csst-msc-l1-ast",
# "csst-msc-l1-qc0",
# "csst-hstdm-l1",
# "csst-msc-l1-sls",
# "csst-msc-l1-mbi",
# ]
def gen_dag_run_id(digits=6):
"""
Generate a unique run_id for a dag_cfg.
"""
now = time.Time.now()
dag_run_id = now.strftime("%Y%m%d-%H%M%S-")
n = len(string.ascii_lowercase)
for i in range(digits):
dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
return dag_run_id
def get_dag_message_template(dag_id):
"""
Get the dag_cfg message template for a given dag_cfg.
"""
if dag_id not in DAG_LIST:
raise ValueError(f"Unknown dag_cfg: {dag_id}")
with open(os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"{dag_id}.json"), "r") as f:
template = json.load(f)
return template
def gen_msg(dag_id, **kwargs):
"""
Generate a message
"""
this_dag_run_id = gen_dag_run_id(DAG_RUN_ID_DIGITS)
msg = get_dag_message_template(dag_id)
for k, v in kwargs.items():
assert msg.get(k, None) is not None, f"Unknown key: {k}"
msg[k] = v
return msg
import toml
import os
from astropy.table import Table
import socket
from csst_dfs_client import plan, level0, level1, dag, catalog
CONFIG = toml.load(os.path.join(os.path.dirname(__file__), "config.toml"))
def check_port(ip, port, timeout=3):
"""
# # 示例:检查 192.168.1.1 的 80 端口是否开放
# print(check_port("192.168.1.1", 80)) # True/False
"""
try:
# 创建 Socket 连接(TCP)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout) # 设置超时时间(秒)
# 尝试连接
result = sock.connect_ex((ip, port))
# 返回状态
if result == 0: # 0 表示成功
return True
else:
return False
except Exception as e:
print(f"Error: {e}")
return False
finally:
sock.close() # 确保关闭连接
class DFS:
plan = plan
level0 = level0
level1 = level1
dag = dag
catalog = catalog
def __init__(self, location=None):
# try each location
print("Test all locations...", end="")
status_list = []
for loc in CONFIG.keys():
dfs_ip = CONFIG[loc]["CSST_DFS_GATEWAY"].split(":")[0]
dfs_port = int(CONFIG[loc]["CSST_DFS_GATEWAY"].split(":")[1])
# redis_ip = CONFIG[loc]["redis"]["host"]
# redis_port = CONFIG[loc]["redis"]["port"]
this_dfs_status = check_port(dfs_ip, dfs_port, timeout=1)
# this_redis_status = check_port(redis_ip, redis_port, timeout=1)
this_status = dict(
location=loc,
status=this_dfs_status, # and this_redis_status,
dfs=this_dfs_status,
# redis=this_redis_status,
)
status_list.append(this_status)
# print(this_status)
print("Done!\n")
status_table = Table(status_list)
print(status_table)
print("\n")
if status_table["status"].sum() == 0:
print("No DFS location is available")
elif status_table["status"].sum() > 1:
print("Multiple DFS locations are available, please specify one")
elif location is None:
# set DFS automatically
if status_table["status"].sum() == 1:
print("One DFS locations are available, good")
location = status_table["location"][status_table["status"]][0]
elif status_table["status"].sum() == 0:
print("No DFS location is available, using csu")
location = "csu"
else:
raise ValueError("Multiple DFS locations are available")
self.location = location
self.config = CONFIG[self.location]
for k, v in CONFIG[self.location].items():
os.environ.setdefault(k, str(v))
# print("Setting redis config:")
# self.redis = Redis(location=self.location)
dfs = DFS(location=None)
[csu]
CSST_DFS_GATEWAY="192.168.25.89:28000"
CSST_DFS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0"
[zjlab]
CSST_DFS_GATEWAY="10.200.60.246:28000"
CSST_DFS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0"
[naoc]
CSST_DFS_GATEWAY="10.80.1.22:28000"
CSST_DFS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0"
#[csu.redis]
#host="192.168.25.55"
#port=26379
#db=1
#password="csst__2025"
#qname="csst_data_list"
#[naoc.redis]
#host="10.80.1.22"
#port=26379
#db=0
#password="csst__2025"
#qname="csst_data_list"
# ZJLAB-ALIYUN
#[zjlab.dfs]
#CSST_DFS_GATEWAY="10.80.1.22:28000"
#CSST_DFS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0"
#
#[zjlab.redis]
#host="172.24.232.12"
#port=26379
#db=0
#password="csst__2025"
#qname="csst_data_list"
...@@ -8,31 +8,31 @@ from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs ...@@ -8,31 +8,31 @@ from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs
def run_l1_pipeline( def run_l1_pipeline(
# data parameters # data parameters
dataset:str, dataset: str,
instrument:Optional[str]=None, instrument: Optional[str] = None,
obs_type:Optional[str]=None, obs_type: Optional[str] = None,
obs_group:Optional[str]=None, obs_group: Optional[str] = None,
obs_id:Optional[str]=None, obs_id: Optional[str] = None,
detector:Optional[str]=None, detector: Optional[str] = None,
prc_status:Optional[int]=None, prc_status: Optional[int] = None,
qc_status:Optional[int]=None, qc_status: Optional[int] = None,
# task parameters # task parameters
batch_id:Optional[str]="test-batch", batch_id: Optional[str] = "test-batch",
priority:Optional[str]="1", priority: Optional[str] = "1",
# DAG parameters # DAG parameters
pmapname:Optional[str]="", pmapname: Optional[str] = "",
ref_cat:Optional[str]="trilegal_093", ref_cat: Optional[str] = "trilegal_093",
# submit # submit
verbose:Optional[bool]=True, verbose: Optional[bool] = True,
submit:Optional[bool]=False, submit: Optional[bool] = False,
final_prc_status:Optional[int]=-2, final_prc_status: Optional[int] = -2,
force:Optional[bool]=False, force: Optional[bool] = False,
top_n:Optional[int]=-1, top_n: Optional[int] = -1,
# select DAGs # select DAGs
dags:Optional[list[str]]=None, dags: Optional[list[str]] = None,
dag_group: Optional[str]="csst-l1-pipeline", dag_group: Optional[str] = "csst-l1-pipeline",
): ):
"""Run a DAG. """Run a DAG.
Parameters Parameters
...@@ -90,7 +90,7 @@ def run_l1_pipeline( ...@@ -90,7 +90,7 @@ def run_l1_pipeline(
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found") print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")
# generate DAG group run # generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run( dag_group_run = BaseDAG.generate_dag_group_run(
dag_group=dag_group, dag_group=dag_group,
batch_id=batch_id, batch_id=batch_id,
priority=priority, priority=priority,
...@@ -118,7 +118,9 @@ def run_l1_pipeline( ...@@ -118,7 +118,9 @@ def run_l1_pipeline(
"csst-hstdm-l1", "csst-hstdm-l1",
} }
assert dags is None or set(dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {dags}" assert dags is None or set(dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {dags}"
SELECTED_DAGS = DEFAULT_DAGS.intersection(dags) if dags is not None else DEFAULT_DAGS SELECTED_DAGS = (
DEFAULT_DAGS.intersection(dags) if dags is not None else DEFAULT_DAGS
)
print("Selected DAGs: ", SELECTED_DAGS) print("Selected DAGs: ", SELECTED_DAGS)
for dag in SELECTED_DAGS: for dag in SELECTED_DAGS:
...@@ -130,9 +132,16 @@ def run_l1_pipeline( ...@@ -130,9 +132,16 @@ def run_l1_pipeline(
ref_cat=ref_cat, ref_cat=ref_cat,
force_success=force, force_success=force,
) )
this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None] this_dag_run_list = [
this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if this_task["dag_run"]
this_task["dag_run"] is not None] for this_task in this_task_list
if this_task["dag_run"] is not None
]
this_data_id_list = [
this_task["relevant_data_id_list"]
for this_task in this_task_list
if this_task["dag_run"] is not None
]
dag_run_list.extend(this_dag_run_list) dag_run_list.extend(this_dag_run_list)
data_id_list.extend(this_data_id_list) data_id_list.extend(this_data_id_list)
...@@ -140,7 +149,9 @@ def run_l1_pipeline( ...@@ -140,7 +149,9 @@ def run_l1_pipeline(
this_n_dag_run_success = len(this_dag_run_list) this_n_dag_run_success = len(this_dag_run_list)
n_dag_run_all += this_n_dag_run_all n_dag_run_all += this_n_dag_run_all
n_dag_run_success += this_n_dag_run_success n_dag_run_success += this_n_dag_run_success
print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs") print(
f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs"
)
# print dag_group_run and dag_run_list # print dag_group_run and dag_run_list
if verbose: if verbose:
......
...@@ -58,9 +58,7 @@ parser.add_argument("--detector", type=str, help="Detector name", default=None) ...@@ -58,9 +58,7 @@ parser.add_argument("--detector", type=str, help="Detector name", default=None)
parser.add_argument( parser.add_argument(
"--prc-status", type=int, help="Initial processing status", default=None "--prc-status", type=int, help="Initial processing status", default=None
) )
parser.add_argument( parser.add_argument("--qc-status", type=int, help="Initial QC status", default=None)
"--qc-status", type=int, help="Initial QC status", default=None
)
# task parameters # task parameters
parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch") parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch")
parser.add_argument("--priority", type=str, help="Task priority", default="1") parser.add_argument("--priority", type=str, help="Task priority", default="1")
...@@ -77,13 +75,13 @@ parser.add_argument( ...@@ -77,13 +75,13 @@ parser.add_argument(
) )
# additional options # additional options
parser.add_argument("--force", action="store_true", help="Force success", default=False) parser.add_argument("--force", action="store_true", help="Force success", default=False)
parser.add_argument("--verbose", action="store_true", help="Force success", default=False)
# submit top N
parser.add_argument( parser.add_argument(
"--top-n", type=int, help="Submit top N tasks", default=-1 "--verbose", action="store_true", help="Force success", default=False
) )
# submit top N
parser.add_argument("--top-n", type=int, help="Submit top N tasks", default=-1)
# select DAGs # select DAGs
parser.add_argument('--dags', nargs='+', type=str, help="DAGs to select", default=None) parser.add_argument("--dags", nargs="+", type=str, help="DAGs to select", default=None)
args = parser.parse_args() args = parser.parse_args()
...@@ -121,7 +119,7 @@ plan_basis, data_basis = Dispatcher.find_plan_level0_basis( ...@@ -121,7 +119,7 @@ plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found") print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")
# generate DAG group run # generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run( dag_group_run = BaseDAG.generate_dag_group_run(
dag_group="csst-l1-pipeline", dag_group="csst-l1-pipeline",
batch_id=args.batch_id, batch_id=args.batch_id,
priority=args.priority, priority=args.priority,
...@@ -148,8 +146,12 @@ DEFAULT_DAGS = { ...@@ -148,8 +146,12 @@ DEFAULT_DAGS = {
"csst-cpic-l1", "csst-cpic-l1",
"csst-hstdm-l1", "csst-hstdm-l1",
} }
assert args.dags is None or set(args.dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {args.dags}" assert args.dags is None or set(args.dags).issubset(
SELECTED_DAGS = DEFAULT_DAGS.intersection(args.dags) if args.dags is not None else DEFAULT_DAGS DEFAULT_DAGS
), f"Selected DAGs: {args.dags}"
SELECTED_DAGS = (
DEFAULT_DAGS.intersection(args.dags) if args.dags is not None else DEFAULT_DAGS
)
print("Selected DAGs: ", SELECTED_DAGS) print("Selected DAGs: ", SELECTED_DAGS)
for dag in SELECTED_DAGS: for dag in SELECTED_DAGS:
...@@ -161,9 +163,16 @@ for dag in SELECTED_DAGS: ...@@ -161,9 +163,16 @@ for dag in SELECTED_DAGS:
ref_cat=args.ref_cat, ref_cat=args.ref_cat,
force_success=args.force, force_success=args.force,
) )
this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None] this_dag_run_list = [
this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if this_task["dag_run"]
this_task["dag_run"] is not None] for this_task in this_task_list
if this_task["dag_run"] is not None
]
this_data_id_list = [
this_task["relevant_data_id_list"]
for this_task in this_task_list
if this_task["dag_run"] is not None
]
dag_run_list.extend(this_dag_run_list) dag_run_list.extend(this_dag_run_list)
data_id_list.extend(this_data_id_list) data_id_list.extend(this_data_id_list)
......
from csst_dag import DFS
from csst_dfs_client import plan
import sys
import os
plan_path = sys.argv[1]
assert os.path.exists(plan_path), f"Plan file {plan_path} does not exist"
res = plan.write_file(local_file=plan_path)
print(res)
MSC_MBI_CHIPID = [
"06",
"07",
"08",
"09",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"19",
"20",
"22",
"23",
"24",
"25",
]
MSC_SLS_CHIPID = [
"01",
"02",
"03",
"04",
"05",
"10",
"21",
"26",
"27",
"28",
"29",
"30",
]
import itertools from ._base_dag import BaseDAG, Level1DAG, Level2DAG
from astropy import table
from ._base_dag import BaseDAG
from ._dispatcher import Dispatcher from ._dispatcher import Dispatcher
from .._csst import csst from .._csst import csst
from ._dag_utils import generate_permutations
def generate_permutations(**kwargs) -> table.Table:
"""
生成关键字参数所有值的排列组合字典列表
参数:
**kwargs: 关键字参数,值应为可迭代对象(如列表)
返回:
list[dict]: 每个字典代表一种排列组合
"""
# 验证输入值是否为可迭代对象
for key, values in kwargs.items():
if not isinstance(values, (list, tuple, set)):
kwargs[key] = [values] # 如果不是可迭代对象,转换为列表
# 提取键和对应的值列表
keys = list(kwargs.keys())
value_lists = [kwargs[key] for key in keys]
# 生成笛卡尔积(所有值组合)
permutations = []
for combination in itertools.product(*value_lists):
# 将每个组合转换为字典 {键: 值}
perm_dict = dict(zip(keys, combination))
permutations.append(perm_dict)
return table.Table(permutations)
CSST_DAGS = { CSST_DAGS = {
# MSC # MSC
"csst-msc-l1-qc0": BaseDAG( "csst-msc-l1-qc0": Level1DAG(
dag="csst-msc-l1-qc0", dag="csst-msc-l1-qc0",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["MSC"], instrument=["MSC"],
...@@ -46,7 +13,7 @@ CSST_DAGS = { ...@@ -46,7 +13,7 @@ CSST_DAGS = {
), ),
dispatcher=Dispatcher.dispatch_file, dispatcher=Dispatcher.dispatch_file,
), ),
"csst-msc-l1-mbi": BaseDAG( "csst-msc-l1-mbi": Level1DAG(
dag="csst-msc-l1-mbi", dag="csst-msc-l1-mbi",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["MSC"], instrument=["MSC"],
...@@ -55,7 +22,7 @@ CSST_DAGS = { ...@@ -55,7 +22,7 @@ CSST_DAGS = {
), ),
dispatcher=Dispatcher.dispatch_file, dispatcher=Dispatcher.dispatch_file,
), ),
"csst-msc-l1-ast": BaseDAG( "csst-msc-l1-ast": Level1DAG(
dag="csst-msc-l1-ast", dag="csst-msc-l1-ast",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["MSC"], instrument=["MSC"],
...@@ -64,7 +31,7 @@ CSST_DAGS = { ...@@ -64,7 +31,7 @@ CSST_DAGS = {
), ),
dispatcher=Dispatcher.dispatch_file, dispatcher=Dispatcher.dispatch_file,
), ),
"csst-msc-l1-sls": BaseDAG( "csst-msc-l1-sls": Level1DAG(
dag="csst-msc-l1-sls", dag="csst-msc-l1-sls",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["MSC"], instrument=["MSC"],
...@@ -73,7 +40,7 @@ CSST_DAGS = { ...@@ -73,7 +40,7 @@ CSST_DAGS = {
), ),
dispatcher=Dispatcher.dispatch_file, dispatcher=Dispatcher.dispatch_file,
), ),
"csst-msc-l1-ooc": BaseDAG( "csst-msc-l1-ooc": Level1DAG(
dag="csst-msc-l1-ooc", dag="csst-msc-l1-ooc",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["MSC"], instrument=["MSC"],
...@@ -83,7 +50,7 @@ CSST_DAGS = { ...@@ -83,7 +50,7 @@ CSST_DAGS = {
dispatcher=Dispatcher.dispatch_obsgroup_detector, dispatcher=Dispatcher.dispatch_obsgroup_detector,
), ),
# MCI # MCI
"csst-mci-l1": BaseDAG( "csst-mci-l1": Level1DAG(
dag="csst-mci-l1", dag="csst-mci-l1",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["MCI"], instrument=["MCI"],
...@@ -92,7 +59,7 @@ CSST_DAGS = { ...@@ -92,7 +59,7 @@ CSST_DAGS = {
), ),
dispatcher=Dispatcher.dispatch_file, dispatcher=Dispatcher.dispatch_file,
), ),
"csst-mci-l1-qc0": BaseDAG( "csst-mci-l1-qc0": Level1DAG(
dag="csst-mci-l1", dag="csst-mci-l1",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["MCI"], instrument=["MCI"],
...@@ -102,7 +69,7 @@ CSST_DAGS = { ...@@ -102,7 +69,7 @@ CSST_DAGS = {
dispatcher=Dispatcher.dispatch_file, dispatcher=Dispatcher.dispatch_file,
), ),
# IFS # IFS
"csst-ifs-l1": BaseDAG( "csst-ifs-l1": Level1DAG(
dag="csst-ifs-l1", dag="csst-ifs-l1",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["IFS"], instrument=["IFS"],
...@@ -112,7 +79,7 @@ CSST_DAGS = { ...@@ -112,7 +79,7 @@ CSST_DAGS = {
dispatcher=Dispatcher.dispatch_obsid, dispatcher=Dispatcher.dispatch_obsid,
), ),
# CPIC # CPIC
"csst-cpic-l1": BaseDAG( "csst-cpic-l1": Level1DAG(
dag="csst-cpic-l1", dag="csst-cpic-l1",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["CPIC"], instrument=["CPIC"],
...@@ -121,7 +88,7 @@ CSST_DAGS = { ...@@ -121,7 +88,7 @@ CSST_DAGS = {
), ),
dispatcher=Dispatcher.dispatch_file, dispatcher=Dispatcher.dispatch_file,
), ),
"csst-cpic-l1-qc0": BaseDAG( "csst-cpic-l1-qc0": Level1DAG(
dag="csst-cpic-l1-qc0", dag="csst-cpic-l1-qc0",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["CPIC"], instrument=["CPIC"],
...@@ -131,7 +98,7 @@ CSST_DAGS = { ...@@ -131,7 +98,7 @@ CSST_DAGS = {
dispatcher=Dispatcher.dispatch_file, dispatcher=Dispatcher.dispatch_file,
), ),
# HSTDM # HSTDM
"csst-hstdm-l1": BaseDAG( "csst-hstdm-l1": Level1DAG(
dag="csst-hstdm-l1", dag="csst-hstdm-l1",
pattern=generate_permutations( pattern=generate_permutations(
instrument=["HSTDM"], instrument=["HSTDM"],
......
import json import json
import os import os
from typing import Callable from typing import Callable, Optional
import yaml import yaml
from astropy import table from astropy import table, time
from ._dispatcher import override_common_keys from ._dag_utils import (
from .._dfs import DFS, dfs force_string,
from ..hash import generate_sha1_from_time override_common_keys,
generate_sha1_from_time,
)
from ..dfs import DFS
from ._dispatcher import Dispatcher
DAG_CONFIG_DIR = os.path.join( DAG_CONFIG_DIR = os.path.join(
os.path.dirname(os.path.dirname(__file__)), os.path.dirname(os.path.dirname(__file__)),
...@@ -18,6 +22,100 @@ DAG_CONFIG_DIR = os.path.join( ...@@ -18,6 +22,100 @@ DAG_CONFIG_DIR = os.path.join(
class BaseDAG: class BaseDAG:
"""Base class for all Directed Acyclic Graph (DAG) implementations. """Base class for all Directed Acyclic Graph (DAG) implementations.
This class provides core functionality for DAG configuration, message generation,
and task scheduling.
"""
@staticmethod
def generate_sha1():
"""Generate a unique SHA1 hash based on current timestamp.
Returns
-------
str
SHA1 hash string
"""
return generate_sha1_from_time(verbose=False)
@staticmethod
def generate_dag_group_run(
dag_group: str = "default-dag-group",
batch_id: str = "default-batch",
priority: int | str = 1,
):
"""Generate a DAG group run configuration.
Parameters
----------
dag_group : str, optional
Group identifier (default: "-")
batch_id : str, optional
Batch identifier (default: "-")
priority : int | str, optional
Execution priority (default: 1)
Returns
-------
dict
Dictionary containing:
- dag_group: Original group name
- dag_group_run: Generated SHA1 identifier
- batch_id: Batch identifier
- priority: Execution priority
"""
return dict(
dag_group=dag_group,
dag_group_run=BaseDAG.generate_sha1(),
batch_id=batch_id,
priority=priority,
created_time=time.Time.now().isot,
)
@staticmethod
def force_string(d: dict):
return force_string(d)
class Level2DAG(BaseDAG):
"""Level 2 DAG base class.
Base class for all Level 2 Directed Acyclic Graph (DAG) implementations.
This class provides core functionality for DAG configuration, message generation,
and task scheduling.
"""
def __init__(self):
pass
def schedule(self, plan_basis: table.Table, data_basis: table.Table):
"""Schedule the DAG for execution.
Parameters
----------
plan_basis : table.Table
Plan basis table
data_basis : table.Table
Data basis table
"""
pass
def generate_dag_run(self):
"""Generate a DAG run configuration.
Returns
-------
dict
Dictionary containing DAG run configuration
"""
pass
class Level1DAG(BaseDAG):
"""Level 1 DAG base class.
Base class for all Level 1 Directed Acyclic Graph (DAG) implementations.
This class provides core functionality for DAG configuration, message generation, This class provides core functionality for DAG configuration, message generation,
and execution management within the CSST dlist processing system. and execution management within the CSST dlist processing system.
...@@ -29,8 +127,6 @@ class BaseDAG: ...@@ -29,8 +127,6 @@ class BaseDAG:
Configuration loaded from YAML file Configuration loaded from YAML file
dag_run_template : dict dag_run_template : dict
Message template structure loaded from JSON file Message template structure loaded from JSON file
dfs : DFS
Data Flow System instance for execution
Raises Raises
------ ------
...@@ -58,7 +154,7 @@ class BaseDAG: ...@@ -58,7 +154,7 @@ class BaseDAG:
# Load yaml and json config # Load yaml and json config
yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.yml") yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.yml")
json_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.json") json_path = os.path.join(DAG_CONFIG_DIR, f"default-dag-run.json") # unified
with open(yml_path, "r") as f: with open(yml_path, "r") as f:
self.dag_cfg = yaml.safe_load(f) self.dag_cfg = yaml.safe_load(f)
...@@ -68,10 +164,86 @@ class BaseDAG: ...@@ -68,10 +164,86 @@ class BaseDAG:
with open(json_path, "r") as f: with open(json_path, "r") as f:
self.dag_run_template = json.load(f) self.dag_run_template = json.load(f)
# DFS instance def run(
self.dfs = dfs self,
# DAG group parameters
dag_group: str = "default-dag-group",
batch_id: str = "default-batch",
priority: int | str = 1,
# plan filter
dataset: str | None = None,
instrument: str | None = None,
obs_type: str | None = None,
obs_group: str | None = None,
obs_id: str | None = None,
proposal_id: str | None = None,
# data filter
detector: str | None = None,
filter: str | None = None,
prc_status: str | None = None,
qc_status: str | None = None,
# prc paramters
pmapname: str = "",
ref_cat: str = "",
extra_kwargs: Optional[dict] = None,
# additional parameters
force_success: bool = False,
return_details: bool = False,
return_data_list: bool = False,
# no custom_id
):
if self.dispatcher is Dispatcher.dispatch_obsgroup:
assert (
obs_group is not None
), "obs_group is required for obsgroup dispatcher"
assert obs_id is None, "obs_id is not allowed for obsgroup dispatcher"
assert detector is None, "detector is not allowed for obsgroup dispatcher"
assert filter is None, "filter is not allowed for obsgroup dispatcher"
if extra_kwargs is None:
extra_kwargs = {}
dag_group_run = self.generate_dag_group_run(
dag_group=dag_group,
batch_id=batch_id,
priority=priority,
)
plan_basis = DFS.dfs1_find_plan_basis(
dataset=dataset,
instrument=instrument,
obs_type=obs_type,
obs_group=obs_group,
obs_id=obs_id,
proposal_id=proposal_id,
)
data_basis = DFS.dfs1_find_level0_basis(
dataset=dataset,
instrument=instrument,
obs_type=obs_type,
obs_group=obs_group,
obs_id=obs_id,
detector=detector,
filter=filter,
prc_status=prc_status,
qc_status=qc_status,
)
plan_basis, data_basis = self.filter_basis(plan_basis, data_basis)
dag_run_list = self.schedule(
dag_group_run=dag_group_run,
data_basis=data_basis,
plan_basis=plan_basis,
force_success=force_success,
return_data_list=return_data_list,
# directly passed to dag_run's
pmapname=pmapname,
ref_cat=ref_cat,
extra_kwargs=extra_kwargs,
)
if return_details:
return dag_group_run, dag_run_list
else:
return dag_group_run, [_["dag_run"] for _ in dag_run_list]
def filter_basis(self, plan_basis, data_basis): def filter_basis(self, plan_basis, data_basis):
# filter data basis via pattern
filtered_data_basis = table.join( filtered_data_basis = table.join(
self.pattern, self.pattern,
data_basis, data_basis,
...@@ -83,23 +255,59 @@ class BaseDAG: ...@@ -83,23 +255,59 @@ class BaseDAG:
if len(filtered_data_basis) == 0: if len(filtered_data_basis) == 0:
return plan_basis[:0], filtered_data_basis return plan_basis[:0], filtered_data_basis
u_data_basis = table.unique(filtered_data_basis["dataset", "obs_id"]) u_data_basis = table.unique(filtered_data_basis["dataset", "obs_id"])
# filter plan basis via data basis
filtered_plan_basis = table.join( filtered_plan_basis = table.join(
u_data_basis, u_data_basis,
plan_basis, plan_basis,
keys=["dataset", "obs_id"], keys=["dataset", "obs_id"],
join_type="inner", join_type="inner",
) )
# sort via obs_id
filtered_plan_basis.sort(keys=["dataset", "obs_id"]) filtered_plan_basis.sort(keys=["dataset", "obs_id"])
return filtered_plan_basis, filtered_data_basis return filtered_plan_basis, filtered_data_basis
def schedule( def schedule(
self, self,
dag_group_run: dict, dag_group_run: dict, # dag_group, dag_group_run
data_basis: table.Table, data_basis: table.Table,
plan_basis: table.Table, plan_basis: table.Table,
force_success: bool = False, force_success: bool = False,
return_data_list: bool = False,
**kwargs, **kwargs,
) -> tuple[list, set]: ) -> list[dict]:
"""Schedule tasks for DAG execution.
This method filters plan and data basis, dispatches tasks, and generates
DAG run messages for successful tasks.
Parameters
----------
dag_group_run : dict
DAG group run configuration containing:
- dag_group: Group identifier
- dag_group_run: SHA1 identifier for this run
- batch_id: Batch identifier
- priority: Execution priority
data_basis : table.Table
Table of data records to process
plan_basis : table.Table
Table of plan records to execute
force_success : bool, optional
If True, generate DAG run messages for all tasks, even if they failed
(default: False)
return_data_list : bool, optional
If True, fill the data_list parameter with the data_basis records
(default: False)
**kwargs
Additional keyword arguments passed to `dag_run`
Returns
-------
list[dict]:
A tuple containing:
- List of task dictionaries with DAG run messages added for successful tasks
- Set of obs_id strings for tasks that failed or were skipped
"""
# filter plan and data basis # filter plan and data basis
filtered_plan_basis, filtered_data_basis = self.filter_basis( filtered_plan_basis, filtered_data_basis = self.filter_basis(
plan_basis, data_basis plan_basis, data_basis
...@@ -109,67 +317,21 @@ class BaseDAG: ...@@ -109,67 +317,21 @@ class BaseDAG:
for this_task in task_list: for this_task in task_list:
# only convert success tasks # only convert success tasks
if force_success or this_task["success"]: if force_success or this_task["success"]:
dag_run = self.gen_dag_run( dag_run = self.generate_dag_run(
**dag_group_run, **dag_group_run,
**this_task["task"], **this_task["task"],
**kwargs, **kwargs,
) )
this_task["dag_run"] = dag_run this_task["dag_run"] = dag_run
if return_data_list:
this_task["dag_run"]["data_list"] = [
list(this_task["relevant_data"]["_id"])
]
else: else:
this_task["dag_run"] = None this_task["dag_run"] = None
return task_list return task_list
@staticmethod def generate_dag_run(self, **kwargs) -> dict:
def generate_sha1():
"""Generate a unique SHA1 hash based on current timestamp.
Returns
-------
str
SHA1 hash string
"""
return generate_sha1_from_time(verbose=False)
@staticmethod
def gen_dag_group_run(
dag_group: str = "-",
batch_id: str = "-",
priority: int = 1,
):
"""Generate a DAG group run configuration.
Parameters
----------
dag_group : str, optional
Group identifier (default: "-")
batch_id : str, optional
Batch identifier (default: "-")
priority : int, optional
Execution priority (default: 1)
Returns
-------
dict
Dictionary containing:
- dag_group: Original group name
- dag_group_run: Generated SHA1 identifier
- batch_id: Batch identifier
- priority: Execution priority
"""
return dict(
dag_group=dag_group,
dag_group_run=BaseDAG.generate_sha1(),
batch_id=batch_id,
priority=priority,
)
@staticmethod
def force_string(d: dict):
for k, v in d.items():
d[k] = str(v)
return d
def gen_dag_run(self, **kwargs) -> dict:
"""Generate a complete DAG run message. """Generate a complete DAG run message.
Parameters Parameters
...@@ -194,33 +356,13 @@ class BaseDAG: ...@@ -194,33 +356,13 @@ class BaseDAG:
# set hash # set hash
dag_run = override_common_keys( dag_run = override_common_keys(
dag_run, dag_run,
{"dag": self.dag, "dag_run": self.generate_sha1()}, {
"dag": self.dag,
"dag_run": self.generate_sha1(),
},
) )
# It seems that the dag_run_template is already stringified,
# so we don't need to force_string here.
# force values to be string # force values to be string
dag_run = self.force_string(dag_run) dag_run = self.force_string(dag_run)
return dag_run return dag_run
@staticmethod
def push_dag_group_run(
dag_group_run: dict,
dag_run_list: list[dict],
):
"""Submit a DAG group run to the DFS system.
Parameters
----------
dag_group_run : dict
Group run configuration
dag_run_list : list[dict]
List of individual DAG run messages
Returns
-------
Any
Result from dfs.dag.new_dag_group_run()
"""
return dfs.dag.new_dag_group_run(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
import glob
import hashlib
import itertools
import json
import os
import random
import string
import uuid
from collections.abc import Iterable
import numpy as np
from astropy import table
from astropy import time
DAG_MESSAGE_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_config")
DAG_YAML_LIST = glob.glob(DAG_MESSAGE_TEMPLATE_DIRECTORY + "/*.yml")
# DAG list
DAG_LIST = [os.path.splitext(os.path.basename(_))[0] for _ in DAG_YAML_LIST]
def generate_dag_run_id(digits=6):
"""
Generate a unique run_id for a dag_cfg.
"""
now = time.Time.now()
dag_run_id = now.strftime("%Y%m%d-%H%M%S-")
n = len(string.ascii_lowercase)
for i in range(digits):
dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
return dag_run_id
def load_default_dag_run_message():
"""Load the default dag_cfg run message template."""
with open(
os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"default-dag-run.json"), "r"
) as f:
template = json.load(f)
return template
def get_dag_message_template(dag_id):
"""Get the dag_cfg message template for a given dag_cfg."""
if dag_id not in DAG_LIST:
raise ValueError(f"Unknown dag_cfg: {dag_id}")
with open(os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"{dag_id}.json"), "r") as f:
template = json.load(f)
return template
def generate_dag_run_message(dag_id, **kwargs):
"""Generate a DAG run message."""
DAG_RUN_ID_DIGITS = 6
this_dag_run_id = generate_dag_run_id(DAG_RUN_ID_DIGITS)
msg = get_dag_message_template(dag_id)
for k, v in kwargs.items():
assert msg.get(k, None) is not None, f"Unknown key: {k}"
msg[k] = v
return msg
def generate_uuid():
"""Generate a unique id."""
return str(uuid.uuid4())
def force_string(d):
"""更通用的递归字符串转换"""
if isinstance(d, dict):
return {k: force_string(v) for k, v in d.items()}
elif isinstance(d, Iterable) and not isinstance(d, (str, bytes)):
return [force_string(item) for item in d]
elif isinstance(d, (str, bytes)):
return str(d)
else:
return d
def generate_permutations(**kwargs) -> table.Table:
"""
生成关键字参数所有值的排列组合字典列表
参数:
**kwargs: 关键字参数,值应为可迭代对象(如列表)
返回:
list[dict]: 每个字典代表一种排列组合
"""
# 验证输入值是否为可迭代对象
for key, values in kwargs.items():
if not isinstance(values, (list, tuple, set)):
kwargs[key] = [values] # 如果不是可迭代对象,转换为列表
# 提取键和对应的值列表
keys = list(kwargs.keys())
value_lists = [kwargs[key] for key in keys]
# 生成笛卡尔积(所有值组合)
permutations = []
for combination in itertools.product(*value_lists):
# 将每个组合转换为字典 {键: 值}
perm_dict = dict(zip(keys, combination))
permutations.append(perm_dict)
return table.Table(permutations)
def override_common_keys(d1: dict, d2: dict) -> dict:
"""
Construct a new dictionary by updating the values of basis_keys that exists in the first dictionary
with the values of the second dictionary.
Parameters
----------
d1 : dict
The first dictionary.
d2 : dict
The second dictionary.
Returns
-------
dict:
The updated dictionary.
"""
return {k: d2[k] if k in d2.keys() else d1[k] for k in d1.keys()}
def generate_sha1_from_time(verbose=False):
"""
根据当前时间生成 SHA-1 哈希值,并添加随机字符串确保唯一性
Returns
-------
Tuple:
(时间戳, 随机字符串, SHA-1哈希值) 元组
"""
# 获取当前毫秒级时间戳(ISO格式)
timestamp = time.Time.now().isot
# 生成40个随机字母和数字
random_str = "".join(random.choices(string.ascii_letters + string.digits, k=40))
# 将时间戳和随机字符串组合
combined_str = f"{timestamp}_{random_str}"
# 生成 SHA-1 哈希
sha1 = hashlib.sha1()
sha1.update(combined_str.encode("utf-8"))
sha_value = sha1.hexdigest()
if verbose:
return timestamp, random_str, sha_value
else:
return sha_value
import numpy as np import numpy as np
import joblib import joblib
from astropy import table from astropy import table
from csst_dfs_client import plan, level0, level1
from tqdm import trange from tqdm import trange
from .._csst import csst from .._csst import csst
# from csst_dag._csst import csst
TQDM_KWARGS = dict(unit="task", dynamic_ncols=False) TQDM_KWARGS = dict(unit="task", dynamic_ncols=False)
# THESE ARE GENERAL PARAMETERS!
PLAN_PARAMS = {
"dataset": None,
"instrument": None,
"obs_type": None,
"obs_group": None,
"obs_id": None,
"proposal_id": None,
}
LEVEL0_PARAMS = {
"dataset": None,
"instrument": None,
"obs_type": None,
"obs_group": None,
"obs_id": None,
"detector": None,
"prc_status": None,
"qc_status": None,
}
LEVEL1_PARAMS = {
"dataset": None,
"instrument": None,
"obs_type": None,
"obs_group": None,
"obs_id": None,
"detector": None,
"prc_status": None,
"qc_status": None,
# special keys for data products
"data_model": None,
"batch_id": "default_batch",
# "build": None,
# "pmapname": None,
}
# PROC_PARAMS = {
# "priority": 1,
# "batch_id": "default_batch",
# "pmapname": "pmapname",
# "final_prc_status": -2,
# "demo": False,
# # should be capable to extend
# }
# plan basis keys
PLAN_BASIS_KEYS = (
"dataset",
"instrument",
"obs_type",
"obs_group",
"obs_id",
"n_file",
"_id",
)
# data basis keys
LEVEL0_DATA_BASIS_KEYS = (
"instrument",
"dataset",
"obs_type",
"obs_group",
"obs_id",
"detector",
"file_name",
"_id",
"prc_status",
"qc_status",
)
LEVEL1_DATA_BASIS_KEYS = (
"instrument",
"dataset",
"obs_type",
"obs_group",
"obs_id",
"detector",
"file_name",
"_id",
"prc_status",
"qc_status",
"data_model",
"batch_id",
"build",
"pmapname",
)
# join_type for data x plan # join_type for data x plan
PLAN_JOIN_TYPE = "inner" PLAN_JOIN_TYPE = "inner"
""" """
...@@ -114,31 +24,6 @@ Typical types: ...@@ -114,31 +24,6 @@ Typical types:
""" """
def override_common_keys(d1: dict, d2: dict) -> dict:
"""
Construct a new dictionary by updating the values of basis_keys that exists in the first dictionary
with the values of the second dictionary.
Parameters
----------
d1 : dict
The first dictionary.
d2 : dict
The second dictionary.
Returns
-------
dict:
The updated dictionary.
"""
return {k: d2[k] if k in d2.keys() else d1[k] for k in d1.keys()}
def extract_basis_table(dlist: list[dict], basis_keys: tuple) -> table.Table:
"""Extract basis key-value pairs from a list of dictionaries."""
return table.Table([{k: d.get(k, "") for k in basis_keys} for d in dlist])
def split_data_basis(data_basis: table.Table, n_split: int = 1) -> list[table.Table]: def split_data_basis(data_basis: table.Table, n_split: int = 1) -> list[table.Table]:
"""Split data basis into n_split parts via obs_id""" """Split data basis into n_split parts via obs_id"""
assert ( assert (
...@@ -173,110 +58,6 @@ class Dispatcher: ...@@ -173,110 +58,6 @@ class Dispatcher:
A class to dispatch tasks based on the observation type. A class to dispatch tasks based on the observation type.
""" """
@staticmethod
def find_plan_basis(**kwargs) -> table.Table:
"""
Find plan records.
"""
# query
prompt = "plan"
qr_kwargs = override_common_keys(PLAN_PARAMS, kwargs)
qr = plan.find(**qr_kwargs)
assert qr.success, qr
print(f">>> [{prompt}] query kwargs: {qr_kwargs}")
print(f">>> [{prompt}] {len(qr.data)} records found.")
# plan basis / obsid basis
try:
for _ in qr.data:
this_instrument = _["instrument"]
if this_instrument == "HSTDM":
if _["params"]["detector"] == "SIS12":
this_n_file = len(_["params"]["exposure_start"]) * 2
else:
this_n_file = len(_["params"]["exposure_start"])
else:
# count effective detectors of this instrument
this_n_file = len(csst[this_instrument].effective_detector_names)
_["n_file"] = this_n_file
except KeyError:
print(f"`n_epec_frame` is not found in {_}")
raise KeyError(f"`n_epec_frame` is not found in {_}")
plan_basis = extract_basis_table(
qr.data,
PLAN_BASIS_KEYS,
)
return plan_basis
@staticmethod
def find_level0_basis(**kwargs) -> table.Table:
"""
Find level0 records.
"""
# query
prompt = "level0"
qr_kwargs = override_common_keys(LEVEL0_PARAMS, kwargs)
qr = level0.find(**qr_kwargs)
assert qr.success, qr
print(f">>> [{prompt}] query kwargs: {qr_kwargs}")
print(f">>> [{prompt}] {len(qr.data)} records found.")
# data basis
data_basis = extract_basis_table(
qr.data,
LEVEL0_DATA_BASIS_KEYS,
)
return data_basis
@staticmethod
def find_level1_basis(**kwargs) -> table.Table:
"""
Find level1 records.
"""
# query
prompt = "level1"
qr_kwargs = override_common_keys(LEVEL1_PARAMS, kwargs)
qr = level1.find(**qr_kwargs)
assert qr.success, qr
print(f">>> [{prompt}] query kwargs: {qr_kwargs}")
print(f">>> [{prompt}] {len(qr.data)} records found.")
# data basis
data_basis = extract_basis_table(
qr.data,
LEVEL1_DATA_BASIS_KEYS,
)
return data_basis
@staticmethod
def find_plan_level0_basis(**kwargs) -> tuple[table.Table, table.Table]:
data_basis = Dispatcher.find_level0_basis(**kwargs)
plan_basis = Dispatcher.find_plan_basis(**kwargs)
assert len(data_basis) > 0, data_basis
assert len(plan_basis) > 0, plan_basis
u_data_basis = table.unique(data_basis["dataset", "obs_id"])
relevant_plan = table.join(
u_data_basis,
plan_basis,
keys=["dataset", "obs_id"],
join_type=PLAN_JOIN_TYPE,
)
assert len(relevant_plan) > 0, relevant_plan
return relevant_plan, data_basis
@staticmethod
def find_plan_level1_basis(**kwargs) -> tuple[table.Table, table.Table]:
data_basis = Dispatcher.find_level1_basis(**kwargs)
plan_basis = Dispatcher.find_plan_basis(**kwargs)
assert len(data_basis) > 0, data_basis
assert len(plan_basis) > 0, plan_basis
u_data_basis = table.unique(data_basis["dataset", "obs_id"])
relevant_plan = table.join(
u_data_basis,
plan_basis,
keys=["dataset", "obs_id"],
join_type=PLAN_JOIN_TYPE,
)
assert len(relevant_plan) > 0, relevant_plan
return relevant_plan, data_basis
@staticmethod @staticmethod
def dispatch_file( def dispatch_file(
plan_basis: table.Table, plan_basis: table.Table,
...@@ -458,7 +239,7 @@ class Dispatcher: ...@@ -458,7 +239,7 @@ class Dispatcher:
relevant_data_id_list=( relevant_data_id_list=(
[] []
if len(this_data_detector_files) == 0 if len(this_data_detector_files) == 0
else list(this_data_detector_files["_id_data"]) else list(this_data_detector_files["_id"])
), ),
n_file_expected=this_data_detector_plan["n_file"].sum(), n_file_expected=this_data_detector_plan["n_file"].sum(),
n_file_found=len(this_data_detector_files), n_file_found=len(this_data_detector_files),
...@@ -733,20 +514,6 @@ class Dispatcher: ...@@ -733,20 +514,6 @@ class Dispatcher:
def load_test_data() -> tuple: def load_test_data() -> tuple:
import joblib import joblib
plan_recs = joblib.load("dagtest/csst-msc-c9-25sqdeg-v3.plan.dump") plan_basis = joblib.load("dagtest/csst-msc-c9-25sqdeg-v3.plan_basis.dump")
data_recs = joblib.load("dagtest/csst-msc-c9-25sqdeg-v3.level0.dump") data_basis = joblib.load("dagtest/csst-msc-c9-25sqdeg-v3.level0_basis.dump")
print(f"{len(plan_recs.data)} plan records")
print(f"{len(data_recs.data)} data records")
for _ in plan_recs.data:
_["n_file"] = (
_["params"]["num_epec_frame"] if _["instrument"] == "HSTDM" else 1
)
plan_basis = extract_basis_table(
plan_recs.data,
PLAN_BASIS_KEYS,
)
data_basis = extract_basis_table(
data_recs.data,
LEVEL0_DATA_BASIS_KEYS,
)
return plan_basis, data_basis return plan_basis, data_basis
{ {
"dag_group": "csst-cpic-l1",
"dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
"dag": "csst-cpic-l1-qc0",
"dag_run": "c89d7e7a022e6f0cdf1daff921c29dbce0ac7c01",
"batch_id": "inttest",
"priority": 1,
"dataset": "csst-cpic-c11-hip71681-v1", "dataset": "csst-cpic-c11-hip71681-v1",
"instrument": "CPIC",
"obs_type": "SCI", "obs_type": "SCI",
"obs_group": "none", "obs_group": "default",
"obs_id": "40100000001", "obs_id": "40100000001",
"detector": "VIS", "detector": "VIS",
"filter": "",
"custom_id": "",
"batch_id": "default-batch",
"pmapname": "", "pmapname": "",
"ref_cat": "",
"dag_group": "csst-cpic-l1",
"dag": "csst-cpic-l1-qc0",
"dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
"dag_run": "c89d7e7a022e6f0cdf1daff921c29dbce0ac7c01",
"priority": 1,
"data_list": [],
"extra_kwargs": {},
"created_time": "1970-01-01T00:00:00",
"rerun": 0,
"status_code": -1024,
"n_file_expected": -1, "n_file_expected": -1,
"n_file_found": -1 "n_file_found": -1
} }
\ No newline at end of file
{ {
"dag_group": "csst-cpic-l1",
"dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
"dag": "csst-cpic-l1",
"dag_run": "c89d7e7a022e6f0cdf1daff921c29dbce0ac7c01",
"batch_id": "inttest",
"priority": 1,
"dataset": "csst-cpic-c11-hip71681-v1", "dataset": "csst-cpic-c11-hip71681-v1",
"instrument": "CPIC",
"obs_type": "SCI", "obs_type": "SCI",
"obs_group": "none", "obs_group": "default",
"obs_id": "40100000001", "obs_id": "40100000001",
"detector": "VIS", "detector": "VIS",
"filter": "",
"custom_id": "",
"batch_id": "default-batch",
"pmapname": "", "pmapname": "",
"ref_cat": "",
"dag_group": "csst-cpic-l1",
"dag": "csst-cpic-l1",
"dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
"dag_run": "c89d7e7a022e6f0cdf1daff921c29dbce0ac7c01",
"priority": 1,
"data_list": [],
"extra_kwargs": {},
"created_time": "1970-01-01T00:00:00",
"rerun": 0,
"status_code": -1024,
"n_file_expected": -1, "n_file_expected": -1,
"n_file_found": -1 "n_file_found": -1
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment