_run.py 5.73 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import argparse
import os
import json
from typing import Optional
import joblib

from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs


def run_l1_pipeline(
        # data parameters
        dataset:str,
        instrument:Optional[str]=None,
        obs_type:Optional[str]=None,
        obs_group:Optional[str]=None,
        obs_id:Optional[str]=None,
        detector:Optional[str]=None,
        prc_status:Optional[int]=None,
        qc_status:Optional[int]=None,
        # task parameters
        batch_id:Optional[str]="test-batch",
        priority:Optional[str]="1",
        # DAG parameters
        pmapname:Optional[str]="",
        ref_cat:Optional[str]="trilegal_093",
        # submit
        verbose:Optional[bool]=True,
        submit:Optional[bool]=False,
        final_prc_status:Optional[int]=-2,
        force:Optional[bool]=False,
        top_n:Optional[int]=-1,
        # select DAGs
        dags:Optional[list[str]]=None,
        dag_group: Optional[str]="csst-l1-pipeline",
    ):
    """Run a DAG.

    Parameters
    ----------
    dataset : str
        Dataset name
    instrument : Optional[str], optional
        Instrument name, by default None
    obs_type : Optional[str], optional
        Observation type, by default None
    obs_group : Optional[str], optional
        Observation group, by default None
    obs_id : Optional[str], optional
        Observation ID, by default None
    detector : Optional[str], optional
        Detector name, by default None
    prc_status : Optional[int], optional
        Initial processing status, by default None
    qc_status : Optional[int], optional
        Initial QC status, by default None
    batch_id : Optional[str], optional
        Batch ID, by default "test-batch"
    priority : Optional[str], optional
        Task priority, by default "1"
    pmapname : Optional[str], optional
        CCDS pmapname, by default ""
    ref_cat : Optional[str], optional
        Reference catalog, by default "trilegal_093"
    submit : Optional[bool], optional
        Push results, by default False
    final_prc_status : Optional[int], optional
        Final processing status, by default -2
    force : Optional[bool], optional
        Force success, by default False
    verbose : Optional[bool], optional
        Force success, by default False
    top_n : Optional[int], optional
        Submit top N tasks, by default -1
    dags : Optional[list[str]], optional
        DAGs to select, by default None
    dag_group: Optional[str]="csst-l1-pipeline",
        DAG group.
    """

    plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
        dataset=dataset,
        instrument=instrument,
        obs_type=obs_type,
        obs_group=obs_group,
        obs_id=obs_id,
        detector=detector,
        prc_status=prc_status,
        qc_status=qc_status,
    )
    print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")

    # generate DAG group run
    dag_group_run = BaseDAG.gen_dag_group_run(
        dag_group=dag_group,
        batch_id=batch_id,
        priority=priority,
    )

    # generate DAG run list
    print("\n")
    print(">>> Matching DAGs ...")
    dag_run_list = []
    data_id_list = []
    n_dag_run_all = 0
    n_dag_run_success = 0

    DEFAULT_DAGS = {
        "csst-msc-l1-mbi",
        "csst-msc-l1-ast",
        "csst-msc-l1-sls",
        "csst-msc-l1-qc0",
        "csst-msc-l1-ooc",
        "csst-mci-l1-qc0",
        "csst-mci-l1",
        "csst-ifs-l1",
        "csst-cpic-l1-qc0",
        "csst-cpic-l1",
        "csst-hstdm-l1",
    }
    assert dags is None or set(dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {dags}"
    SELECTED_DAGS = DEFAULT_DAGS.intersection(dags) if dags is not None else DEFAULT_DAGS
    print("Selected DAGs: ", SELECTED_DAGS)

    for dag in SELECTED_DAGS:
        this_task_list = CSST_DAGS[dag].schedule(
            dag_group_run=dag_group_run,
            plan_basis=plan_basis,
            data_basis=data_basis,
            pmapname=pmapname,
            ref_cat=ref_cat,
            force_success=force,
        )
        this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None]
        this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if
                             this_task["dag_run"] is not None]

        dag_run_list.extend(this_dag_run_list)
        data_id_list.extend(this_data_id_list)
        this_n_dag_run_all = len(this_task_list)
        this_n_dag_run_success = len(this_dag_run_list)
        n_dag_run_all += this_n_dag_run_all
        n_dag_run_success += this_n_dag_run_success
        print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs")

    # print dag_group_run and dag_run_list
    if verbose:
        print("\n")
        print(">>> `dag_group_run` :")
        print(f"\t- {json.dumps(dag_group_run, separators=(',', ':'))}")
        print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
        if len(dag_run_list) > 0:
            for dag_run in dag_run_list:
                try:
                    print(f"\t- {json.dumps(dag_run, separators=(',', ':'))}")
                except:
                    print(f"\t- {dag_run}")

    # dump dag_group_run
    # joblib.dump(
    #     dict(
    #         plan_basis=plan_basis,
    #         data_basis=data_basis,
    #         dag_group_run=dag_group_run,
    #         dag_run_list=dag_run_list,
    #     ),
    #     os.path.join(
    #         os.getenv("HOME"),
    #         "csst_dag",
    #         f"{dag_group_run['dag_group_run']}.joblib",
    #     ),
    # )
    # submit DAG group run
    if submit:
        res = dfs.dag.new_dag_group_run(
            dag_group_run=dag_group_run,
            dag_run_list=dag_run_list,
        )
        print(res)