from abc import ABC, abstractmethod from ._dag_list import DAG_LIST import yaml import os import glob import string import json import numpy as np from astropy import time DAG_RUN_ID_DIGITS = 6 DAG_CONFIG_DIR = os.path.join( os.path.dirname(os.path.dirname(__file__)), "dag_config", ) """ - BaseTrigger - AutomaticTrigger - ManualTrigger - with Parameters - without Parameters """ class BaseDAG(ABC): def __init__(self, dag_id: str): self.dag_id = dag_id assert dag_id in DAG_LIST, f"{dag_id} not in DAG_LIST" with open(f"{dag_id}.yml", "r") as f: self.dag = yaml.safe_load(f) assert ( self.dag["dag_id"] == self.dag_id ), f"{dag_id} not consistent with definition in .yml file." with open(f"{dag_id}.json", "r") as f: self.msg_template = json.load(f) self.msg_keys = set(self.msg_template.keys()) # @abstractmethod # def trigger(self, **kwargs) -> None: # pass # def schedule(self, **kwargs): pass @staticmethod def gen_dag_run_id(digits=6): """ Generate a unique run_id for a dag. """ now = time.Time.now() dag_run_id = now.strftime("%Y%m%d-%H%M%S-") n = len(string.ascii_lowercase) for i in range(digits): dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)] return dag_run_id # # @abstractmethod # def push(self) -> None: # pass # def __call__(self, **kwargs): # self.trigger(**kwargs)