Commit beea95ad authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

update msc DAGs

parent f7806201
from ._base_dag import BaseDAG from ._base_dag import BaseDAG
from ._dag_list import DAG_LIST from ._dag_list import DAG_LIST
from .msc import CsstMscL1Mbi from .msc import CsstMscL1
DAG_MAP = { DAG_MAP = {
"csst-msc-l1-mbi": CsstMscL1Mbi(), "csst-msc-l1-qc0": CsstMscL1(dag_id="csst-msc-l1-qc0"),
# "csst-msc-l1-qc0": CsstMscL1Qc0(), "csst-msc-l1-mbi": CsstMscL1(dag_id="csst-msc-l1-mbi"),
"csst-msc-l1-sls": CsstMscL1(dag_id="csst-msc-l1-sls"),
} }
......
...@@ -42,6 +42,7 @@ class BaseDAG: ...@@ -42,6 +42,7 @@ class BaseDAG:
self.dfs = DFS(location=os.getenv("DFS_LOCATION")) self.dfs = DFS(location=os.getenv("DFS_LOCATION"))
def gen_msg(self, **kwargs): def gen_msg(self, **kwargs):
"""Load message template and generate message dictionary."""
msg = self.msg_template.copy() msg = self.msg_template.copy()
for k, v in kwargs.items(): for k, v in kwargs.items():
assert k in self.msg_keys, f"{k} not in {self.msg_keys}" assert k in self.msg_keys, f"{k} not in {self.msg_keys}"
...@@ -53,7 +54,7 @@ class BaseDAG: ...@@ -53,7 +54,7 @@ class BaseDAG:
# pass # pass
# #
def schedule(self, **kwargs): def schedule(self, **kwargs):
pass raise NotImplementedError("Not implemented yet")
@staticmethod @staticmethod
def gen_dag_run_id(digits=6): def gen_dag_run_id(digits=6):
......
...@@ -2,8 +2,9 @@ import os ...@@ -2,8 +2,9 @@ import os
import glob import glob
DAG_LIST = [ DAG_LIST = [
"csst-msc-l1-mbi",
"csst-msc-l1-qc0", "csst-msc-l1-qc0",
"csst-msc-l1-mbi",
"csst-msc-l1-sls",
] ]
EXISTENCE_MAPPING = {True: "✅", False: "❌"} EXISTENCE_MAPPING = {True: "✅", False: "❌"}
......
...@@ -2,7 +2,6 @@ import json ...@@ -2,7 +2,6 @@ import json
from ._base_dag import BaseDAG from ._base_dag import BaseDAG
from csst_dfs_client import plan, level0 from csst_dfs_client import plan, level0
__all__ = ["CsstMscL1Mbi"]
MSC_MBI_CHIPID = [ MSC_MBI_CHIPID = [
"06", "06",
...@@ -39,19 +38,32 @@ MSC_SLS_CHIPID = [ ...@@ -39,19 +38,32 @@ MSC_SLS_CHIPID = [
"29", "29",
"30", "30",
] ]
MSC_CHIPID = MSC_MBI_CHIPID + MSC_SLS_CHIPID
CHIPID_MAP = {
"csst-msc-l1-mbi": MSC_MBI_CHIPID,
"csst-msc-l1-sls": MSC_SLS_CHIPID,
"csst-msc-l1-qc0": MSC_CHIPID,
}
class CsstMscL1Mbi(BaseDAG):
def __init__(self):
super().__init__("csst-msc-l1-mbi")
def schedule( class CsstMscL1(BaseDAG):
def __init__(self, dag_id: str):
super().__init__(dag_id)
self.CHIPID = CHIPID_MAP[dag_id]
def schedule(self, **kwargs):
return self._base_schedule(**kwargs)
def _base_schedule(
self, self,
dataset: str = "csst-msc-c9-25sqdeg-v3", dataset: str = "csst-msc-c9-25sqdeg-v3",
obs_type: str = "WIDE", obs_type: str = "WIDE",
project_id="none", project_id="none",
batch_id: str | None = "default", batch_id: str | None = "default",
prc_status: int = -1024, initial_prc_status: int = -1024,
final_prc_status: int = -2,
demo=True, demo=True,
**kwargs, **kwargs,
): ):
...@@ -74,12 +86,12 @@ class CsstMscL1Mbi(BaseDAG): ...@@ -74,12 +86,12 @@ class CsstMscL1Mbi(BaseDAG):
dataset=dataset, dataset=dataset,
obs_type=obs_type, obs_type=obs_type,
project_id=project_id, project_id=project_id,
prc_status=prc_status, prc_status=initial_prc_status,
) )
assert recs.success, recs.message assert recs.success, recs.message
msgs = [] msgs = []
for this_rec in recs.data: for this_rec in recs.data:
if this_rec["detector_no"] in MSC_MBI_CHIPID: if this_rec["detector_no"] in self.CHIPID:
this_msg = self.gen_msg( this_msg = self.gen_msg(
dataset=dataset, dataset=dataset,
obs_type=obs_type, obs_type=obs_type,
...@@ -96,7 +108,7 @@ class CsstMscL1Mbi(BaseDAG): ...@@ -96,7 +108,7 @@ class CsstMscL1Mbi(BaseDAG):
this_update = level0.update_prc_status( this_update = level0.update_prc_status(
level0_id=this_rec["level0_id"], level0_id=this_rec["level0_id"],
dag_run_id=this_msg["dag_run_id"], dag_run_id=this_msg["dag_run_id"],
prc_status=-2, prc_status=final_prc_status,
dataset=dataset, dataset=dataset,
) )
assert this_update.success, this_update.message assert this_update.success, this_update.message
......
{ {
"priority": 1, "priority": 1,
"dag_id": "csst-msc-l1-sls", "dag_id": "csst-msc-l1-qc0",
"dag_run_id": "12345", "dag_run_id": "12345",
"dataset": "csst-msc-c9-25sqdeg-v3", "dataset": "csst-msc-c9-25sqdeg-v3",
"obs_type": "BIAS", "obs_type": "BIAS",
......
...@@ -55,21 +55,23 @@ class DFS: ...@@ -55,21 +55,23 @@ class DFS:
print(status_table) print(status_table)
print("\n") print("\n")
if status_table["status"].sum() == 0: if status_table["status"].sum() == 0:
raise ValueError("No DFS location is available") print("No DFS location is available")
elif status_table["status"].sum() > 1: elif status_table["status"].sum() > 1:
print("Multiple DFS locations are available, please specify one") print("Multiple DFS locations are available, please specify one")
raise ValueError("Multiple DFS locations are available") elif location is None:
else: # set DFS automatically
if location is None: if status_table["status"].sum() == 1:
# set DFS automatically print("One DFS locations are available, good")
assert (
status_table["status"].sum() == 1
), "Multiple DFS locations are available"
location = status_table["location"][status_table["status"]][0] location = status_table["location"][status_table["status"]][0]
print(f"Using DFS location: {location}") elif status_table["status"].sum() == 0:
print("No DFS location is available, using csu")
location = "csu"
else:
raise ValueError("Multiple DFS locations are available")
self.location = location self.location = location
self.config = CONFIG[self.location] self.config = CONFIG[self.location]
print(f"Using DFS location: {location}")
for k, v in CONFIG[self.location]["dfs"].items(): for k, v in CONFIG[self.location]["dfs"].items():
os.environ.setdefault(k, str(v)) os.environ.setdefault(k, str(v))
# print("Setting redis config:") # print("Setting redis config:")
......
from csst_dag import DFS, dags from csst_dag import DFS, dags
dfs = DFS() # dfs = DFS()
dag = dags.get_dag("csst-msc-l1-qc0")
dag.msg_template
dag = dags.get_dag("csst-msc-l1-mbi") dag = dags.get_dag("csst-msc-l1-mbi")
dag.msg_template
dag = dags.get_dag("csst-msc-l1-sls")
dag.msg_template
dag.schedule( dag.schedule(
dataset="csst-msc-c9-25sqdeg-v3", dataset="csst-msc-c9-25sqdeg-v3",
obs_type="WIDE", obs_type="WIDE",
project_id="none", project_id="none",
batch_id="default", batch_id="default",
prc_status=-1024, initial_prc_status=-2,
demo=True, final_prc_status=-2,
demo=False,
) )
from csst_dag.dag.msc import CsstMscL1Mbi
dag = dags.get_dag("csst-msc-l1-mbi")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment