_dag_utils.py 4.27 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import glob
import hashlib
import itertools
import json
import os
import random
import string
import uuid
from collections.abc import Iterable

import numpy as np
from astropy import table
from astropy import time

DAG_MESSAGE_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_config")

DAG_YAML_LIST = glob.glob(DAG_MESSAGE_TEMPLATE_DIRECTORY + "/*.yml")

# DAG list
DAG_LIST = [os.path.splitext(os.path.basename(_))[0] for _ in DAG_YAML_LIST]


def generate_dag_run_id(digits=6):
    """
    Generate a unique run_id for a dag_cfg.
    """
    now = time.Time.now()
    dag_run_id = now.strftime("%Y%m%d-%H%M%S-")

    n = len(string.ascii_lowercase)
    for i in range(digits):
        dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
    return dag_run_id


def load_default_dag_run_message():
    """Load the default dag_cfg run message template."""
    with open(
        os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"default-dag-run.json"), "r"
    ) as f:
        template = json.load(f)
    return template


def get_dag_message_template(dag_id):
    """Get the dag_cfg message template for a given dag_cfg."""
    if dag_id not in DAG_LIST:
        raise ValueError(f"Unknown dag_cfg: {dag_id}")
    with open(os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"{dag_id}.json"), "r") as f:
        template = json.load(f)
    return template


def generate_dag_run_message(dag_id, **kwargs):
    """Generate a DAG run message."""
    DAG_RUN_ID_DIGITS = 6
    this_dag_run_id = generate_dag_run_id(DAG_RUN_ID_DIGITS)
    msg = get_dag_message_template(dag_id)

    for k, v in kwargs.items():
        assert msg.get(k, None) is not None, f"Unknown key: {k}"
        msg[k] = v
    return msg


def generate_uuid():
    """Generate a unique id."""
    return str(uuid.uuid4())


def force_string(d):
    """更通用的递归字符串转换"""
    if isinstance(d, dict):
        return {k: force_string(v) for k, v in d.items()}
    elif isinstance(d, Iterable) and not isinstance(d, (str, bytes)):
        return [force_string(item) for item in d]
    elif isinstance(d, (str, bytes)):
        return str(d)
    else:
        return d


def generate_permutations(**kwargs) -> table.Table:
    """
    生成关键字参数所有值的排列组合字典列表

    参数:
        **kwargs: 关键字参数,值应为可迭代对象(如列表)

    返回:
        list[dict]: 每个字典代表一种排列组合
    """
    # 验证输入值是否为可迭代对象
    for key, values in kwargs.items():
        if not isinstance(values, (list, tuple, set)):
            kwargs[key] = [values]  # 如果不是可迭代对象,转换为列表

    # 提取键和对应的值列表
    keys = list(kwargs.keys())
    value_lists = [kwargs[key] for key in keys]

    # 生成笛卡尔积(所有值组合)
    permutations = []
    for combination in itertools.product(*value_lists):
        # 将每个组合转换为字典 {键: 值}
        perm_dict = dict(zip(keys, combination))
        permutations.append(perm_dict)

    return table.Table(permutations)


def override_common_keys(d1: dict, d2: dict) -> dict:
    """
    Construct a new dictionary by updating the values of basis_keys that exists in the first dictionary
    with the values of the second dictionary.

    Parameters
    ----------
    d1 : dict
        The first dictionary.
    d2 : dict
        The second dictionary.

    Returns
    -------
    dict:
        The updated dictionary.
    """
    return {k: d2[k] if k in d2.keys() else d1[k] for k in d1.keys()}


def generate_sha1_from_time(verbose=False):
    """
    根据当前时间生成 SHA-1 哈希值,并添加随机字符串确保唯一性

    Returns
    -------
    Tuple:
        (时间戳, 随机字符串, SHA-1哈希值) 元组
    """
    # 获取当前毫秒级时间戳(ISO格式)
    timestamp = time.Time.now().isot
    # 生成40个随机字母和数字
    random_str = "".join(random.choices(string.ascii_letters + string.digits, k=40))
    # 将时间戳和随机字符串组合
    combined_str = f"{timestamp}_{random_str}"

    # 生成 SHA-1 哈希
    sha1 = hashlib.sha1()
    sha1.update(combined_str.encode("utf-8"))
    sha_value = sha1.hexdigest()

    if verbose:
        return timestamp, random_str, sha_value
    else:
        return sha_value