Commit 56c69913 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

Refactor DAG parameter names for consistency. Changed `dag_id` to `dag` and...

Refactor DAG parameter names for consistency. Changed `dag_id` to `dag` and `dag_run_id` to `dag_run` across level0, level1, and level2 modules and tests.
parent c4f41f74
Pipeline #8745 failed with stages
in 0 seconds
...@@ -124,20 +124,20 @@ def update_qc0_status(level0_id: str, qc0_status: int, dataset: str = constants. ...@@ -124,20 +124,20 @@ def update_qc0_status(level0_id: str, qc0_status: int, dataset: str = constants.
""" """
return request.put(f"/api/level0/qc0_status/{level0_id}", {'qc0_status': qc0_status, 'dataset': dataset}) return request.put(f"/api/level0/qc0_status/{level0_id}", {'qc0_status': qc0_status, 'dataset': dataset})
def update_prc_status(level0_id: str, dag_run_id: str, prc_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result: def update_prc_status(level0_id: str, dag_run: str, prc_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result:
""" """
更新0级数据的处理状态 更新0级数据的处理状态
Args: Args:
level0_id (str): 0级数据的ID level0_id (str): 0级数据的ID
dag_run_id (str): 运行ID dag_run (str): DAG运行标识
prc_status (int): 处理状态 prc_status (int): 处理状态
dataset (str): 数据集名称 dataset (str): 数据集名称
Returns: Returns:
Result: 操作结果 Result: 操作结果
""" """
return request.put(f"/api/level0/prc_status/{level0_id}/{dag_run_id}", {'prc_status': prc_status, 'dataset': dataset}) return request.put(f"/api/level0/prc_status/{level0_id}/{dag_run}", {'prc_status': prc_status, 'dataset': dataset})
def write(local_file: str, def write(local_file: str,
dataset: str = constants.DEFAULT_DATASET, dataset: str = constants.DEFAULT_DATASET,
...@@ -185,8 +185,8 @@ def write_cat(local_file: str, ...@@ -185,8 +185,8 @@ def write_cat(local_file: str,
raise FileNotFoundError(local_file) raise FileNotFoundError(local_file)
return request.post_file("/api/level0/cat/file", local_file, params) return request.post_file("/api/level0/cat/file", local_file, params)
def find_process(dag_id: Optional[str] = None, def find_process(dag: Optional[str] = None,
dag_run_id: Optional[str] = None, dag_run: Optional[str] = None,
batch_id: Optional[str] = None, batch_id: Optional[str] = None,
level0_id: Optional[str] = None, level0_id: Optional[str] = None,
dataset: Optional[str] = None, dataset: Optional[str] = None,
...@@ -199,8 +199,8 @@ def find_process(dag_id: Optional[str] = None, ...@@ -199,8 +199,8 @@ def find_process(dag_id: Optional[str] = None,
查询0级数据处理过程 查询0级数据处理过程
Args: Args:
dag_id (str): 管线ID dag (str): DAG标识
dag_run_id (str): 运行ID dag_run (str): DAG运行标识
batch_id (str): 批次ID batch_id (str): 批次ID
level0_id (str): 0级数据的ID level0_id (str): 0级数据的ID
dataset (str): 数据集 dataset (str): 数据集
...@@ -215,8 +215,8 @@ def find_process(dag_id: Optional[str] = None, ...@@ -215,8 +215,8 @@ def find_process(dag_id: Optional[str] = None,
""" """
params = { params = {
'dag_id': dag_id, 'dag': dag,
'dag_run_id': dag_run_id, 'dag_run': dag_run,
'batch_id': batch_id, 'batch_id': batch_id,
'level0_id': level0_id, 'level0_id': level0_id,
'dataset': dataset, 'dataset': dataset,
...@@ -236,8 +236,8 @@ def find_process(dag_id: Optional[str] = None, ...@@ -236,8 +236,8 @@ def find_process(dag_id: Optional[str] = None,
return request.post("/api/level0/process", params) return request.post("/api/level0/process", params)
def add_process(level0_id: str, def add_process(level0_id: str,
dag_id: str, dag: str,
dag_run_id: str, dag_run: str,
batch_id: Optional[str] = None, batch_id: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET, dataset: str = constants.DEFAULT_DATASET,
prc_status: int = -1024, prc_status: int = -1024,
...@@ -249,8 +249,8 @@ def add_process(level0_id: str, ...@@ -249,8 +249,8 @@ def add_process(level0_id: str,
Args: Args:
level0_id (str): 0级数据的ID level0_id (str): 0级数据的ID
dag_id (str): 管线ID dag (str): DAG标识
dag_run_id (str): 运行ID dag_run (str): DAG运行标识
dataset (str): 数据集 dataset (str): 数据集
batch_id (str): 批次ID batch_id (str): 批次ID
prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS" prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
...@@ -264,8 +264,8 @@ def add_process(level0_id: str, ...@@ -264,8 +264,8 @@ def add_process(level0_id: str,
""" """
params = { params = {
'level0_id': level0_id, 'level0_id': level0_id,
'dag_id': dag_id, 'dag': dag,
'dag_run_id': dag_run_id, 'dag_run': dag_run,
'dataset': dataset, 'dataset': dataset,
'batch_id': batch_id, 'batch_id': batch_id,
'prc_time': prc_time, 'prc_time': prc_time,
......
...@@ -162,7 +162,7 @@ def write(local_file: Union[IO, str], ...@@ -162,7 +162,7 @@ def write(local_file: Union[IO, str],
level1_id: str, level1_id: str,
data_model: str, data_model: str,
file_name: str, file_name: str,
dag_id: str, dag: str,
pmapname: str, pmapname: str,
build: int, build: int,
level0_id: Optional[str] = None, level0_id: Optional[str] = None,
...@@ -182,7 +182,7 @@ def write(local_file: Union[IO, str], ...@@ -182,7 +182,7 @@ def write(local_file: Union[IO, str],
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
data_model (str): 数据类型 data_model (str): 数据类型
file_name (str): 1级数据文件名 file_name (str): 1级数据文件名
dag_id (str): 管线ID dag (str): DAG标识
pmapname (str): CCDS pmap名称 pmapname (str): CCDS pmap名称
build (int): 构建号 build (int): 构建号
dataset (str): 数据集名称 dataset (str): 数据集名称
...@@ -201,7 +201,7 @@ def write(local_file: Union[IO, str], ...@@ -201,7 +201,7 @@ def write(local_file: Union[IO, str],
'level1_id': level1_id, 'level1_id': level1_id,
'data_model': data_model, 'data_model': data_model,
'file_name': file_name, 'file_name': file_name,
'dag_id': dag_id, 'dag': dag,
'pmapname': pmapname, 'pmapname': pmapname,
'build': build, 'build': build,
'dataset': dataset, 'dataset': dataset,
...@@ -219,8 +219,8 @@ def write(local_file: Union[IO, str], ...@@ -219,8 +219,8 @@ def write(local_file: Union[IO, str],
return request.post_file("/api/level1/file", local_file, params) return request.post_file("/api/level1/file", local_file, params)
return request.post_bytesio("/api/level1/file", local_file, params) return request.post_bytesio("/api/level1/file", local_file, params)
def find_process(dag_id: Optional[str] = None, def find_process(dag: Optional[str] = None,
dag_run_id: Optional[str] = None, dag_run: Optional[str] = None,
batch_id: Optional[str] = None, batch_id: Optional[str] = None,
level1_id: Optional[str] = None, level1_id: Optional[str] = None,
dataset: Optional[str] = None, dataset: Optional[str] = None,
...@@ -233,8 +233,8 @@ def find_process(dag_id: Optional[str] = None, ...@@ -233,8 +233,8 @@ def find_process(dag_id: Optional[str] = None,
查询0级数据处理过程 查询0级数据处理过程
Args: Args:
dag_id (str): 管线ID dag (str): DAG标识
dag_run_id (str): 运行ID dag_run (str): DAG运行标识
batch_id (str): 批次ID batch_id (str): 批次ID
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
dataset (str): 数据集 dataset (str): 数据集
...@@ -249,8 +249,8 @@ def find_process(dag_id: Optional[str] = None, ...@@ -249,8 +249,8 @@ def find_process(dag_id: Optional[str] = None,
""" """
params = { params = {
'dag_id': dag_id, 'dag': dag,
'dag_run_id': dag_run_id, 'dag_run': dag_run,
'batch_id': batch_id, 'batch_id': batch_id,
'level1_id': level1_id, 'level1_id': level1_id,
'dataset': dataset, 'dataset': dataset,
...@@ -270,8 +270,8 @@ def find_process(dag_id: Optional[str] = None, ...@@ -270,8 +270,8 @@ def find_process(dag_id: Optional[str] = None,
return request.post("/api/level1/process", params) return request.post("/api/level1/process", params)
def add_process(level1_id: str, def add_process(level1_id: str,
dag_id: str, dag: str,
dag_run_id: str, dag_run: str,
dataset: str = constants.DEFAULT_DATASET, dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID, batch_id: str = constants.DEFAULT_BATCH_ID,
prc_time: str = utils.get_current_time(), prc_time: str = utils.get_current_time(),
...@@ -283,8 +283,8 @@ def add_process(level1_id: str, ...@@ -283,8 +283,8 @@ def add_process(level1_id: str,
Args: Args:
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
dag_id (str): 管线ID dag (str): DAG标识
dag_run_id (str): 运行ID dag_run (str): DAG运行标识
dataset (str): 数据集 dataset (str): 数据集
batch_id (str): 批次ID batch_id (str): 批次ID
prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS" prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
...@@ -298,8 +298,8 @@ def add_process(level1_id: str, ...@@ -298,8 +298,8 @@ def add_process(level1_id: str,
""" """
params = { params = {
'level1_id': level1_id, 'level1_id': level1_id,
'dag_id': dag_id, 'dag': dag,
'dag_run_id': dag_run_id, 'dag_run': dag_run,
'dataset': dataset, 'dataset': dataset,
'batch_id': batch_id, 'batch_id': batch_id,
'prc_time': prc_time, 'prc_time': prc_time,
......
...@@ -150,7 +150,7 @@ def write(local_file: Union[IO, str], ...@@ -150,7 +150,7 @@ def write(local_file: Union[IO, str],
level2_id: str, level2_id: str,
data_model: str, data_model: str,
file_name: str, file_name: str,
dag_id: str, dag: str,
build: int, build: int,
level0_id: Optional[str] = None, level0_id: Optional[str] = None,
level1_id: Optional[str] = None, level1_id: Optional[str] = None,
...@@ -168,7 +168,7 @@ def write(local_file: Union[IO, str], ...@@ -168,7 +168,7 @@ def write(local_file: Union[IO, str],
level2_id (str): 2级数据的ID level2_id (str): 2级数据的ID
data_model (str): 数据类型,如'csst-msc-l2-mbi-cat' data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
file_name (str): 2级数据文件名 file_name (str): 2级数据文件名
dag_id (str): 管线ID dag (str): DAG标识
build (int): 构建号 build (int): 构建号
level0_id (Optional[str]): 0级数据的ID默认为 None level0_id (Optional[str]): 0级数据的ID默认为 None
level1_id (Optional[str]): 1级数据的ID默认为 None level1_id (Optional[str]): 1级数据的ID默认为 None
...@@ -191,7 +191,7 @@ def write(local_file: Union[IO, str], ...@@ -191,7 +191,7 @@ def write(local_file: Union[IO, str],
'brick_id': brick_id, 'brick_id': brick_id,
'file_name': file_name, 'file_name': file_name,
'data_model': data_model, 'data_model': data_model,
'dag_id': dag_id, 'dag': dag,
'build': build, 'build': build,
'dataset': dataset, 'dataset': dataset,
'batch_id': batch_id, 'batch_id': batch_id,
......
...@@ -35,7 +35,7 @@ class Level0TestCase(unittest.TestCase): ...@@ -35,7 +35,7 @@ class Level0TestCase(unittest.TestCase):
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_update_prc_status(self): def test_update_prc_status(self):
result = level0.update_prc_status(level0_id = "1010910015799127", dag_run_id="202411071002481234", prc_status=3) result = level0.update_prc_status(level0_id = "1010910015799127", dag_run="202411071002481234", prc_status=3)
print(result) print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
...@@ -51,8 +51,8 @@ class Level0TestCase(unittest.TestCase): ...@@ -51,8 +51,8 @@ class Level0TestCase(unittest.TestCase):
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_add_process(self): def test_add_process(self):
result = level0.add_process(level0_id="1060940003452925", result = level0.add_process(level0_id="1060940003452925",
dag_id="csst-msc-l1-mbi", dag="csst-msc-l1-mbi",
dag_run_id="202411071002481234", dag_run="202411071002481234",
dataset="v93", dataset="v93",
batch_id="v930batch", batch_id="v930batch",
prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="") prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="")
......
...@@ -45,7 +45,7 @@ class Level1TestCase(unittest.TestCase): ...@@ -45,7 +45,7 @@ class Level1TestCase(unittest.TestCase):
instrument = "MSC", instrument = "MSC",
level0_id="1010910015799127", level0_id="1010910015799127",
level1_id = "1010910015799127", level1_id = "1010910015799127",
dag_id = "csst-msc-l1-mbi", dag = "csst-msc-l1-mbi",
file_name= "CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L1_V01.fits", file_name= "CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L1_V01.fits",
pmapname="csst_000128.pmap", pmapname="csst_000128.pmap",
data_model='csst-msc-l1-mbi', data_model='csst-msc-l1-mbi',
...@@ -60,7 +60,7 @@ class Level1TestCase(unittest.TestCase): ...@@ -60,7 +60,7 @@ class Level1TestCase(unittest.TestCase):
print(result) print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_add_process(self): def test_add_process(self):
result = level1.add_process(level1_id="1060940003452925", dag_id="csst-msc-l1-mbi", dag_run_id="202411071002481234", prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="") result = level1.add_process(level1_id="1060940003452925", dag="csst-msc-l1-mbi", dag_run="202411071002481234", prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="")
print(result) print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
......
...@@ -50,7 +50,7 @@ class Level2TestCase(unittest.TestCase): ...@@ -50,7 +50,7 @@ class Level2TestCase(unittest.TestCase):
file_name = "CSST_MSC_MS_SCI_20310423084104_20310423084334_10109400638867_12_L2_V01_CATMIX.fits", file_name = "CSST_MSC_MS_SCI_20310423084104_20310423084334_10109400638867_12_L2_V01_CATMIX.fits",
brick_id = 254, brick_id = 254,
qc2_status=12, qc2_status=12,
dag_id = "csst-msc-l2-mbi", dag = "csst-msc-l2-mbi",
build=1, build=1,
dataset="094", dataset="094",
prc_status=1) prc_status=1)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment