Commit 8373a725 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

update general l1 dags

parent e73e36ec
import json
from ._base_dag import BaseDAG
from csst_dfs_client import plan, level0
MSC_DETECTORS = [
"01",
"02",
"03",
"04",
"05",
"06",
"07",
"08",
"09",
"10",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"19",
"20",
"21",
"22",
"23",
"24",
"25",
"26",
"27",
"28",
"29",
"30",
]
MSC_MBI_DETECTORS = [
"06",
"07",
"08",
"09",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"19",
"20",
"22",
"23",
"24",
"25",
]
MSC_SLS_DETECTORS = [
"01",
"02",
"03",
"04",
"05",
"10",
"21",
"26",
"27",
"28",
"29",
"30",
]
DAG_PARAMS = {
"csst-msc-l1-mbi": {
"detector": MSC_MBI_DETECTORS,
},
"csst-msc-l1-ast": {
"detector": MSC_MBI_DETECTORS,
},
"csst-msc-l1-sls": {
"detector": MSC_SLS_DETECTORS,
},
"csst-msc-l1-qc0": {
"detector": MSC_DETECTORS,
},
"csst-cpic-l1": {
"detector": ["VIS"],
},
"csst-cpic-l1-qc0": {
"detector": ["VIS"],
},
}
class GeneralL1DAG(BaseDAG):
def __init__(self, dag_group: str, dag: str):
super().__init__(dag_group=dag_group, dag=dag)
# set parameters
self.params = DAG_PARAMS.get(dag, {})
def schedule(
self,
batch_id: str | None = "-",
priority: int = 1,
dataset: str = "csst-msc-c9-25sqdeg-v3",
obs_type: str = "WIDE",
obs_group="W1",
initial_prc_status: int = -1024, # level0 prc_status level1
final_prc_status: int = -2,
demo=True,
):
# no need to query plan
# plan.write_file(local_path="plan.json")
# plan.find(
# instrument="MSC",
# dataset=dataset,
# obs_type=obs_type,
# project_id=project_id,
# )
# generate a dag_group_run
dag_group_run = self.gen_dag_group_run(
dag_group=self.dag_group,
batch_id=batch_id,
priority=priority,
)
# find level0 data records
recs = level0.find(
instrument=self.instrument,
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
prc_status=initial_prc_status,
)
assert recs.success, recs.message
print(f"{len(recs.data)} records -> ", end="")
# generate DAG messages
dag_run_list = []
for this_rec in recs.data:
# filter level0 data records
is_selected = True
additional_keys = {}
for k, v in self.params.items():
is_selected = this_rec[k] in v and is_selected
additional_keys[k] = this_rec[k]
if is_selected:
# generate a DAG message if is_selected
this_dag_run = self.gen_dag_run(
dag_group_run=dag_group_run,
batch_id=batch_id,
dag_run=self.generate_sha1(),
dataset=dataset,
obs_type=obs_type,
obs_group=obs_group,
obs_id=this_rec["obs_id"],
**additional_keys,
)
dag_run_list.append(this_dag_run)
# update level0 prc_status
if not demo:
this_update = level0.update_prc_status(
level0_id=this_rec["level0_id"],
dag_run_id=this_dag_run["dag_run_id"],
prc_status=final_prc_status,
dataset=dataset,
)
assert this_update.success, this_update.message
if not demo:
# push and update
res_push = self.push_dag_group_run(dag_group_run, dag_run_list)
print(
f"{len(dag_run_list)} DAG runs -> "
f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))} -> "
f"{res_push}"
)
assert res_push.success, res_push.message
else:
# no push
print(
f"{len(dag_run_list)} DAG runs -> "
f"{json.dumps(dag_group_run, indent=None, separators=(',', ':'))}"
)
# TODO: `dag_group_run` and `dag_run_list` should be dumped to a text file in the future
return dag_group_run, dag_run_list
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment