Commit 5aeb8fc5 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

update level1 dag trigger

parent 53e2a631
Pipeline #7550 failed with stages
in 2 minutes
from .dag import gen_dag_run_id, gen_level1_dag_message
from .redis import Redis, get_redis
import json
from astropy import time
import numpy as np
import string
import os
from ..msc import MBI_CHIPID, SLS_CHIPID
RUN_ID_DIGITS = 10
DAG_TEMPLATE_DIRECTORY = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "dags"
)
def gen_dag_run_id(digits=10):
"""
Generate a unique run_id for a dag.
"""
now = time.Time.now()
dag_run_id = now.iso[:10].replace("-", "")
n = len(string.ascii_lowercase)
for i in range(digits):
dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
return dag_run_id
def gen_mbi_level1_dag_message():
pass
def gen_level1_dag_message(
# dag_id: str = "csst-msc-l1-mbi",
obsid: str = "11009101682009",
**kwargs,
):
if obsid.startswith("1"):
# MSC: + chipid
assert "chipid" in kwargs.keys(), "chipid is required for MSC obsid"
assert len(obsid) in (11, 14)
if obsid[1] not in "01":
return None
chipid = kwargs.get("chipid")
if chipid in MBI_CHIPID:
dag_id = "csst-msc-l1-mbi"
run_id = gen_dag_run_id(digits=RUN_ID_DIGITS)
with open(os.path.join(DAG_TEMPLATE_DIRECTORY, dag_id + ".json"), "r") as f:
message = json.load(f)
# set message values
message["dag_id"] = "csst-msc-l1-mbi"
message["dag_run_id"] = run_id
message["message"]["obsid"] = obsid
message["message"]["chipid"] = chipid
elif chipid in SLS_CHIPID:
dag_id = "csst-msc-l1-sls"
run_id = gen_dag_run_id(digits=RUN_ID_DIGITS)
with open(os.path.join(DAG_TEMPLATE_DIRECTORY, dag_id + ".json"), "r") as f:
message = json.load(f)
# set message values
message["dag_id"] = "csst-msc-l1-sls"
message["dag_run_id"] = run_id
message["message"]["obsid"] = obsid
message["message"]["chipid"] = chipid
else:
raise ValueError(f"Invalid chipid: {chipid}")
elif obsid.startswith("2"):
# MCI: + ?
pass
elif obsid.startswith("3"):
# IFS: + ?
pass
elif obsid.startswith("4"):
# CPIC: + ?
pass
elif obsid.startswith("5"):
# HSTDM: + ?
pass
else:
raise ValueError(f"Unknown obsid: {obsid}")
if kwargs.get("print_query_link", False):
# print(f"http://localhost:3000/scalebox/run/level0?id={data['dag_run_id']}")
pass
if kwargs.get("return_dict", False):
return message
else:
message_string = json.dumps(message, ensure_ascii=False, indent=None)
return message_string
SLS_CHIPID = [
"01",
"02",
"03",
"04",
"05",
"10",
"21",
"26",
"27",
"28",
"29",
"30",
]
MBI_CHIPID = [
"06",
"07",
"08",
"09",
"11",
"12",
"13",
"14",
"15",
"16",
"17",
"18",
"19",
"20",
"22",
"23",
"24",
"25",
]
import redis
import toml
import os
redis_config_path = os.path.join(os.path.dirname(__file__), "redis_config.toml")
REDIS_CONFIG = toml.load(redis_config_path)
REDIS_QNAME = "csst_data_list"
class Redis(redis.Redis):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name = REDIS_QNAME
def push(self, msg):
self.lpush(self.name, msg)
def pop(self):
return self.rpop(self.name)
def get_all(self):
return self.lrange(self.name, 0, -1)
def get_redis(location="naoc"):
print("Available redis locations: ", list(REDIS_CONFIG.keys()))
if location not in REDIS_CONFIG.keys():
raise ValueError(f"Unknown redis location: {location}")
return Redis(**REDIS_CONFIG[location])
# msgs = r.lrange(name, 0, -1)
#
# for chipid in range(6, 26):
# this_msg = gen_msg(
# dag_id="csst-msc-l1-mbi", obsid="11009101682009", chipid=f"{chipid:02d}"
# )
# print(this_msg)
# r.lpush(name, this_msg)
#
# msgs = r.lrange(name, 0, -1)
# print(msgs)
#
#
# msgs_later = r.lrange(name, 0, -1)
# print(msgs_later)
[naoc]
host="10.3.10.28"
port=26379
db=0
password="csst__2025"
astropy
numpy
redis
toml
\ No newline at end of file
import setuptools
# 读取README.md作为长描述
with open("README.md", "r") as f:
long_description = f.read()
# 读取依赖列表requirements.txt
# 忽略#开头或者版本号不明确指定的条目
with open("requirements.txt", "r") as f:
requirements = [
req.strip()
for req in f.readlines()
if not req.startswith("#") and req.__contains__("==")
]
# 配置、安装
setuptools.setup(
name="csst_dag", # 包名
version="0.0.1", # 版本号
author="Bo Zhang", # 作者
author_email="bozhang@nao.cas.cn", # 邮箱
description="CSST DAG", # 短描述
long_description=long_description, # 长描述
long_description_content_type="text/markdown", # 长描述类型
url="https://csst-tb.bao.ac.cn/code/csst-cicd/csst-dag", # 主页
packages=setuptools.find_packages(
where="."
), # 用setuptools工具自动发现带有__init__.py的包
license="MIT", # 证书类型
classifiers=[ # 程序分类, 参考 https://pypi.org/classifiers/
# How mature is this project?
# 3 - Alpha
# 4 - Beta
# 5 - Production/Stable
"Development Status :: 3 - Alpha",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering :: Physics",
"Topic :: Scientific/Engineering :: Astronomy",
],
include_package_data=True, # 设置包含随包数据
package_data={ # 具体随包数据路径
"csst_dag": ["dags/*"],
},
# 请注意检查,防止临时文件或其他不必要的文件被提交到仓库,否则会一同安装
python_requires=">=3.11", # Python版本要求
install_requires=requirements,
)
import pytest
from csst_dag import gen_level1_dag_message
def test_gen_level1_dag_message_msc_sls():
msg = gen_level1_dag_message(obsid="11009101682009", chipid="01", return_dict=True)
assert msg["dag_id"] == "csst-msc-l1-sls"
def test_gen_level1_dag_message_msc_mbi():
msg = gen_level1_dag_message(obsid="11009101682009", chipid="09", return_dict=True)
assert msg["dag_id"] == "csst-msc-l1-mbi"
def test_gen_level1_dag_message_msc_invalid_chipid():
with pytest.raises(ValueError):
msg = gen_level1_dag_message(
obsid="11009101682009", chipid="00", return_dict=True
)
def test_gen_level1_dag_message_msc_none():
msg = gen_level1_dag_message(obsid="12009101682009", chipid="00", return_dict=True)
assert msg is None
from csst_dag import get_redis
def test_redis_naoc():
r = get_redis("naoc")
print(r)
assert r.name == "csst_data_list"
assert len(r.get_all()) == 0
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment