import json from ._base_dag import BaseDAG from csst_dfs_client import plan, level0 __all__ = ["CsstMscL1Mbi"] MSC_MBI_CHIPID = [ "06", "07", "08", "09", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "22", "23", "24", "25", ] MSC_SLS_CHIPID = [ "01", "02", "03", "04", "05", "10", "21", "26", "27", "28", "29", "30", ] class CsstMscL1Mbi(BaseDAG): def __init__(self): super().__init__("csst-msc-l1-mbi") def schedule( self, dataset: str = "csst-msc-c9-25sqdeg-v3", obs_type: str = "WIDE", project_id="none", batch_id: str | None = "default", prc_status: int = -1024, demo=True, **kwargs, ): # dataset: str = "csst-msc-c9-25sqdeg-v3" # obs_type: str = "WIDE" # project_id = "none" # batch_id: str | None = "default" # no need to query plan # plan.find( # instrument="MSC", # dataset=dataset, # obs_type=obs_type, # project_id=project_id, # ) recs = level0.find( instrument="MSC", dataset=dataset, obs_type=obs_type, project_id=project_id, prc_status=prc_status, ) assert recs.success, recs.message msgs = [] for this_rec in recs.data: if this_rec["detector_no"] in MSC_MBI_CHIPID: this_msg = self.gen_msg( dataset=dataset, obs_type=obs_type, project_id=project_id, batch_id=batch_id, obs_id=this_rec["obs_id"], chip_id=this_rec["detector_no"], dag_run_id=self.gen_dag_run_id(), **kwargs, ) if not demo: # push and update self.push(this_msg) this_update = level0.update_prc_status( level0_id=this_rec["level0_id"], dag_run_id=this_msg["dag_run_id"], prc_status=-2, dataset=dataset, ) assert this_update.success, this_update.message msgs.append(this_msg) return msgs