Commit c4f41f74 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

"add DAG group run creation and search functionality"

parent 3bafb4be
Pipeline #8708 failed with stages
in 0 seconds
from typing import Optional, Tuple
from .common import request, Result, utils
DateTimeTuple = Tuple[str, str]
def new_dag_group_run(dag_group_run: dict, dag_run_list: Optional[list] = None) -> Result:
"""
新建DAG处理组
Args:
dag_group_run (dict): DAG处理组的字典表示,包含dag_group, dag_group_run, batch_id, priority字段。
dag_run_list (Optional[list], optional): DAG运行列表. Defaults to None.
Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因。
"""
return request.put("/api/dag/group_run", {'dag_group_run': dag_group_run, 'dag_run_list': dag_run_list})
def find_group_run(dag_group: Optional[str] = None,
batch_id: Optional[str] = None,
queue_time: Optional[DateTimeTuple] = None,
prc_status: Optional[int] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数搜索DAG组的记录
Args:
dag_group (Optional[str], optional): DAG处理组. Defaults to None.
batch_id (Optional[str], optional): 批次号. Defaults to None.
queue_time (Optional[DateTimeTuple], optional): 入队时间范围. Defaults to None.
prc_status (Optional[int], optional): 处理状态. Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0,不限制.
Returns:
Result: 搜索结果对象.
"""
params = {
'dag_group': dag_group,
'batch_id': batch_id,
'prc_status': prc_status,
'queue_time_start': None,
'queue_time_end': None,
'page': page,
'limit': limit,
}
if queue_time is not None:
params['queue_time_start'], params['queue_time_end'] = queue_time
if params['queue_time_start'] and utils.is_valid_datetime_format(params['queue_time_start']):
pass
if params['queue_time_end'] and utils.is_valid_datetime_format(params['queue_time_end']):
pass
return request.post("/api/dag/group_run", params)
\ No newline at end of file
...@@ -185,37 +185,6 @@ def write_cat(local_file: str, ...@@ -185,37 +185,6 @@ def write_cat(local_file: str,
raise FileNotFoundError(local_file) raise FileNotFoundError(local_file)
return request.post_file("/api/level0/cat/file", local_file, params) return request.post_file("/api/level0/cat/file", local_file, params)
def generate_prc_msg(instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
obs_id: str,
detector: str,
level0_id: str,
dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
"""
生成流水线的处理消息
Args:
instrument (str): 模块ID
obs_id (str): 观测ID
detector (str): 探测器
level0_id (str): 0级数据ID
dataset (Optional[str], optional): 数据集名称..
batch_id (Optional[str], optional): 批次ID. Defaults to 'auto'.
Returns:
Result: 处理消息生成的结果,是否成功以及相关的错误信息
"""
params = {
'dataset': dataset,
'batch_id': batch_id,
'obs_id': obs_id,
'detector': detector,
"level0_id": level0_id,
}
return request.put(f"/api/level0/prc/{instrument}", params)
def find_process(dag_id: Optional[str] = None, def find_process(dag_id: Optional[str] = None,
dag_run_id: Optional[str] = None, dag_run_id: Optional[str] = None,
batch_id: Optional[str] = None, batch_id: Optional[str] = None,
......
...@@ -219,32 +219,6 @@ def write(local_file: Union[IO, str], ...@@ -219,32 +219,6 @@ def write(local_file: Union[IO, str],
return request.post_file("/api/level1/file", local_file, params) return request.post_file("/api/level1/file", local_file, params)
return request.post_bytesio("/api/level1/file", local_file, params) return request.post_bytesio("/api/level1/file", local_file, params)
def generate_prc_msg(instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level1_id: str,
dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
"""
生成流水线的处理消息
Args:
instrument (str): 模块ID
level1_id (str): 1级数据的ID
dag_id (str): 流水管线ID,默认为空字符串
dataset (str): 数据集
batch_id (str): 批次ID
Returns:
Result: 处理消息生成的结果,是否成功以及相关的错误信息
"""
params = {
'dataset': dataset,
'batch_id': batch_id,
'level1_id': level1_id,
}
return request.put(f"/api/level1/prc/{instrument}", params)
def find_process(dag_id: Optional[str] = None, def find_process(dag_id: Optional[str] = None,
dag_run_id: Optional[str] = None, dag_run_id: Optional[str] = None,
batch_id: Optional[str] = None, batch_id: Optional[str] = None,
......
import unittest
from csst_dfs_client import dag
class DAGTestCase(unittest.TestCase):
def setUp(self):
pass
def test_new_group_run(self):
dag_group_run = {"dag_group": "csst_dag.cli.msc_l1",
"dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
"batch_id": "inttest",
"priority": 1
}
dag_run_list = [{
"dag_group": "csst_dag.cli.msc_l1",
"dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
"batch_id": "inttest",
"priority": 1,
"dag": "csst-msc-l1-mbi",
"dag_run": "61b622a5d256806082c668b2d1273668a1eee3ec",
"dataset": "csst-msc-c9-25sqdeg-v3",
"obs_type": "WIDE",
"obs_group": "none",
"obs_id": "10100232366",
"detector": "09"
}]
result = dag.new_dag_group_run(dag_group_run = dag_group_run, dag_run_list = dag_run_list)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_find_dag_group_run(self):
result = dag.find_group_run(dag_group = "csst_dag.cli.msc_l1", batch_id = "inttest")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
...@@ -45,15 +45,6 @@ class Level0TestCase(unittest.TestCase): ...@@ -45,15 +45,6 @@ class Level0TestCase(unittest.TestCase):
print(result) print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_generate_prc_message(self):
result = level0.generate_prc_msg(
instrument='MSC',
level0_id="1060940003452930",
obs_id="10609400034529",
detector="30")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_find_process(self): def test_find_process(self):
result = level0.find_process(level0_id="1060940003452925") result = level0.find_process(level0_id="1060940003452925")
print(result) print(result)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment