csst_mci_l1.py 4.15 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
"""
Aim
---
Process an MCI dataset.

Example
-------
python -m csst_dag.cli.csst_mci_l1 -h

python -m csst_dag.cli.csst_mci_l1 \
    --dataset=csst-cpic-c11-hip71681-v1 \
    --instrument=CPIC \
    --obs-type=SCI \
    --obs-group=hip71681 \
    --obs-id=40100000001 \
    --detector=VIS \
    --prc-status=-1024 \
    --batch-id=test-b1 \
    --priority=1
"""

import argparse
import os

import joblib

from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs

parser = argparse.ArgumentParser(
    description="Scheduler for CSST CPIC L1 pipeline.",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

# data parameters
parser.add_argument("--dataset", type=str, help="Dataset name")
parser.add_argument("--instrument", type=str, help="Instrument name", default=None)
parser.add_argument("--obs-type", type=str, help="Observation type", default=None)
parser.add_argument("--obs-group", type=str, help="Observation group", default=None)
parser.add_argument("--obs-id", type=str, help="Observation ID", default=None)
parser.add_argument("--detector", type=str, help="Detector name", default=None)
parser.add_argument(
    "--prc-status", type=int, help="Initial processing status", default=None
)
# task parameters
parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch")
parser.add_argument("--priority", type=str, help="Task priority", default=1)
# DAG parameters
parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="")
BO ZHANG's avatar
BO ZHANG committed
49
50
51
parser.add_argument(
    "--ref-cat", type=str, help="Reference catalog", default="trilegal_093"
)
52
53
54
55
56
57
# submit
parser.add_argument("--submit", action="store_true", help="Push results", default=False)
# post-processing parameters
parser.add_argument(
    "--final-prc-status", type=int, help="Final processing status", default=-2
)
BO ZHANG's avatar
BO ZHANG committed
58
59
# force option
parser.add_argument("--force", action="store_true", help="Force success", default=False)
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

args = parser.parse_args()
print("CLI parameters: ", args)


plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
    dataset=args.dataset,
    instrument=args.instrument,
    obs_type=args.obs_type,
    obs_group=args.obs_group,
    obs_id=args.obs_id,
    detector=args.detector,
    prc_status=args.prc_status,
)
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")

# generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run(
    dag_group="csst-cpic-l1",
    batch_id=args.batch_id,
    priority=args.priority,
)

# generate DAG run list
print("\n"*2)
print(">>> Matching DAGs ...")
dag_run_list = []
data_id_list = []
BO ZHANG's avatar
tweaks    
BO ZHANG committed
88
89
n_dag_run_all = 0
n_dag_run_success = 0
90
91
92
93
94
95
96
97
98
for dag in [
    "csst-cpic-l1",
    "csst-cpic-l1-qc0",
]:
    this_task_list = CSST_DAGS[dag].schedule(
        dag_group_run=dag_group_run,
        plan_basis=plan_basis,
        data_basis=data_basis,
        pmapname=args.pmapname,
BO ZHANG's avatar
BO ZHANG committed
99
        ref_cat=args.ref_cat,
BO ZHANG's avatar
tweaks    
BO ZHANG committed
100
        force_success=args.force,
101
102
    )
    this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None]
BO ZHANG's avatar
tweaks    
BO ZHANG committed
103
104
105
    this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if
                         this_task["dag_run"] is not None]

106
107
    dag_run_list.extend(this_dag_run_list)
    data_id_list.extend(this_data_id_list)
BO ZHANG's avatar
tweaks    
BO ZHANG committed
108
109
110
111
112
    this_n_dag_run_all = len(this_task_list)
    this_n_dag_run_success = len(this_dag_run_list)
    n_dag_run_all += this_n_dag_run_all
    n_dag_run_success += this_n_dag_run_success
    print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs")
113

BO ZHANG's avatar
tweaks    
BO ZHANG committed
114
    # print dag_group_run and dag_run_list
115
print("\n")
BO ZHANG's avatar
tweaks    
BO ZHANG committed
116
print(">>> `dag_group_run` :")
117
print(f"\t- {dag_group_run}")
BO ZHANG's avatar
tweaks    
BO ZHANG committed
118
print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
if len(dag_run_list) > 0:
    for dag_run in dag_run_list:
        print(f"\t- {dag_run}")

# dump dag_group_run
joblib.dump(
    dict(
        dag_group_run=dag_group_run,
        dag_run_list=dag_run_list,
    ),
    os.path.join(
        os.getenv("HOME"),
        "csst_dag",
        f"{dag_group_run['dag_group_run']}.joblib",
    ),
)
# submit DAG group run
if args.submit:
    res = dfs.dag.new_dag_group_run(
        dag_group_run=dag_group_run,
        dag_run_list=dag_run_list,
    )
    print(res)