cpic.py 2.23 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
"""
Aim
---
Process an CPIC dataset, given a set of parameters.

Example
-------
python -m csst_dag.cli.cpic \
    --dataset=csst-cpic-c11-hip71681-v1 \
    --project-id=none \
    --batch-id=csci-test-20250507 \
    --priority=1 \
    --initial-prc-status=-1024 \
    --final-prc-status=-2
"""

from csst_dag.dag import CsstDAG
import argparse

parser = argparse.ArgumentParser(
    description="Scheduler for CPIC L1 pipeline.",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument("--dataset", type=str, help="Dataset name")
# parser.add_argument("--instrument", type=str, help="Instrument name", default="CPIC")
parser.add_argument("--project-id", type=str, help="Project ID", default="none")
parser.add_argument("--obs-type", type=str, help="Observation type", default="")
parser.add_argument("--batch-id", type=str, help="Batch ID", default="default_batch")
parser.add_argument("--priority", type=str, help="Task priority", default=1)
parser.add_argument(
    "--initial-prc-status", type=int, help="Initial processing status", default=-1024
)
parser.add_argument(
    "--final-prc-status", type=int, help="Final processing status", default=-2
)

args = parser.parse_args()
print("CLI parameters: ", args)

DAG_LOOP_MAP = {
    "SCI": ["csst-cpic-l1"],
BO ZHANG's avatar
BO ZHANG committed
43
44
    "DSF": ["csst-cpic-l1"],
    "CALS": ["csst-cpic-l1"],
BO ZHANG's avatar
BO ZHANG committed
45
46
47
    "BIAS": ["csst-cpic-l1-qc0"],
    "DARK": ["csst-cpic-l1-qc0"],
    "FLAT": ["csst-cpic-l1-qc0"],
BO ZHANG's avatar
BO ZHANG committed
48
49
    "BKG": ["csst-cpic-l1-qc0"],
    "LASER": ["csst-cpic-l1-qc0"],
BO ZHANG's avatar
BO ZHANG committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
}
if args.obs_type:
    assert args.obs_type in DAG_LOOP_MAP.keys(), f"Unknown obs_type: {args.obs_type}"
    DAG_LOOP_MAP = {args.obs_type: DAG_LOOP_MAP[args.obs_type]}

for obs_type, dag_ids in DAG_LOOP_MAP.items():
    print(f"* Processing {obs_type}")
    for dag_id in dag_ids:
        print(f"   - Scheduling `{dag_id}` -> ", end="")
        dag = CsstDAG.get_dag(dag_id=dag_id)
        msgs = dag.schedule(
            dataset=args.dataset,
            obs_type=obs_type,
            project_id=args.project_id,
            batch_id=args.batch_id,
            initial_prc_status=args.initial_prc_status,
            final_prc_status=args.final_prc_status,
            demo=True,
            priority=args.priority,
        )
        print(f"{len(msgs)} tasks.")