import argparse import os import json from typing import Optional import joblib from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs def run_l1_pipeline( # data parameters dataset:str, instrument:Optional[str]=None, obs_type:Optional[str]=None, obs_group:Optional[str]=None, obs_id:Optional[str]=None, detector:Optional[str]=None, prc_status:Optional[int]=None, qc_status:Optional[int]=None, # task parameters batch_id:Optional[str]="test-batch", priority:Optional[str]="1", # DAG parameters pmapname:Optional[str]="", ref_cat:Optional[str]="trilegal_093", # submit verbose:Optional[bool]=True, submit:Optional[bool]=False, final_prc_status:Optional[int]=-2, force:Optional[bool]=False, top_n:Optional[int]=-1, # select DAGs dags:Optional[list[str]]=None, dag_group: Optional[str]="csst-l1-pipeline", ): """Run a DAG. Parameters ---------- dataset : str Dataset name instrument : Optional[str], optional Instrument name, by default None obs_type : Optional[str], optional Observation type, by default None obs_group : Optional[str], optional Observation group, by default None obs_id : Optional[str], optional Observation ID, by default None detector : Optional[str], optional Detector name, by default None prc_status : Optional[int], optional Initial processing status, by default None qc_status : Optional[int], optional Initial QC status, by default None batch_id : Optional[str], optional Batch ID, by default "test-batch" priority : Optional[str], optional Task priority, by default "1" pmapname : Optional[str], optional CCDS pmapname, by default "" ref_cat : Optional[str], optional Reference catalog, by default "trilegal_093" submit : Optional[bool], optional Push results, by default False final_prc_status : Optional[int], optional Final processing status, by default -2 force : Optional[bool], optional Force success, by default False verbose : Optional[bool], optional Force success, by default False top_n : Optional[int], optional Submit top N tasks, by default -1 dags : Optional[list[str]], optional DAGs to select, by default None dag_group: Optional[str]="csst-l1-pipeline", DAG group. """ plan_basis, data_basis = Dispatcher.find_plan_level0_basis( dataset=dataset, instrument=instrument, obs_type=obs_type, obs_group=obs_group, obs_id=obs_id, detector=detector, prc_status=prc_status, qc_status=qc_status, ) print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found") # generate DAG group run dag_group_run = BaseDAG.gen_dag_group_run( dag_group=dag_group, batch_id=batch_id, priority=priority, ) # generate DAG run list print("\n") print(">>> Matching DAGs ...") dag_run_list = [] data_id_list = [] n_dag_run_all = 0 n_dag_run_success = 0 DEFAULT_DAGS = { "csst-msc-l1-mbi", "csst-msc-l1-ast", "csst-msc-l1-sls", "csst-msc-l1-qc0", "csst-msc-l1-ooc", "csst-mci-l1-qc0", "csst-mci-l1", "csst-ifs-l1", "csst-cpic-l1-qc0", "csst-cpic-l1", "csst-hstdm-l1", } assert dags is None or set(dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {dags}" SELECTED_DAGS = DEFAULT_DAGS.intersection(dags) if dags is not None else DEFAULT_DAGS print("Selected DAGs: ", SELECTED_DAGS) for dag in SELECTED_DAGS: this_task_list = CSST_DAGS[dag].schedule( dag_group_run=dag_group_run, plan_basis=plan_basis, data_basis=data_basis, pmapname=pmapname, ref_cat=ref_cat, force_success=force, ) this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None] this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if this_task["dag_run"] is not None] dag_run_list.extend(this_dag_run_list) data_id_list.extend(this_data_id_list) this_n_dag_run_all = len(this_task_list) this_n_dag_run_success = len(this_dag_run_list) n_dag_run_all += this_n_dag_run_all n_dag_run_success += this_n_dag_run_success print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs") # print dag_group_run and dag_run_list if verbose: print("\n") print(">>> `dag_group_run` :") print(f"\t- {json.dumps(dag_group_run, separators=(',', ':'))}") print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]") if len(dag_run_list) > 0: for dag_run in dag_run_list: try: print(f"\t- {json.dumps(dag_run, separators=(',', ':'))}") except: print(f"\t- {dag_run}") # dump dag_group_run # joblib.dump( # dict( # plan_basis=plan_basis, # data_basis=data_basis, # dag_group_run=dag_group_run, # dag_run_list=dag_run_list, # ), # os.path.join( # os.getenv("HOME"), # "csst_dag", # f"{dag_group_run['dag_group_run']}.joblib", # ), # ) # submit DAG group run if submit: res = dfs.dag.new_dag_group_run( dag_group_run=dag_group_run, dag_run_list=dag_run_list, ) print(res)