Commit 4452fde2 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

tweaks

parent a58f85cd
......@@ -29,7 +29,14 @@ USER csst
COPY --chown=csst:csst . /pipeline
# install packages & requirements
RUN --mount=type=cache,id=pip,uid=9000,gid=9000,target=/home/csst/.cache \
pip install pkg/ccds
pip install pkg/ccds \
&& pip install pkg/csst-dfs-client \
&& pip install pkg/csst_common \
&& pip install pkg/csst_dadel \
&& pip install pkg/csst-dag \
&& pip install -r pkg/csst-dag/requirements.txt \
&& cp -r pkg/csst-dag/csst_dag/cli /pipeline/app \
&& rm -rf pkg
# change workdir
WORKDIR /pipeline/output
......
import argparse
import os
import json
from typing import Optional
import joblib
from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs
def run_l1_pipeline(
# data parameters
dataset:str,
instrument:Optional[str]=None,
obs_type:Optional[str]=None,
obs_group:Optional[str]=None,
obs_id:Optional[str]=None,
detector:Optional[str]=None,
prc_status:Optional[int]=None,
qc_status:Optional[int]=None,
# task parameters
batch_id:Optional[str]="test-batch",
priority:Optional[str]="1",
# DAG parameters
pmapname:Optional[str]="",
ref_cat:Optional[str]="trilegal_093",
# submit
verbose:Optional[bool]=True,
submit:Optional[bool]=False,
final_prc_status:Optional[int]=-2,
force:Optional[bool]=False,
top_n:Optional[int]=-1,
# select DAGs
dags:Optional[list[str]]=None,
dag_group: Optional[str]="csst-l1-pipeline",
):
"""Run a DAG.
Parameters
----------
dataset : str
Dataset name
instrument : Optional[str], optional
Instrument name, by default None
obs_type : Optional[str], optional
Observation type, by default None
obs_group : Optional[str], optional
Observation group, by default None
obs_id : Optional[str], optional
Observation ID, by default None
detector : Optional[str], optional
Detector name, by default None
prc_status : Optional[int], optional
Initial processing status, by default None
qc_status : Optional[int], optional
Initial QC status, by default None
batch_id : Optional[str], optional
Batch ID, by default "test-batch"
priority : Optional[str], optional
Task priority, by default "1"
pmapname : Optional[str], optional
CCDS pmapname, by default ""
ref_cat : Optional[str], optional
Reference catalog, by default "trilegal_093"
submit : Optional[bool], optional
Push results, by default False
final_prc_status : Optional[int], optional
Final processing status, by default -2
force : Optional[bool], optional
Force success, by default False
verbose : Optional[bool], optional
Force success, by default False
top_n : Optional[int], optional
Submit top N tasks, by default -1
dags : Optional[list[str]], optional
DAGs to select, by default None
dag_group: Optional[str]="csst-l1-pipeline",
DAG group.
"""
plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
dataset=dataset,
instrument=instrument,
obs_type=obs_type,
obs_group=obs_group,
obs_id=obs_id,
detector=detector,
prc_status=prc_status,
qc_status=qc_status,
)
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")
# generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run(
dag_group=dag_group,
batch_id=batch_id,
priority=priority,
)
# generate DAG run list
print("\n")
print(">>> Matching DAGs ...")
dag_run_list = []
data_id_list = []
n_dag_run_all = 0
n_dag_run_success = 0
DEFAULT_DAGS = {
"csst-msc-l1-mbi",
"csst-msc-l1-ast",
"csst-msc-l1-sls",
"csst-msc-l1-qc0",
"csst-msc-l1-ooc",
"csst-mci-l1-qc0",
"csst-mci-l1",
"csst-ifs-l1",
"csst-cpic-l1-qc0",
"csst-cpic-l1",
"csst-hstdm-l1",
}
assert dags is None or set(dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {dags}"
SELECTED_DAGS = DEFAULT_DAGS.intersection(dags) if dags is not None else DEFAULT_DAGS
print("Selected DAGs: ", SELECTED_DAGS)
for dag in SELECTED_DAGS:
this_task_list = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run,
plan_basis=plan_basis,
data_basis=data_basis,
pmapname=pmapname,
ref_cat=ref_cat,
force_success=force,
)
this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None]
this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if
this_task["dag_run"] is not None]
dag_run_list.extend(this_dag_run_list)
data_id_list.extend(this_data_id_list)
this_n_dag_run_all = len(this_task_list)
this_n_dag_run_success = len(this_dag_run_list)
n_dag_run_all += this_n_dag_run_all
n_dag_run_success += this_n_dag_run_success
print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs")
# print dag_group_run and dag_run_list
if verbose:
print("\n")
print(">>> `dag_group_run` :")
print(f"\t- {json.dumps(dag_group_run, separators=(',', ':'))}")
print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
if len(dag_run_list) > 0:
for dag_run in dag_run_list:
try:
print(f"\t- {json.dumps(dag_run, separators=(',', ':'))}")
except:
print(f"\t- {dag_run}")
# dump dag_group_run
# joblib.dump(
# dict(
# plan_basis=plan_basis,
# data_basis=data_basis,
# dag_group_run=dag_group_run,
# dag_run_list=dag_run_list,
# ),
# os.path.join(
# os.getenv("HOME"),
# "csst_dag",
# f"{dag_group_run['dag_group_run']}.joblib",
# ),
# )
# submit DAG group run
if submit:
res = dfs.dag.new_dag_group_run(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
print(res)
"""
Aim
---
Inspect a dataset.
Example
-------
python -m csst_dag.cli.inspect -h
python -m csst_dag.cli.inspect \
--dataset=csst-msc-c9-25sqdeg-v3 \
--instrument=MSC \
--obs-type=WIDE \
--obs-group=W2
25平方度宽场
python -m csst_dag.cli.inspect \
--level=0 \
--dataset=csst-msc-c9-25sqdeg-v3 \
--instrument=MSC \
--obs-type=WIDE
python -m csst_dag.cli.inspect \
--level=0 \
--dataset=csst-msc-c11-1000sqdeg-wide-test-v1 \
--instrument=MSC \
--obs-type=WIDE
python -m csst_dag.cli.inspect \
--level=1 \
--dataset=csst-msc-c11-1000sqdeg-wide-test-v1 \
--instrument=MSC \
--obs-type=WIDE \
--data-model=csst-msc-l1-mbi \
--batch-id=1000sqdeg-test-b1
CPIC:
python -m csst_dag.cli.inspect \
--level=0 \
--dataset=csst-cpic-c11-hip71681-v1 \
--instrument=CPIC \
--obs-type=SCI \
--obs-group=hip71681
"""
import argparse
import numpy as np
from astropy import table
from csst_dag import Dispatcher, dfs
parser = argparse.ArgumentParser(
description="Inspector for CSST datasets.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# data level
parser.add_argument("--level", type=int, help="Data level", default=0)
# level0 data parameters
parser.add_argument("--dataset", type=str, help="Dataset name", default="csst-msc-c9-25sqdeg-v3")
parser.add_argument("--instrument", type=str, help="Instrument name", default="MSC")
parser.add_argument("--obs-type", type=str, help="Observation type", default=None)
parser.add_argument("--obs-group", type=str, help="Observation group", default=None)
parser.add_argument("--obs-id", type=str, help="Observation ID", default=None)
parser.add_argument("--detector", type=str, help="Detector name", default=None)
parser.add_argument("--prc-status", type=int, help="Prc status", default=None)
# level1 data parameters
parser.add_argument("--data-model", type=str, help="Data model", default=None)
parser.add_argument("--batch-id", type=str, help="Batch ID", default=None)
parser.add_argument("--build", type=str, help="Build number", default=None)
parser.add_argument("--pmapname", type=str, help="CCDS pmap name", default=None)
# reset prc_status
parser.add_argument("--reset", type=int, help="Reset prc_status", default=None)
# additional options
parser.add_argument("--verbose", action="store_true", help="Verbose mode", default=False)
args = parser.parse_args()
# from csst_dag import DotDict
#
# args = DotDict(
# dataset="csst-msc-c9-25sqdeg-v3",
# instrument="MSC",
# obs_type="WIDE",
# obs_group="W2",
# obs_id=None,
# detector=None,
# prc_status=None,
# )
print("CLI parameters: ", args)
if args.level == 0:
print("Inspecting level0 data...")
data_basis = Dispatcher.find_level0_basis(
dataset=args.dataset,
instrument=args.instrument,
obs_type=args.obs_type,
obs_group=args.obs_group,
obs_id=args.obs_id,
detector=args.detector,
prc_status=args.prc_status,
)
else:
print("Inspecting level1 data...")
data_basis = Dispatcher.find_level1_basis(
dataset=args.dataset,
instrument=args.instrument,
obs_type=args.obs_type,
obs_group=args.obs_group,
obs_id=args.obs_id,
detector=args.detector,
prc_status=args.prc_status,
data_model=args.data_model, # level1 specific
batch_id=args.batch_id,
build=args.build,
pmapname=args.pmapname,
)
print(f">>> {len(data_basis)} data basis found")
# data_basis.remove_columns(["file_name", "_id"])
stats_config = dict(
prc_status=["dataset", "prc_status", "qc_status"],
obs_type=["dataset", "instrument", "obs_type", "prc_status", "qc_status"],
obs_group=["dataset", "instrument", "obs_type", "obs_group", "prc_status", "qc_status"],
detector=["dataset", "instrument", "obs_type", "obs_group", "detector", "prc_status", "qc_status"],
obs_id=["dataset", "instrument", "obs_type", "obs_group", "obs_id", "prc_status", "qc_status"],
)
if args.level == 1:
for k in stats_config.keys():
stats_config[k].extend(["data_model", "batch_id", "build"])
for stats_type, stats_keys in stats_config.items():
if args.verbose:
data_basis.pprint_all()
print("\n>>> STATS DIM: ", stats_type)
u_data, c_data = np.unique(data_basis[stats_keys], return_counts=True)
u_table = table.Table(u_data)
u_table.add_column(table.Column(c_data, name="count"))
u_table.pprint_all()
if args.reset is not None:
print("Resetting prc_status to ", args.reset)
dfs.update_prc_status_by_ids(
list(data_basis["_id"]),
args.reset,
)
import os
import sys
import joblib
# get dag_group_run hash
dag_group_run_hash = sys.argv[1]
assert len(dag_group_run_hash) == 40
# get file path
file_path = os.path.join(
os.getenv("HOME"),
"csst_dag",
f"{dag_group_run_hash}.joblib",
)
# load DAG group run
data = joblib.load(file_path)
dag_group_run = data["dag_group_run"]
dag_run_list = data["dag_run_list"]
# print dag_group_run and dag_run_list
print("`dag_group_run`:")
print(f" - {dag_group_run}")
for dag_run in dag_run_list:
print(f" - {dag_run}")
"""
Aim
---
Process an MSC dataset, given a set of parameters.
Example
-------
python -m csst_dag.cli.run -h
python -m csst_dag.cli.run \
--dags csst-msc-l1-mbi \
--dataset=csst-msc-c9-25sqdeg-v3 \
--instrument=MSC \
--obs-type=WIDE \
--obs-group=W2 \
--obs-id=10100232366 \
--detector=09 \
--batch-id=test-b1 \
--priority=1 \
--pmapname=csst_000070.pmap \
--ref-cat=trilegal_093
# 25平方度宽场
python -m csst_dag.cli.run \
--dags csst-msc-l1-mbi \
--dataset=csst-msc-c9-25sqdeg-v3 \
--instrument=MSC \
--obs-type=WIDE \
--prc-status=-2 \
--batch-id=25sqdeg-test-b2 \
--priority=1 \
--pmapname=csst_000070.pmap \
--ref-cat=trilegal_093 \
--submit
"""
import argparse
import os
import json
import joblib
from csst_dag import CSST_DAGS, Dispatcher, BaseDAG, dfs
parser = argparse.ArgumentParser(
description="Scheduler for CSST L1 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# data parameters
parser.add_argument("--dataset", type=str, help="Dataset name")
parser.add_argument("--instrument", type=str, help="Instrument name", default=None)
parser.add_argument("--obs-type", type=str, help="Observation type", default=None)
parser.add_argument("--obs-group", type=str, help="Observation group", default=None)
parser.add_argument("--obs-id", type=str, help="Observation ID", default=None)
parser.add_argument("--detector", type=str, help="Detector name", default=None)
parser.add_argument(
"--prc-status", type=int, help="Initial processing status", default=None
)
parser.add_argument(
"--qc-status", type=int, help="Initial QC status", default=None
)
# task parameters
parser.add_argument("--batch-id", type=str, help="Batch ID", default="test-batch")
parser.add_argument("--priority", type=str, help="Task priority", default="1")
# DAG parameters
parser.add_argument("--pmapname", type=str, help="CCDS pmapname", default="")
parser.add_argument(
"--ref-cat", type=str, help="Reference catalog", default="trilegal_093"
)
# submit
parser.add_argument("--submit", action="store_true", help="Push results", default=False)
# post-processing parameters
parser.add_argument(
"--final-prc-status", type=int, help="Final processing status", default=-2
)
# additional options
parser.add_argument("--force", action="store_true", help="Force success", default=False)
parser.add_argument("--verbose", action="store_true", help="Force success", default=False)
# submit top N
parser.add_argument(
"--top-n", type=int, help="Submit top N tasks", default=-1
)
# select DAGs
parser.add_argument('--dags', nargs='+', type=str, help="DAGs to select", default=None)
args = parser.parse_args()
# from csst_dag import DotDict
#
# args = DotDict(
# dataset="csst-msc-c9-25sqdeg-v3",
# instrument="MSC",
# obs_type="WIDE",
# obs_group="W2",
# obs_id="10100232366",
# detector=None,
# prc_status=None,
# batch_id="test-batch",
# priority=1,
# pmapname="csst_000070.pmap",
# ref_cat="trilegal_093",
# submit=False,
# )
print("CLI parameters: ", args)
plan_basis, data_basis = Dispatcher.find_plan_level0_basis(
dataset=args.dataset,
instrument=args.instrument,
obs_type=args.obs_type,
obs_group=args.obs_group,
obs_id=args.obs_id,
detector=args.detector,
prc_status=args.prc_status,
qc_status=args.qc_status,
)
print(f"{len(plan_basis)} plan basis, {len(data_basis)} data basis found")
# generate DAG group run
dag_group_run = BaseDAG.gen_dag_group_run(
dag_group="csst-l1-pipeline",
batch_id=args.batch_id,
priority=args.priority,
)
# generate DAG run list
print("\n")
print(">>> Matching DAGs ...")
dag_run_list = []
data_id_list = []
n_dag_run_all = 0
n_dag_run_success = 0
DEFAULT_DAGS = {
"csst-msc-l1-mbi",
"csst-msc-l1-ast",
"csst-msc-l1-sls",
"csst-msc-l1-qc0",
"csst-msc-l1-ooc",
"csst-mci-l1-qc0",
"csst-mci-l1",
"csst-ifs-l1",
"csst-cpic-l1-qc0",
"csst-cpic-l1",
"csst-hstdm-l1",
}
assert args.dags is None or set(args.dags).issubset(DEFAULT_DAGS), f"Selected DAGs: {args.dags}"
SELECTED_DAGS = DEFAULT_DAGS.intersection(args.dags) if args.dags is not None else DEFAULT_DAGS
print("Selected DAGs: ", SELECTED_DAGS)
for dag in SELECTED_DAGS:
this_task_list = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run,
plan_basis=plan_basis,
data_basis=data_basis,
pmapname=args.pmapname,
ref_cat=args.ref_cat,
force_success=args.force,
)
this_dag_run_list = [this_task["dag_run"] for this_task in this_task_list if this_task["dag_run"] is not None]
this_data_id_list = [this_task["relevant_data_id_list"] for this_task in this_task_list if
this_task["dag_run"] is not None]
dag_run_list.extend(this_dag_run_list)
data_id_list.extend(this_data_id_list)
this_n_dag_run_all = len(this_task_list)
this_n_dag_run_success = len(this_dag_run_list)
n_dag_run_all += this_n_dag_run_all
n_dag_run_success += this_n_dag_run_success
print(f"- `{dag}` : [ {this_n_dag_run_success} / {this_n_dag_run_all} ] dag_runs")
# print dag_group_run and dag_run_list
if args.verbose:
print("\n")
print(">>> `dag_group_run` :")
print(f"\t- {json.dumps(dag_group_run, separators=(',', ':'))}")
print(f">>> `dag_run_list` : [ {n_dag_run_success} / {n_dag_run_all} ]")
if len(dag_run_list) > 0:
for dag_run in dag_run_list:
try:
print(f"\t- {json.dumps(dag_run, separators=(',', ':'))}")
except:
print(f"\t- {dag_run}")
# dump dag_group_run
joblib.dump(
dict(
plan_basis=plan_basis,
data_basis=data_basis,
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
),
os.path.join(
os.getenv("HOME"),
"csst_dag",
f"{dag_group_run['dag_group_run']}.joblib",
),
)
# submit DAG group run
if args.submit:
res = dfs.dag.new_dag_group_run(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
print(res)
import csst_dag
print("csst_dag version: ", csst_dag.__version__)
from csst_dag import DFS
from csst_dfs_client import plan
import sys
import os
plan_path = sys.argv[1]
assert os.path.exists(plan_path), f"Plan file {plan_path} does not exist"
res = plan.write_file(local_file=plan_path)
print(res)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment