l1.py 4.23 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
3
4
import json
from ._base_dag import BaseDAG
from csst_dfs_client import plan, level0

BO ZHANG's avatar
BO ZHANG committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
MSC_DETECTORS = [
    "01",
    "02",
    "03",
    "04",
    "05",
    "06",
    "07",
    "08",
    "09",
    "10",
    "11",
    "12",
    "13",
    "14",
    "15",
    "16",
    "17",
    "18",
    "19",
    "20",
    "21",
    "22",
    "23",
    "24",
    "25",
    "26",
    "27",
    "28",
    "29",
    "30",
]
MSC_MBI_DETECTORS = [
    "06",
    "07",
    "08",
    "09",
    "11",
    "12",
    "13",
    "14",
    "15",
    "16",
    "17",
    "18",
    "19",
    "20",
    "22",
    "23",
    "24",
    "25",
]
MSC_SLS_DETECTORS = [
    "01",
    "02",
    "03",
    "04",
    "05",
    "10",
    "21",
    "26",
    "27",
    "28",
    "29",
    "30",
]
BO ZHANG's avatar
BO ZHANG committed
71
72
73

DAG_PARAMS = {
    "csst-msc-l1-mbi": {
BO ZHANG's avatar
BO ZHANG committed
74
75
76
77
        "detector": MSC_MBI_DETECTORS,
    },
    "csst-msc-l1-ast": {
        "detector": MSC_MBI_DETECTORS,
BO ZHANG's avatar
BO ZHANG committed
78
79
    },
    "csst-msc-l1-sls": {
BO ZHANG's avatar
BO ZHANG committed
80
        "detector": MSC_SLS_DETECTORS,
BO ZHANG's avatar
BO ZHANG committed
81
82
    },
    "csst-msc-l1-qc0": {
BO ZHANG's avatar
BO ZHANG committed
83
        "detector": MSC_DETECTORS,
BO ZHANG's avatar
BO ZHANG committed
84
85
    },
    "csst-cpic-l1": {
BO ZHANG's avatar
BO ZHANG committed
86
        "detector": ["VIS"],
BO ZHANG's avatar
BO ZHANG committed
87
88
    },
    "csst-cpic-l1-qc0": {
BO ZHANG's avatar
BO ZHANG committed
89
        "detector": ["VIS"],
BO ZHANG's avatar
BO ZHANG committed
90
91
92
    },
}

BO ZHANG's avatar
BO ZHANG committed
93
94
95
96
97
SCHEDULE_KWARGS = {
    "priority",
    # "queue",
    # "execution_date",
}
BO ZHANG's avatar
BO ZHANG committed
98
99


BO ZHANG's avatar
BO ZHANG committed
100
class GeneralL1DAG(BaseDAG):
BO ZHANG's avatar
BO ZHANG committed
101

BO ZHANG's avatar
BO ZHANG committed
102
103
104
105
    def __init__(self, dag_group: str, dag: str):
        super().__init__(dag_group=dag_group, dag=dag)
        # set parameters
        self.params = DAG_PARAMS[dag]
BO ZHANG's avatar
BO ZHANG committed
106
107
108

    def schedule(
        self,
BO ZHANG's avatar
BO ZHANG committed
109
        batch_id: str | None = "-",
BO ZHANG's avatar
BO ZHANG committed
110
111
        dataset: str = "csst-msc-c9-25sqdeg-v3",
        obs_type: str = "WIDE",
BO ZHANG's avatar
BO ZHANG committed
112
        obs_group="W1",
BO ZHANG's avatar
BO ZHANG committed
113
        initial_prc_status: int = -1024,  # level0 prc_status level1
BO ZHANG's avatar
BO ZHANG committed
114
115
116
117
118
119
        final_prc_status: int = -2,
        demo=True,
        **kwargs,
    ):
        assert kwargs.keys() <= SCHEDULE_KWARGS, f"Unknown kwargs: {kwargs.keys()}"
        # no need to query plan
BO ZHANG's avatar
BO ZHANG committed
120
        # plan.write_file(local_path="plan.json")
BO ZHANG's avatar
BO ZHANG committed
121
122
123
124
125
126
127
        # plan.find(
        #     instrument="MSC",
        #     dataset=dataset,
        #     obs_type=obs_type,
        #     project_id=project_id,
        # )

BO ZHANG's avatar
BO ZHANG committed
128
129
130
131
132
133
134
135
136
        # generate a dag_group_run
        dag_group_run = self.gen_dag_group_run(
            dag_group=self.dag_group,
            batch_id=batch_id,
            priority=kwargs.get("priority", 1),
        )
        if demo:
            print(json.dumps(dag_group_run, indent=4))

BO ZHANG's avatar
BO ZHANG committed
137
138
        # find level0 data records
        recs = level0.find(
BO ZHANG's avatar
BO ZHANG committed
139
            instrument=self.instrument,
BO ZHANG's avatar
BO ZHANG committed
140
141
            dataset=dataset,
            obs_type=obs_type,
BO ZHANG's avatar
BO ZHANG committed
142
            obs_group=obs_group,
BO ZHANG's avatar
BO ZHANG committed
143
144
145
146
147
            prc_status=initial_prc_status,
        )
        assert recs.success, recs.message

        # generate DAG messages
BO ZHANG's avatar
BO ZHANG committed
148
        dag_run_list = []
BO ZHANG's avatar
BO ZHANG committed
149
150
151
152
153
        for this_rec in recs.data:

            # filter level0 data records
            is_selected = True
            additional_keys = {}
BO ZHANG's avatar
BO ZHANG committed
154
155
156
            for k, v in self.params.items():
                is_selected = this_rec[k] in v and is_selected
                additional_keys[k] = this_rec[k]
BO ZHANG's avatar
BO ZHANG committed
157
158
159

            if is_selected:
                # generate a DAG message if is_selected
BO ZHANG's avatar
BO ZHANG committed
160
161
162
163
                this_dag_run = self.gen_dag_run(
                    dag_group_run=dag_group_run,
                    batch_id=batch_id,
                    dag_run_id=self.generate_sha1(),
BO ZHANG's avatar
BO ZHANG committed
164
165
                    dataset=dataset,
                    obs_type=obs_type,
BO ZHANG's avatar
BO ZHANG committed
166
                    obs_group=obs_group,
BO ZHANG's avatar
BO ZHANG committed
167
168
169
170
                    obs_id=this_rec["obs_id"],
                    **additional_keys,
                    **kwargs,
                )
BO ZHANG's avatar
BO ZHANG committed
171
172
173
174
175
176
177
178
179
180
181
                if demo:
                    print(json.dumps(this_dag_run, indent=4))
                # update level0 prc_status
                this_update = level0.update_prc_status(
                    level0_id=this_rec["level0_id"],
                    dag_run_id=this_dag_run["dag_run_id"],
                    prc_status=final_prc_status,
                    dataset=dataset,
                )
                assert this_update.success, this_update.message
                dag_run_list.append(this_dag_run)
BO ZHANG's avatar
BO ZHANG committed
182

BO ZHANG's avatar
BO ZHANG committed
183
184
185
186
187
188
        if not demo:
            # push and update
            res_push = self.push_dag_group_run(dag_group_run, dag_run_list)
            print(res_push)
            assert res_push.success, res_push.message
        return dag_group_run, dag_run_list