Commit 26305abf authored by Wei Shoulin's avatar Wei Shoulin
Browse files

find process

parent fe778337
Pipeline #8214 failed with stages
in 0 seconds
......@@ -221,25 +221,62 @@ def generate_prc_msg(instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
return request.put(f"/api/level0/prc/{instrument}", params)
def process_list(level0_id: str) -> Result:
def find_process(dag_id: Optional[str] = None,
dag_run_id: Optional[str] = None,
batch_id: Optional[str] = None,
level0_id: Optional[str] = None,
dataset: Optional[str] = None,
prc_module: Optional[str] = None,
prc_status: Optional[int] = None,
prc_time: Optional[DateTimeTuple] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
通过 level0 的 ID 查询0级数据处理过程
查询0级数据处理过程
Args:
dag_id (str): 管线ID
dag_run_id (str): 运行ID
batch_id (str): 批次ID
level0_id (str): 0级数据的ID
dataset (str): 数据集
prc_module (str): 处理模块
prc_status (int): 处理状态
prc_time (DateTimeTuple): 处理时间范围
page (int): 页码,默认为1
limit (int): 每页数量 0: 不限制
Returns:
Result: 成功后,Result.data为数据列表,失败message为失败原因
"""
return request.get(f"/api/level0/prc/{level0_id}")
params = {
'dag_id': dag_id,
'dag_run_id': dag_run_id,
'batch_id': batch_id,
'level0_id': level0_id,
'dataset': dataset,
'prc_module': prc_module,
'prc_status': prc_status,
'prc_time_start': None,
'prc_time_end': None,
'page': page,
'limit': limit
}
if prc_time is not None:
params['prc_time_start'], params['prc_time_end'] = prc_time
if params['prc_time_start'] and utils.is_valid_datetime_format(params['prc_time_start']):
pass
if params['prc_time_end'] and utils.is_valid_datetime_format(params['prc_time_end']):
pass
return request.post("/api/level0/process", params)
def add_process(level0_id: str,
dag_id: str,
dag_run_id: str,
batch_id: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET,
prc_status: int = -1024,
prc_status: int = -1024,
prc_time: str = utils.get_current_time(),
prc_module: str = "",
message: str = "") -> Result:
......
......@@ -239,18 +239,55 @@ def generate_prc_msg(instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
return request.put(f"/api/level1/prc/{instrument}", params)
def process_list(level1_id: str) -> Result:
def find_process(dag_id: Optional[str] = None,
dag_run_id: Optional[str] = None,
batch_id: Optional[str] = None,
level1_id: Optional[str] = None,
dataset: Optional[str] = None,
prc_module: Optional[str] = None,
prc_status: Optional[int] = None,
prc_time: Optional[DateTimeTuple] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
通过 level1 的 ID 查询1级数据处理记录
查询0级数据处理过程
Args:
dag_id (str): 管线ID
dag_run_id (str): 运行ID
batch_id (str): 批次ID
level1_id (str): 1级数据的ID
dataset (str): 数据集
prc_module (str): 处理模块
prc_status (int): 处理状态
prc_time (DateTimeTuple): 处理时间范围
page (int): 页码,默认为1
limit (int): 每页数量 0: 不限制
Returns:
Result: 查询结果
Result: 成功后,Result.data为数据列表,失败message为失败原因
"""
return request.get(f"/api/level1/prc/{level1_id}")
params = {
'dag_id': dag_id,
'dag_run_id': dag_run_id,
'batch_id': batch_id,
'level1_id': level1_id,
'dataset': dataset,
'prc_module': prc_module,
'prc_status': prc_status,
'prc_time_start': None,
'prc_time_end': None,
'page': page,
'limit': limit
}
if prc_time is not None:
params['prc_time_start'], params['prc_time_end'] = prc_time
if params['prc_time_start'] and utils.is_valid_datetime_format(params['prc_time_start']):
pass
if params['prc_time_end'] and utils.is_valid_datetime_format(params['prc_time_end']):
pass
return request.post("/api/level1/process", params)
def add_process(level1_id: str,
dag_id: str,
......
......@@ -55,8 +55,8 @@ class Level0TestCase(unittest.TestCase):
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_process_list(self):
result = level0.process_list(level0_id="1060940003452925")
def test_find_process(self):
result = level0.find_process(level0_id="1060940003452925")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_add_process(self):
......
......@@ -53,8 +53,8 @@ class Level1TestCase(unittest.TestCase):
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_process_list(self):
result = level1.process_list(level1_id="1060940003452925")
def test_find_process(self):
result = level1.find_process(level1_id="1060940003452925")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_add_process(self):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment