dag.py 6.24 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from typing import Optional, Tuple
from .common import request, Result, utils

DateTimeTuple = Tuple[str, str]

def new_dag_group_run(dag_group_run: dict, dag_run_list: Optional[list] = None)  -> Result:
    """
    新建DAG处理组
    
    Args:
        dag_group_run (dict): DAG处理组的字典表示,包含dag_group, dag_group_run, batch_id, priority字段。
        dag_run_list (Optional[list], optional): DAG运行列表. Defaults to None.
    
    Returns:
        Result: 成功后,Result.data为写入记录,失败message为失败原因。
    
    """
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    batch_size = 512
    if dag_run_list is None:
        return request.put("/api/dag/group_run", {'dag_group_run': dag_group_run, 'dag_run_list': []})
    
    results = []
    for i in range(0, len(dag_run_list), batch_size):
        batch = dag_run_list[i:i + batch_size]
        result = request.put("/api/dag/group_run", {'dag_group_run': dag_group_run, 'dag_run_list': batch})
        results.append(result)
        if not result.success:
            # If any batch fails, return the failed result immediately
            return result

    # If all batches succeed, return the last result
    return results[-1]
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

def find_group_run(dag_group: Optional[str] = None,
        batch_id: Optional[str] = None,
        queue_time: Optional[DateTimeTuple] = None,
        prc_status: Optional[int] = None,
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数搜索DAG组的记录
    
    Args:
        dag_group (Optional[str], optional): DAG处理组. Defaults to None.
        batch_id (Optional[str], optional): 批次号. Defaults to None.
        queue_time (Optional[DateTimeTuple], optional): 入队时间范围. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0,不限制.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'dag_group': dag_group,
        'batch_id': batch_id,
        'prc_status': prc_status,
        'queue_time_start': None,
        'queue_time_end': None,
        'page': page,
        'limit': limit,
    }
    
    if queue_time is not None:
        params['queue_time_start'], params['queue_time_end'] = queue_time
        if params['queue_time_start'] and utils.is_valid_datetime_format(params['queue_time_start']):
            pass
        if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']):
            pass 
72
73
74
75
76
77
78
    return request.post("/api/dag/group_run", params)

def find_dag_run(dag_group: Optional[str] = None,
        dag_group_run: Optional[str] = None,
        batch_id: Optional[str] = None,
        dag: Optional[str] = None,
        dataset: Optional[str] = None,
79
80
81
82
83
84
85
        instrument: Optional[str] = None,
        obs_type: Optional[str] = None,
        obs_group: Optional[str] = None,
        detector: Optional[str] = None,
        filter: Optional[str] = None,
        object: Optional[str] = None,
        proposal_id: Optional[str] = None,
86
87
88
89
90
91
92
93
94
95
96
97
98
        status_code: Optional[int] = None,
        queue_time: Optional[DateTimeTuple] = None,
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数搜索DAG组的记录
    
    Args:
        dag_group (Optional[str], optional): DAG处理组. Defaults to None.
        dag_group_run (Optional[str], optional): DAG处理组运行ID. Defaults to None.
        batch_id (Optional[str], optional): 批次号. Defaults to None.
        dag (Optional[str], optional): DAG. Defaults to None.
        dataset (Optional[str], optional): 数据集. Defaults to None.
99
100
101
102
103
104
105
        instrument (Optional[str], optional): 设备. Defaults to None.
        obs_type (Optional[str], optional): 观测类型. Defaults to None.
        obs_group (Optional[str], optional): 观测组. Defaults to None.
        detector (Optional[str], optional): 探测器. Defaults to None.
        filter (Optional[str], optional): 滤波器. Defaults to None.
        object (Optional[str], optional): 观测对象. Defaults to None.
        proposal_id (Optional[str], optional): 提案ID. Defaults to None.
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
        status_code (Optional[int], optional): 状态码. Defaults to None.
        queue_time (Optional[DateTimeTuple], optional): 入队时间范围. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0,不限制.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'dag_group': dag_group,
        'dag_group_run': dag_group_run,
        'batch_id': batch_id,
        'dag': dag,
        'dataset': dataset,
122
123
124
125
126
127
128
        'instrument': instrument,
        'obs_type': obs_type,
        'obs_group': obs_group,
        'detector': detector,
        'filter': filter,
        'object': object,
        'proposal_id': proposal_id,
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
        'status_code': status_code,
        'queue_time_start': None,
        'queue_time_end': None,
        'page': page,
        'limit': limit,
    }
    
    if queue_time is not None:
        params['queue_time_start'], params['queue_time_end'] = queue_time
        if params['queue_time_start'] and utils.is_valid_datetime_format(params['queue_time_start']):
            pass
        if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']):
            pass 
    return request.post("/api/dag/dag_run", params)

144
145
def update_dag_run(dag_run: str, status_code: int, queue_time: Optional[str] = None, 
                            start_time: Optional[str] = None, end_time: Optional[str] = None) -> Result:
146
147
148
149
150
151
    """
    更新DAG运行的处理状态
    
    Args:
        dag_run (str): DAG运行标识
        status_code (int): 状态码
152
153
154
        queue_time (Optional[str], optional): 入队时间. Defaults to None.
        start_time (Optional[str], optional): 开始时间. Defaults to None.
        end_time (Optional[str], optional): 结束时间. Defaults to None.
155
156
157
158
    
    Returns:
        Result: 操作结果
    """
159
160
    return request.put("/api/dag/dag_run/status", {'dag_run': dag_run, 'status_code': status_code,
                                                    'queue_time': queue_time, 'start_time': start_time, 'end_time': end_time})