_base_dag.py 5.61 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
""" """

BO ZHANG's avatar
tweaks    
BO ZHANG committed
3
import json
BO ZHANG's avatar
BO ZHANG committed
4
5
6
7
8
9
10
11
12
import os
from abc import abstractmethod

import yaml
from typing import Any

from ._dag_list import DAG_LIST
from .._dfs import DFS, dfs
from ..hash import generate_sha1_from_time
BO ZHANG's avatar
tweaks    
BO ZHANG committed
13
14
15
16
17

DAG_CONFIG_DIR = os.path.join(
    os.path.dirname(os.path.dirname(__file__)),
    "dag_config",
)
BO ZHANG's avatar
tweaks  
BO ZHANG committed
18
19
20
21
22
23
24
25
26
27

"""
- BaseTrigger
  - AutomaticTrigger
  - ManualTrigger
    - with Parameters
    - without Parameters
"""


BO ZHANG's avatar
BO ZHANG committed
28
class BaseDAG:
BO ZHANG's avatar
BO ZHANG committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    """Base class for all Directed Acyclic Graph (DAG) implementations.

    This class provides core functionality for DAG configuration, message generation,
    and execution management within the CSST data processing system.

    Attributes
    ----------
    dag : str
        Name of the DAG, must exist in DAG_MAP
    dag_cfg : dict
        Configuration loaded from YAML file
    dag_run_template : dict
        Message template structure loaded from JSON file
    dag_run_keys : set
        Set of all valid message keys from the template
    dfs : DFS
        Data Flow System instance for execution

    Raises
    ------
    AssertionError
        If DAG name is not in DAG_MAP or config name mismatch
    """

    INSTRUMENT_ENUM = ("MSC", "MCI", "IFS", "CPIC", "HSTDM")

    def __init__(self, dag_group: str, dag: str):
        """Initialize a DAG instance with configuration loading.

        Parameters
        ----------
        dag_group : str
            Name of the DAG group.
        dag : str
            Name of the DAG.

        Raises
        ------
        AssertionError
            If DAG name is invalid or config files are inconsistent
        """
        # Set DAG name
        self.dag_group = dag_group
        self.dag = dag
        assert dag in DAG_LIST, f"{dag} not in DAG_MAP"
        # determine instrument
BO ZHANG's avatar
tweaks    
BO ZHANG committed
75
        self.instrument = dag.split("-")[1].upper()  # e.g., "MSC"
BO ZHANG's avatar
tweaks    
BO ZHANG committed
76
        assert self.instrument in self.INSTRUMENT_ENUM, self.instrument
BO ZHANG's avatar
BO ZHANG committed
77
78
79
80

        # Load yaml and json config
        yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.yml")
        json_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.json")
BO ZHANG's avatar
tweaks  
BO ZHANG committed
81

BO ZHANG's avatar
tweaks    
BO ZHANG committed
82
        with open(yml_path, "r") as f:
BO ZHANG's avatar
tweaks    
BO ZHANG committed
83
            self.dag_cfg = yaml.safe_load(f)
BO ZHANG's avatar
tweaks  
BO ZHANG committed
84
        assert (
BO ZHANG's avatar
BO ZHANG committed
85
86
            self.dag_cfg["name"] == self.dag
        ), f"{self.dag_cfg['name']} != {self.dag}"  # , f"{dag_cfg} not consistent with definition in .yml file."
BO ZHANG's avatar
tweaks    
BO ZHANG committed
87
        with open(json_path, "r") as f:
BO ZHANG's avatar
BO ZHANG committed
88
89
90
91
92
93
            self.dag_run_template = json.load(f)

        # Summarize DAG run keys
        self.dag_run_keys = set(self.dag_run_template.keys())

        # DFS instance
BO ZHANG's avatar
BO ZHANG committed
94
        self.dfs = dfs
BO ZHANG's avatar
tweaks    
BO ZHANG committed
95

BO ZHANG's avatar
tweaks    
BO ZHANG committed
96
    def schedule(self, **kwargs):
BO ZHANG's avatar
BO ZHANG committed
97
98
99
100
101
102
103
104
105
106
107
        """Placeholder for DAG scheduling logic.

        Notes
        -----
        This method must be implemented by concrete DAG subclasses.

        Raises
        ------
        NotImplementedError
            Always raises as this is an abstract method
        """
BO ZHANG's avatar
BO ZHANG committed
108
        raise NotImplementedError("Not implemented yet")
BO ZHANG's avatar
tweaks  
BO ZHANG committed
109

BO ZHANG's avatar
tweaks    
BO ZHANG committed
110
    @staticmethod
BO ZHANG's avatar
BO ZHANG committed
111
112
113
114
115
116
117
    def generate_sha1():
        """Generate a unique SHA1 hash based on current timestamp.

        Returns
        -------
        str
            SHA1 hash string
BO ZHANG's avatar
tweaks    
BO ZHANG committed
118
        """
BO ZHANG's avatar
BO ZHANG committed
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
        return generate_sha1_from_time(verbose=False)

    @staticmethod
    def gen_dag_group_run(
        dag_group: str = "-",
        batch_id: str = "-",
        priority: int = 1,
    ):
        """Generate a DAG group run configuration.

        Parameters
        ----------
        dag_group : str, optional
            Group identifier (default: "-")
        batch_id : str, optional
            Batch identifier (default: "-")
        priority : int, optional
            Execution priority (default: 1)

        Returns
        -------
        dict
            Dictionary containing:
            - dag_group: Original group name
            - dag_group_run: Generated SHA1 identifier
            - batch_id: Batch identifier
            - priority: Execution priority
BO ZHANG's avatar
tweaks    
BO ZHANG committed
146
        """
BO ZHANG's avatar
BO ZHANG committed
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
        return dict(
            dag_group=dag_group,
            dag_group_run=BaseDAG.generate_sha1(),
            # dag=self.dag,
            # dag_run=BaseDAG.generate_sha1(),
            batch_id=batch_id,
            priority=priority,
        )

    def gen_dag_run(self, dag_group_run: dict, **dag_run_kwargs: Any):
        """Generate a complete DAG run message.

        Parameters
        ----------
        dag_group_run : dict
            Output from gen_dag_group_run()
        **dag_run_kwargs : Any
            Additional run-specific parameters

        Returns
        -------
        dict
            Complete DAG run message

        Raises
        ------
        AssertionError
            If any key is not in the message template
        """
        # copy template
        dag_run = self.dag_run_template.copy()

        # update dag_group_run info
        for k, v in dag_group_run.items():
            assert k in self.dag_run_keys, f"{k} not in {self.dag_run_keys}"
            dag_run[k] = v

        # update dag_run info
        for k, v in dag_run_kwargs.items():
            assert k in self.dag_run_keys, f"{k} not in {self.dag_run_keys}"
            dag_run[k] = v

        return dag_run

    @staticmethod
    def push_dag_group_run(
        dag_group_run: dict,
        dag_run_list: list[dict],
    ):
        """Submit a DAG group run to the DFS system.

        Parameters
        ----------
        dag_group_run : dict
            Group run configuration
        dag_run_list : list[dict]
            List of individual DAG run messages

        Returns
        -------
        Any
            Result from dfs.dag.new_dag_group_run()
        """

        return dfs.dag.new_dag_group_run(
            dag_group_run=dag_group_run,
            dag_run_list=dag_run_list,
        )