Commit 9a8486ac authored by Wei Shoulin's avatar Wei Shoulin
Browse files

feat(dag): add find_dag_run and update_dag_run_status_code APIs

Introduce two new endpoints:
- find_dag_run: search DAG run records with filters
- update_dag_run_status_code: update status of a DAG run

Include corresponding unit tests.
parent 3947cac2
Pipeline #10272 failed with stages
in 0 seconds
...@@ -70,3 +70,65 @@ def find_group_run(dag_group: Optional[str] = None, ...@@ -70,3 +70,65 @@ def find_group_run(dag_group: Optional[str] = None,
if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']): if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']):
pass pass
return request.post("/api/dag/group_run", params) return request.post("/api/dag/group_run", params)
def find_dag_run(dag_group: Optional[str] = None,
dag_group_run: Optional[str] = None,
batch_id: Optional[str] = None,
dag: Optional[str] = None,
dataset: Optional[str] = None,
status_code: Optional[int] = None,
queue_time: Optional[DateTimeTuple] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数搜索DAG组的记录
Args:
dag_group (Optional[str], optional): DAG处理组. Defaults to None.
dag_group_run (Optional[str], optional): DAG处理组运行ID. Defaults to None.
batch_id (Optional[str], optional): 批次号. Defaults to None.
dag (Optional[str], optional): DAG. Defaults to None.
dataset (Optional[str], optional): 数据集. Defaults to None.
status_code (Optional[int], optional): 状态码. Defaults to None.
queue_time (Optional[DateTimeTuple], optional): 入队时间范围. Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0,不限制.
Returns:
Result: 搜索结果对象.
"""
params = {
'dag_group': dag_group,
'dag_group_run': dag_group_run,
'batch_id': batch_id,
'dag': dag,
'dataset': dataset,
'status_code': status_code,
'queue_time_start': None,
'queue_time_end': None,
'page': page,
'limit': limit,
}
if queue_time is not None:
params['queue_time_start'], params['queue_time_end'] = queue_time
if params['queue_time_start'] and utils.is_valid_datetime_format(params['queue_time_start']):
pass
if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']):
pass
return request.post("/api/dag/dag_run", params)
def update_dag_run_status_code(dag_run: str, status_code: int) -> Result:
"""
更新DAG运行的处理状态
Args:
dag_run (str): DAG运行标识
status_code (int): 状态码
Returns:
Result: 操作结果
"""
return request.put("/api/dag/dag_run/status", {'dag_run': dag_run, 'status_code': status_code})
\ No newline at end of file
...@@ -34,3 +34,13 @@ class DAGTestCase(unittest.TestCase): ...@@ -34,3 +34,13 @@ class DAGTestCase(unittest.TestCase):
result = dag.find_group_run(dag_group = "csst_dag.cli.msc_l1", batch_id = "inttest") result = dag.find_group_run(dag_group = "csst_dag.cli.msc_l1", batch_id = "inttest")
print(result) print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_find_dag_run(self):
result = dag.find_dag_run(dag_group = "csst_dag.cli.msc_l1", batch_id = "inttest")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_update_dag_run_status_code(self):
result = dag.update_dag_run_status_code(dag_run = "195244ff176f923aec9a9328c75ecaeb4a8c4345", status_code = 1)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment