import json from ._base_dag import BaseDAG from csst_dfs_client import plan, level0 __all__ = ["CsstMscL1Mbi"] MSC_MBI_CHIPID = [ "06", "07", "08", "09", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "22", "23", "24", "25", ] MSC_SLS_CHIPID = [ "01", "02", "03", "04", "05", "10", "21", "26", "27", "28", "29", "30", ] class CsstMscL1Mbi(BaseDAG): def __init__(self): super().__init__("csst-msc-l1-mbi") def schedule( self, dataset: str = "csst-msc-c9-25sqdeg-v3", obs_type: str = "WIDE", project_id="none", batch_id: str | None = "default", **kwargs, ): # dataset: str = "csst-msc-c9-25sqdeg-v3" # obs_type: str = "WIDE" # project_id = "none" # batch_id: str | None = "default" # no need to query plan # plan.find( # instrument="MSC", # dataset=dataset, # obs_type=obs_type, # project_id=project_id, # ) recs = level0.find( instrument="MSC", dataset=dataset, obs_type=obs_type, project_id=project_id, ) assert recs.success, recs.message msgs = [] for rec in recs.data: msg = self.msg_template.copy() msg.update(kwargs) msg.update( dict( dataset=dataset, obs_type=obs_type, project_id=project_id, batch_id=batch_id, obs_id=rec["obs_id"], chipid=rec["detector_no"], dag_run_id=self.gen_dag_run_id(), ) ) msgs.append(msg) return msgs