Commit 20045361 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add cli._run module

parent 2901886a
import argparse
import os
import json
from typing import Optional
import joblib
from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs
def run_l1_pipeline(
# data parameters
dataset:str,
instrument:Optional[str]=None,
obs_type:Optional[str]=None,
obs_group:Optional[str]=None,
obs_id:Optional[str]=None,
detector:Optional[str]=None,
prc_status:Optional[int]=None,
qc_status:Optional[int]=None,
# task parameters
batch_id:Optional[str]="test-batch",
priority:Optional[str]="1",
# DAG parameters
pmapname:Optional[str]="",
ref_cat:Optional[str]="trilegal_093",
# submit
verbose:Optional[bool]=True,
submit:Optional[bool]=False,
final_prc_status:Optional[int]=-2,
force:Optional[bool]=False,
top_n:Optional[int]=-1,
# select DAGs
dags:Optional[list[str]]=None,
dag_group: Optional[str]="csst-l1-pipeline",
):
"""Run a DAG.
Parameters
----------
dataset : str
Dataset name
instrument : Optional[str], optional
Instrument name, by default None
obs_type : Optional[str], optional
Observation type, by default None
obs_group : Optional[str], optional
Observation group, by default None
obs_id : Optional[str], optional
Observation ID, by default None
detector : Optional[str], optional
Detector name, by default None
prc_status : Optional[int], optional
Initial processing status, by default None
qc_status : Optional[int], optional
Initial QC status, by default None
batch_id : Optional[str], optional
Batch ID, by default "test-batch"
priority : Optional[str], optional
Task priority, by default "1"
pmapname : Optional[str], optional
CCDS pmapname, by default ""
ref_cat : Optional[str], optional
Reference catalog, by default "trilegal_093"
submit : Optional[bool], optional
Push results, by default False
final_prc_status : Optional[int], optional
Final processing status, by default -2
force : Optional[bool], optional
Force success, by default False
verbose : Optional[bool], optional
Force success, by default False
top_n : Optional[int], optional
Submit top N tasks, by default -1
dags : Optional[list[str]], optional
DAGs to select, by default None
dag_group: Optional[str]="csst-l1-pipeline",
DAG group.
"""
plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
dataset=dataset,
instrument=instrument,
obs_type=obs_type,
obs_group=obs_group,
obs_id=obs_id,
detector=detector,
prc_status=prc_status,
qc_status=qc_status,
)
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")
# generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run(
dag_group=dag_group,
batch_id=batch_id,
priority=priority,
)
# generate DAG run list
print("\n")
print(">>> Matching DAGs ...")
dag_run_list = []
data_id_list = []
n_dag_run_all = 0
n_dag_run_success = 0
DEFAULT_DAGS = {
"csst-msc-l1-mbi",
"csst-msc-l1-ast",
"csst-msc-l1-sls",
"csst-msc-l1-qc0",
"csst-msc-l1-ooc",
"csst-mci-l1-qc0",
"csst-mci-l1",
"csst-ifs-l1",
"csst-cpic-l1-qc0",
"csst-cpic-l1",
"csst-hstdm-l1",
}
assert dags is None or set(dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {dags}"
SELECTED_DAGS = DEFAULT_DAGS.intersection(dags) if dags is not None else DEFAULT_DAGS
print("Selected DAGs: ", SELECTED_DAGS)
for dag in SELECTED_DAGS:
this_task_list = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run,
plan_basis=plan_basis,
data_basis=data_basis,
pmapname=pmapname,
ref_cat=ref_cat,
force_success=force,
)
this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None]
this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if
this_task["dag_run"] is not None]
dag_run_list.extend(this_dag_run_list)
data_id_list.extend(this_data_id_list)
this_n_dag_run_all = len(this_task_list)
this_n_dag_run_success = len(this_dag_run_list)
n_dag_run_all += this_n_dag_run_all
n_dag_run_success += this_n_dag_run_success
print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs")
# print dag_group_run and dag_run_list
if verbose:
print("\n")
print(">>> `dag_group_run` :")
print(f"\t- {json.dumps(dag_group_run, separators=(',', ':'))}")
print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
if len(dag_run_list) > 0:
for dag_run in dag_run_list:
try:
print(f"\t- {json.dumps(dag_run, separators=(',', ':'))}")
except:
print(f"\t- {dag_run}")
# dump dag_group_run
# joblib.dump(
# dict(
# plan_basis=plan_basis,
# data_basis=data_basis,
# dag_group_run=dag_group_run,
# dag_run_list=dag_run_list,
# ),
# os.path.join(
# os.getenv("HOME"),
# "csst_dag",
# f"{dag_group_run['dag_group_run']}.joblib",
# ),
# )
# submit DAG group run
if submit:
res = dfs.dag.new_dag_group_run(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
print(res)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment