Commit b4bd79b5 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

major update: use triggers instead of rules

parent 618219c3
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="py.test" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="7">
<item index="0" class="java.lang.String" itemvalue="pyyaml" />
<item index="1" class="java.lang.String" itemvalue="csst_msc_common" />
<item index="2" class="java.lang.String" itemvalue="scipy" />
<item index="3" class="java.lang.String" itemvalue="astropy" />
<item index="4" class="java.lang.String" itemvalue="numpy" />
<item index="5" class="java.lang.String" itemvalue="psutil" />
<item index="6" class="java.lang.String" itemvalue="joblib" />
</list>
</value>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="/opt/homebrew/anaconda3 (2)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/csst-dag.iml" filepath="$PROJECT_DIR$/.idea/csst-dag.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>
\ No newline at end of file
...@@ -43,10 +43,9 @@ setuptools.setup( ...@@ -43,10 +43,9 @@ setuptools.setup(
include_package_data=True, # 设置包含随包数据 include_package_data=True, # 设置包含随包数据
package_data={ # 具体随包数据路径 package_data={ # 具体随包数据路径
"csst_dag": [ "csst_dag": [
"dag_rules/*",
"dag_templates/*",
"constants/*", "constants/*",
"redis_config.toml", "dag/*",
"config/*",
], ],
}, },
# 请注意检查,防止临时文件或其他不必要的文件被提交到仓库,否则会一同安装 # 请注意检查,防止临时文件或其他不必要的文件被提交到仓库,否则会一同安装
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# from .dag import gen_dag_run_id, gen_level1_dag_message, CSST_DAG_LIST from .dfs import DFS
from .redis import Redis
from .dag import CSST_DAG_LIST # all DAGs
from .utils import dump_message_list from .utils import dump_message_list
from .constants import MSC_MBI_CHIPID from .dag import DAG_LIST
print("Available DAG_LIST: ")
for _ in DAG_LIST:
print(f" - {_}")
import glob
import os import os
import string
import json import json
import yaml import numpy as np
import glob from astropy import time
import toml
from typing import Optional DAG_RUN_ID_DIGITS = 6
from .message import gen_dag_run_id
DAG_MESSAGE_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag")
RUN_ID_DIGITS = 15 DAG_YAML_LIST = glob.glob(DAG_MESSAGE_TEMPLATE_DIRECTORY + "/*.yml")
DAG_LIST = [os.path.splitext(os.path.basename(_))[0] for _ in DAG_YAML_LIST]
DAG_RULE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_rules")
DAG_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_templates") # print(DAG_LIST)
# DAG_RULE_DIRECTORY = "/Users/cham/CsstProjects/csst-dag/csst_dag/dag_rules" # [
# DAG_TEMPLATE_DIRECTORY = "/Users/cham/CsstProjects/csst-dag/csst_dag/dag_templates" # "csst-msc-l1-ooc-bias",
# "csst-msc-l1-ooc-flat",
# "csst-msc-l1-ooc-dark",
def match_string_headers(s: str, headers: set[str]) -> bool: # "csst-msc-l1-ast",
status = False # "csst-msc-l1-qc0",
for header in headers: # "csst-hstdm-l1",
if s.startswith(header): # "csst-msc-l1-sls",
status = True # "csst-msc-l1-mbi",
return status # ]
# define CsstDAG and CsstDAGList def gen_dag_run_id(digits=6):
class CsstDAG: """
def __init__(self, name="", rules: Optional[dict] = None): Generate a unique run_id for a dag.
self.name = name """
self.rules = rules now = time.Time.now()
self.keys = set(rules.keys()) dag_run_id = now.strftime("%Y%m%d-%H%M%S-")
# load message template n = len(string.ascii_lowercase)
dag_def_path = os.path.join(DAG_TEMPLATE_DIRECTORY, f"{name}.yml") for i in range(digits):
dag_msg_path = os.path.join(DAG_TEMPLATE_DIRECTORY, f"{name}.json") dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
if os.path.exists(dag_def_path): return dag_run_id
with open(dag_def_path, "r") as f:
self.definition = yaml.safe_load(f)
else: def get_dag_message_template(dag_id):
raise FileNotFoundError(f"{dag_def_path} not found") """
if os.path.exists(dag_msg_path): Get the dag message template for a given dag_id.
with open(dag_msg_path, "r") as f: """
self.message_template = json.load(f) if dag_id not in DAG_LIST:
else: raise ValueError(f"Unknown dag_id: {dag_id}")
raise FileNotFoundError(f"{dag_msg_path} not found") with open(os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"{dag_id}.json"), "r") as f:
template = json.load(f)
def match(self, **kwargs: dict) -> bool: return template
# check if all required keys are present
if set(kwargs.keys()) != self.keys:
return False def gen_msg(dag_id, **kwargs):
# check if all values are valid """
for k, v in kwargs.items(): Generate a message
# check if v is in self.rules[k] """
if not match_string_headers(v, self.rules[k]): this_dag_run_id = gen_dag_run_id(DAG_RUN_ID_DIGITS)
return False msg = get_dag_message_template(dag_id)
# all checks passed
return True for k, v in kwargs.items():
assert msg.get(k, None) is not None, f"Unknown key: {k}"
def __repr__(self): msg[k] = v
return f"CsstDAG({self.name})" return msg
def pprint(self):
print(f"Name: {self.name}")
print(f"Keys: {self.keys}")
print(f"Definition: {self.definition}")
print(f"Message template: {self.message_template}")
def gen_message(
self,
batch_id: str = "msc-v093-rdx-naoc-v1",
dataset: str = "msc-v093",
**kwargs, # required keywords for DAG
) -> str:
"""Generate DAG message"""
if not self.match(**kwargs):
raise ValueError(f"Cannot generate DAG message for {self.name}")
dag_id = self.name
this_dag_run_id = gen_dag_run_id(RUN_ID_DIGITS)
this_message = dict(
dag_id=dag_id,
dag_run_id=this_dag_run_id,
batch_id=batch_id,
message=dict(
dataset=dataset,
batch_id=batch_id,
**kwargs,
),
)
message_string = json.dumps(this_message, ensure_ascii=False, indent=None)
return message_string
class CsstDAGList(list):
def __init__(self, *args):
super().__init__(*args)
def get(self, name: str) -> CsstDAG:
"""Get DAG by name"""
for dag in self:
if dag.name == name:
return dag
raise ValueError(f"Cannot find DAG: {name}")
def match(self, **kwargs: dict) -> list:
"""Match DAGs by kwargs"""
matched_dag_list = []
for dag in self:
if dag.match(**kwargs):
matched_dag_list.append(dag.name)
return matched_dag_list
def match_dag(self, **kwargs: dict) -> list:
"""Match DAGs by kwargs"""
matched_dag_list = []
for dag in self:
if dag.match(**kwargs):
matched_dag_list.append(dag)
return matched_dag_list
# load all DAG templates
DAG_RULES = glob.glob(os.path.join(DAG_RULE_DIRECTORY, "*.toml"))
CSST_DAG_LIST = CsstDAGList()
print(f"DAG_TRIGGER_DIRECTORY: {DAG_RULE_DIRECTORY}")
print(f"DAG_TEMPLATE_DIRECTORY: {DAG_TEMPLATE_DIRECTORY}")
for dag_file in DAG_RULES:
with open(os.path.join(DAG_RULE_DIRECTORY, dag_file), "r") as f:
dags = toml.load(f)
for name, dag in dags.items():
print(f" - Add DAG: name={name}, required_keys={tuple(dag.keys())}")
CSST_DAG_LIST.append(CsstDAG(name, dag))
{ {
"priority": 1,
"dag_id": "csst-msc-l1-mbi", "dag_id": "csst-msc-l1-mbi",
"dag_run_id": "12345", "dag_run_id": "12345",
"dataset": "csst-msc-c9-25sqdeg-v3",
"batch_id": "msc-093-reduction-v2", "batch_id": "msc-093-reduction-v2",
"message": { "obsid": "10109300543790",
"obsid": "10109300543790", "chipid": "09",
"chipid": "09",
"dataset": "msc-093",
"batch_id": "msc-093-reduction-v2"
}
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment