import json import numpy as np from csst_dfs_client import plan, level0 from ._base_dag import BaseDAG from .._csst import csst # DAG_DETECTOR_NAMES = { # "csst-msc-l1-mbi": {"detector": csst["msc"]["mbi"].effective_detector_names}, # "csst-msc-l1-ast": {"detector": csst["msc"]["mbi"].effective_detector_names}, # "csst-msc-l1-sls": {"detector": csst["msc"]["sls"].effective_detector_names}, # "csst-msc-l1-qc0": {"detector": csst["msc"].effective_detector_names}, # "csst-msc-l1-ooc": {"detector": csst["msc"].effective_detector_names}, # "csst-mci-l1": {"detector": csst["mci"].effective_detector_names}, # "csst-ifs-l1-rss": {"detector": csst["ifs"].effective_detector_names}, # "csst-cpic-l1": {"detector": csst["cpic"].effective_detector_names}, # "csst-cpic-l1-qc0": {"detector": csst["cpic"].effective_detector_names}, # "csst-hstdm-l1": {"detector": csst["cpic"].effective_detector_names}, # } def get_detector_names_via_dag(dag: str) -> list[str]: if "msc" in dag: if "mbi" in dag or "ast" in dag: return csst["MSC"]["MBI"].effective_detector_names elif "sls" in dag: return csst["MSC"]["SLS"].effective_detector_names else: return csst["MSC"].effective_detector_names elif "mci" in dag: return csst["MCI"].effective_detector_names elif "ifs" in dag: return csst["IFS"].effective_detector_names elif "cpic" in dag: return csst["CPIC"].effective_detector_names elif "hstdm" in dag: return csst["HSTDM"].effective_detector_names class GeneralDAGViaObsid(BaseDAG): def __init__(self, dag_group: str, dag: str, use_detector: bool = False): super().__init__(dag_group=dag_group, dag=dag, use_detector=use_detector) # set effective detector names self.effective_detector_names = get_detector_names_via_dag(dag) def schedule( self, batch_id: str | None = "-", priority: int = 1, dataset: str = "csst-msc-c9-25sqdeg-v3", obs_type: str = "WIDE", obs_group: str = "W1", pmapname: str = "csst_000001.map", initial_prc_status: int = -1024, # level0 prc_status level1 final_prc_status: int = -2, demo=True, ): # no need to query plan # plan.write_file(local_path="plan.json") # plan.find( # instrument="MSC", # dataset=dataset, # obs_type=obs_type, # project_id=project_id, # ) # generate a dag_group_run dag_group_run = self.gen_dag_group_run( dag_group=self.dag_group, batch_id=batch_id, priority=priority, ) dag_run_list = [] # find level0 data records recs = level0.find( instrument=self.instrument, dataset=dataset, obs_type=obs_type, obs_group=obs_group, prc_status=initial_prc_status, ) assert recs.success, recs print(f"{len(recs.data)} records -> ", end="") if self.use_detector: # generate DAG messages via obs_id-detector for this_rec in recs.data: # filter level0 data records: detector in expected list if this_rec["detector"] in self.effective_detector_names: # generate a DAG message if is_selected this_dag_run = self.gen_dag_run( dag_group_run=dag_group_run, dag_run=self.generate_sha1(), batch_id=batch_id, pmapname=pmapname, dataset=dataset, obs_type=obs_type, obs_group=obs_group, obs_id=this_rec["obs_id"], detector=this_rec["detector"], ) dag_run_list.append(this_dag_run) # update level0 prc_status if not demo: this_update = level0.update_prc_status( level0_id=this_rec["level0_id"], dag_run=this_dag_run["dag_run"], prc_status=final_prc_status, dataset=dataset, ) assert this_update.success, this_update.message # generate DAG messages via obs_id else: u_obsid, c_obsid = np.unique( [this_rec["obs_id"] for this_rec in recs.data], return_counts=True, ) # select those obs_ids with `counts == effective detector number` u_obsid_selected = u_obsid[ c_obsid == len(self.effective_detector_names) ] for this_obsid in u_obsid[u_obsid_selected]: # generate a DAG message if is_selected this_dag_run = self.gen_dag_run( dag_group_run=dag_group_run, dag_run=self.generate_sha1(), batch_id=batch_id, pmapname=pmapname, dataset=dataset, obs_type=obs_type, obs_group=obs_group, obs_id=this_obsid, ) dag_run_list.append(this_dag_run) if not demo: # push and update res_push = self.push_dag_group_run(dag_group_run, dag_run_list) print( f"{len(dag_run_list)} DAG runs -> " f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))} -> " f"{res_push}" ) assert res_push.success, res_push.message else: # no push print( f"{len(dag_run_list)} DAG runs -> " f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))}" ) # TODO: `dag_group_run` and `dag_run_list` should be dumped to a text file in the future return dict( dag_group_run=dag_group_run, dag_run_list=dag_run_list, ) class GeneralDAGViaObsgroup(BaseDAG): def __init__(self, dag_group: str, dag: str, use_detector: bool = False): super().__init__(dag_group=dag_group, dag=dag, use_detector=use_detector) # set effective detector names self.effective_detector_names = get_detector_names_via_dag(dag) def schedule( self, batch_id: str | None = "-", priority: int = 1, dataset: str = "csst-msc-c9-25sqdeg-v3", obs_type: str = "WIDE", obs_group: str = "W1", pmapname: str = "csst_000001.map", initial_prc_status: int = -1024, # level0 prc_status level1 final_prc_status: int = -2, demo=True, ): # generate a dag_group_run dag_group_run = self.gen_dag_group_run( dag_group=self.dag_group, batch_id=batch_id, priority=priority, ) dag_run_list = [] # find plan with compact mode plan.count_plan_level0( instrument=self.instrument, obs_type=obs_type, obs_group=obs_group, dataset=dataset, prc_status=initial_prc_status, ) res_plan_level0 = plan.count_plan_level0( instrument="MSC", obs_type="WIDE", obs_group="W1", dataset="csst-msc-c9-25sqdeg-v3", prc_status=-1024, ) assert res_plan_level0.success, res_plan_level0 n_plan = res_plan_level0.data["plan_count"] n_level0 = res_plan_level0.data["level0_count"] # find level0 records if n_plan == 0: print(f"No plan found for {obs_type} {obs_group} {dataset}") if n_level0 < n_plan * self.n_effective_detector: print( f"Plan {obs_type} {obs_group} {dataset} has {n_plan} plans, " f"but {n_level0} level0 records found" ) if n_plan == n_level0 * self.n_effective_detector: print( f"Plan {obs_type} {obs_group} {dataset} has {n_plan} plans, " f"and {n_level0} level0 records found" ) # generate DAG run list if not self.use_detector: # generate a DAG run via obs_group this_dag_run = self.gen_dag_run( dag_group_run=dag_group_run, batch_id=batch_id, dag_run=self.generate_sha1(), dataset=dataset, obs_type=obs_type, obs_group=obs_group, pmapname=pmapname, ) dag_run_list.append(this_dag_run) else: # generate DAG runs via obs_group-detectors for this_detector in self.effective_detector_names: this_dag_run = self.gen_dag_run( dag_group_run=dag_group_run, batch_id=batch_id, dag_run=self.generate_sha1(), dataset=dataset, obs_type=obs_type, obs_group=obs_group, detector=this_detector, pmapname=pmapname, ) dag_run_list.append(this_dag_run) if not demo: # push and update res_push = self.push_dag_group_run(dag_group_run, dag_run_list) print( f"{len(dag_run_list)} DAG runs -> " f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))} -> " f"{res_push}" ) return dict( dag_group_run=dag_group_run, dag_run_list=dag_run_list, )