from abc import ABC, abstractmethod from ._dag_list import DAG_LIST from ..dfs import DFS import yaml import os import glob import string import json import numpy as np from astropy import time DAG_RUN_ID_DIGITS = 6 DAG_CONFIG_DIR = os.path.join( os.path.dirname(os.path.dirname(__file__)), "dag_config", ) """ - BaseTrigger - AutomaticTrigger - ManualTrigger - with Parameters - without Parameters """ class BaseDAG: def __init__(self, dag_id: str): self.dag_id = dag_id assert dag_id in DAG_LIST, f"{dag_id} not in DAG_LIST" yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag_id}.yml") json_path = os.path.join(DAG_CONFIG_DIR, f"{dag_id}.json") with open(yml_path, "r") as f: self.dag = yaml.safe_load(f)[0] assert ( self.dag["dag_id"] == self.dag_id ), f"{self.dag}" # , f"{dag_id} not consistent with definition in .yml file." with open(json_path, "r") as f: self.msg_template = json.load(f) self.msg_keys = set(self.msg_template.keys()) self.dfs = DFS(location=os.getenv("DFS_LOCATION")) def gen_msg(self, **kwargs): msg = self.msg_template.copy() for k, v in kwargs.items(): assert k in self.msg_keys, f"{k} not in {self.msg_keys}" msg[k] = v return msg # @abstractmethod # def trigger(self, **kwargs) -> None: # pass # def schedule(self, **kwargs): pass @staticmethod def gen_dag_run_id(digits=6): """ Generate a unique run_id for a dag. """ now = time.Time.now() dag_run_id = now.strftime("%Y%m%d-%H%M%S-") n = len(string.ascii_lowercase) for i in range(digits): dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)] return dag_run_id @abstractmethod def push(self, msg_dict: dict) -> None: msg_str = json.dumps(msg_dict, ensure_ascii=False, indent=None) return self.dfs.redis.push(msg_str)