""" Aim --- Process an MSC dataset, given a set of parameters. Example ------- python -m csst_dag.cli.csst_msc_l1 -h python -m csst_dag.cli.csst_msc_l1 \ --dataset=csst-msc-c9-25sqdeg-v3 \ --instrument=MSC \ --obs-type=WIDE \ --obs-group=W2 \ --obs-id=10100232366 \ --detector=09 \ --prc-status=-1024 \ --batch-id=test-b1 \ --priority=1 \ --pmapname=csst_000070.pmap \ --ref-cat=trilegal_093 python -m csst_dag.cli.csst_msc_l1 \ --dataset=csst-msc-c9-25sqdeg-v3 \ --instrument=MSC \ --obs-type=WIDE \ --obs-group=W2 \ --obs-id=10100232366 \ --detector=09 \ --batch-id=test-b1 \ --priority=1 \ --pmapname=csst_000070.pmap \ --ref-cat=trilegal_093 \ --submit # 1000 平方度宽场测试天区 50个指向 python -m csst_dag.cli.csst_msc_l1 \ --dataset=csst-msc-c11-1000sqdeg-wide-test-v1 \ --instrument=MSC \ --batch-id=1000sqdeg-test-b1 \ --ref-cat=trilegal_1000_w1 \ --submit """ import os from csst_dag.dag import CSST_DAGS, Dispatcher, BaseDAG from csst_dag import dfs import argparse import joblib parser = argparse.ArgumentParser( description="Scheduler for CSST MSC L1 pipeline.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) # data parameters parser.add_argument("--dataset", type=str, help="Dataset name") parser.add_argument("--instrument", type=str, help="Instrument name", default=None) parser.add_argument("--obs-type", type=str, help="Observation type", default=None) parser.add_argument("--obs-group", type=str, help="Observation group", default=None) parser.add_argument("--obs-id", type=str, help="Observation ID", default=None) parser.add_argument("--detector", type=str, help="Detector name", default=None) parser.add_argument( "--prc-status", type=int, help="Initial processing status", default=None ) # task parameters parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch") parser.add_argument("--priority", type=str, help="Task priority", default=1) # DAG parameters parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="") parser.add_argument( "--ref-cat", type=str, help="Reference catalog", default="trilegal_093" ) # submit parser.add_argument("--submit", action="store_true", help="Push results", default=False) # post-processing parameters parser.add_argument( "--final-prc-status", type=int, help="Final processing status", default=-2 ) args = parser.parse_args() # from csst_dag import DotDict # # args = DotDict( # dataset="csst-msc-c9-25sqdeg-v3", # instrument="MSC", # obs_type="WIDE", # obs_group="W2", # obs_id="10100232366", # detector=None, # prc_status=None, # batch_id="test-batch", # priority=1, # pmapname="csst_000070.pmap", # ref_cat="trilegal_093", # submit=False, # ) print("CLI parameters: ", args) plan_basis, data_basis = Dispatcher.find_plan_level0_basis( dataset=args.dataset, instrument=args.instrument, obs_type=args.obs_type, obs_group=args.obs_group, obs_id=args.obs_id, detector=args.detector, prc_status=args.prc_status, ) print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found") # generate DAG group run dag_group_run = BaseDAG.gen_dag_group_run( dag_group="csst-msc-l1", batch_id=args.batch_id, priority=args.priority, ) # generate DAG run list dag_run_list = CSST_DAGS["csst-msc-l1-mbi"].schedule( dag_group_run=dag_group_run, plan_basis=plan_basis, data_basis=data_basis, pmapname=args.pmapname, ref_cat=args.ref_cat, ) # print dag_group_run and dag_run_list print("`dag_group_run`:") for k, v in dag_group_run.items(): print(f"\t- {k}: {v}") print(f"`dag_run_list`[{len(dag_run_list)}]:") if len(dag_run_list) > 0: for k, v in dag_run_list[0].items(): print(f"\t- {k}: {v}") # dump dag_group_run joblib.dump( dict( dag_group_run=dag_group_run, dag_run_list=dag_run_list, ), os.path.join( os.getenv("HOME"), "csst_dag", f"{dag_group_run['dag_group_run']}.joblib", ), ) # submit DAG group run if args.submit: res = dfs.dag.new_dag_group_run( dag_group_run=dag_group_run, dag_run_list=dag_run_list, ) print(res)