Commit 6612f40d authored by Wei Shoulin's avatar Wei Shoulin
Browse files

run_id -> dag_run_id pipeline_run_id -> dag_id

#11599
merged 1 commit into
apache :  master
from
sridhar-rl :  fix-run-id
run_id -> dag_run_id, pipeline_run_id -> dag_id #11599
<here is a image 52677f8958282441-0e06207544895614>
parent 0c4e9bf0
Pipeline #8004 canceled with stages
...@@ -125,21 +125,21 @@ def update_qc0_status(level0_id: str, file_type: str, qc0_status: int, dataset: ...@@ -125,21 +125,21 @@ def update_qc0_status(level0_id: str, file_type: str, qc0_status: int, dataset:
""" """
return request.put(f"/api/level0/qc0_status/{level0_id}", {'file_type': file_type, 'qc0_status': qc0_status, 'dataset': dataset}) return request.put(f"/api/level0/qc0_status/{level0_id}", {'file_type': file_type, 'qc0_status': qc0_status, 'dataset': dataset})
def update_prc_status(level0_id: str, file_type: str, run_id: str, prc_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result: def update_prc_status(level0_id: str, file_type: str, dag_run_id: str, prc_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result:
""" """
更新0级数据的处理状态 更新0级数据的处理状态
Args: Args:
level0_id (str): 0级数据的ID level0_id (str): 0级数据的ID
file_type (str): 文件类型 file_type (str): 文件类型
run_id (str): 运行ID dag_run_id (str): 运行ID
prc_status (int): 处理状态 prc_status (int): 处理状态
dataset (str): 数据集名称 dataset (str): 数据集名称
Returns: Returns:
Result: 操作结果 Result: 操作结果
""" """
return request.put(f"/api/level0/prc_status/{level0_id}/{run_id}", {'file_type': file_type, 'prc_status': prc_status, 'dataset': dataset}) return request.put(f"/api/level0/prc_status/{level0_id}/{dag_run_id}", {'file_type': file_type, 'prc_status': prc_status, 'dataset': dataset})
def write(local_file: str, def write(local_file: str,
dataset: str = constants.DEFAULT_DATASET, dataset: str = constants.DEFAULT_DATASET,
...@@ -232,8 +232,8 @@ def process_list(level0_id: str) -> Result: ...@@ -232,8 +232,8 @@ def process_list(level0_id: str) -> Result:
return request.get(f"/api/level0/prc/{level0_id}") return request.get(f"/api/level0/prc/{level0_id}")
def add_process(level0_id: str, def add_process(level0_id: str,
pipeline_id: str, dag_id: str,
run_id: str, dag_run_id: str,
batch_id: Optional[str] = None, batch_id: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET, dataset: str = constants.DEFAULT_DATASET,
prc_status: int = -1024, prc_status: int = -1024,
...@@ -245,8 +245,8 @@ def add_process(level0_id: str, ...@@ -245,8 +245,8 @@ def add_process(level0_id: str,
Args: Args:
level0_id (str): 0级数据的ID level0_id (str): 0级数据的ID
pipeline_id (str): 管线ID dag_id (str): 管线ID
run_id (str): 运行ID dag_run_id (str): 运行ID
dataset (str): 数据集 dataset (str): 数据集
batch_id (str): 批次ID batch_id (str): 批次ID
prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS" prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
...@@ -260,8 +260,8 @@ def add_process(level0_id: str, ...@@ -260,8 +260,8 @@ def add_process(level0_id: str,
""" """
params = { params = {
'level0_id': level0_id, 'level0_id': level0_id,
'pipeline_id': pipeline_id, 'dag_id': dag_id,
'run_id': run_id, 'dag_run_id': dag_run_id,
'dataset': dataset, 'dataset': dataset,
'batch_id': batch_id, 'batch_id': batch_id,
'prc_time': prc_time, 'prc_time': prc_time,
......
...@@ -157,7 +157,7 @@ def write(local_file: Union[IO, str], ...@@ -157,7 +157,7 @@ def write(local_file: Union[IO, str],
level1_id: str, level1_id: str,
file_type: str, file_type: str,
file_name: str, file_name: str,
pipeline_id: str, dag_id: str,
pmapname: str, pmapname: str,
build: int, build: int,
level0_id: Optional[str] = None, level0_id: Optional[str] = None,
...@@ -175,7 +175,7 @@ def write(local_file: Union[IO, str], ...@@ -175,7 +175,7 @@ def write(local_file: Union[IO, str],
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
file_type (str): 文件类型 file_type (str): 文件类型
file_name (str): 1级数据文件名 file_name (str): 1级数据文件名
pipeline_id (str): 管线ID dag_id (str): 管线ID
pmapname (str): CCDS pmap名称 pmapname (str): CCDS pmap名称
build (int): 构建号 build (int): 构建号
dataset (str): 数据集名称 dataset (str): 数据集名称
...@@ -192,7 +192,7 @@ def write(local_file: Union[IO, str], ...@@ -192,7 +192,7 @@ def write(local_file: Union[IO, str],
'level1_id': level1_id, 'level1_id': level1_id,
'file_type': file_type, 'file_type': file_type,
'file_name': file_name, 'file_name': file_name,
'pipeline_id': pipeline_id, 'dag_id': dag_id,
'pmapname': pmapname, 'pmapname': pmapname,
'build': build, 'build': build,
'dataset': dataset, 'dataset': dataset,
...@@ -220,7 +220,7 @@ def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], ...@@ -220,7 +220,7 @@ def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
Args: Args:
module_id (str): 模块ID module_id (str): 模块ID
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
pipeline_id (str): 流水管线ID,默认为空字符串 dag_id (str): 流水管线ID,默认为空字符串
dataset (str): 数据集 dataset (str): 数据集
batch_id (str): 批次ID batch_id (str): 批次ID
...@@ -250,8 +250,8 @@ def process_list(level1_id: str) -> Result: ...@@ -250,8 +250,8 @@ def process_list(level1_id: str) -> Result:
return request.get(f"/api/level1/prc/{level1_id}") return request.get(f"/api/level1/prc/{level1_id}")
def add_process(level1_id: str, def add_process(level1_id: str,
pipeline_id: str, dag_id: str,
run_id: str, dag_run_id: str,
dataset: str = constants.DEFAULT_DATASET, dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID, batch_id: str = constants.DEFAULT_BATCH_ID,
prc_time: str = utils.get_current_time(), prc_time: str = utils.get_current_time(),
...@@ -263,8 +263,8 @@ def add_process(level1_id: str, ...@@ -263,8 +263,8 @@ def add_process(level1_id: str,
Args: Args:
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
pipeline_id (str): 管线ID dag_id (str): 管线ID
run_id (str): 运行ID dag_run_id (str): 运行ID
dataset (str): 数据集 dataset (str): 数据集
batch_id (str): 批次ID batch_id (str): 批次ID
prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS" prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
...@@ -278,8 +278,8 @@ def add_process(level1_id: str, ...@@ -278,8 +278,8 @@ def add_process(level1_id: str,
""" """
params = { params = {
'level1_id': level1_id, 'level1_id': level1_id,
'pipeline_id': pipeline_id, 'dag_id': dag_id,
'run_id': run_id, 'dag_run_id': dag_run_id,
'dataset': dataset, 'dataset': dataset,
'batch_id': batch_id, 'batch_id': batch_id,
'prc_time': prc_time, 'prc_time': prc_time,
......
...@@ -150,7 +150,7 @@ def write(local_file: Union[IO, str], ...@@ -150,7 +150,7 @@ def write(local_file: Union[IO, str],
level2_id: str, level2_id: str,
data_type: str, data_type: str,
file_name: str, file_name: str,
pipeline_id: str, dag_id: str,
build: int, build: int,
level0_id: Optional[str] = None, level0_id: Optional[str] = None,
level1_id: Optional[str] = None, level1_id: Optional[str] = None,
...@@ -168,7 +168,7 @@ def write(local_file: Union[IO, str], ...@@ -168,7 +168,7 @@ def write(local_file: Union[IO, str],
level2_id (str): 2级数据的ID level2_id (str): 2级数据的ID
data_type (str): 数据类型,如'csst-msc-l2-mbi-cat' data_type (str): 数据类型,如'csst-msc-l2-mbi-cat'
file_name (str): 2级数据文件名 file_name (str): 2级数据文件名
pipeline_id (str): 管线ID dag_id (str): 管线ID
build (int): 构建号 build (int): 构建号
level0_id (Optional[str]): 0级数据的ID默认为 None level0_id (Optional[str]): 0级数据的ID默认为 None
level1_id (Optional[str]): 1级数据的ID默认为 None level1_id (Optional[str]): 1级数据的ID默认为 None
...@@ -191,7 +191,7 @@ def write(local_file: Union[IO, str], ...@@ -191,7 +191,7 @@ def write(local_file: Union[IO, str],
'brick_id': brick_id, 'brick_id': brick_id,
'file_name': file_name, 'file_name': file_name,
'data_type': data_type, 'data_type': data_type,
'pipeline_id': pipeline_id, 'dag_id': dag_id,
'build': build, 'build': build,
'dataset': dataset, 'dataset': dataset,
'batch_id': batch_id, 'batch_id': batch_id,
......
...@@ -136,8 +136,8 @@ print(result)# result.data为list类型,包含0级数据处理记录列表 ...@@ -136,8 +136,8 @@ print(result)# result.data为list类型,包含0级数据处理记录列表
from csst_dfs_client import level0 from csst_dfs_client import level0
result = level0.add_process( result = level0.add_process(
level0_id='some_level0_id', level0_id='some_level0_id',
pipeline_id='some_pipeline_id', dag_id='some_dag_id',
run_id='some_run_id', dag_run_id='some_dag_run_id',
batch_id='some_batch_id', batch_id='some_batch_id',
dataset='default_dataset', dataset='default_dataset',
prc_status=1, prc_status=1,
......
...@@ -117,7 +117,7 @@ module_id = 'MSC' ...@@ -117,7 +117,7 @@ module_id = 'MSC'
level1_id = 'some_level1_id' level1_id = 'some_level1_id'
file_type = 'SCI' file_type = 'SCI'
file_name = 'level1_data.fits' file_name = 'level1_data.fits'
pipeline_id = 'some_pipeline_id' dag_id = 'some_dag_id'
pmapname = 'some_pmapname' pmapname = 'some_pmapname'
build = 1 build = 1
level0_id = 'some_level0_id' level0_id = 'some_level0_id'
...@@ -125,7 +125,7 @@ dataset = 'default_dataset' ...@@ -125,7 +125,7 @@ dataset = 'default_dataset'
batch_id = 'default_batch_id' batch_id = 'default_batch_id'
qc1_status = 0 qc1_status = 0
result = level1.write(local_file_path, module_id, level1_id, file_type, file_name, pipeline_id, pmapname, build, level0_id, dataset, batch_id, qc1_status) result = level1.write(local_file_path, module_id, level1_id, file_type, file_name, dag_id, pmapname, build, level0_id, dataset, batch_id, qc1_status)
print(result) print(result)
``` ```
...@@ -152,8 +152,8 @@ print(result) # result.data为list类型,包含1级数据处理记录列表 ...@@ -152,8 +152,8 @@ print(result) # result.data为list类型,包含1级数据处理记录列表
from csst_dfs_client import level1 from csst_dfs_client import level1
result = level1.add_process( result = level1.add_process(
level1_id='some_level1_id', level1_id='some_level1_id',
pipeline_id='some_pipeline_id', dag_id='some_dag_id',
run_id='some_run_id', dag_run_id='some_dag_run_id',
batch_id='some_batch_id', batch_id='some_batch_id',
dataset='default_dataset', dataset='default_dataset',
prc_status=1, prc_status=1,
......
...@@ -84,7 +84,7 @@ module_id = 'MSC' ...@@ -84,7 +84,7 @@ module_id = 'MSC'
level2_id = 'some_level2_id' level2_id = 'some_level2_id'
file_type = 'SCI' file_type = 'SCI'
file_name = 'level2_data.fits' file_name = 'level2_data.fits'
pipeline_id = 'some_pipeline_id' dag_id = 'some_dag_id'
build = 1 build = 1
level0_id = 'some_level0_id' level0_id = 'some_level0_id'
level1_id = 'some_level1_id' level1_id = 'some_level1_id'
...@@ -92,7 +92,7 @@ dataset = 'default_dataset' ...@@ -92,7 +92,7 @@ dataset = 'default_dataset'
batch_id = 'default_batch_id' batch_id = 'default_batch_id'
qc2_status = 0 qc2_status = 0
result = level2.write(local_file_path, module_id, level2_id, file_type, file_name, pipeline_id, build, level0_id, level1_id, dataset, batch_id, qc2_status) result = level2.write(local_file_path, module_id, level2_id, file_type, file_name, dag_id, build, level0_id, level1_id, dataset, batch_id, qc2_status)
print(result) print(result)
``` ```
......
...@@ -31,8 +31,8 @@ ...@@ -31,8 +31,8 @@
| 列名 | 类型 | 说明 | | 列名 | 类型 | 说明 |
|------------|-------------------------|----------------------| |------------|-------------------------|----------------------|
| level0_id | str | 0级数据的ID | | level0_id | str | 0级数据的ID |
| pipeline_id | str | 流水线的ID | | dag_id | str | 流水线的ID |
| run_id | str | run ID | | dag_run_id | str | run ID |
| batch_id | str | 批次ID | | batch_id | str | 批次ID |
| dataset | str | 数据集标识 | | dataset | str | 数据集标识 |
| prc_status | int | 处理状态 | | prc_status | int | 处理状态 |
...@@ -60,10 +60,10 @@ ...@@ -60,10 +60,10 @@
| qc1_time | datetime | QC1的时间 | | qc1_time | datetime | QC1的时间 |
| prc_status | int | 处理状态,默认为-1024未处理 | | prc_status | int | 处理状态,默认为-1024未处理 |
| prc_time | datetime | 处理时间 | | prc_time | datetime | 处理时间 |
| pipeline_id | str | 处理管线ID | | dag_id | str | 处理管线ID |
| pmapname | str | ccds pmap的名称 | | pmapname | str | ccds pmap的名称 |
| build | int | 构建版本编号 | | build | int | 构建版本编号 |
| run_id | str | 运行ID | | dag_run_id | str | 运行ID |
| batch_id | str | 运行批次ID | | batch_id | str | 运行批次ID |
| dataset | str | 数据集标识 | | dataset | str | 数据集标识 |
| create_time | datetime | 创建时间 | | create_time | datetime | 创建时间 |
...@@ -87,8 +87,8 @@ ...@@ -87,8 +87,8 @@
| 列名 | 类型 | 说明 | | 列名 | 类型 | 说明 |
|------------|-------------------------|----------------| |------------|-------------------------|----------------|
| level1_id | str | 1级数据的ID | | level1_id | str | 1级数据的ID |
| pipeline_id | str | 流水线的ID | | dag_id | str | 流水线的ID |
| run_id | str | run的ID | | dag_run_id | str | run的ID |
| dataset | str | 数据集 | | dataset | str | 数据集 |
| batch_id | str | 批次ID | | batch_id | str | 批次ID |
| prc_status | int | 处理状态 | | prc_status | int | 处理状态 |
...@@ -118,9 +118,9 @@ ...@@ -118,9 +118,9 @@
| qc2_time | datetime | QC2时间 | | qc2_time | datetime | QC2时间 |
| prc_status | int | 处理状态,默认为-1024 | | prc_status | int | 处理状态,默认为-1024 |
| prc_time | datetime | 数据处理时间 | | prc_time | datetime | 数据处理时间 |
| pipeline_id | str | 处理管道ID | | dag_id | str | 处理管道ID |
| build | int | 软件构建版本的编号 | | build | int | 软件构建版本的编号 |
| run_id | str | 运行ID | | dag_run_id | str | 运行ID |
| dataset | str | 数据集标识 | | dataset | str | 数据集标识 |
| batch_id | str | 批次ID | | batch_id | str | 批次ID |
| create_time | datetime | 创建的时间 | | create_time | datetime | 创建的时间 |
......
...@@ -37,7 +37,7 @@ class Level0TestCase(unittest.TestCase): ...@@ -37,7 +37,7 @@ class Level0TestCase(unittest.TestCase):
def test_update_prc_status(self): def test_update_prc_status(self):
result = level0.update_prc_status(level0_id = "1010910015799127", file_type='SCI', result = level0.update_prc_status(level0_id = "1010910015799127", file_type='SCI',
run_id="202411071002481234", prc_status=3) dag_run_id="202411071002481234", prc_status=3)
print(result) print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
...@@ -62,8 +62,8 @@ class Level0TestCase(unittest.TestCase): ...@@ -62,8 +62,8 @@ class Level0TestCase(unittest.TestCase):
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_add_process(self): def test_add_process(self):
result = level0.add_process(level0_id="1060940003452925", result = level0.add_process(level0_id="1060940003452925",
pipeline_id="csst-msc-l1-mbi", dag_id="csst-msc-l1-mbi",
run_id="202411071002481234", dag_run_id="202411071002481234",
dataset="v93", dataset="v93",
batch_id="v930batch", batch_id="v930batch",
prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="") prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="")
......
...@@ -43,7 +43,7 @@ class Level1TestCase(unittest.TestCase): ...@@ -43,7 +43,7 @@ class Level1TestCase(unittest.TestCase):
module_id = "MSC", module_id = "MSC",
level0_id="1010910015799127", level0_id="1010910015799127",
level1_id = "1010910015799127", level1_id = "1010910015799127",
pipeline_id = "csst-msc-l1-mbi", dag_id = "csst-msc-l1-mbi",
file_name= "CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L1_V01.fits", file_name= "CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L1_V01.fits",
pmapname="csst_000128.pmap", pmapname="csst_000128.pmap",
file_type='SCI', file_type='SCI',
...@@ -58,7 +58,7 @@ class Level1TestCase(unittest.TestCase): ...@@ -58,7 +58,7 @@ class Level1TestCase(unittest.TestCase):
print(result) print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_add_process(self): def test_add_process(self):
result = level1.add_process(level1_id="1060940003452925", pipeline_id="csst-msc-l1-mbi", run_id="202411071002481234", prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="") result = level1.add_process(level1_id="1060940003452925", dag_id="csst-msc-l1-mbi", dag_run_id="202411071002481234", prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="")
print(result) print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
......
...@@ -50,7 +50,7 @@ class Level2TestCase(unittest.TestCase): ...@@ -50,7 +50,7 @@ class Level2TestCase(unittest.TestCase):
file_name = "CSST_MSC_MS_SCI_20310423084104_20310423084334_10109400638867_12_L2_V01_CATMIX.fits", file_name = "CSST_MSC_MS_SCI_20310423084104_20310423084334_10109400638867_12_L2_V01_CATMIX.fits",
brick_id = 254, brick_id = 254,
qc2_status=12, qc2_status=12,
pipeline_id = "csst-msc-l2-mbi", dag_id = "csst-msc-l2-mbi",
build=1, build=1,
dataset="094", dataset="094",
prc_status=1) prc_status=1)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment