Commit 3ca51c02 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

remove redundant cli interfaces

parent 4c92ef65
"""
Aim
---
Process a CPIC dataset, given a set of parameters.
Example
-------
python -m csst_dag.cli.csst_cpic_l1 -h
python -m csst_dag.cli.csst_cpic_l1 \
--dataset=csst-cpic-c11-hip71681-v1 \
--instrument=CPIC \
--obs-type=SCI \
--obs-group=hip71681 \
--obs-id=40100000001 \
--detector=VIS \
--prc-status=-1024 \
--batch-id=test-b1 \
--priority=1
"""
import argparse
import os
import joblib
from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs
parser = argparse.ArgumentParser(
description="Scheduler for CSST CPIC L1 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# data parameters
parser.add_argument("--dataset", type=str, help="Dataset name")
parser.add_argument("--instrument", type=str, help="Instrument name", default=None)
parser.add_argument("--obs-type", type=str, help="Observation type", default=None)
parser.add_argument("--obs-group", type=str, help="Observation group", default=None)
parser.add_argument("--obs-id", type=str, help="Observation ID", default=None)
parser.add_argument("--detector", type=str, help="Detector name", default=None)
parser.add_argument(
"--prc-status", type=int, help="Initial processing status", default=None
)
# task parameters
parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch")
parser.add_argument("--priority", type=str, help="Task priority", default=1)
# DAG parameters
parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="")
# parser.add_argument(
# "--ref-cat", type=str, help="Reference catalog", default="trilegal_093"
# )
# submit
parser.add_argument("--submit", action="store_true", help="Push results", default=False)
# post-processing parameters
parser.add_argument(
"--final-prc-status", type=int, help="Final processing status", default=-2
)
args = parser.parse_args()
print("CLI parameters: ", args)
plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
dataset=args.dataset,
instrument=args.instrument,
obs_type=args.obs_type,
obs_group=args.obs_group,
obs_id=args.obs_id,
detector=args.detector,
prc_status=args.prc_status,
)
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")
# generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run(
dag_group="csst-cpic-l1",
batch_id=args.batch_id,
priority=args.priority,
)
# generate DAG run list
print("\n"*2)
print(">>> Matching DAGs ...")
dag_run_list = []
data_id_set = set()
for dag in [
"csst-cpic-l1",
"csst-cpic-l1-qc0",
]:
this_dag_run_list, this_data_id_set = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run,
plan_basis=plan_basis,
data_basis=data_basis,
pmapname=args.pmapname,
)
print(f"- [{dag}] : {len(this_dag_run_list)} dag_runs")
dag_run_list.extend(this_dag_run_list)
data_id_set.union(this_data_id_set)
# print dag_group_run and dag_run_list
print("\n")
print(">>> dag_group_run:")
print(f"\t- {dag_group_run}")
print(f">>> dag_run_list[{len(dag_run_list)}]:")
if len(dag_run_list) > 0:
for dag_run in dag_run_list:
print(f"\t- {dag_run}")
# dump dag_group_run
joblib.dump(
dict(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
),
os.path.join(
os.getenv("HOME"),
"csst_dag",
f"{dag_group_run['dag_group_run']}.joblib",
),
)
# submit DAG group run
if args.submit:
res = dfs.dag.new_dag_group_run(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
print(res)
"""
Aim
---
Process an HSTDM dataset.
Example
-------
python -m csst_dag.cli.csst_hstdm_l1 -h
python -m csst_dag.cli.csst_hstdm_l1 \
--dataset=csst-hstdm-c11-stare-sis1-v1 \
--instrument=HSTDM \
--obs-type=STARE \
--obs-group=default \
--prc-status=-1024 \
--batch-id=test-b1 \
--priority=1 \
--submit
"""
import argparse
import os
import joblib
from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs
parser = argparse.ArgumentParser(
description="Scheduler for CSST CPIC L1 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# data parameters
parser.add_argument("--dataset", type=str, help="Dataset name")
parser.add_argument("--instrument", type=str, help="Instrument name", default=None)
parser.add_argument("--obs-type", type=str, help="Observation type", default=None)
parser.add_argument("--obs-group", type=str, help="Observation group", default=None)
parser.add_argument("--obs-id", type=str, help="Observation ID", default=None)
parser.add_argument("--detector", type=str, help="Detector name", default=None)
parser.add_argument(
"--prc-status", type=int, help="Initial processing status", default=None
)
# task parameters
parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch")
parser.add_argument("--priority", type=str, help="Task priority", default=1)
# DAG parameters
parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="")
# parser.add_argument(
# "--ref-cat", type=str, help="Reference catalog", default="trilegal_093"
# )
# submit
parser.add_argument("--submit", action="store_true", help="Push results", default=False)
# post-processing parameters
parser.add_argument(
"--final-prc-status", type=int, help="Final processing status", default=-2
)
# force option
parser.add_argument("--force", action="store_true", help="Force success", default=False)
args = parser.parse_args()
print("CLI parameters: ", args)
plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
dataset=args.dataset,
instrument=args.instrument,
obs_type=args.obs_type,
obs_group=args.obs_group,
obs_id=args.obs_id,
detector=args.detector,
prc_status=args.prc_status,
)
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")
# generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run(
dag_group="csst-hstdm-l1",
batch_id=args.batch_id,
priority=args.priority,
)
# generate DAG run list
print("\n"*2)
print(">>> Matching DAGs ...")
dag_run_list = []
data_id_list = []
n_dag_run_all = 0
n_dag_run_success = 0
for dag in [
"csst-hstdm-l1",
]:
this_task_list = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run,
plan_basis=plan_basis,
data_basis=data_basis,
pmapname=args.pmapname,
force_success=args.force,
)
this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None]
this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if this_task["dag_run"] is not None]
dag_run_list.extend(this_dag_run_list)
data_id_list.extend(this_data_id_list)
this_n_dag_run_all = len(this_task_list)
this_n_dag_run_success = len(this_dag_run_list)
n_dag_run_all += this_n_dag_run_all
n_dag_run_success += this_n_dag_run_success
print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs")
# print dag_group_run and dag_run_list
print("\n")
print(">>> `dag_group_run` :")
print(f"\t- {dag_group_run}")
print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
if len(dag_run_list) > 0:
for dag_run in dag_run_list:
print(f"\t- {dag_run}")
# dump dag_group_run
joblib.dump(
dict(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
),
os.path.join(
os.getenv("HOME"),
"csst_dag",
f"{dag_group_run['dag_group_run']}.joblib",
),
)
# submit DAG group run
if args.submit:
res = dfs.dag.new_dag_group_run(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
print(res)
"""
Aim
---
Process an MCI dataset.
Example
-------
python -m csst_dag.cli.csst_mci_l1 -h
python -m csst_dag.cli.csst_mci_l1 \
--dataset=csst-cpic-c11-hip71681-v1 \
--instrument=CPIC \
--obs-type=SCI \
--obs-group=hip71681 \
--obs-id=40100000001 \
--detector=VIS \
--prc-status=-1024 \
--batch-id=test-b1 \
--priority=1
"""
import argparse
import os
import joblib
from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs
parser = argparse.ArgumentParser(
description="Scheduler for CSST CPIC L1 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# data parameters
parser.add_argument("--dataset", type=str, help="Dataset name")
parser.add_argument("--instrument", type=str, help="Instrument name", default=None)
parser.add_argument("--obs-type", type=str, help="Observation type", default=None)
parser.add_argument("--obs-group", type=str, help="Observation group", default=None)
parser.add_argument("--obs-id", type=str, help="Observation ID", default=None)
parser.add_argument("--detector", type=str, help="Detector name", default=None)
parser.add_argument(
"--prc-status", type=int, help="Initial processing status", default=None
)
# task parameters
parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch")
parser.add_argument("--priority", type=str, help="Task priority", default=1)
# DAG parameters
parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="")
parser.add_argument(
"--ref-cat", type=str, help="Reference catalog", default="trilegal_093"
)
# submit
parser.add_argument("--submit", action="store_true", help="Push results", default=False)
# post-processing parameters
parser.add_argument(
"--final-prc-status", type=int, help="Final processing status", default=-2
)
# force option
parser.add_argument("--force", action="store_true", help="Force success", default=False)
args = parser.parse_args()
print("CLI parameters: ", args)
plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
dataset=args.dataset,
instrument=args.instrument,
obs_type=args.obs_type,
obs_group=args.obs_group,
obs_id=args.obs_id,
detector=args.detector,
prc_status=args.prc_status,
)
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")
# generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run(
dag_group="csst-cpic-l1",
batch_id=args.batch_id,
priority=args.priority,
)
# generate DAG run list
print("\n"*2)
print(">>> Matching DAGs ...")
dag_run_list = []
data_id_list = []
n_dag_run_all = 0
n_dag_run_success = 0
for dag in [
"csst-cpic-l1",
"csst-cpic-l1-qc0",
]:
this_task_list = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run,
plan_basis=plan_basis,
data_basis=data_basis,
pmapname=args.pmapname,
ref_cat=args.ref_cat,
force_success=args.force,
)
this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None]
this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if
this_task["dag_run"] is not None]
dag_run_list.extend(this_dag_run_list)
data_id_list.extend(this_data_id_list)
this_n_dag_run_all = len(this_task_list)
this_n_dag_run_success = len(this_dag_run_list)
n_dag_run_all += this_n_dag_run_all
n_dag_run_success += this_n_dag_run_success
print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs")
# print dag_group_run and dag_run_list
print("\n")
print(">>> `dag_group_run` :")
print(f"\t- {dag_group_run}")
print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
if len(dag_run_list) > 0:
for dag_run in dag_run_list:
print(f"\t- {dag_run}")
# dump dag_group_run
joblib.dump(
dict(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
),
os.path.join(
os.getenv("HOME"),
"csst_dag",
f"{dag_group_run['dag_group_run']}.joblib",
),
)
# submit DAG group run
if args.submit:
res = dfs.dag.new_dag_group_run(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
print(res)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment