Commit 46d721f1 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

tweaks

parent 47f70a8e
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from ._dag_list import DAG_LIST from ._dag_list import DAG_LIST
from ..dfs import DFS
import yaml import yaml
import os import os
import glob import glob
...@@ -38,6 +39,14 @@ class BaseDAG(ABC): ...@@ -38,6 +39,14 @@ class BaseDAG(ABC):
with open(json_path, "r") as f: with open(json_path, "r") as f:
self.msg_template = json.load(f) self.msg_template = json.load(f)
self.msg_keys = set(self.msg_template.keys()) self.msg_keys = set(self.msg_template.keys())
self.dfs = DFS(location=os.getenv("DFS_LOCATION"))
def gen_msg(self, **kwargs):
msg = self.msg_template.copy()
for k, v in kwargs.items():
assert k in self.msg_keys, f"{k} not in {self.msg_keys}"
msg[k] = v
return msg
# @abstractmethod # @abstractmethod
# def trigger(self, **kwargs) -> None: # def trigger(self, **kwargs) -> None:
...@@ -59,10 +68,6 @@ class BaseDAG(ABC): ...@@ -59,10 +68,6 @@ class BaseDAG(ABC):
dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)] dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
return dag_run_id return dag_run_id
# @abstractmethod
# @abstractmethod def push(self, msg: str) -> None:
# def push(self) -> None: return self.dfs.redis.push(msg)
# pass
# def __call__(self, **kwargs):
# self.trigger(**kwargs)
...@@ -51,6 +51,8 @@ class CsstMscL1Mbi(BaseDAG): ...@@ -51,6 +51,8 @@ class CsstMscL1Mbi(BaseDAG):
obs_type: str = "WIDE", obs_type: str = "WIDE",
project_id="none", project_id="none",
batch_id: str | None = "default", batch_id: str | None = "default",
prc_status: int = -1024,
demo=True,
**kwargs, **kwargs,
): ):
# dataset: str = "csst-msc-c9-25sqdeg-v3" # dataset: str = "csst-msc-c9-25sqdeg-v3"
...@@ -72,22 +74,33 @@ class CsstMscL1Mbi(BaseDAG): ...@@ -72,22 +74,33 @@ class CsstMscL1Mbi(BaseDAG):
dataset=dataset, dataset=dataset,
obs_type=obs_type, obs_type=obs_type,
project_id=project_id, project_id=project_id,
prc_status=prc_status,
) )
assert recs.success, recs.message assert recs.success, recs.message
msgs = [] msgs = []
for rec in recs.data: for this_rec in recs.data:
msg = self.msg_template.copy() if this_rec["detector_no"] in MSC_MBI_CHIPID:
msg.update(kwargs) this_msg = self.gen_msg(
msg.update(
dict(
dataset=dataset, dataset=dataset,
obs_type=obs_type, obs_type=obs_type,
project_id=project_id, project_id=project_id,
batch_id=batch_id, batch_id=batch_id,
obs_id=rec["obs_id"], obs_id=this_rec["obs_id"],
chipid=rec["detector_no"], chipid=this_rec["detector_no"],
dag_run_id=self.gen_dag_run_id(), dag_run_id=self.gen_dag_run_id(),
**kwargs,
) )
if not demo:
level0.update_prc_status(
this_rec["level0_id"],
this_msg["dag_run_id"],
prc_status=-2,
dataset=dataset,
) )
msgs.append(msg) self.push(this_msg)
msgs.append(this_msg)
# self.push(msg)
return msgs return msgs
# push msg
# update prc_status
from csst_dag.dfs import DFS from csst_dag.dfs import DFS
import os
dfs = DFS()
def test_dfs_naoc(): os.environ
dfs = DFS(location="naoc")
print(dfs.config)
assert len(dfs.redis.get_all()) == 0
from csst_dag import DFS, dags
dfs = DFS()
dag = dags.get_dag("csst-msc-l1-mbi")
dag.schedule(
dataset="csst-msc-c9-25sqdeg-v3",
obs_type="WIDE",
project_id="none",
batch_id="default",
prc_status=-1024,
demo=True,
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment