dag.py 4.79 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from typing import Optional, Tuple
from .common import request, Result, utils

DateTimeTuple = Tuple[str, str]

def new_dag_group_run(dag_group_run: dict, dag_run_list: Optional[list] = None)  -> Result:
    """
    新建DAG处理组
    
    Args:
        dag_group_run (dict): DAG处理组的字典表示,包含dag_group, dag_group_run, batch_id, priority字段。
        dag_run_list (Optional[list], optional): DAG运行列表. Defaults to None.
    
    Returns:
        Result: 成功后,Result.data为写入记录,失败message为失败原因。
    
    """
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    batch_size = 512
    if dag_run_list is None:
        return request.put("/api/dag/group_run", {'dag_group_run': dag_group_run, 'dag_run_list': []})
    
    results = []
    for i in range(0, len(dag_run_list), batch_size):
        batch = dag_run_list[i:i + batch_size]
        result = request.put("/api/dag/group_run", {'dag_group_run': dag_group_run, 'dag_run_list': batch})
        results.append(result)
        if not result.success:
            # If any batch fails, return the failed result immediately
            return result

    # If all batches succeed, return the last result
    return results[-1]
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

def find_group_run(dag_group: Optional[str] = None,
        batch_id: Optional[str] = None,
        queue_time: Optional[DateTimeTuple] = None,
        prc_status: Optional[int] = None,
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数搜索DAG组的记录
    
    Args:
        dag_group (Optional[str], optional): DAG处理组. Defaults to None.
        batch_id (Optional[str], optional): 批次号. Defaults to None.
        queue_time (Optional[DateTimeTuple], optional): 入队时间范围. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0,不限制.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'dag_group': dag_group,
        'batch_id': batch_id,
        'prc_status': prc_status,
        'queue_time_start': None,
        'queue_time_end': None,
        'page': page,
        'limit': limit,
    }
    
    if queue_time is not None:
        params['queue_time_start'], params['queue_time_end'] = queue_time
        if params['queue_time_start'] and utils.is_valid_datetime_format(params['queue_time_start']):
            pass
        if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']):
            pass 
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    return request.post("/api/dag/group_run", params)

def find_dag_run(dag_group: Optional[str] = None,
        dag_group_run: Optional[str] = None,
        batch_id: Optional[str] = None,
        dag: Optional[str] = None,
        dataset: Optional[str] = None,
        status_code: Optional[int] = None,
        queue_time: Optional[DateTimeTuple] = None,
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数搜索DAG组的记录
    
    Args:
        dag_group (Optional[str], optional): DAG处理组. Defaults to None.
        dag_group_run (Optional[str], optional): DAG处理组运行ID. Defaults to None.
        batch_id (Optional[str], optional): 批次号. Defaults to None.
        dag (Optional[str], optional): DAG. Defaults to None.
        dataset (Optional[str], optional): 数据集. Defaults to None.
        status_code (Optional[int], optional): 状态码. Defaults to None.
        queue_time (Optional[DateTimeTuple], optional): 入队时间范围. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0,不限制.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'dag_group': dag_group,
        'dag_group_run': dag_group_run,
        'batch_id': batch_id,
        'dag': dag,
        'dataset': dataset,
        'status_code': status_code,
        'queue_time_start': None,
        'queue_time_end': None,
        'page': page,
        'limit': limit,
    }
    
    if queue_time is not None:
        params['queue_time_start'], params['queue_time_end'] = queue_time
        if params['queue_time_start'] and utils.is_valid_datetime_format(params['queue_time_start']):
            pass
        if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']):
            pass 
    return request.post("/api/dag/dag_run", params)

def update_dag_run_status_code(dag_run: str, status_code: int) -> Result:
    """
    更新DAG运行的处理状态
    
    Args:
        dag_run (str): DAG运行标识
        status_code (int): 状态码
    
    Returns:
        Result: 操作结果
    """
    return request.put("/api/dag/dag_run/status", {'dag_run': dag_run, 'status_code': status_code})