_base_dag.py 6.01 KB
Newer Older
BO ZHANG's avatar
tweaks    
BO ZHANG committed
1
import json
BO ZHANG's avatar
BO ZHANG committed
2
import os
BO ZHANG's avatar
tweaks    
BO ZHANG committed
3
from typing import Callable
BO ZHANG's avatar
BO ZHANG committed
4
5

import yaml
BO ZHANG's avatar
BO ZHANG committed
6
from astropy import table
BO ZHANG's avatar
BO ZHANG committed
7

BO ZHANG's avatar
tweaks    
BO ZHANG committed
8
from ._dispatcher import override_common_keys
BO ZHANG's avatar
BO ZHANG committed
9
10
from .._dfs import DFS, dfs
from ..hash import generate_sha1_from_time
BO ZHANG's avatar
tweaks    
BO ZHANG committed
11
12
13
14
15

DAG_CONFIG_DIR = os.path.join(
    os.path.dirname(os.path.dirname(__file__)),
    "dag_config",
)
BO ZHANG's avatar
tweaks  
BO ZHANG committed
16
17


BO ZHANG's avatar
BO ZHANG committed
18
class BaseDAG:
BO ZHANG's avatar
BO ZHANG committed
19
20
21
    """Base class for all Directed Acyclic Graph (DAG) implementations.

    This class provides core functionality for DAG configuration, message generation,
BO ZHANG's avatar
BO ZHANG committed
22
    and execution management within the CSST dlist processing system.
BO ZHANG's avatar
BO ZHANG committed
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

    Attributes
    ----------
    dag : str
        Name of the DAG, must exist in DAG_MAP
    dag_cfg : dict
        Configuration loaded from YAML file
    dag_run_template : dict
        Message template structure loaded from JSON file
    dfs : DFS
        Data Flow System instance for execution

    Raises
    ------
    AssertionError
        If DAG name is not in DAG_MAP or config name mismatch
    """

BO ZHANG's avatar
BO ZHANG committed
41
42
43
44
45
46
    def __init__(
        self,
        dag: str,
        pattern: table.Table,
        dispatcher: Callable,
    ):
BO ZHANG's avatar
BO ZHANG committed
47
48
49
50
51
        """Initialize a DAG instance with configuration loading.

        Parameters
        ----------
        dag : str
BO ZHANG's avatar
BO ZHANG committed
52
            DAG name, must exist in DAG_MAP
BO ZHANG's avatar
BO ZHANG committed
53
54
55
        """
        # Set DAG name
        self.dag = dag
BO ZHANG's avatar
BO ZHANG committed
56
57
        self.pattern = pattern
        self.dispatcher = dispatcher
BO ZHANG's avatar
BO ZHANG committed
58
59
60
61

        # Load yaml and json config
        yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.yml")
        json_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.json")
BO ZHANG's avatar
tweaks  
BO ZHANG committed
62

BO ZHANG's avatar
tweaks    
BO ZHANG committed
63
        with open(yml_path, "r") as f:
BO ZHANG's avatar
tweaks    
BO ZHANG committed
64
            self.dag_cfg = yaml.safe_load(f)
BO ZHANG's avatar
tweaks  
BO ZHANG committed
65
        assert (
BO ZHANG's avatar
BO ZHANG committed
66
67
            self.dag_cfg["name"] == self.dag
        ), f"{self.dag_cfg['name']} != {self.dag}"  # , f"{dag_cfg} not consistent with definition in .yml file."
BO ZHANG's avatar
tweaks    
BO ZHANG committed
68
        with open(json_path, "r") as f:
BO ZHANG's avatar
BO ZHANG committed
69
70
71
            self.dag_run_template = json.load(f)

        # DFS instance
BO ZHANG's avatar
BO ZHANG committed
72
        self.dfs = dfs
BO ZHANG's avatar
tweaks    
BO ZHANG committed
73

BO ZHANG's avatar
BO ZHANG committed
74
75
76
77
78
79
80
    def filter_basis(self, plan_basis, data_basis):
        filtered_data_basis = table.join(
            self.pattern,
            data_basis,
            keys=self.pattern.colnames,
            join_type="inner",
        )
BO ZHANG's avatar
BO ZHANG committed
81
        if len(filtered_data_basis) == 0:
BO ZHANG's avatar
BO ZHANG committed
82
            return plan_basis[:0], filtered_data_basis
BO ZHANG's avatar
BO ZHANG committed
83
        u_data_basis = table.unique(filtered_data_basis["dataset", "obs_id"])
BO ZHANG's avatar
BO ZHANG committed
84
85
86
87
88
89
        filtered_plan_basis = table.join(
            u_data_basis,
            plan_basis,
            keys=["dataset", "obs_id"],
            join_type="inner",
        )
BO ZHANG's avatar
BO ZHANG committed
90
        return filtered_plan_basis, filtered_data_basis
BO ZHANG's avatar
BO ZHANG committed
91

BO ZHANG's avatar
BO ZHANG committed
92
93
94
95
96
    def schedule(
        self,
        dag_group_run: dict,
        data_basis: table.Table,
        plan_basis: table.Table,
BO ZHANG's avatar
BO ZHANG committed
97
        force_success: bool = False,
BO ZHANG's avatar
BO ZHANG committed
98
        **kwargs,
BO ZHANG's avatar
BO ZHANG committed
99
100
    ) -> tuple[list, set]:
        # filter plan and data basis
BO ZHANG's avatar
BO ZHANG committed
101
102
103
        filtered_plan_basis, filtered_data_basis = self.filter_basis(
            plan_basis, data_basis
        )
BO ZHANG's avatar
BO ZHANG committed
104
        # dispatch tasks
BO ZHANG's avatar
BO ZHANG committed
105
        task_list = self.dispatcher(filtered_plan_basis, filtered_data_basis)
BO ZHANG's avatar
BO ZHANG committed
106
107
108
        # convert tasks to dag_run_list
        dag_run_list = []  # each element is a dag_run (dict)
        data_id_set = set()  # each element is a data_id (str)
BO ZHANG's avatar
tweaks    
BO ZHANG committed
109
        for this_task in task_list:
BO ZHANG's avatar
BO ZHANG committed
110
111
112
113
114
115
116
117
118
119
            # only convert success tasks
            if force_success or this_task["success"]:
                dag_run = self.gen_dag_run(
                    **dag_group_run,
                    **this_task["task"],
                    **kwargs,
                )
                dag_run_list.append(dag_run)
                data_id_set.union(this_task["relevant_data"]["_id"])
        return dag_run_list, data_id_set
BO ZHANG's avatar
tweaks  
BO ZHANG committed
120

BO ZHANG's avatar
tweaks    
BO ZHANG committed
121
    @staticmethod
BO ZHANG's avatar
BO ZHANG committed
122
123
124
125
126
127
128
    def generate_sha1():
        """Generate a unique SHA1 hash based on current timestamp.

        Returns
        -------
        str
            SHA1 hash string
BO ZHANG's avatar
tweaks    
BO ZHANG committed
129
        """
BO ZHANG's avatar
BO ZHANG committed
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
        return generate_sha1_from_time(verbose=False)

    @staticmethod
    def gen_dag_group_run(
        dag_group: str = "-",
        batch_id: str = "-",
        priority: int = 1,
    ):
        """Generate a DAG group run configuration.

        Parameters
        ----------
        dag_group : str, optional
            Group identifier (default: "-")
        batch_id : str, optional
            Batch identifier (default: "-")
        priority : int, optional
            Execution priority (default: 1)

        Returns
        -------
        dict
            Dictionary containing:
            - dag_group: Original group name
            - dag_group_run: Generated SHA1 identifier
            - batch_id: Batch identifier
            - priority: Execution priority
BO ZHANG's avatar
tweaks    
BO ZHANG committed
157
        """
BO ZHANG's avatar
BO ZHANG committed
158
159
160
161
162
163
164
        return dict(
            dag_group=dag_group,
            dag_group_run=BaseDAG.generate_sha1(),
            batch_id=batch_id,
            priority=priority,
        )

BO ZHANG's avatar
tweaks    
BO ZHANG committed
165
    def gen_dag_run(self, **kwargs) -> dict:
BO ZHANG's avatar
BO ZHANG committed
166
167
168
169
        """Generate a complete DAG run message.

        Parameters
        ----------
BO ZHANG's avatar
tweaks    
BO ZHANG committed
170
171
        kwargs : dict
            Additional keyword arguments to override.
BO ZHANG's avatar
BO ZHANG committed
172
173
174
175
176
177
178
179
180
181
182
183
184

        Returns
        -------
        dict
            Complete DAG run message

        Raises
        ------
        AssertionError
            If any key is not in the message template
        """
        # copy template
        dag_run = self.dag_run_template.copy()
BO ZHANG's avatar
tweaks    
BO ZHANG committed
185
186
        # update values
        dag_run = override_common_keys(dag_run, kwargs)
BO ZHANG's avatar
BO ZHANG committed
187
        # set hash
BO ZHANG's avatar
BO ZHANG committed
188
189
190
191
        dag_run = override_common_keys(
            dag_run,
            {"dag": self.dag, "dag_run": self.generate_sha1()},
        )
BO ZHANG's avatar
BO ZHANG committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
        return dag_run

    @staticmethod
    def push_dag_group_run(
        dag_group_run: dict,
        dag_run_list: list[dict],
    ):
        """Submit a DAG group run to the DFS system.

        Parameters
        ----------
        dag_group_run : dict
            Group run configuration
        dag_run_list : list[dict]
            List of individual DAG run messages

        Returns
        -------
        Any
            Result from dfs.dag.new_dag_group_run()
        """

        return dfs.dag.new_dag_group_run(
            dag_group_run=dag_group_run,
            dag_run_list=dag_run_list,
        )