""" """ import json import os from abc import abstractmethod import yaml from typing import Any from ._dag_list import DAG_LIST from .._dfs import DFS, dfs from ..hash import generate_sha1_from_time DAG_CONFIG_DIR = os.path.join( os.path.dirname(os.path.dirname(__file__)), "dag_config", ) """ - BaseTrigger - AutomaticTrigger - ManualTrigger - with Parameters - without Parameters """ class BaseDAG: """Base class for all Directed Acyclic Graph (DAG) implementations. This class provides core functionality for DAG configuration, message generation, and execution management within the CSST data processing system. Attributes ---------- dag : str Name of the DAG, must exist in DAG_MAP dag_cfg : dict Configuration loaded from YAML file dag_run_template : dict Message template structure loaded from JSON file dag_run_keys : set Set of all valid message keys from the template dfs : DFS Data Flow System instance for execution Raises ------ AssertionError If DAG name is not in DAG_MAP or config name mismatch """ INSTRUMENT_ENUM = ("MSC", "MCI", "IFS", "CPIC", "HSTDM") def __init__(self, dag_group: str, dag: str): """Initialize a DAG instance with configuration loading. Parameters ---------- dag_group : str Name of the DAG group. dag : str Name of the DAG. Raises ------ AssertionError If DAG name is invalid or config files are inconsistent """ # Set DAG name self.dag_group = dag_group self.dag = dag assert dag in DAG_LIST, f"{dag} not in DAG_MAP" # determine instrument self.instrument = dag.split("-")[1] # e.g., "MSC" assert self.instrument in self.INSTRUMENT_ENUM, self.instrument # Load yaml and json config yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.yml") json_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.json") with open(yml_path, "r") as f: self.dag_cfg = yaml.safe_load(f)[0] assert ( self.dag_cfg["name"] == self.dag ), f"{self.dag_cfg['name']} != {self.dag}" # , f"{dag_cfg} not consistent with definition in .yml file." with open(json_path, "r") as f: self.dag_run_template = json.load(f) # Summarize DAG run keys self.dag_run_keys = set(self.dag_run_template.keys()) # DFS instance self.dfs = dfs def schedule(self, **kwargs): """Placeholder for DAG scheduling logic. Notes ----- This method must be implemented by concrete DAG subclasses. Raises ------ NotImplementedError Always raises as this is an abstract method """ raise NotImplementedError("Not implemented yet") @staticmethod def generate_sha1(): """Generate a unique SHA1 hash based on current timestamp. Returns ------- str SHA1 hash string """ return generate_sha1_from_time(verbose=False) @staticmethod def gen_dag_group_run( dag_group: str = "-", batch_id: str = "-", priority: int = 1, ): """Generate a DAG group run configuration. Parameters ---------- dag_group : str, optional Group identifier (default: "-") batch_id : str, optional Batch identifier (default: "-") priority : int, optional Execution priority (default: 1) Returns ------- dict Dictionary containing: - dag_group: Original group name - dag_group_run: Generated SHA1 identifier - batch_id: Batch identifier - priority: Execution priority """ return dict( dag_group=dag_group, dag_group_run=BaseDAG.generate_sha1(), # dag=self.dag, # dag_run=BaseDAG.generate_sha1(), batch_id=batch_id, priority=priority, ) def gen_dag_run(self, dag_group_run: dict, **dag_run_kwargs: Any): """Generate a complete DAG run message. Parameters ---------- dag_group_run : dict Output from gen_dag_group_run() **dag_run_kwargs : Any Additional run-specific parameters Returns ------- dict Complete DAG run message Raises ------ AssertionError If any key is not in the message template """ # copy template dag_run = self.dag_run_template.copy() # update dag_group_run info for k, v in dag_group_run.items(): assert k in self.dag_run_keys, f"{k} not in {self.dag_run_keys}" dag_run[k] = v # update dag_run info for k, v in dag_run_kwargs.items(): assert k in self.dag_run_keys, f"{k} not in {self.dag_run_keys}" dag_run[k] = v return dag_run @staticmethod def push_dag_group_run( dag_group_run: dict, dag_run_list: list[dict], ): """Submit a DAG group run to the DFS system. Parameters ---------- dag_group_run : dict Group run configuration dag_run_list : list[dict] List of individual DAG run messages Returns ------- Any Result from dfs.dag.new_dag_group_run() """ return dfs.dag.new_dag_group_run( dag_group_run=dag_group_run, dag_run_list=dag_run_list, )