Commit 33f289fe authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add msc cli

parent beea95ad
# Given dataset and batchid, trigger all DAGs for MSC # Process a dataset, given
# - project_id = "none"
from csst_dag.constants import MSC_MBI_CHIPID # - obs_type = "*"
from csst_dfs_client import plan, level0 from csst_dag.dag import dags
import argparse import argparse
# python -m csst_dag.trigger.csst-msc-l1-mbi -h # python -m csst_dag.trigger.csst-msc-l1-mbi -h
# python -m csst_dag.trigger.csst-msc-l1-mbi --dataset csst-msc-c9-25sqdeg-v3 --batch-id # python -m csst_dag.trigger.csst-msc-l1-mbi --dataset csst-msc-c9-25sqdeg-v3 --batch-id
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Trigger for DAG csst-msc-l1-mbi", description="Scheduler for MSC L1 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
parser.add_argument("--dataset", type=str, help="Dataset name") parser.add_argument("--dataset", type=str, help="Dataset name")
# parser.add_argument("--instrument", type=str, help="Instrument name", default="MSC") # parser.add_argument("--instrument", type=str, help="Instrument name", default="MSC")
parser.add_argument("--project-id", type=str, help="Project ID", default="None") parser.add_argument("--project-id", type=str, help="Project ID", default="None")
# parser.add_argument( parser.add_argument("--obs-type", type=str, help="Observation type", default="WIDE")
# "--file-type", type=str, help="File type (SCI/CAL/REF)", default="SCI"
# )
# parser.add_argument("--obs-type", type=str, help="Observation type", default="WIDE")
parser.add_argument("--batch-id", type=str, help="Batch ID", default="default") parser.add_argument("--batch-id", type=str, help="Batch ID", default="default")
args = parser.parse_args() args = parser.parse_args()
print(args) print(args)
OBSTYPE_DAG_MAP = { DAG_LOOP_MAP = {
"WIDE": "csst-msc-l1-mbi", "WIDE": ["csst-msc-l1-mbi", "csst-msc-l1-sls"],
"DEEP": "csst-msc-l1-mbi", "DEEP": ["csst-msc-l1-mbi", "csst-msc-l1-sls"],
"BIAS": "csst-msc-l1-mbi", "BIAS": ["csst-msc-l1-qc0"],
"DARK": "csst-msc-l1-mbi", "DARK": ["csst-msc-l1-qc0"],
"FLAT": "csst-msc-l1-mbi", "FLAT": ["csst-msc-l1-qc0"],
} }
for obs_type in ["WIDE", "DEEP"]:
pass for obs_type, dag_ids in DAG_LOOP_MAP.items():
print(f"Processing {obs_type}")
for dag_id in dag_ids:
print(f" - {dag_id}")
dag = dags.get_dag(dag_id=dag_id)
dag.schedule(
dataset=args.dataset,
obs_type=args.obs_type,
project_id=args.project_id,
batch_id=args.batch_id,
initial_prc_status=-2,
final_prc_status=-2,
demo=True,
)
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from ._dag_list import DAG_LIST from ._dag_list import DAG_LIST
from ..dfs import DFS from ..dfs import dfs
import yaml import yaml
import os import os
import glob import glob
...@@ -39,7 +39,7 @@ class BaseDAG: ...@@ -39,7 +39,7 @@ class BaseDAG:
with open(json_path, "r") as f: with open(json_path, "r") as f:
self.msg_template = json.load(f) self.msg_template = json.load(f)
self.msg_keys = set(self.msg_template.keys()) self.msg_keys = set(self.msg_template.keys())
self.dfs = DFS(location=os.getenv("DFS_LOCATION")) self.dfs = dfs
def gen_msg(self, **kwargs): def gen_msg(self, **kwargs):
"""Load message template and generate message dictionary.""" """Load message template and generate message dictionary."""
......
...@@ -106,6 +106,8 @@ class Redis(redis.Redis): ...@@ -106,6 +106,8 @@ class Redis(redis.Redis):
return self.lrange(self.qname, 0, -1) return self.lrange(self.qname, 0, -1)
dfs = DFS(location=None)
# msgs = r.lrange(name, 0, -1) # msgs = r.lrange(name, 0, -1)
# for chipid in range(6, 26): # for chipid in range(6, 26):
# this_msg = gen_msg( # this_msg = gen_msg(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment