Commit e73e36ec authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

update general l1 dags

parent 0eceb6af
from ._base_dag import BaseDAG from ._base_dag import BaseDAG
from ._dag_list import DAG_LIST from ._dag_list import DAG_LIST
from .l1 import GeneralL1DAG from .dags import GeneralDAGViaObsid
class CsstDAGs(dict): class CsstDAGs(dict):
...@@ -9,13 +9,25 @@ class CsstDAGs(dict): ...@@ -9,13 +9,25 @@ class CsstDAGs(dict):
""" """
dag_list = { dag_list = {
"csst-msc-l1-qc0": GeneralL1DAG(dag_group="msc-l1", dag="csst-msc-l1-qc0"), "csst-msc-l1-qc0": GeneralDAGViaObsid(
"csst-msc-l1-mbi": GeneralL1DAG(dag_group="msc-l1", dag="csst-msc-l1-mbi"), dag_group="msc-l1", dag="csst-msc-l1-qc0"
"csst-msc-l1-ast": GeneralL1DAG(dag_group="msc-l1", dag="csst-msc-l1-ast"), ),
"csst-msc-l1-sls": GeneralL1DAG(dag_group="msc-l1", dag="csst-msc-l1-sls"), "csst-msc-l1-mbi": GeneralDAGViaObsid(
"csst-msc-l1-ooc": GeneralL1DAG(dag_group="msc-l1", dag="csst-msc-l1-ooc"), dag_group="msc-l1", dag="csst-msc-l1-mbi"
"csst-cpic-l1": GeneralL1DAG(dag_group="cpic-l1", dag="csst-cpic-l1"), ),
"csst-cpic-l1-qc0": GeneralL1DAG(dag_group="cpic-l1", dag="csst-cpic-l1-qc0"), "csst-msc-l1-ast": GeneralDAGViaObsid(
dag_group="msc-l1", dag="csst-msc-l1-ast"
),
"csst-msc-l1-sls": GeneralDAGViaObsid(
dag_group="msc-l1", dag="csst-msc-l1-sls"
),
"csst-msc-l1-ooc": GeneralDAGViaObsid(
dag_group="msc-l1", dag="csst-msc-l1-ooc"
),
"csst-cpic-l1": GeneralDAGViaObsid(dag_group="cpic-l1", dag="csst-cpic-l1"),
"csst-cpic-l1-qc0": GeneralDAGViaObsid(
dag_group="cpic-l1", dag="csst-cpic-l1-qc0"
),
} }
def __init__(self): def __init__(self):
......
...@@ -52,7 +52,7 @@ class BaseDAG: ...@@ -52,7 +52,7 @@ class BaseDAG:
INSTRUMENT_ENUM = ("MSC", "MCI", "IFS", "CPIC", "HSTDM") INSTRUMENT_ENUM = ("MSC", "MCI", "IFS", "CPIC", "HSTDM")
def __init__(self, dag_group: str, dag: str): def __init__(self, dag_group: str, dag: str, use_detector: bool = False):
"""Initialize a DAG instance with configuration loading. """Initialize a DAG instance with configuration loading.
Parameters Parameters
...@@ -61,6 +61,8 @@ class BaseDAG: ...@@ -61,6 +61,8 @@ class BaseDAG:
Name of the DAG group. Name of the DAG group.
dag : str dag : str
Name of the DAG. Name of the DAG.
use_detector : bool, optional
Flag to determine if `detector` is used.
Raises Raises
------ ------
...@@ -70,7 +72,9 @@ class BaseDAG: ...@@ -70,7 +72,9 @@ class BaseDAG:
# Set DAG name # Set DAG name
self.dag_group = dag_group self.dag_group = dag_group
self.dag = dag self.dag = dag
self.use_detector = use_detector
assert dag in DAG_LIST, f"{dag} not in DAG_MAP" assert dag in DAG_LIST, f"{dag} not in DAG_MAP"
# determine instrument # determine instrument
self.instrument = dag.split("-")[1].upper() # e.g., "MSC" self.instrument = dag.split("-")[1].upper() # e.g., "MSC"
assert self.instrument in self.INSTRUMENT_ENUM, self.instrument assert self.instrument in self.INSTRUMENT_ENUM, self.instrument
......
import json
import numpy as np
from csst_dfs_client import plan, level0
from ._base_dag import BaseDAG
from ..csst import csst
# DAG_DETECTOR_NAMES = {
# "csst-msc-l1-mbi": {"detector": csst["msc"]["mbi"].effective_detector_names},
# "csst-msc-l1-ast": {"detector": csst["msc"]["mbi"].effective_detector_names},
# "csst-msc-l1-sls": {"detector": csst["msc"]["sls"].effective_detector_names},
# "csst-msc-l1-qc0": {"detector": csst["msc"].effective_detector_names},
# "csst-msc-l1-ooc": {"detector": csst["msc"].effective_detector_names},
# "csst-mci-l1": {"detector": csst["mci"].effective_detector_names},
# "csst-ifs-l1-rss": {"detector": csst["ifs"].effective_detector_names},
# "csst-cpic-l1": {"detector": csst["cpic"].effective_detector_names},
# "csst-cpic-l1-qc0": {"detector": csst["cpic"].effective_detector_names},
# "csst-hstdm-l1": {"detector": csst["cpic"].effective_detector_names},
# }
def get_detector_names_via_dag(dag: str) -> list[str]:
if "msc" in dag:
if "mbi" in dag:
return csst["msc"]["mbi"].effective_detector_names
elif "sls" in dag:
return csst["msc"]["sls"].effective_detector_names
else:
return csst["msc"].effective_detector_names
elif "mci" in dag:
return csst["mci"].effective_detector_names
elif "ifs" in dag:
return csst["ifs"].effective_detector_names
elif "cpic" in dag:
return csst["cpic"].effective_detector_names
elif "hstdm" in dag:
return csst["hstdm"].effective_detector_names
class GeneralDAGViaObsid(BaseDAG):
def __init__(self, dag_group: str, dag: str, use_detector: bool = False):
super().__init__(dag_group=dag_group, dag=dag, use_detector=use_detector)
# set effective detector names
self.effective_detector_names = get_detector_names_via_dag(dag)
def schedule(
self,
batch_id: str | None = "-",
priority: int = 1,
dataset: str = "csst-msc-c9-25sqdeg-v3",
obs_type: str = "WIDE",
obs_group: str = "W1",
pmapname: str = "csst_000001.map",
initial_prc_status: int = -1024, # level0 prc_status level1
final_prc_status: int = -2,
demo=True,
):
# no need to query plan
# plan.write_file(local_path="plan.json")
# plan.find(
# instrument="MSC",
# dataset=dataset,
# obs_type=obs_type,
# project_id=project_id,
# )
# generate a dag_group_run
dag_group_run = self.gen_dag_group_run(
dag_group=self.dag_group,
batch_id=batch_id,
priority=priority,
)
dag_run_list = []
# find level0 data records
recs = level0.find(
instrument=self.instrument,
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
prc_status=initial_prc_status,
)
assert recs.success, recs
print(f"{len(recs.data)} records -> ", end="")
if self.use_detector:
# generate DAG messages via obs_id-detector
for this_rec in recs.data:
# filter level0 data records: detector in expected list
if this_rec["detector"] in self.effective_detector_names:
# generate a DAG message if is_selected
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
dag_run=self.generate_sha1(),
batch_id=batch_id,
pmapname=pmapname,
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
obs_id=this_rec["obs_id"],
detector=this_rec["detector"],
)
dag_run_list.append(this_dag_run)
# update level0 prc_status
if not demo:
this_update = level0.update_prc_status(
level0_id=this_rec["level0_id"],
dag_run=this_dag_run["dag_run"],
prc_status=final_prc_status,
dataset=dataset,
)
assert this_update.success, this_update.message
# generate DAG messages via obs_id
else:
u_obsid, c_obsid = np.unique(
[this_rec["obs_id"] for this_rec in recs.data],
return_counts=True,
)
# select those obs_ids with `counts == effective detector number`
u_obsid_selected = u_obsid[
c_obsid == len(self.effective_detector_names)
]
for this_obsid in u_obsid[u_obsid_selected]:
# generate a DAG message if is_selected
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
dag_run=self.generate_sha1(),
batch_id=batch_id,
pmapname=pmapname,
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
obs_id=this_obsid,
)
dag_run_list.append(this_dag_run)
if not demo:
# push and update
res_push = self.push_dag_group_run(dag_group_run, dag_run_list)
print(
f"{len(dag_run_list)} DAG runs -> "
f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))} -> "
f"{res_push}"
)
assert res_push.success, res_push.message
else:
# no push
print(
f"{len(dag_run_list)} DAG runs -> "
f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))}"
)
# TODO: `dag_group_run` and `dag_run_list` should be dumped to a text file in the future
return dict(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
class GeneralDAGViaObsgroup(BaseDAG):
def __init__(self, dag_group: str, dag: str, use_detector: bool = False):
super().__init__(dag_group=dag_group, dag=dag, use_detector=use_detector)
# set effective detector names
self.effective_detector_names = get_detector_names_via_dag(dag)
def schedule(
self,
batch_id: str | None = "-",
priority: int = 1,
dataset: str = "csst-msc-c9-25sqdeg-v3",
obs_type: str = "WIDE",
obs_group: str = "W1",
pmapname: str = "csst_000001.map",
initial_prc_status: int = -1024, # level0 prc_status level1
final_prc_status: int = -2,
demo=True,
):
# generate a dag_group_run
dag_group_run = self.gen_dag_group_run(
dag_group=self.dag_group,
batch_id=batch_id,
priority=priority,
)
dag_run_list = []
# find plan with compact mode
plan.count_plan_level0(
instrument=self.instrument,
obs_type=obs_type,
obs_group=obs_group,
dataset=dataset,
prc_status=initial_prc_status,
)
res_plan_level0 = plan.count_plan_level0(
instrument="MSC",
obs_type="WIDE",
obs_group="W1",
dataset="csst-msc-c9-25sqdeg-v3",
prc_status=-1024,
)
assert res_plan_level0.success, res_plan_level0
n_plan = res_plan_level0.data["plan_count"]
n_level0 = res_plan_level0.data["level0_count"]
# find level0 records
if n_plan == 0:
print(f"No plan found for {obs_type} {obs_group} {dataset}")
if n_level0 < n_plan * self.n_effective_detector:
print(
f"Plan {obs_type} {obs_group} {dataset} has {n_plan} plans, "
f"but {n_level0} level0 records found"
)
if n_plan == n_level0 * self.n_effective_detector:
print(
f"Plan {obs_type} {obs_group} {dataset} has {n_plan} plans, "
f"and {n_level0} level0 records found"
)
# generate DAG run list
if not self.use_detector:
# generate a DAG run via obs_group
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
batch_id=batch_id,
dag_run=self.generate_sha1(),
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
pmapname=pmapname,
)
dag_run_list.append(this_dag_run)
else:
# generate DAG runs via obs_group-detectors
for this_detector in self.effective_detector_names:
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
batch_id=batch_id,
dag_run=self.generate_sha1(),
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
detector=this_detector,
pmapname=pmapname,
)
dag_run_list.append(this_dag_run)
if not demo:
# push and update
res_push = self.push_dag_group_run(dag_group_run, dag_run_list)
print(
f"{len(dag_run_list)} DAG runs -> "
f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))} -> "
f"{res_push}"
)
return dict(
dag_group_run=dag_group_run,
dag_run_list=dag_run_list,
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment