Commit 737aa104 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add csst-msc-l1-ooc to csst_dag.cli.csst_msc_l1

parent dfa31851
......@@ -133,9 +133,12 @@ DEFAULT_DAGS = {
"csst-msc-l1-ast",
"csst-msc-l1-sls",
"csst-msc-l1-qc0",
"csst-msc-l1-ooc"
}
SELECTED_DAGS = DEFAULT_DAGS if args.dags is None else {dag for dag in args.dags if dag in DEFAULT_DAGS}
assert args.dags is None or set(args.dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {args.dags}"
SELECTED_DAGS = DEFAULT_DAGS.intersection(args.dags) if args.dags is not None else DEFAULT_DAGS
print("Selected DAGs: ", SELECTED_DAGS)
for dag in SELECTED_DAGS:
this_task_list = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run,
......
"""
Aim
---
Process an MSC dataset, given a set of parameters.
Example
-------
python -m csst_dag.cli.msc_l1 -h
python -m csst_dag.cli.msc_l1 \
--batch-id=csci-test-20250507 \
--priority=1 \
--dataset=csst-msc-c9-25sqdeg-v3 \
--obs-type=WIDE \
--obs-group=W1 \
--initial-prc-status=-1024 \
--final-prc-status=-2 \
--demo
python -m csst_dag.cli.msc_l1 \
--batch-id=25sqdeg-test-b1 \
--priority=1 \
--dataset=csst-msc-c9-25sqdeg-v3 \
--obs-type=WIDE \
--obs-group=W1 \
--initial-prc-status=-2 \
--final-prc-status=-2
"""
from csst_dag.dag import CsstDAGs
import argparse
parser = argparse.ArgumentParser(
description="Scheduler for MSC L1 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# task related parameters
parser.add_argument("--batch-id", type=str, help="Batch ID", default="default_batch")
parser.add_argument("--priority", type=str, help="Task priority", default=1)
# data related parameters
parser.add_argument("--dataset", type=str, help="Dataset name")
# parser.add_argument("--instrument", type=str, help="Instrument name", default="MSC")
parser.add_argument("--obs-group", type=str, help="Observation group", default="none")
parser.add_argument("--obs-type", type=str, help="Observation type", default="")
parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="")
# status related parameters
parser.add_argument(
"--initial-prc-status", type=int, help="Initial processing status", default=-1024
)
parser.add_argument(
"--final-prc-status", type=int, help="Final processing status", default=-2
)
parser.add_argument(
"--demo", action="store_true", help="Enable demo mode", default=False
)
args = parser.parse_args()
print("CLI parameters: ", args)
# define DAP LIST in this group
DAG_MAP = {
"WIDE": ["csst-msc-l1-mbi", "csst-msc-l1-sls", "csst-msc-l1-ast"],
"DEEP": ["csst-msc-l1-mbi", "csst-msc-l1-sls", "csst-msc-l1-ast"],
"BIAS": ["csst-msc-l1-qc0"],
"DARK": ["csst-msc-l1-qc0"],
"FLAT": ["csst-msc-l1-qc0"],
}
# if obs_type is set
if args.obs_type:
assert args.obs_type in DAG_MAP.keys(), f"Unknown obs_type: {args.obs_type}"
DAG_MAP = {args.obs_type: DAG_MAP[args.obs_type]}
for obs_type, dags in DAG_MAP.items():
print(f"- Processing `{obs_type}`")
for this_dag in dags:
print(f" * `{this_dag}` : ", end="")
dag = CsstDAGs().get(this_dag)
dag_group_run, dag_run_list = dag.schedule(
batch_id=args.batch_id,
priority=args.priority,
dataset=args.dataset,
obs_type=obs_type,
obs_group=args.obs_group,
pmapname=args.pmapname,
initial_prc_status=args.initial_prc_status,
final_prc_status=args.final_prc_status,
demo=args.demo,
)
# print(f"{len(dag_run_list)} tasks.")
......@@ -50,7 +50,7 @@ print(task_list_via_obsgroup[0]["relevant_data"].colnames)
# 16 task/s @n_jobs=1, 130*10 tasks/s @n_jobs=10 (max) 🔼
task_list_via_obsgroup_detector = Dispatcher.dispatch_obsgroup_detector(
plan_basis, data_basis, n_jobs=1
plan_basis, data_basis
)
t = Table(task_list_via_obsgroup_detector)
np.unique(t["n_relevant_plan"], return_counts=True)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment