_run.py 5.79 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
3
4
5
6
7
8
9
10
import argparse
import os
import json
from typing import Optional
import joblib

from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs


def run_l1_pipeline(
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    # data parameters
    dataset: str,
    instrument: Optional[str] = None,
    obs_type: Optional[str] = None,
    obs_group: Optional[str] = None,
    obs_id: Optional[str] = None,
    detector: Optional[str] = None,
    prc_status: Optional[int] = None,
    qc_status: Optional[int] = None,
    # task parameters
    batch_id: Optional[str] = "test-batch",
    priority: Optional[str] = "1",
    # DAG parameters
    pmapname: Optional[str] = "",
    ref_cat: Optional[str] = "trilegal_093",
    # submit
    verbose: Optional[bool] = True,
    submit: Optional[bool] = False,
    final_prc_status: Optional[int] = -2,
    force: Optional[bool] = False,
    top_n: Optional[int] = -1,
    # select DAGs
    dags: Optional[list[str]] = None,
    dag_group: Optional[str] = "csst-l1-pipeline",
):
BO ZHANG's avatar
BO ZHANG committed
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
    """Run a DAG.

    Parameters
    ----------
    dataset : str
        Dataset name
    instrument : Optional[str], optional
        Instrument name, by default None
    obs_type : Optional[str], optional
        Observation type, by default None
    obs_group : Optional[str], optional
        Observation group, by default None
    obs_id : Optional[str], optional
        Observation ID, by default None
    detector : Optional[str], optional
        Detector name, by default None
    prc_status : Optional[int], optional
        Initial processing status, by default None
    qc_status : Optional[int], optional
        Initial QC status, by default None
    batch_id : Optional[str], optional
        Batch ID, by default "test-batch"
    priority : Optional[str], optional
        Task priority, by default "1"
    pmapname : Optional[str], optional
        CCDS pmapname, by default ""
    ref_cat : Optional[str], optional
        Reference catalog, by default "trilegal_093"
    submit : Optional[bool], optional
        Push results, by default False
    final_prc_status : Optional[int], optional
        Final processing status, by default -2
    force : Optional[bool], optional
        Force success, by default False
    verbose : Optional[bool], optional
        Force success, by default False
    top_n : Optional[int], optional
        Submit top N tasks, by default -1
    dags : Optional[list[str]], optional
        DAGs to select, by default None
    dag_group: Optional[str]="csst-l1-pipeline",
        DAG group.
    """

    plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
        dataset=dataset,
        instrument=instrument,
        obs_type=obs_type,
        obs_group=obs_group,
        obs_id=obs_id,
        detector=detector,
        prc_status=prc_status,
        qc_status=qc_status,
    )
    print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")

    # generate DAG group run
93
    dag_group_run = BaseDAG.generate_dag_group_run(
BO ZHANG's avatar
BO ZHANG committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
        dag_group=dag_group,
        batch_id=batch_id,
        priority=priority,
    )

    # generate DAG run list
    print("\n")
    print(">>> Matching DAGs ...")
    dag_run_list = []
    data_id_list = []
    n_dag_run_all = 0
    n_dag_run_success = 0

    DEFAULT_DAGS = {
        "csst-msc-l1-mbi",
        "csst-msc-l1-ast",
        "csst-msc-l1-sls",
        "csst-msc-l1-qc0",
        "csst-msc-l1-ooc",
        "csst-mci-l1-qc0",
        "csst-mci-l1",
        "csst-ifs-l1",
        "csst-cpic-l1-qc0",
        "csst-cpic-l1",
        "csst-hstdm-l1",
    }
    assert dags is None or set(dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {dags}"
121
122
123
    SELECTED_DAGS = (
        DEFAULT_DAGS.intersection(dags) if dags is not None else DEFAULT_DAGS
    )
BO ZHANG's avatar
BO ZHANG committed
124
125
126
127
128
129
130
131
132
133
134
    print("Selected DAGs: ", SELECTED_DAGS)

    for dag in SELECTED_DAGS:
        this_task_list = CSST_DAGS[dag].schedule(
            dag_group_run=dag_group_run,
            plan_basis=plan_basis,
            data_basis=data_basis,
            pmapname=pmapname,
            ref_cat=ref_cat,
            force_success=force,
        )
135
136
137
138
139
140
141
142
143
144
        this_dag_run_list = [
            this_task["dag_run"]
            for this_task in this_task_list
            if this_task["dag_run"] is not None
        ]
        this_data_id_list = [
            this_task["relevant_data_id_list"]
            for this_task in this_task_list
            if this_task["dag_run"] is not None
        ]
BO ZHANG's avatar
BO ZHANG committed
145
146
147
148
149
150
151

        dag_run_list.extend(this_dag_run_list)
        data_id_list.extend(this_data_id_list)
        this_n_dag_run_all = len(this_task_list)
        this_n_dag_run_success = len(this_dag_run_list)
        n_dag_run_all += this_n_dag_run_all
        n_dag_run_success += this_n_dag_run_success
152
153
154
        print(
            f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs"
        )
BO ZHANG's avatar
BO ZHANG committed
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189

    # print dag_group_run and dag_run_list
    if verbose:
        print("\n")
        print(">>> `dag_group_run` :")
        print(f"\t- {json.dumps(dag_group_run, separators=(',', ':'))}")
        print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
        if len(dag_run_list) > 0:
            for dag_run in dag_run_list:
                try:
                    print(f"\t- {json.dumps(dag_run, separators=(',', ':'))}")
                except:
                    print(f"\t- {dag_run}")

    # dump dag_group_run
    # joblib.dump(
    #     dict(
    #         plan_basis=plan_basis,
    #         data_basis=data_basis,
    #         dag_group_run=dag_group_run,
    #         dag_run_list=dag_run_list,
    #     ),
    #     os.path.join(
    #         os.getenv("HOME"),
    #         "csst_dag",
    #         f"{dag_group_run['dag_group_run']}.joblib",
    #     ),
    # )
    # submit DAG group run
    if submit:
        res = dfs.dag.new_dag_group_run(
            dag_group_run=dag_group_run,
            dag_run_list=dag_run_list,
        )
        print(res)