_base_dag.py 2.08 KB
Newer Older
BO ZHANG's avatar
tweaks  
BO ZHANG committed
1
2
from abc import ABC, abstractmethod
from ._dag_list import DAG_LIST
BO ZHANG's avatar
BO ZHANG committed
3
from ..dfs import dfs
BO ZHANG's avatar
tweaks  
BO ZHANG committed
4
import yaml
BO ZHANG's avatar
tweaks    
BO ZHANG committed
5
6
7
8
9
10
11
12
13
14
15
16
import os
import glob
import string
import json
import numpy as np
from astropy import time

DAG_RUN_ID_DIGITS = 6
DAG_CONFIG_DIR = os.path.join(
    os.path.dirname(os.path.dirname(__file__)),
    "dag_config",
)
BO ZHANG's avatar
tweaks  
BO ZHANG committed
17
18
19
20
21
22
23
24
25
26

"""
- BaseTrigger
  - AutomaticTrigger
  - ManualTrigger
    - with Parameters
    - without Parameters
"""


BO ZHANG's avatar
BO ZHANG committed
27
class BaseDAG:
BO ZHANG's avatar
tweaks  
BO ZHANG committed
28
29
30
31
    def __init__(self, dag_id: str):
        self.dag_id = dag_id
        assert dag_id in DAG_LIST, f"{dag_id} not in DAG_LIST"

BO ZHANG's avatar
tweaks    
BO ZHANG committed
32
33
34
35
        yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag_id}.yml")
        json_path = os.path.join(DAG_CONFIG_DIR, f"{dag_id}.json")
        with open(yml_path, "r") as f:
            self.dag = yaml.safe_load(f)[0]
BO ZHANG's avatar
tweaks  
BO ZHANG committed
36
37
        assert (
            self.dag["dag_id"] == self.dag_id
BO ZHANG's avatar
tweaks    
BO ZHANG committed
38
39
        ), f"{self.dag}"  # , f"{dag_id} not consistent with definition in .yml file."
        with open(json_path, "r") as f:
BO ZHANG's avatar
tweaks    
BO ZHANG committed
40
41
            self.msg_template = json.load(f)
        self.msg_keys = set(self.msg_template.keys())
BO ZHANG's avatar
BO ZHANG committed
42
        self.dfs = dfs
BO ZHANG's avatar
tweaks    
BO ZHANG committed
43
44

    def gen_msg(self, **kwargs):
BO ZHANG's avatar
BO ZHANG committed
45
        """Load message template and generate message dictionary."""
BO ZHANG's avatar
tweaks    
BO ZHANG committed
46
47
48
49
50
        msg = self.msg_template.copy()
        for k, v in kwargs.items():
            assert k in self.msg_keys, f"{k} not in {self.msg_keys}"
            msg[k] = v
        return msg
BO ZHANG's avatar
tweaks  
BO ZHANG committed
51

BO ZHANG's avatar
tweaks    
BO ZHANG committed
52
53
54
55
56
    # @abstractmethod
    # def trigger(self, **kwargs) -> None:
    #     pass
    #
    def schedule(self, **kwargs):
BO ZHANG's avatar
BO ZHANG committed
57
        raise NotImplementedError("Not implemented yet")
BO ZHANG's avatar
tweaks  
BO ZHANG committed
58

BO ZHANG's avatar
tweaks    
BO ZHANG committed
59
60
61
62
63
64
65
    @staticmethod
    def gen_dag_run_id(digits=6):
        """
        Generate a unique run_id for a dag.
        """
        now = time.Time.now()
        dag_run_id = now.strftime("%Y%m%d-%H%M%S-")
BO ZHANG's avatar
tweaks  
BO ZHANG committed
66

BO ZHANG's avatar
tweaks    
BO ZHANG committed
67
68
69
70
71
        n = len(string.ascii_lowercase)
        for i in range(digits):
            dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
        return dag_run_id

BO ZHANG's avatar
tweaks    
BO ZHANG committed
72
    @abstractmethod
BO ZHANG's avatar
tweaks    
BO ZHANG committed
73
74
75
    def push(self, msg_dict: dict) -> None:
        msg_str = json.dumps(msg_dict, ensure_ascii=False, indent=None)
        return self.dfs.redis.push(msg_str)