Commit be6f7bc2 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

tweaks

parent 4908426d
from .dfs import DFS
from .utils import dump_message_list
from .dag import DAG_LIST
# print("Available DAG_LIST: ")
# for _ in DAG_LIST:
# print(f" - {_}")
from .dag import CsstDAG, dags
from ._base_dag import BaseDAG
from ._dag_list import DAG_LIST
from .msc import CsstMscL1Mbi
DAG_MAP = {
"csst-msc-l1-mbi": CsstMscL1Mbi(),
# "csst-msc-l1-qc0": CsstMscL1Qc0(),
}
class CsstDAG:
def __init__(self):
pass
@staticmethod
def ls():
print(DAG_MAP.keys())
@staticmethod
def get_dag(dag_id: str = ""):
assert dag_id in DAG_LIST, f"{dag_id} not in DAG_LIST"
return DAG_MAP[dag_id]
def trigger(
self,
dag_id: str = "",
**kwargs,
):
pass
def schedule(
self,
dag_id: str = "",
**kwargs,
):
pass
def push(
self,
dag_id: str = "",
**kwargs,
):
pass
dags = CsstDAG()
from abc import ABC, abstractmethod
from ._dag_list import DAG_LIST
import json
import yaml
"""
- BaseTrigger
- AutomaticTrigger
- ManualTrigger
- with Parameters
- without Parameters
"""
class BaseDAG(ABC):
def __init__(self, dag_id: str):
self.dag_id = dag_id
assert dag_id in DAG_LIST, f"{dag_id} not in DAG_LIST"
with open(f"{dag_id}.yml", "r") as f:
self.dag = yaml.safe_load(f)
assert (
self.dag["dag_id"] == self.dag_id
), f"{dag_id} not consistent with definition in .yml file."
with open(f"{dag_id}.json", "r") as f:
self.task_template = json.load(f)
self.task_params = set(self.task_template.keys())
@abstractmethod
def trigger(self, **kwargs) -> None:
pass
@abstractmethod
def schedule(self, **kwargs) -> None:
pass
@abstractmethod
def push(self) -> None:
pass
# def __call__(self, **kwargs):
# self.trigger(**kwargs)
import os
import glob
DAG_LIST = [
"csst-msc-l1-mbi",
"csst-msc-l1-qc0",
]
EXISTENCE_MAPPING = {True: "✅", False: "❌"}
print(" DAG \t definition task")
print("--------------------------------------------")
for this_dag_id in DAG_LIST:
this_dag_definition_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"dag_config",
f"{this_dag_id}.yml",
)
this_dag_task_template_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"dag_config",
f"{this_dag_id}.json",
)
status_def = EXISTENCE_MAPPING[os.path.exists(this_dag_definition_file)]
status_task = EXISTENCE_MAPPING[os.path.exists(this_dag_task_template_file)]
print(f"{this_dag_id} \t {status_def} {status_task}")
from csst_dfs_client import plan, level0
__all__ = ["CsstMscL1Mbi"]
class CsstMscL1Mbi:
def __init__(self):
pass
@staticmethod
def schedule(
dataset: str = "csst-msc-c9-25sqdeg-v3",
obs_type: str = "WIDE",
project_id="none",
batch_id: str | None = "default",
**kwargs,
):
pass
#
# dataset: str = "csst-msc-c9-25sqdeg-v3"
# obs_type: str = "WIDE"
# project_id="none"
# batch_id: str | None = "default"
#
# plan.find(instrument="MSC")
{
{
"dag_id": "csst-hstdm-l1",
"dag_run_id": "12345",
"batch_id": "hstdm-001-reduction-v2",
......
- dag_id: diamond
- dag_id: csst-msc-l1-mbi
dag:
tasks:
- name: QC0
......
import redis
import toml
import os
from csst_dfs_client import plan, level0
from astropy.table import Table
import socket
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment