""" Aim --- Process an MSC dataset, given a set of parameters. Example ------- python -m csst_dag.cli.run -h python -m csst_dag.cli.run \ --dags csst-msc-l1-mbi \ --dataset=csst-msc-c9-25sqdeg-v3 \ --instrument=MSC \ --obs-type=WIDE \ --obs-group=W2 \ --obs-id=10100232366 \ --detector=09 \ --batch-id=test-b1 \ --priority=1 \ --pmapname=csst_000070.pmap \ --ref-cat=trilegal_093 # 25平方度宽场 python -m csst_dag.cli.run \ --dags csst-msc-l1-mbi \ --dataset=csst-msc-c9-25sqdeg-v3 \ --instrument=MSC \ --obs-type=WIDE \ --prc-status=-2 \ --batch-id=25sqdeg-test-b2 \ --priority=1 \ --pmapname=csst_000070.pmap \ --ref-cat=trilegal_093 \ --submit """ import argparse import os import json import joblib from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs parser = argparse.ArgumentParser( description="Scheduler for CSST L1 pipeline.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) # data parameters parser.add_argument("--dataset", type=str, help="Dataset name") parser.add_argument("--instrument", type=str, help="Instrument name", default=None) parser.add_argument("--obs-type", type=str, help="Observation type", default=None) parser.add_argument("--obs-group", type=str, help="Observation group", default=None) parser.add_argument("--obs-id", type=str, help="Observation ID", default=None) parser.add_argument("--detector", type=str, help="Detector name", default=None) parser.add_argument( "--prc-status", type=int, help="Initial processing status", default=None ) parser.add_argument( "--qc-status", type=int, help="Initial QC status", default=None ) # task parameters parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch") parser.add_argument("--priority", type=str, help="Task priority", default="1") # DAG parameters parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="") parser.add_argument( "--ref-cat", type=str, help="Reference catalog", default="trilegal_093" ) # submit parser.add_argument("--submit", action="store_true", help="Push results", default=False) # post-processing parameters parser.add_argument( "--final-prc-status", type=int, help="Final processing status", default=-2 ) # additional options parser.add_argument("--force", action="store_true", help="Force success", default=False) parser.add_argument("--verbose", action="store_true", help="Force success", default=False) # submit top N parser.add_argument( "--top-n", type=int, help="Submit top N tasks", default=-1 ) # select DAGs parser.add_argument('--dags', nargs='+', type=str, help="DAGs to select", default=None) args = parser.parse_args() # from csst_dag import DotDict # # args = DotDict( # dataset="csst-msc-c9-25sqdeg-v3", # instrument="MSC", # obs_type="WIDE", # obs_group="W2", # obs_id="10100232366", # detector=None, # prc_status=None, # batch_id="test-batch", # priority=1, # pmapname="csst_000070.pmap", # ref_cat="trilegal_093", # submit=False, # ) print("CLI parameters: ", args) plan_basis, data_basis = Dispatcher.find_plan_level0_basis( dataset=args.dataset, instrument=args.instrument, obs_type=args.obs_type, obs_group=args.obs_group, obs_id=args.obs_id, detector=args.detector, prc_status=args.prc_status, qc_status=args.qc_status, ) print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found") # generate DAG group run dag_group_run = BaseDAG.gen_dag_group_run( dag_group="csst-l1-pipeline", batch_id=args.batch_id, priority=args.priority, ) # generate DAG run list print("\n") print(">>> Matching DAGs ...") dag_run_list = [] data_id_list = [] n_dag_run_all = 0 n_dag_run_success = 0 DEFAULT_DAGS = { "csst-msc-l1-mbi", "csst-msc-l1-ast", "csst-msc-l1-sls", "csst-msc-l1-qc0", "csst-msc-l1-ooc", "csst-mci-l1-qc0", "csst-mci-l1", "csst-ifs-l1-qc0", "csst-ifs-l1-rss", # "csst-ifs-l1-cube", "csst-cpic-l1-qc0", "csst-cpic-l1", "csst-hstdm-l1", } assert args.dags is None or set(args.dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {args.dags}" SELECTED_DAGS = DEFAULT_DAGS.intersection(args.dags) if args.dags is not None else DEFAULT_DAGS print("Selected DAGs: ", SELECTED_DAGS) for dag in SELECTED_DAGS: this_task_list = CSST_DAGS[dag].schedule( dag_group_run=dag_group_run, plan_basis=plan_basis, data_basis=data_basis, pmapname=args.pmapname, ref_cat=args.ref_cat, force_success=args.force, ) this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None] this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if this_task["dag_run"] is not None] dag_run_list.extend(this_dag_run_list) data_id_list.extend(this_data_id_list) this_n_dag_run_all = len(this_task_list) this_n_dag_run_success = len(this_dag_run_list) n_dag_run_all += this_n_dag_run_all n_dag_run_success += this_n_dag_run_success print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs") # print dag_group_run and dag_run_list if args.verbose: print("\n") print(">>> `dag_group_run` :") print(f"\t- {dag_group_run}") print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]") if len(dag_run_list) > 0: for dag_run in dag_run_list: print(f"\t- {json.dumps(dag_run, separators=(',', ':'))}") # dump dag_group_run joblib.dump( dict( dag_group_run=dag_group_run, dag_run_list=dag_run_list, ), os.path.join( os.getenv("HOME"), "csst_dag", f"{dag_group_run['dag_group_run']}.joblib", ), ) # submit DAG group run if args.submit: res = dfs.dag.new_dag_group_run( dag_group_run=dag_group_run, dag_run_list=dag_run_list, ) print(res)