Commit 727f17da authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

remove useless modules: dags dag_list

parent c1a52e53
import os
import glob
DAG_LIST = [
"csst-msc-l1-qc0",
"csst-msc-l1-mbi",
"csst-msc-l1-ast",
"csst-msc-l1-sls",
"csst-msc-l1-ooc",
"csst-cpic-l1",
"csst-cpic-l1-qc0",
]
EXISTENCE_MAPPING = {True: "✅", False: "❌"}
print(" DAG \t definition task")
print("--------------------------------------------")
for this_dag_id in DAG_LIST:
this_dag_definition_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"dag_config",
f"{this_dag_id}.yml",
)
this_dag_task_template_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"dag_config",
f"{this_dag_id}.json",
)
status_def = EXISTENCE_MAPPING[os.path.exists(this_dag_definition_file)]
status_task = EXISTENCE_MAPPING[os.path.exists(this_dag_task_template_file)]
print(f"{this_dag_id} \t {status_def} {status_task}")
import json
import numpy as np
from csst_dfs_client import plan, level0
from .._csst import csst, CsstPlanObsid, CsstPlanObsgroup
from ._base_dag import BaseDAG
from .._csst import csst
# DAG_DETECTOR_NAMES = {
# "csst-msc-l1-mbi": {"detector": csst["msc"]["mbi"].effective_detector_names},
# "csst-msc-l1-ast": {"detector": csst["msc"]["mbi"].effective_detector_names},
# "csst-msc-l1-sls": {"detector": csst["msc"]["sls"].effective_detector_names},
# "csst-msc-l1-qc0": {"detector": csst["msc"].effective_detector_names},
# "csst-msc-l1-ooc": {"detector": csst["msc"].effective_detector_names},
# "csst-mci-l1": {"detector": csst["mci"].effective_detector_names},
# "csst-ifs-l1-rss": {"detector": csst["ifs"].effective_detector_names},
# "csst-cpic-l1": {"detector": csst["cpic"].effective_detector_names},
# "csst-cpic-l1-qc0": {"detector": csst["cpic"].effective_detector_names},
# "csst-hstdm-l1": {"detector": csst["cpic"].effective_detector_names},
# }
def get_detector_names_via_dag(dag: str) -> list[str]:
if "msc" in dag:
if "mbi" in dag or "ast" in dag:
return csst["MSC"]["MBI"].effective_detector_names
elif "sls" in dag:
return csst["MSC"]["SLS"].effective_detector_names
else:
return csst["MSC"].effective_detector_names
elif "mci" in dag:
return csst["MCI"].effective_detector_names
elif "ifs" in dag:
return csst["IFS"].effective_detector_names
elif "cpic" in dag:
return csst["CPIC"].effective_detector_names
elif "hstdm" in dag:
return csst["HSTDM"].effective_detector_names
class GeneralDAGViaObsid(BaseDAG):
def __init__(self, dag_group: str, dag: str, use_detector: bool = False):
super().__init__(dag_group=dag_group, dag=dag, use_detector=use_detector)
# set effective detector names
self.effective_detector_names = get_detector_names_via_dag(dag)
def schedule(
self,
batch_id: str | None = "-",
priority: int = 1,
dataset: str = "csst-msc-c9-25sqdeg-v3",
obs_type: str = "WIDE",
obs_group: str = "W1",
pmapname: str = "csst_000001.map",
initial_prc_status: int = -1024, # level0 prc_status level1
final_prc_status: int = -2,
demo=True,
):
# no need to query plan
# plan.write_file(local_path="plan.json")
# plan.find(
# instrument="MSC",
# dataset=dataset,
# obs_type=obs_type,
# project_id=project_id,
# )
# generate a dag_group_run
dag_group_run = self.gen_dag_group_run(
dag_group=self.dag_group,
batch_id=batch_id,
priority=priority,
)
dag_run_list = []
# find level0 data records
recs = level0.find(
instrument=self.instrument,
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
prc_status=initial_prc_status,
)
assert recs.success, recs
print(f"{len(recs.data)} records -> ", end="")
if self.use_detector:
# generate DAG messages via obs_id-detector
for i_rec, this_rec in enumerate(recs.data):
# print(i_rec, this_rec, self.effective_detector_names)
# filter level0 data records: detector in expected list
if this_rec["detector"] in self.effective_detector_names:
# generate a DAG message if is_selected
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
dag_run=self.generate_sha1(),
batch_id=batch_id,
pmapname=pmapname,
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
obs_id=this_rec["obs_id"],
detector=this_rec["detector"],
)
dag_run_list.append(this_dag_run)
# update level0 prc_status
if not demo:
this_update = level0.update_prc_status(
level0_id=this_rec["level0_id"],
dag_run=this_dag_run["dag_run"],
prc_status=final_prc_status,
dataset=dataset,
)
assert this_update.success, this_update.message
# generate DAG messages via obs_id
else:
u_obsid, c_obsid = np.unique(
[this_rec["obs_id"] for this_rec in recs.data],
return_counts=True,
)
# select those obs_ids with `counts == effective detector number`
u_obsid_selected = u_obsid[c_obsid == len(self.effective_detector_names)]
for this_obsid in u_obsid[u_obsid_selected]:
# generate a DAG message if is_selected
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
dag_run=self.generate_sha1(),
batch_id=batch_id,
pmapname=pmapname,
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
obs_id=this_obsid,
)
dag_run_list.append(this_dag_run)
if not demo:
# push and update
res_push = self.push_dag_group_run(dag_group_run, dag_run_list)
print(
f"{len(dag_run_list)} DAG runs -> "
f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))} -> "
f"{res_push}"
)
assert res_push.success, res_push.message
else:
# no push
print(
f"{len(dag_run_list)} DAG runs -> "
f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))}"
)
# TODO: `dag_group_run` and `dag_run_list` should be dumped to a text file in the future
return dict(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
class GeneralDAGViaObsgroup(BaseDAG):
def __init__(self, dag_group: str, dag: str, use_detector: bool = False):
super().__init__(dag_group=dag_group, dag=dag, use_detector=use_detector)
# set effective detector names
# self.effective_detector_names = get_detector_names_via_dag(dag)
def schedule(
self,
batch_id: str | None = "-",
priority: int = 1,
dataset: str = "csst-msc-c9-25sqdeg-v3",
obs_type: str = "WIDE",
obs_group: str = "W1",
pmapname: str = "csst_000001.pmap",
initial_prc_status: int = -1024, # level0 prc_status level1
final_prc_status: int = -2,
demo=True,
):
# generate a dag_group_run
dag_group_run = self.gen_dag_group_run(
dag_group=self.dag_group,
batch_id=batch_id,
priority=priority,
)
dag_run_list = []
instrument = "MSC"
obs_type = "BIAS"
obs_group = "bias-temperature-var"
dataset = "csst-msc-c9-ooc-v1"
initial_prc_status = -1024
final_prc_status = -2
# find plan with compact mode
qr_count = plan.count_plan_level0(
instrument=instrument,
obs_type=obs_type,
obs_group=obs_group,
dataset=dataset,
prc_status=initial_prc_status,
)
assert qr_count.success, qr_count
n_plan = qr_count.data["plan_count"]
n_level0 = qr_count.data["level0_count"]
assert n_plan > 0, f"No plan found for {obs_type} {obs_group} {dataset}"
assert (
n_level0 > 0
), f"No level0 record found for {obs_type} {obs_group} {dataset}"
# find plan and level0 data
qr_plan = plan.find(
instrument=instrument,
obs_type=obs_type,
obs_group=obs_group,
dataset=dataset,
)
qr_level0 = level0.find(
instrument=instrument,
obs_type=obs_type,
obs_group=obs_group,
dataset=dataset,
prc_status=initial_prc_status,
)
obs_group = CsstPlanObsgroup.from_plan(qr_plan.data)
# find level0 records
if n_plan == 0:
print(f"No plan found for {obs_type} {obs_group} {dataset}")
if n_level0 < n_plan * self.n_effective_detector:
print(
f"Plan {obs_type} {obs_group} {dataset} has {n_plan} plans, "
f"but {n_level0} level0 records found"
)
if n_plan == n_level0 * self.n_effective_detector:
print(
f"Plan {obs_type} {obs_group} {dataset} has {n_plan} plans, "
f"and {n_level0} level0 records found"
)
# generate DAG run list
if not self.use_detector:
# generate a DAG run via obs_group
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
batch_id=batch_id,
dag_run=self.generate_sha1(),
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
pmapname=pmapname,
)
dag_run_list.append(this_dag_run)
else:
# generate DAG runs via obs_group-detectors
for this_detector in self.effective_detector_names:
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
batch_id=batch_id,
dag_run=self.generate_sha1(),
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
detector=this_detector,
pmapname=pmapname,
)
dag_run_list.append(this_dag_run)
if not demo:
# push and update
res_push = self.push_dag_group_run(dag_group_run, dag_run_list)
print(
f"{len(dag_run_list)} DAG runs -> "
f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))} -> "
f"{res_push}"
)
return dict(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment