l1.py 4.34 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
3
4
import json
from ._base_dag import BaseDAG
from csst_dfs_client import plan, level0

BO ZHANG's avatar
BO ZHANG committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
MSC_DETECTORS = [
    "01",
    "02",
    "03",
    "04",
    "05",
    "06",
    "07",
    "08",
    "09",
    "10",
    "11",
    "12",
    "13",
    "14",
    "15",
    "16",
    "17",
    "18",
    "19",
    "20",
    "21",
    "22",
    "23",
    "24",
    "25",
    "26",
    "27",
    "28",
    "29",
    "30",
]
MSC_MBI_DETECTORS = [
    "06",
    "07",
    "08",
    "09",
    "11",
    "12",
    "13",
    "14",
    "15",
    "16",
    "17",
    "18",
    "19",
    "20",
    "22",
    "23",
    "24",
    "25",
]
MSC_SLS_DETECTORS = [
    "01",
    "02",
    "03",
    "04",
    "05",
    "10",
    "21",
    "26",
    "27",
    "28",
    "29",
    "30",
]
BO ZHANG's avatar
BO ZHANG committed
71
72
73

DAG_PARAMS = {
    "csst-msc-l1-mbi": {
BO ZHANG's avatar
BO ZHANG committed
74
75
76
77
        "detector": MSC_MBI_DETECTORS,
    },
    "csst-msc-l1-ast": {
        "detector": MSC_MBI_DETECTORS,
BO ZHANG's avatar
BO ZHANG committed
78
79
    },
    "csst-msc-l1-sls": {
BO ZHANG's avatar
BO ZHANG committed
80
        "detector": MSC_SLS_DETECTORS,
BO ZHANG's avatar
BO ZHANG committed
81
82
    },
    "csst-msc-l1-qc0": {
BO ZHANG's avatar
BO ZHANG committed
83
        "detector": MSC_DETECTORS,
BO ZHANG's avatar
BO ZHANG committed
84
85
    },
    "csst-cpic-l1": {
BO ZHANG's avatar
BO ZHANG committed
86
        "detector": ["VIS"],
BO ZHANG's avatar
BO ZHANG committed
87
88
    },
    "csst-cpic-l1-qc0": {
BO ZHANG's avatar
BO ZHANG committed
89
        "detector": ["VIS"],
BO ZHANG's avatar
BO ZHANG committed
90
91
92
93
    },
}


BO ZHANG's avatar
BO ZHANG committed
94
class GeneralL1DAG(BaseDAG):
BO ZHANG's avatar
BO ZHANG committed
95

BO ZHANG's avatar
BO ZHANG committed
96
97
98
    def __init__(self, dag_group: str, dag: str):
        super().__init__(dag_group=dag_group, dag=dag)
        # set parameters
BO ZHANG's avatar
tweaks    
BO ZHANG committed
99
        self.params = DAG_PARAMS.get(dag, {})
BO ZHANG's avatar
BO ZHANG committed
100
101
102

    def schedule(
        self,
BO ZHANG's avatar
BO ZHANG committed
103
        batch_id: str | None = "-",
BO ZHANG's avatar
tweaks    
BO ZHANG committed
104
        priority: int = 1,
BO ZHANG's avatar
BO ZHANG committed
105
106
        dataset: str = "csst-msc-c9-25sqdeg-v3",
        obs_type: str = "WIDE",
BO ZHANG's avatar
BO ZHANG committed
107
        obs_group="W1",
BO ZHANG's avatar
BO ZHANG committed
108
        initial_prc_status: int = -1024,  # level0 prc_status level1
BO ZHANG's avatar
BO ZHANG committed
109
110
111
112
        final_prc_status: int = -2,
        demo=True,
    ):
        # no need to query plan
BO ZHANG's avatar
BO ZHANG committed
113
        # plan.write_file(local_path="plan.json")
BO ZHANG's avatar
BO ZHANG committed
114
115
116
117
118
119
120
        # plan.find(
        #     instrument="MSC",
        #     dataset=dataset,
        #     obs_type=obs_type,
        #     project_id=project_id,
        # )

