run.py 6.01 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
Aim
---
Process an MSC dataset, given a set of parameters.

Example
-------
python -m csst_dag.cli.run -h

python -m csst_dag.cli.run \
    --dags csst-msc-l1-mbi \
    --dataset=csst-msc-c9-25sqdeg-v3 \
    --instrument=MSC \
    --obs-type=WIDE \
    --obs-group=W2 \
    --obs-id=10100232366 \
    --detector=09 \
    --batch-id=test-b1 \
    --priority=1 \
    --pmapname=csst_000070.pmap \
    --ref-cat=trilegal_093

# 25平方度宽场
python -m csst_dag.cli.run \
    --dags csst-msc-l1-mbi \
    --dataset=csst-msc-c9-25sqdeg-v3 \
    --instrument=MSC \
    --obs-type=WIDE \
    --prc-status=-2 \
    --batch-id=25sqdeg-test-b2 \
    --priority=1 \
    --pmapname=csst_000070.pmap \
    --ref-cat=trilegal_093 \
    --submit

"""

import argparse
import os
import json

import joblib

from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs

parser = argparse.ArgumentParser(
    description="Scheduler for CSST L1 pipeline.",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

# data parameters
parser.add_argument("--dataset", type=str, help="Dataset name")
parser.add_argument("--instrument", type=str, help="Instrument name", default=None)
parser.add_argument("--obs-type", type=str, help="Observation type", default=None)
parser.add_argument("--obs-group", type=str, help="Observation group", default=None)
parser.add_argument("--obs-id", type=str, help="Observation ID", default=None)
parser.add_argument("--detector", type=str, help="Detector name", default=None)
parser.add_argument(
    "--prc-status", type=int, help="Initial processing status", default=None
)
parser.add_argument(
    "--qc-status", type=int, help="Initial QC status", default=None
)
# task parameters
parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch")
parser.add_argument("--priority", type=str, help="Task priority", default="1")
# DAG parameters
parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="")
parser.add_argument(
    "--ref-cat", type=str, help="Reference catalog", default="trilegal_093"
)
# submit
parser.add_argument("--submit", action="store_true", help="Push results", default=False)
# post-processing parameters
parser.add_argument(
    "--final-prc-status", type=int, help="Final processing status", default=-2
)
# additional options
parser.add_argument("--force", action="store_true", help="Force success", default=False)
parser.add_argument("--verbose", action="store_true", help="Force success", default=False)
# submit top N
parser.add_argument(
    "--top-n", type=int, help="Submit top N tasks", default=-1
)
# select DAGs
parser.add_argument('--dags', nargs='+', type=str, help="DAGs to select", default=None)


args = parser.parse_args()


# from csst_dag import DotDict
#
# args = DotDict(
#     dataset="csst-msc-c9-25sqdeg-v3",
#     instrument="MSC",
#     obs_type="WIDE",
#     obs_group="W2",
#     obs_id="10100232366",
#     detector=None,
#     prc_status=None,
#     batch_id="test-batch",
#     priority=1,
#     pmapname="csst_000070.pmap",
#     ref_cat="trilegal_093",
#     submit=False,
# )

print("CLI parameters: ", args)

plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
    dataset=args.dataset,
    instrument=args.instrument,
    obs_type=args.obs_type,
    obs_group=args.obs_group,
    obs_id=args.obs_id,
    detector=args.detector,
    prc_status=args.prc_status,
    qc_status=args.qc_status,
)
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")

# generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run(
    dag_group="csst-l1-pipeline",
    batch_id=args.batch_id,
    priority=args.priority,
)

# generate DAG run list
print("\n")
print(">>> Matching DAGs ...")
dag_run_list = []
data_id_list = []
n_dag_run_all = 0
n_dag_run_success = 0

DEFAULT_DAGS = {
    "csst-msc-l1-mbi",
    "csst-msc-l1-ast",
    "csst-msc-l1-sls",
    "csst-msc-l1-qc0",
    "csst-msc-l1-ooc",
    "csst-mci-l1-qc0",
    "csst-mci-l1",
    "csst-ifs-l1",
    "csst-cpic-l1-qc0",
    "csst-cpic-l1",
    "csst-hstdm-l1",
}
assert args.dags is None or set(args.dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {args.dags}"
SELECTED_DAGS = DEFAULT_DAGS.intersection(args.dags) if args.dags is not None else DEFAULT_DAGS
print("Selected DAGs: ", SELECTED_DAGS)

for dag in SELECTED_DAGS:
    this_task_list = CSST_DAGS[dag].schedule(
        dag_group_run=dag_group_run,
        plan_basis=plan_basis,
        data_basis=data_basis,
        pmapname=args.pmapname,
        ref_cat=args.ref_cat,
        force_success=args.force,
    )
    this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None]
    this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if
                         this_task["dag_run"] is not None]

    dag_run_list.extend(this_dag_run_list)
    data_id_list.extend(this_data_id_list)
    this_n_dag_run_all = len(this_task_list)
    this_n_dag_run_success = len(this_dag_run_list)
    n_dag_run_all += this_n_dag_run_all
    n_dag_run_success += this_n_dag_run_success
    print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs")

# print dag_group_run and dag_run_list
if args.verbose:
    print("\n")
    print(">>> `dag_group_run` :")
    print(f"\t- {json.dumps(dag_group_run, separators=(',', ':'))}")
    print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
    if len(dag_run_list) > 0:
        for dag_run in dag_run_list:
            try:
                print(f"\t- {json.dumps(dag_run, separators=(',', ':'))}")
            except:
                print(f"\t- {dag_run}")

# dump dag_group_run
joblib.dump(
    dict(
        plan_basis=plan_basis,
        data_basis=data_basis,
        dag_group_run=dag_group_run,
        dag_run_list=dag_run_list,
    ),
    os.path.join(
        os.getenv("HOME"),
        "csst_dag",
        f"{dag_group_run['dag_group_run']}.joblib",
    ),
)
# submit DAG group run
if args.submit:
    res = dfs.dag.new_dag_group_run(
        dag_group_run=dag_group_run,
        dag_run_list=dag_run_list,
    )
    print(res)