import json from ._base_dag import BaseDAG from csst_dfs_client import plan, level0 # CHIPID_MAP = { # "csst-msc-l1-mbi": MSC_MBI_CHIPID, # "csst-msc-l1-sls": MSC_SLS_CHIPID, # "csst-msc-l1-qc0": MSC_CHIPID, # } DAG_PARAMS = { "csst-msc-l1-mbi": { "instrument": "MSC", "additional_keys": { "detector": { "key_in_dfs": "detector", "key_in_dag": "detector", "enum": [ "06", "07", "08", "09", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "22", "23", "24", "25", ], } }, }, "csst-msc-l1-sls": { "instrument": "MSC", "additional_keys": { "detector": { "key_in_dfs": "detector", "key_in_dag": "detector", "enum": [ "01", "02", "03", "04", "05", "10", "21", "26", "27", "28", "29", "30", ], }, }, }, "csst-msc-l1-qc0": { "instrument": "MSC", "additional_keys": { "detector": { "key_in_dfs": "detector", "key_in_dag": "detector", "enum": [ "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", ], }, }, }, "csst-cpic-l1": { "instrument": "CPIC", "additional_keys": { "detector": { "key_in_dfs": "detector", "key_in_dag": "detector", "enum": [ "VIS", ], }, }, }, "csst-cpic-l1-qc0": { "instrument": "CPIC", "additional_keys": { "detector": { "key_in_dfs": "detector", "key_in_dag": "detector", "enum": [ "VIS", ], }, }, }, } SCHEDULE_KWARGS = {"priority", "queue", "execution_date"} class CsstL1(BaseDAG): def __init__(self, dag_id: str): super().__init__(dag_id) self.params = DAG_PARAMS[dag_id] # MSC/MCI/IFS/CPIC/HSTDM def schedule( self, dataset: str = "csst-msc-c9-25sqdeg-v3", obs_type: str = "WIDE", obs_group="none", batch_id: str | None = "default", initial_prc_status: int = -1024, # level0 prc_status level1 final_prc_status: int = -2, demo=True, **kwargs, ): assert kwargs.keys() <= SCHEDULE_KWARGS, f"Unknown kwargs: {kwargs.keys()}" # no need to query plan # # plan.write_file(local_path="plan.json") # plan.find( # instrument="MSC", # dataset=dataset, # obs_type=obs_type, # project_id=project_id, # ) # find level0 data records recs = level0.find( instrument=self.params["instrument"], dataset=dataset, obs_type=obs_type, obs_group=obs_group, prc_status=initial_prc_status, ) assert recs.success, recs.message # generate DAG messages msgs = [] for this_rec in recs.data: # filter level0 data records is_selected = True additional_keys = {} for k, v in self.params["additional_keys"].items(): is_selected = is_selected and this_rec[v["key_in_dfs"]] in v["enum"] additional_keys[v["key_in_dag"]] = this_rec[v["key_in_dfs"]] if is_selected: # generate a DAG message if is_selected this_msg = self.gen_msg( dataset=dataset, obs_type=obs_type, obs_group=obs_group, batch_id=batch_id, obs_id=this_rec["obs_id"], # chip_id=this_rec["detector_no"], dag_run_id=self.gen_dag_run_id(), **additional_keys, **kwargs, ) # print(json.dumps(this_msg, indent=4)) if not demo: # push and update self.push(this_msg) this_update = level0.update_prc_status( level0_id=this_rec["level0_id"], dag_run_id=this_msg["dag_run_id"], prc_status=final_prc_status, dataset=dataset, ) assert this_update.success, this_update.message msgs.append(this_msg) return msgs