import json from ._base_dag import BaseDAG from csst_dfs_client import plan, level0 MSC_DETECTORS = [ "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", ] MSC_MBI_DETECTORS = [ "06", "07", "08", "09", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "22", "23", "24", "25", ] MSC_SLS_DETECTORS = [ "01", "02", "03", "04", "05", "10", "21", "26", "27", "28", "29", "30", ] DAG_PARAMS = { "csst-msc-l1-mbi": { "detector": MSC_MBI_DETECTORS, }, "csst-msc-l1-ast": { "detector": MSC_MBI_DETECTORS, }, "csst-msc-l1-sls": { "detector": MSC_SLS_DETECTORS, }, "csst-msc-l1-qc0": { "detector": MSC_DETECTORS, }, "csst-cpic-l1": { "detector": ["VIS"], }, "csst-cpic-l1-qc0": { "detector": ["VIS"], }, } class GeneralL1DAG(BaseDAG): def __init__(self, dag_group: str, dag: str): super().__init__(dag_group=dag_group, dag=dag) # set parameters self.params = DAG_PARAMS.get(dag, {}) def schedule( self, batch_id: str | None = "-", priority: int = 1, dataset: str = "csst-msc-c9-25sqdeg-v3", obs_type: str = "WIDE", obs_group="W1", initial_prc_status: int = -1024, # level0 prc_status level1 final_prc_status: int = -2, demo=True, ): # no need to query plan # plan.write_file(local_path="plan.json") # plan.find( # instrument="MSC", # dataset=dataset, # obs_type=obs_type, # project_id=project_id, # ) # generate a dag_group_run dag_group_run = self.gen_dag_group_run( dag_group=self.dag_group, batch_id=batch_id, priority=priority, ) # find level0 data records recs = level0.find( instrument=self.instrument, dataset=dataset, obs_type=obs_type, obs_group=obs_group, prc_status=initial_prc_status, ) assert recs.success, recs.message # generate DAG messages dag_run_list = [] for this_rec in recs.data: # filter level0 data records is_selected = True additional_keys = {} for k, v in self.params.items(): is_selected = this_rec[k] in v and is_selected additional_keys[k] = this_rec[k] if is_selected: # generate a DAG message if is_selected this_dag_run = self.gen_dag_run( dag_group_run=dag_group_run, batch_id=batch_id, dag_run_id=self.generate_sha1(), dataset=dataset, obs_type=obs_type, obs_group=obs_group, obs_id=this_rec["obs_id"], **additional_keys, ) if demo: print(json.dumps(this_dag_run, indent=4), end="") # update level0 prc_status this_update = level0.update_prc_status( level0_id=this_rec["level0_id"], dag_run_id=this_dag_run["dag_run_id"], prc_status=final_prc_status, dataset=dataset, ) assert this_update.success, this_update.message dag_run_list.append(this_dag_run) if not demo: # push and update res_push = self.push_dag_group_run(dag_group_run, dag_run_list) print( f"{len(dag_run_list)} DAG runs -> " f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))} -> " f"{res_push}" ) assert res_push.success, res_push.message else: # no push print( f"{len(dag_run_list)} DAG runs -> " f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))}" ) # TODO: `dag_group_run` and `dag_run_list` should be dumped to a text file in the future return dag_group_run, dag_run_list