Commit 5bc8755f authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

implement general l1

parent 55bacfe2
from ._base_dag import BaseDAG from ._base_dag import BaseDAG
from ._dag_list import DAG_LIST from ._dag_list import DAG_LIST
from .msc import CsstMscL1 from .l1 import CsstL1
DAG_MAP = { DAG_MAP = {
"csst-msc-l1-qc0": CsstMscL1(dag_id="csst-msc-l1-qc0"), "csst-msc-l1-qc0": CsstL1(dag_id="csst-msc-l1-qc0"),
"csst-msc-l1-mbi": CsstMscL1(dag_id="csst-msc-l1-mbi"), "csst-msc-l1-mbi": CsstL1(dag_id="csst-msc-l1-mbi"),
"csst-msc-l1-sls": CsstMscL1(dag_id="csst-msc-l1-sls"), "csst-msc-l1-sls": CsstL1(dag_id="csst-msc-l1-sls"),
"csst-cpic-l1": CsstL1(dag_id="csst-cpic-l1"),
"csst-cpic-l1-qc0": CsstL1(dag_id="csst-cpic-l1-qc0"),
} }
......
...@@ -35,7 +35,7 @@ class BaseDAG: ...@@ -35,7 +35,7 @@ class BaseDAG:
self.dag = yaml.safe_load(f)[0] self.dag = yaml.safe_load(f)[0]
assert ( assert (
self.dag["dag_id"] == self.dag_id self.dag["dag_id"] == self.dag_id
), f"{self.dag}" # , f"{dag_id} not consistent with definition in .yml file." ), f"{self.dag['dag_id']} != {self.dag_id}" # , f"{dag_id} not consistent with definition in .yml file."
with open(json_path, "r") as f: with open(json_path, "r") as f:
self.msg_template = json.load(f) self.msg_template = json.load(f)
self.msg_keys = set(self.msg_template.keys()) self.msg_keys = set(self.msg_template.keys())
......
...@@ -5,6 +5,8 @@ DAG_LIST = [ ...@@ -5,6 +5,8 @@ DAG_LIST = [
"csst-msc-l1-qc0", "csst-msc-l1-qc0",
"csst-msc-l1-mbi", "csst-msc-l1-mbi",
"csst-msc-l1-sls", "csst-msc-l1-sls",
"csst-cpic-l1",
"csst-cpic-l1-qc0",
] ]
EXISTENCE_MAPPING = {True: "✅", False: "❌"} EXISTENCE_MAPPING = {True: "✅", False: "❌"}
......
import json
from ._base_dag import BaseDAG
from csst_dfs_client import plan, level0
# CHIPID_MAP = {
# "csst-msc-l1-mbi": MSC_MBI_CHIPID,
# "csst-msc-l1-sls": MSC_SLS_CHIPID,
# "csst-msc-l1-qc0": MSC_CHIPID,
# }
DAG_PARAMS = {
"csst-msc-l1-mbi": {
"instrument": "MSC",
"additional_keys": {
"chip_id": {
"key_in_dfs": "detector_no",
"key_in_dag": "chip_id",
"enum": [
"06",
"07",
"08",
"09",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"19",
"20",
"22",
"23",
"24",
"25",
],
}
},
},
"csst-msc-l1-sls": {
"instrument": "MSC",
"additional_keys": {
"chip_id": {
"key_in_dfs": "detector_no",
"key_in_dag": "chip_id",
"enum": [
"01",
"02",
"03",
"04",
"05",
"10",
"21",
"26",
"27",
"28",
"29",
"30",
],
},
},
},
"csst-msc-l1-qc0": {
"instrument": "MSC",
"additional_keys": {
"chip_id": {
"key_in_dfs": "detector_no",
"key_in_dag": "chip_id",
"enum": [
"01",
"02",
"03",
"04",
"05",
"06",
"07",
"08",
"09",
"10",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"19",
"20",
"21",
"22",
"23",
"24",
"25",
"26",
"27",
"28",
"29",
"30",
],
},
},
},
"csst-cpic-l1": {
"instrument": "CPIC",
"additional_keys": {},
},
"csst-cpic-l1-qc0": {
"instrument": "CPIC",
"additional_keys": {},
},
}
SCHEDULE_KWARGS = {"priority", "queue", "execution_date"}
class CsstL1(BaseDAG):
def __init__(self, dag_id: str):
super().__init__(dag_id)
self.params = DAG_PARAMS[dag_id] # MSC/MCI/IFS/CPIC/HSTDM
def schedule(
self,
dataset: str = "csst-msc-c9-25sqdeg-v3",
obs_type: str = "WIDE",
project_id="none",
batch_id: str | None = "default",
initial_prc_status: int = -1024,
final_prc_status: int = -2,
demo=True,
**kwargs,
):
assert kwargs.keys() <= SCHEDULE_KWARGS, f"Unknown kwargs: {kwargs.keys()}"
# no need to query plan
# plan.find(
# instrument="MSC",
# dataset=dataset,
# obs_type=obs_type,
# project_id=project_id,
# )
# find level0 data records
recs = level0.find(
instrument=self.params["instrument"],
dataset=dataset,
obs_type=obs_type,
project_id=project_id,
prc_status=initial_prc_status,
)
assert recs.success, recs.message
# generate DAG messages
msgs = []
for this_rec in recs.data:
# filter level0 data records
is_selected = True
additional_keys = {}
for k, v in self.params["additional_keys"].items():
is_selected = is_selected and this_rec[v["key_in_dfs"]] in v["enum"]
additional_keys[v["key_in_dag"]] = this_rec[v["key_in_dfs"]]
if is_selected:
# generate a DAG message if is_selected
this_msg = self.gen_msg(
dataset=dataset,
obs_type=obs_type,
project_id=project_id,
batch_id=batch_id,
obs_id=this_rec["obs_id"],
# chip_id=this_rec["detector_no"],
dag_run_id=self.gen_dag_run_id(),
**additional_keys,
**kwargs,
)
print(json.dumps(this_msg, indent=4))
if not demo:
# push and update
self.push(this_msg)
this_update = level0.update_prc_status(
level0_id=this_rec["level0_id"],
dag_run_id=this_msg["dag_run_id"],
prc_status=final_prc_status,
dataset=dataset,
)
assert this_update.success, this_update.message
msgs.append(this_msg)
return msgs
- dag_id: csst-cpic-l1-qc0 - dag_id: csst-cpic-l1
dag: dag:
tasks: tasks:
- name: QC0 - name: QC0
image: csst-cpic-l1-qc0 image: csst-cpic-l1-qc0
- name: CPIC - name: CPIC
image: csst-cpic-l1-qc0 image: csst-cpic-l1
from csst_dag import DFS, dags from csst_dag import DFS, CsstDAG
# dfs = DFS() # dfs = DFS()
dag = dags.get_dag("csst-msc-l1-qc0") dag = CsstDAG.get_dag("csst-msc-l1-qc0")
dag.msg_template dag.msg_template
dag.params
dag = dags.get_dag("csst-msc-l1-mbi") dag = CsstDAG.get_dag("csst-msc-l1-mbi")
dag.msg_template dag.msg_template
dag = dags.get_dag("csst-msc-l1-sls") dag = CsstDAG.get_dag("csst-msc-l1-sls")
dag.msg_template dag.msg_template
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment