dag.py 2.09 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from typing import Optional, Tuple
from .common import request, Result, utils

DateTimeTuple = Tuple[str, str]

def new_dag_group_run(dag_group_run: dict, dag_run_list: Optional[list] = None)  -> Result:
    """
    新建DAG处理组
    
    Args:
        dag_group_run (dict): DAG处理组的字典表示,包含dag_group, dag_group_run, batch_id, priority字段。
        dag_run_list (Optional[list], optional): DAG运行列表. Defaults to None.
    
    Returns:
        Result: 成功后,Result.data为写入记录,失败message为失败原因。
    
    """
    return request.put("/api/dag/group_run", {'dag_group_run': dag_group_run, 'dag_run_list': dag_run_list})

def find_group_run(dag_group: Optional[str] = None,
        batch_id: Optional[str] = None,
        queue_time: Optional[DateTimeTuple] = None,
        prc_status: Optional[int] = None,
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数搜索DAG组的记录
    
    Args:
        dag_group (Optional[str], optional): DAG处理组. Defaults to None.
        batch_id (Optional[str], optional): 批次号. Defaults to None.
        queue_time (Optional[DateTimeTuple], optional): 入队时间范围. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0,不限制.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'dag_group': dag_group,
        'batch_id': batch_id,
        'prc_status': prc_status,
        'queue_time_start': None,
        'queue_time_end': None,
        'page': page,
        'limit': limit,
    }
    
    if queue_time is not None:
        params['queue_time_start'], params['queue_time_end'] = queue_time
        if params['queue_time_start'] and utils.is_valid_datetime_format(params['queue_time_start']):
            pass
        if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']):
            pass 
    return request.post("/api/dag/group_run", params)