Commit 1d237193 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

update DAG messages

parent 84bfc3c1
10109200100412
10109200100413
10109200100414
10109200107587
10109200155157
10109200159781
10109200204333
10109200206112
10109200244032
10109200244125
10109200244146
10109200244172
10109200244757
10109200245216
10109200249013
10109200250794
10109200261370
10109200261371
10109200261437
10109200261457
10109200261477
10109200261495
10109200261496
10109200261514
10109200261515
10109200261625
10109200261626
10109200261644
10109200261645
10109200261663
10109200261702
10109200261757
10109200261800
10109200261859
10109200261860
10109200261938
10109200261992
10109200262712
10109200262991
10109200263242
10109200263310
10109200263688
10109200264030
10109200264264
10109200264310
10109200264311
10109200264352
10109200264394
10109200264395
10109200264493
10109200264516
10109200264517
10109200264562
10109200264563
10109200264607
10109200264653
10109200264671
10109200264815
10109200264883
10109200264906
10109200265055
10109200265266
10109200265289
10109200265472
10109200265495
10109200265540
10109200265563
10109200265609
10109200267605
10109200267786
10109200268067
10109200268103
10109200268396
10109200273030
10109200273318
10109200273355
10109200273375
10109200273417
10109200273439
10109200273482
10109200273549
10109200273642
10109200273664
10109200273683
10109200273726
10109200273794
10109200273815
10109200273860
10109200273980
10109200273999
10109200274059
10109200274083
10109200274104
10109200274149
10109200274173
10109200274194
10109200274294
10109200274315
10109200274370
10109200274391
10109200274414
10109200274782
10109200274828
10109200274943
10109200274982
10109200275023
10109200275112
10109200275274
10109200275646
10109200277563
10109200277608
10109200305571
10109200305596
10109200305646
10109200305800
10109200311461
10109200322269
10109200341927
10109200342216
10109200343719
10109200349710
10109200349732
10109200349871
10109200350531
10109200350532
10109200351315
10109200351452
10109200351515
10109200352179
10109200352619
10109200352620
10109200352641
10109200352664
10109200352754
10109200352777
10109200352800
10109200352820
10109200352962
10109200353051
10109200353074
10109200353097
10109200353138
10109200353388
10109200353411
10109200353434
10109200353475
10109200353515
10109200353620
10109200353665
10109200353704
10109200353770
10109200353993
10109200354446
10109200354467
10109200355570
10109200355881
10109200355936
10109200356137
10109200356294
10109200356509
10109200356749
10109200356796
10109200360964
10109200360985
10109200361007
10109200361072
10109200361170
10109200361246
10109200361314
10109200361336
10109200361381
10109200361624
10109200361814
10109200361853
10109200361873
10109200362000
10109200362021
10109200362045
10109200362164
10109200362203
10109200362291
10109200362313
10109200363418
10109200363514
10109200363554
10109200363577
10109200363599
10109200363942
import numpy as np
from csst_dag import CSST_DAG_LIST, Redis
from csst_dag.constants import MSC_MBI_CHIPID
# set BATCH_ID and DATASET
BATCH_ID = "msc-v093-rdx-zjlab-v1"
DATASET = "msc-v093"
# get OBSID
with open("batch/msc-v093/C9_W1_Phot.obsid", "r") as f:
OBSID_LIST = [_.strip() for _ in f.readlines()]
OBSID_LIST_CORRECTED = [_[:3] + "093" + _[6:] for _ in OBSID_LIST]
# get DAG
dag = CSST_DAG_LIST.get("csst-msc-l1-mbi")
# get redis
r = Redis(location="zjlab")
# generate messages
message_list = []
for this_obsid in OBSID_LIST_CORRECTED:
for this_chipid in MSC_MBI_CHIPID:
this_message = dag.gen_message(
batch_id=BATCH_ID,
dataset=DATASET,
obsid=this_obsid,
chipid=this_chipid,
)
message_list.append(this_message)
# push messages to redis
for msg in message_list:
r.push(msg)
......@@ -2,6 +2,9 @@
COSMOS
"""
# cd /nfsdata/share/simulation-collection/csst-dag
# http://159.226.170.52:3000/scalebox/TaskList?app=csst-msc-l1-mbi.apps2
from csst_dag import get_redis, gen_level1_dag_message
from csst_dag.msc import MBI_CHIPID, SLS_CHIPID
import numpy as np
......@@ -16,4 +19,3 @@ for obsid in obsid_list:
break
r.get_all()
# cd /nfsdata/share/simulation-collection/csst-dag
from .dag import gen_dag_run_id, gen_level1_dag_message
from .redis import Redis, get_redis
# from .dag import gen_dag_run_id, gen_level1_dag_message, CSST_DAG_LIST
from .redis import Redis
from .dag import CSST_DAG_LIST # all DAGs
MSC_MBI_CHIPID = [
"06",
"07",
"08",
"09",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"19",
"20",
"22",
"23",
"24",
"25",
]
import os
import json
import yaml
import glob
import toml
from typing import Optional
from .message import gen_dag_run_id
RUN_ID_DIGITS = 15
DAG_RULE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_rules")
DAG_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_templates")
# DAG_RULE_DIRECTORY = "/Users/cham/CsstProjects/csst-dag/csst_dag/dag_rules"
# DAG_TEMPLATE_DIRECTORY = "/Users/cham/CsstProjects/csst-dag/csst_dag/dag_templates"
def match_string_headers(s: str, headers: set[str]) -> bool:
status = False
for header in headers:
if s.startswith(header):
status = True
return status
# define CsstDAG and CsstDAGList
class CsstDAG:
def __init__(self, name="", rules: Optional[dict] = None):
self.name = name
self.rules = rules
self.keys = set(rules.keys())
# load message template
dag_def_path = os.path.join(DAG_TEMPLATE_DIRECTORY, f"{name}.yml")
dag_msg_path = os.path.join(DAG_TEMPLATE_DIRECTORY, f"{name}.json")
if os.path.exists(dag_def_path):
with open(dag_def_path, "r") as f:
self.definition = yaml.safe_load(f)
else:
raise FileNotFoundError(f"{dag_def_path} not found")
if os.path.exists(dag_msg_path):
with open(dag_msg_path, "r") as f:
self.message_template = json.load(f)
else:
raise FileNotFoundError(f"{dag_msg_path} not found")
def match(self, **kwargs: dict) -> bool:
# check if all required keys are present
if set(kwargs.keys()) != self.keys:
return False
# check if all values are valid
for k, v in kwargs.items():
# check if v is in self.rules[k]
if not match_string_headers(v, self.rules[k]):
return False
# all checks passed
return True
def __repr__(self):
return f"CsstDAG({self.name})"
def pprint(self):
print(f"Name: {self.name}")
print(f"Keys: {self.keys}")
print(f"Definition: {self.definition}")
print(f"Message template: {self.message_template}")
def gen_message(
self,
batch_id: str = "msc-v093-rdx-naoc-v1",
dataset: str = "msc-v093",
**kwargs, # required keywords for DAG
) -> str:
"""Generate DAG message"""
if not self.match(**kwargs):
raise ValueError(f"Cannot generate DAG message for {self.name}")
dag_id = self.name
this_dag_run_id = gen_dag_run_id(RUN_ID_DIGITS)
this_message = dict(
dag_id=dag_id,
dag_run_id=this_dag_run_id,
batch_id=batch_id,
message=dict(
dataset=dataset,
batch_id=batch_id,
**kwargs,
),
)
message_string = json.dumps(this_message, ensure_ascii=False, indent=None)
return message_string
class CsstDAGList(list):
def __init__(self, *args):
super().__init__(*args)
def get(self, name: str) -> CsstDAG:
"""Get DAG by name"""
for dag in self:
if dag.name == name:
return dag
raise ValueError(f"Cannot find DAG: {name}")
def match(self, **kwargs: dict) -> list:
"""Match DAGs by kwargs"""
matched_dag_list = []
for dag in self:
if dag.match(**kwargs):
matched_dag_list.append(dag.name)
return matched_dag_list
def match_dag(self, **kwargs: dict) -> list:
"""Match DAGs by kwargs"""
matched_dag_list = []
for dag in self:
if dag.match(**kwargs):
matched_dag_list.append(dag)
return matched_dag_list
# load all DAG templates
DAG_RULES = glob.glob(os.path.join(DAG_RULE_DIRECTORY, "*.toml"))
CSST_DAG_LIST = CsstDAGList()
print(f"DAG_TRIGGER_DIRECTORY: {DAG_RULE_DIRECTORY}")
print(f"DAG_TEMPLATE_DIRECTORY: {DAG_TEMPLATE_DIRECTORY}")
for dag_file in DAG_RULES:
with open(os.path.join(DAG_RULE_DIRECTORY, dag_file), "r") as f:
dags = toml.load(f)
for name, dag in dags.items():
print(f" - Add DAG: name={name}, required_keys={tuple(dag.keys())}")
CSST_DAG_LIST.append(CsstDAG(name, dag))
import json
from astropy import time
import numpy as np
import string
import os
from ..msc import MBI_CHIPID, SLS_CHIPID
RUN_ID_DIGITS = 10
DAG_TEMPLATE_DIRECTORY = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "dags"
)
def gen_dag_run_id(digits=10):
"""
Generate a unique run_id for a dag.
"""
now = time.Time.now()
dag_run_id = now.iso[:10].replace("-", "")
n = len(string.ascii_lowercase)
for i in range(digits):
dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
return dag_run_id
def gen_mbi_level1_dag_message():
pass
def gen_level1_dag_message(
# dag_id: str = "csst-msc-l1-mbi",
obsid: str = "11009101682009",
**kwargs,
):
if obsid.startswith("1"):
# MSC: + chipid
assert "chipid" in kwargs.keys(), "chipid is required for MSC obsid"
assert len(obsid) in (11, 14)
if obsid[1] not in "01":
return None
chipid = kwargs.get("chipid")
if chipid in MBI_CHIPID:
dag_id = "csst-msc-l1-mbi"
run_id = gen_dag_run_id(digits=RUN_ID_DIGITS)
with open(os.path.join(DAG_TEMPLATE_DIRECTORY, dag_id + ".json"), "r") as f:
message = json.load(f)
# set message values
message["dag_id"] = "csst-msc-l1-mbi"
message["dag_run_id"] = run_id
message["message"]["obsid"] = obsid
message["message"]["chipid"] = chipid
elif chipid in SLS_CHIPID:
dag_id = "csst-msc-l1-sls"
run_id = gen_dag_run_id(digits=RUN_ID_DIGITS)
with open(os.path.join(DAG_TEMPLATE_DIRECTORY, dag_id + ".json"), "r") as f:
message = json.load(f)
# set message values
message["dag_id"] = "csst-msc-l1-sls"
message["dag_run_id"] = run_id
message["message"]["obsid"] = obsid
message["message"]["chipid"] = chipid
else:
raise ValueError(f"Invalid chipid: {chipid}")
elif obsid.startswith("2"):
# MCI: + ?
pass
elif obsid.startswith("3"):
# IFS: + ?
pass
elif obsid.startswith("4"):
# CPIC: + ?
pass
elif obsid.startswith("5"):
# HSTDM: + ?
pass
else:
raise ValueError(f"Unknown obsid: {obsid}")
if kwargs.get("print_query_link", False):
# print(f"http://localhost:3000/scalebox/run/level0?id={data['dag_run_id']}")
pass
if kwargs.get("return_dict", False):
return message
else:
message_string = json.dumps(message, ensure_ascii=False, indent=None)
return message_string
import json
from astropy import time
import numpy as np
import string
import os
def gen_dag_run_id(digits=10):
"""
Generate a unique run_id for a dag.
"""
now = time.Time.now()
dag_run_id = now.iso[:10].replace("-", "")
n = len(string.ascii_lowercase)
for i in range(digits):
dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
return dag_run_id
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment