from abc import ABC, abstractmethod from ._dag_list import DAG_LIST import json import yaml """ - BaseTrigger - AutomaticTrigger - ManualTrigger - with Parameters - without Parameters """ class BaseDAG(ABC): def __init__(self, dag_id: str): self.dag_id = dag_id assert dag_id in DAG_LIST, f"{dag_id} not in DAG_LIST" with open(f"{dag_id}.yml", "r") as f: self.dag = yaml.safe_load(f) assert ( self.dag["dag_id"] == self.dag_id ), f"{dag_id} not consistent with definition in .yml file." with open(f"{dag_id}.json", "r") as f: self.task_template = json.load(f) self.task_params = set(self.task_template.keys()) @abstractmethod def trigger(self, **kwargs) -> None: pass @abstractmethod def schedule(self, **kwargs) -> None: pass @abstractmethod def push(self) -> None: pass # def __call__(self, **kwargs): # self.trigger(**kwargs)