from typing import Optional, Tuple from .common import request, Result, utils DateTimeTuple = Tuple[str, str] def new_dag_group_run(dag_group_run: dict, dag_run_list: Optional[list] = None) -> Result: """ 新建DAG处理组 Args: dag_group_run (dict): DAG处理组的字典表示,包含dag_group, dag_group_run, batch_id, priority字段。 dag_run_list (Optional[list], optional): DAG运行列表. Defaults to None. Returns: Result: 成功后,Result.data为写入记录,失败message为失败原因。 """ return request.put("/api/dag/group_run", {'dag_group_run': dag_group_run, 'dag_run_list': dag_run_list}) def find_group_run(dag_group: Optional[str] = None, batch_id: Optional[str] = None, queue_time: Optional[DateTimeTuple] = None, prc_status: Optional[int] = None, page: int = 1, limit: int = 0) -> Result: """ 根据给定的参数搜索DAG组的记录 Args: dag_group (Optional[str], optional): DAG处理组. Defaults to None. batch_id (Optional[str], optional): 批次号. Defaults to None. queue_time (Optional[DateTimeTuple], optional): 入队时间范围. Defaults to None. prc_status (Optional[int], optional): 处理状态. Defaults to None. page (int, optional): 页码. Defaults to 1. limit (int, optional): 每页数量. Defaults to 0,不限制. Returns: Result: 搜索结果对象. """ params = { 'dag_group': dag_group, 'batch_id': batch_id, 'prc_status': prc_status, 'queue_time_start': None, 'queue_time_end': None, 'page': page, 'limit': limit, } if queue_time is not None: params['queue_time_start'], params['queue_time_end'] = queue_time if params['queue_time_start'] and utils.is_valid_datetime_format(params['queue_time_start']): pass if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']): pass return request.post("/api/dag/group_run", params)