import glob import hashlib import itertools import json import os import random import string import uuid from collections.abc import Iterable import numpy as np from astropy import table from astropy import time DAG_MESSAGE_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_config") DAG_YAML_LIST = glob.glob(DAG_MESSAGE_TEMPLATE_DIRECTORY + "/*.yml") # DAG list DAG_LIST = [os.path.splitext(os.path.basename(_))[0] for _ in DAG_YAML_LIST] def generate_dag_run_id(digits=6): """ Generate a unique run_id for a dag_cfg. """ now = time.Time.now() dag_run_id = now.strftime("%Y%m%d-%H%M%S-") n = len(string.ascii_lowercase) for i in range(digits): dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)] return dag_run_id def load_default_dag_run_message(): """Load the default dag_cfg run message template.""" with open( os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"default-dag-run.json"), "r" ) as f: template = json.load(f) return template def get_dag_message_template(dag_id): """Get the dag_cfg message template for a given dag_cfg.""" if dag_id not in DAG_LIST: raise ValueError(f"Unknown dag_cfg: {dag_id}") with open(os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"{dag_id}.json"), "r") as f: template = json.load(f) return template def generate_dag_run_message(dag_id, **kwargs): """Generate a DAG run message.""" DAG_RUN_ID_DIGITS = 6 this_dag_run_id = generate_dag_run_id(DAG_RUN_ID_DIGITS) msg = get_dag_message_template(dag_id) for k, v in kwargs.items(): assert msg.get(k, None) is not None, f"Unknown key: {k}" msg[k] = v return msg def generate_uuid(): """Generate a unique id.""" return str(uuid.uuid4()) def force_string(d): """更通用的递归字符串转换""" if isinstance(d, dict): return {k: force_string(v) for k, v in d.items()} elif isinstance(d, Iterable) and not isinstance(d, (str, bytes)): return [force_string(item) for item in d] elif isinstance(d, (str, bytes)): return str(d) else: return d def generate_permutations(**kwargs) -> table.Table: """ 生成关键字参数所有值的排列组合字典列表 参数: **kwargs: 关键字参数,值应为可迭代对象(如列表) 返回: list[dict]: 每个字典代表一种排列组合 """ # 验证输入值是否为可迭代对象 for key, values in kwargs.items(): if not isinstance(values, (list, tuple, set)): kwargs[key] = [values] # 如果不是可迭代对象,转换为列表 # 提取键和对应的值列表 keys = list(kwargs.keys()) value_lists = [kwargs[key] for key in keys] # 生成笛卡尔积(所有值组合) permutations = [] for combination in itertools.product(*value_lists): # 将每个组合转换为字典 {键: 值} perm_dict = dict(zip(keys, combination)) permutations.append(perm_dict) return table.Table(permutations) def override_common_keys(d1: dict, d2: dict) -> dict: """ Construct a new dictionary by updating the values of basis_keys that exists in the first dictionary with the values of the second dictionary. Parameters ---------- d1 : dict The first dictionary. d2 : dict The second dictionary. Returns ------- dict: The updated dictionary. """ return {k: d2[k] if k in d2.keys() else d1[k] for k in d1.keys()} def generate_sha1_from_time(verbose=False): """ 根据当前时间生成 SHA-1 哈希值,并添加随机字符串确保唯一性 Returns ------- Tuple: (时间戳, 随机字符串, SHA-1哈希值) 元组 """ # 获取当前毫秒级时间戳(ISO格式) timestamp = time.Time.now().isot # 生成40个随机字母和数字 random_str = "".join(random.choices(string.ascii_letters + string.digits, k=40)) # 将时间戳和随机字符串组合 combined_str = f"{timestamp}_{random_str}" # 生成 SHA-1 哈希 sha1 = hashlib.sha1() sha1.update(combined_str.encode("utf-8")) sha_value = sha1.hexdigest() if verbose: return timestamp, random_str, sha_value else: return sha_value