BO ZHANG's avatar
BO ZHANG committed
121
122
123
124
        # generate a dag_group_run
        dag_group_run = self.gen_dag_group_run(
            dag_group=self.dag_group,
            batch_id=batch_id,
BO ZHANG's avatar
tweaks    
BO ZHANG committed
125
            priority=priority,
BO ZHANG's avatar
BO ZHANG committed
126
127
        )
        if demo:
BO ZHANG's avatar
tweaks    
BO ZHANG committed
128
129
130
131
132
            # print(json.dumps(dag_group_run, indent=4))
            # try single-lined json verbose
            print(
                "    - " + json.dumps(dag_group_run, indent=None, separators=(",", ":"))
            )
BO ZHANG's avatar
BO ZHANG committed
133

BO ZHANG's avatar
BO ZHANG committed
134
135
        # find level0 data records
        recs = level0.find(
BO ZHANG's avatar
BO ZHANG committed
136
            instrument=self.instrument,
BO ZHANG's avatar
BO ZHANG committed
137
138
            dataset=dataset,
            obs_type=obs_type,
BO ZHANG's avatar
BO ZHANG committed
139
            obs_group=obs_group,
BO ZHANG's avatar
BO ZHANG committed
140
141
142
143
144
            prc_status=initial_prc_status,
        )
        assert recs.success, recs.message

        # generate DAG messages
BO ZHANG's avatar
BO ZHANG committed
145
        dag_run_list = []
BO ZHANG's avatar
BO ZHANG committed
146
147
148
149
150
        for this_rec in recs.data:

            # filter level0 data records
            is_selected = True
            additional_keys = {}
BO ZHANG's avatar
BO ZHANG committed
151
152
153
            for k, v in self.params.items():
                is_selected = this_rec[k] in v and is_selected
                additional_keys[k] = this_rec[k]
BO ZHANG's avatar
BO ZHANG committed
154
155
156

            if is_selected:
                # generate a DAG message if is_selected
BO ZHANG's avatar
BO ZHANG committed
157
158
159
160
                this_dag_run = self.gen_dag_run(
                    dag_group_run=dag_group_run,
                    batch_id=batch_id,
                    dag_run_id=self.generate_sha1(),
BO ZHANG's avatar
BO ZHANG committed
161
162
                    dataset=dataset,
                    obs_type=obs_type,
BO ZHANG's avatar
BO ZHANG committed
163
                    obs_group=obs_group,
BO ZHANG's avatar
BO ZHANG committed
164
165
166
                    obs_id=this_rec["obs_id"],
                    **additional_keys,
                )
BO ZHANG's avatar
BO ZHANG committed
167
                if demo:
BO ZHANG's avatar
tweaks    
BO ZHANG committed
168
                    print(json.dumps(this_dag_run, indent=4), end="")
BO ZHANG's avatar
BO ZHANG committed
169
170
171
172
173
174
175
176
177
                # update level0 prc_status
                this_update = level0.update_prc_status(
                    level0_id=this_rec["level0_id"],
                    dag_run_id=this_dag_run["dag_run_id"],
                    prc_status=final_prc_status,
                    dataset=dataset,
                )
                assert this_update.success, this_update.message
                dag_run_list.append(this_dag_run)
BO ZHANG's avatar
BO ZHANG committed
178

BO ZHANG's avatar
BO ZHANG committed
179
180
181
        if not demo:
            # push and update
            res_push = self.push_dag_group_run(dag_group_run, dag_run_list)
BO ZHANG's avatar
tweaks    
BO ZHANG committed
182
            print(f"{len(dag_run_list)} DAG runs -> {res_push}")
BO ZHANG's avatar
BO ZHANG committed
183
            assert res_push.success, res_push.message
BO ZHANG's avatar
tweaks    
BO ZHANG committed
184
185
        else:
            # no push
BO ZHANG's avatar
tweaks    
BO ZHANG committed
186
            print(f"{len(dag_run_list)} DAG runs.")
BO ZHANG's avatar
BO ZHANG committed
187
        return dag_group_run, dag_run_list