Commit 62ba143a authored by Wei Shoulin's avatar Wei Shoulin
Browse files

dataset and batch_id

parent bbf900f8
Pipeline #7631 failed with stages
DEFAULT_DATASET = 'routine'
DEFAULT_BATCH_ID = 'auto'
\ No newline at end of file
...@@ -14,3 +14,6 @@ def is_valid_datetime_format(date_str: str, format='%Y-%m-%d %H:%M:%S') -> bool: ...@@ -14,3 +14,6 @@ def is_valid_datetime_format(date_str: str, format='%Y-%m-%d %H:%M:%S') -> bool:
return True return True
except ValueError: except ValueError:
raise ValueError(f"Incorrect data format, should be {format}") raise ValueError(f"Incorrect data format, should be {format}")
def get_current_time() -> str:
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
import os import os
from typing import Optional, Tuple, Literal from typing import Optional, Tuple, Literal
from .common import request, Result, utils from .common import request, Result, utils, constants
DateTimeTuple = Tuple[str, str] DateTimeTuple = Tuple[str, str]
...@@ -19,10 +19,11 @@ def find(project_id: Optional[str] = None, ...@@ -19,10 +19,11 @@ def find(project_id: Optional[str] = None,
dec_obj: Optional[int] = None, dec_obj: Optional[int] = None,
radius: Optional[float] = None, radius: Optional[float] = None,
object_name: Optional[str] = None, object_name: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET,
page: int = 1, page: int = 1,
limit: int = 0) -> Result: limit: int = 0) -> Result:
""" """
根据给定的参数搜索0级数据文件记录 根据给定的参数搜索0级数据文件记录
Args: Args:
project_id (Optional[str], optional): 项目ID. Defaults to None. project_id (Optional[str], optional): 项目ID. Defaults to None.
...@@ -40,6 +41,7 @@ def find(project_id: Optional[str] = None, ...@@ -40,6 +41,7 @@ def find(project_id: Optional[str] = None,
dec_obj (Optional[int], optional): 目标赤纬. Defaults to None. dec_obj (Optional[int], optional): 目标赤纬. Defaults to None.
radius (Optional[float], optional): 搜索半径. Defaults to None. radius (Optional[float], optional): 搜索半径. Defaults to None.
object_name (Optional[str], optional): 目标名称. Defaults to None. object_name (Optional[str], optional): 目标名称. Defaults to None.
dataset (Optional[str], optional): 数据集名称. Defaults to constants.DEFAULT_DATASET.
page (int, optional): 页码. Defaults to 1. page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0,不限制. limit (int, optional): 每页数量. Defaults to 0,不限制.
...@@ -66,6 +68,7 @@ def find(project_id: Optional[str] = None, ...@@ -66,6 +68,7 @@ def find(project_id: Optional[str] = None,
'obs_time_end': None, 'obs_time_end': None,
'create_time_start': None, 'create_time_start': None,
'create_time_end': None, 'create_time_end': None,
'dataset': dataset,
'page': page, 'page': page,
'limit': limit, 'limit': limit,
} }
...@@ -83,18 +86,18 @@ def get_by_id(_id: str) -> Result: ...@@ -83,18 +86,18 @@ def get_by_id(_id: str) -> Result:
def find_by_level0_id(level0_id: str) -> Result: def find_by_level0_id(level0_id: str) -> Result:
""" """
通过 level0 的 ID 查询0级数据 通过 level0 的 ID 查询0级数据
Args: Args:
level0_id (str): 0级数据的ID level0_id (str): 0级数据的ID
Returns: Returns:
Result: 查询结果 Result: 查询结果
""" """
return request.get(f"/api/level0/{level0_id}") return request.get(f"/api/level0/{level0_id}")
def update_qc0_status(level0_id: str, file_type: str, qc0_status: int) -> Result: def update_qc0_status(level0_id: str, file_type: str, qc0_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result:
""" """
更新0级数据的QC0状态 更新0级数据的QC0状态
...@@ -102,87 +105,114 @@ def update_qc0_status(level0_id: str, file_type: str, qc0_status: int) -> Result ...@@ -102,87 +105,114 @@ def update_qc0_status(level0_id: str, file_type: str, qc0_status: int) -> Result
level0_id (str): 0级数据的ID level0_id (str): 0级数据的ID
file_type (str): 文件类型 file_type (str): 文件类型
qc0_status (int): QC0状态 qc0_status (int): QC0状态
dataset (str): 数据集名称
Returns: Returns:
Result: 更新结果 Result: 更新结果
""" """
return request.put(f"/api/level0/qc0_status/{level0_id}", {'file_type': file_type, 'qc0_status': qc0_status}) return request.put(f"/api/level0/qc0_status/{level0_id}", {'file_type': file_type, 'qc0_status': qc0_status, 'dataset': dataset})
def update_prc_status(level0_id: str, file_type: str, prc_status: int) -> Result: def update_prc_status(level0_id: str, file_type: str, prc_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result:
""" """
更新0级数据的处理状态 更新0级数据的处理状态
Args: Args:
level0_id (str): 0级数据的ID。 level0_id (str): 0级数据的ID
file_type (str): 文件类型。 file_type (str): 文件类型
prc_status (int): 处理状态。 prc_status (int): 处理状态
dataset (str): 数据集名称
Returns: Returns:
Result: 操作结果 Result: 操作结果
""" """
return request.put(f"/api/level0/prc_status/{level0_id}", {'file_type': file_type, 'prc_status': prc_status}) return request.put(f"/api/level0/prc_status/{level0_id}", {'file_type': file_type, 'prc_status': prc_status, 'dataset': dataset})
def write(local_file: str, **kwargs) -> Result: def write(local_file: str,
dataset: str = constants.DEFAULT_DATASET,
**kwargs) -> Result:
""" """
将本地文件写入DFS中 将本地文件写入DFS中
Args: Args:
local_file (str]): 文件路径。 local_file (str]): 文件路径
**kwargs: 额外的关键字参数,这些参数将传递给DFS。 dataset (Optional[str], optional): 数据集名称. Defaults to None.
**kwargs: 额外的关键字参数,这些参数将传递给DFS
Returns: Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象 Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象
""" """
params = {
'dataset': dataset,
}
params.update(kwargs)
if not os.path.exists(local_file): if not os.path.exists(local_file):
raise FileNotFoundError(local_file) raise FileNotFoundError(local_file)
return request.post_file("/api/level0/file", local_file, kwargs) return request.post_file("/api/level0/file", local_file, params)
def write_cat(local_file: str, **kwargs) -> Result: def write_cat(local_file: str,
dataset: str = constants.DEFAULT_DATASET,
**kwargs) -> Result:
""" """
主巡天仿真数据的星表本地文件写入DFS中 主巡天仿真数据的星表本地文件写入DFS中
Args: Args:
local_file (str]): 文件路径。 local_file (str]): 文件路径
**kwargs: 额外的关键字参数,这些参数将传递给DFS。 dataset (Optional[str], optional): 数据集名称. Defaults to None.
**kwargs: 额外的关键字参数,这些参数将传递给DFS
Returns: Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象 Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象
""" """
params = {
'dataset': dataset,
}
params.update(kwargs)
if not os.path.exists(local_file): if not os.path.exists(local_file):
raise FileNotFoundError(local_file) raise FileNotFoundError(local_file)
return request.post_file("/api/level0/cat/file", local_file, kwargs) return request.post_file("/api/level0/cat/file", local_file, params)
def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
obs_id: str, obs_id: str,
detector_no: str, detector_no: str,
pipeline_id: Optional[str] = '') -> Result: batch_id: str = constants.DEFAULT_BATCH_ID,
dataset: str = constants.DEFAULT_DATASET,
pipeline_id: str = '') -> Result:
""" """
生成流水线的处理消息 生成流水线的处理消息
Args: Args:
module_id (str): 模块ID。 module_id (str): 模块ID
obs_id (str): 观测ID。 obs_id (str): 观测ID
detector_no (str): 探测器编号。 detector_no (str): 探测器编号
pipeline_id (str): 流水管线ID,默认为空字符串。 pipeline_id (str): 流水管线ID,默认为空字符串
dataset (Optional[str], optional): 数据集名称..
batch_id (Optional[str], optional): 批次ID. Defaults to 'auto'.
Returns: Returns:
Result: 处理消息生成的结果,是否成功以及相关的错误信息 Result: 处理消息生成的结果,是否成功以及相关的错误信息
""" """
params = {
'dataset': dataset,
'batch_id': batch_id,
'pipeline_id': pipeline_id,
'obs_id': obs_id,
'detector_no': detector_no,
}
return request.put(f"/api/level0/prc/{module_id}/{obs_id}/{detector_no}", {'pipeline_id': pipeline_id}) return request.put(f"/api/level0/prc/{module_id}", params)
def process_list(level0_id: str) -> Result: def process_list(level0_id: str) -> Result:
""" """
通过 level0 的 ID 查询0级数据处理过程 通过 level0 的 ID 查询0级数据处理过程
Args: Args:
level0_id (str): 0级数据的ID level0_id (str): 0级数据的ID
Returns: Returns:
Result: 成功后,Result.data为数据列表,失败message为失败原因 Result: 成功后,Result.data为数据列表,失败message为失败原因
""" """
return request.get(f"/api/level0/prc/{level0_id}") return request.get(f"/api/level0/prc/{level0_id}")
...@@ -190,30 +220,36 @@ def process_list(level0_id: str) -> Result: ...@@ -190,30 +220,36 @@ def process_list(level0_id: str) -> Result:
def add_process(level0_id: str, def add_process(level0_id: str,
pipeline_id: str, pipeline_id: str,
run_id: str, run_id: str,
prc_time: str, batch_id: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET,
prc_status: int = -1024, prc_status: int = -1024,
prc_time: str = utils.get_current_time(),
prc_module: str = "", prc_module: str = "",
message: str = "") -> Result: message: str = "") -> Result:
""" """
添加0级数据处理过程 添加0级数据处理过程
Args: Args:
level0_id (str): 0级数据的ID。 level0_id (str): 0级数据的ID
pipeline_id (str): 管线ID。 pipeline_id (str): 管线ID
run_id (str): 运行ID。 run_id (str): 运行ID
prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"。 dataset (str): 数据集
prc_status (int): 处理状态。 batch_id (str): 批次ID
prc_module (str): 处理模块。 prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
message (str): 处理消息。 prc_status (int): 处理状态
prc_module (str): 处理模块
message (str): 处理消息
Returns: Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因 Result: 成功后,Result.data为写入记录,失败message为失败原因
""" """
params = { params = {
'level0_id': level0_id, 'level0_id': level0_id,
'pipeline_id': pipeline_id, 'pipeline_id': pipeline_id,
'run_id': run_id, 'run_id': run_id,
'dataset': dataset,
'batch_id': batch_id,
'prc_time': prc_time, 'prc_time': prc_time,
'prc_status': prc_status, 'prc_status': prc_status,
'prc_module': prc_module, 'prc_module': prc_module,
...@@ -224,13 +260,13 @@ def add_process(level0_id: str, ...@@ -224,13 +260,13 @@ def add_process(level0_id: str,
def new(data: dict) -> Result: def new(data: dict) -> Result:
""" """
新建0级数据,用于仿真数据测试 新建0级数据,用于仿真数据测试
Args: Args:
data (dict): 0级数据的字典表示 data (dict): 0级数据的字典表示
Returns: Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因 Result: 成功后,Result.data为写入记录,失败message为失败原因
""" """
return request.post("/api/level0/new", data) return request.post("/api/level0/new", data)
\ No newline at end of file
from typing import Optional, Tuple, Literal, IO, Union from typing import Optional, Tuple, Literal, IO, Union
from .common import request, Result, utils from .common import request, Result, utils, constants
import os import os
DateTimeTuple = Tuple[str, str] DateTimeTuple = Tuple[str, str]
...@@ -22,10 +22,12 @@ def find(project_id: Optional[str] = None, ...@@ -22,10 +22,12 @@ def find(project_id: Optional[str] = None,
object_name: Optional[str] = None, object_name: Optional[str] = None,
rss_id: Optional[str] = None, rss_id: Optional[str] = None,
cube_id: Optional[str] = None, cube_id: Optional[str] = None,
dataset: Optional[str] = None,
batch_id: Optional[str] = None,
page: int = 1, page: int = 1,
limit: int = 0) -> Result: limit: int = 0) -> Result:
""" """
根据给定的参数搜索1级数据文件记录 根据给定的参数搜索1级数据文件记录
Args: Args:
project_id (Optional[str], optional): 项目ID. Defaults to None. project_id (Optional[str], optional): 项目ID. Defaults to None.
...@@ -45,6 +47,8 @@ def find(project_id: Optional[str] = None, ...@@ -45,6 +47,8 @@ def find(project_id: Optional[str] = None,
object_name (Optional[str], optional): 天体名称. Defaults to None. object_name (Optional[str], optional): 天体名称. Defaults to None.
rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None. rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None. cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
dataset (Optional[str], optional): 数据集名称. Defaults to None.
batch_id (Optional[str], optional): 批次ID. Defaults to None.
page (int, optional): 页码. Defaults to 1. page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0. limit (int, optional): 每页数量. Defaults to 0.
...@@ -74,6 +78,8 @@ def find(project_id: Optional[str] = None, ...@@ -74,6 +78,8 @@ def find(project_id: Optional[str] = None,
'create_time_end': None, 'create_time_end': None,
'rss_id': rss_id, 'rss_id': rss_id,
'cube_id': cube_id, 'cube_id': cube_id,
'dataset': dataset,
'batch_id': batch_id,
'page': page, 'page': page,
'limit': limit, 'limit': limit,
} }
...@@ -89,34 +95,34 @@ def find(project_id: Optional[str] = None, ...@@ -89,34 +95,34 @@ def find(project_id: Optional[str] = None,
def find_by_level1_id(level1_id: str) -> Result: def find_by_level1_id(level1_id: str) -> Result:
""" """
通过 level1 的 ID 查询1级数据 通过 level1 的 ID 查询1级数据
Args: Args:
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
Returns: Returns:
Result: 查询结果 Result: 查询结果
""" """
return request.get(f"/api/level1/{level1_id}") return request.get(f"/api/level1/{level1_id}")
def find_by_brick_id(brick_id: int) -> Result: def find_by_brick_id(brick_id: int) -> Result:
""" """
通过 brick 的 ID 查询1级数据 通过 brick 的 ID 查询1级数据
Args: Args:
brick_id (int): 天区ID brick_id (int): 天区ID
Returns: Returns:
Result: 查询结果 Result: 查询结果
""" """
return request.get(f"/api/level1/brick/{brick_id}") return request.get(f"/api/level1/brick/{brick_id}")
def sls_find_by_qc1_status(qc1_status: int, limit: int = 1) -> Result: def sls_find_by_qc1_status(qc1_status: int, limit: int = 1, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
return request.post(f"/api/level1/sls/qc1_status/{qc1_status}", {'limit': limit}) return request.post(f"/api/level1/sls/qc1_status/{qc1_status}", {'limit': limit, 'batch_id': batch_id})
def update_qc1_status(level1_id: str, file_type: str, qc1_status: int) -> Result: def update_qc1_status(level1_id: str, file_type: str, qc1_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
""" """
更新1级数据的QC0状态 更新1级数据的QC0状态
...@@ -124,25 +130,27 @@ def update_qc1_status(level1_id: str, file_type: str, qc1_status: int) -> Result ...@@ -124,25 +130,27 @@ def update_qc1_status(level1_id: str, file_type: str, qc1_status: int) -> Result
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
file_type (str): 文件类型 file_type (str): 文件类型
qc1_status (int): QC0状态 qc1_status (int): QC0状态
batch_id (str): 批次ID
Returns: Returns:
Result: 更新结果 Result: 更新结果
""" """
return request.put(f"/api/level1/qc1_status/{level1_id}", {'file_type': file_type, 'qc1_status': qc1_status}) return request.put(f"/api/level1/qc1_status/{level1_id}", {'file_type': file_type, 'qc1_status': qc1_status, 'batch_id': batch_id})
def update_prc_status(level1_id: str, file_type: str, prc_status: int) -> Result: def update_prc_status(level1_id: str, file_type: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
""" """
更新1级数据的处理状态 更新1级数据的处理状态
Args: Args:
level1_id (str): 1级数据的ID。 level1_id (str): 1级数据的ID
file_type (str): 文件类型。 file_type (str): 文件类型
prc_status (int): 处理状态。 prc_status (int): 处理状态
batch_id (str): 批次ID
Returns: Returns:
Result: 操作结果 Result: 操作结果
""" """
return request.put(f"/api/level1/prc_status/{level1_id}", {'file_type': file_type, 'prc_status': prc_status}) return request.put(f"/api/level1/prc_status/{level1_id}", {'file_type': file_type, 'prc_status': prc_status, 'batch_id': batch_id})
def write(local_file: Union[IO | str], def write(local_file: Union[IO | str],
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
...@@ -153,21 +161,25 @@ def write(local_file: Union[IO | str], ...@@ -153,21 +161,25 @@ def write(local_file: Union[IO | str],
pipeline_id: str, pipeline_id: str,
pmapname: str, pmapname: str,
build: int, build: int,
dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID,
**extra_kwargs) -> Result: **extra_kwargs) -> Result:
''' '''
将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入 将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入
Args: Args:
local_file (Union[IO | str]): 文件路径或文件对象 local_file (Union[IO | str]): 文件路径或文件对象
module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID。 module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
level0_id (Optional[str | None]): 0级数据的ID。默认为 None。 level0_id (Optional[str | None]): 0级数据的ID默认为 None
level1_id (Optional[str | None]): 1级数据的ID。默认为 None。 level1_id (Optional[str | None]): 1级数据的ID默认为 None
file_type (str): 文件类型。 file_type (str): 文件类型
file_name (str): 1级数据文件名。 file_name (str): 1级数据文件名
pipeline_id (str): 管线ID。 pipeline_id (str): 管线ID
pmapname (str): CCDS pmap名称。 pmapname (str): CCDS pmap名称
build (int): 构建号。 build (int): 构建号
**kwargs: 额外的关键字参数,这些参数将传递给DFS。 dataset (str): 数据集名称
batch_id (str): 批次ID
**kwargs: 额外的关键字参数,这些参数将传递给DFS
Returns: Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象 Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
...@@ -176,7 +188,20 @@ def write(local_file: Union[IO | str], ...@@ -176,7 +188,20 @@ def write(local_file: Union[IO | str],
if utils.is_valid_filename(file_name): if utils.is_valid_filename(file_name):
raise ValueError(f"Incorrect file name [{file_name}], should be *.*") raise ValueError(f"Incorrect file name [{file_name}], should be *.*")
params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'file_type': file_type, 'file_name': file_name, 'pipeline_id': pipeline_id, 'pmapname': pmapname, 'build': build} params = {
'module_id': module_id,
'level0_id': level0_id,
'level1_id': level1_id,
'file_type': file_type,
'file_name': file_name,
'pipeline_id': pipeline_id,
'pmapname': pmapname,
'build': build,
'dataset': dataset,
'batch_id': batch_id
}
if not dataset or not batch_id:
raise ValueError("dataset and batch_id is required")
params.update(extra_kwargs) params.update(extra_kwargs)
if isinstance(local_file, str): if isinstance(local_file, str):
if not os.path.exists(local_file): if not os.path.exists(local_file):
...@@ -186,31 +211,41 @@ def write(local_file: Union[IO | str], ...@@ -186,31 +211,41 @@ def write(local_file: Union[IO | str],
def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level1_id: str, level1_id: str,
pipeline_id: Optional[str] = '') -> Result: pipeline_id: Optional[str] = '',
dataset: Optional[str] = None,
batch_id: Optional[str] = None) -> Result:
""" """
生成流水线的处理消息 生成流水线的处理消息
Args: Args:
module_id (str): 模块ID。 module_id (str): 模块ID
level1_id (str): 1级数据的ID。 level1_id (str): 1级数据的ID
pipeline_id (str): 流水管线ID,默认为空字符串。 pipeline_id (str): 流水管线ID,默认为空字符串
dataset (str): 数据集
batch_id (str): 批次ID
Returns: Returns:
Result: 处理消息生成的结果,是否成功以及相关的错误信息 Result: 处理消息生成的结果,是否成功以及相关的错误信息
""" """
params = {
'dataset': dataset,
'batch_id': batch_id,
'pipeline_id': pipeline_id,
'level1_id': level1_id,
}
return request.put(f"/api/level1/prc/{module_id}/{level1_id}", {'pipeline_id': pipeline_id}) return request.put(f"/api/level1/prc/{module_id}", params)
def process_list(level1_id: str) -> Result: def process_list(level1_id: str) -> Result:
""" """
通过 level1 的 ID 查询1级数据处理记录 通过 level1 的 ID 查询1级数据处理记录
Args: Args:
level1_id (str): 1级数据的ID level1_id (str): 1级数据的ID
Returns: Returns:
Result: 查询结果 Result: 查询结果
""" """
return request.get(f"/api/level1/prc/{level1_id}") return request.get(f"/api/level1/prc/{level1_id}")
...@@ -218,30 +253,36 @@ def process_list(level1_id: str) -> Result: ...@@ -218,30 +253,36 @@ def process_list(level1_id: str) -> Result:
def add_process(level1_id: str, def add_process(level1_id: str,
pipeline_id: str, pipeline_id: str,
run_id: str, run_id: str,
prc_time: str, dataset: Optional[str] = None,
batch_id: Optional[str] = None,
prc_time: str = utils.get_current_time(),
prc_status: int = -1024, prc_status: int = -1024,
prc_module: str = "", prc_module: str = "",
message: str = "") -> Result: message: str = "") -> Result:
""" """
添加1级数据处理记录 添加1级数据处理记录
Args: Args:
level1_id (str): 1级数据的ID。 level1_id (str): 1级数据的ID
pipeline_id (str): 管线ID。 pipeline_id (str): 管线ID
run_id (str): 运行ID。 run_id (str): 运行ID
prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"。 dataset (str): 数据集
prc_status (int): 处理状态。 batch_id (str): 批次ID
prc_module (str): 处理模块。 prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
message (str): 处理消息。 prc_status (int): 处理状态
prc_module (str): 处理模块
message (str): 处理消息
Returns: Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因 Result: 成功后,Result.data为写入记录,失败message为失败原因
""" """
params = { params = {
'level1_id': level1_id, 'level1_id': level1_id,
'pipeline_id': pipeline_id, 'pipeline_id': pipeline_id,
'run_id': run_id, 'run_id': run_id,
'dataset': dataset,
'batch_id': batch_id,
'prc_time': prc_time, 'prc_time': prc_time,
'prc_status': prc_status, 'prc_status': prc_status,
'prc_module': prc_module, 'prc_module': prc_module,
......
import os import os
import pickle import pickle
from typing import Optional, Tuple, Literal, Union, IO from typing import Optional, Tuple, Literal, Union, IO
from .common import request, Result, utils from .common import request, Result, utils, constants
DateTimeTuple = Tuple[str, str] DateTimeTuple = Tuple[str, str]
...@@ -18,6 +18,8 @@ def find( ...@@ -18,6 +18,8 @@ def find(
prc_status: Optional[int] = None, prc_status: Optional[int] = None,
file_name: Optional[str] = None, file_name: Optional[str] = None,
object_name: Optional[str] = None, object_name: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID,
page: int = 1, page: int = 1,
limit: int = 0) -> Result: limit: int = 0) -> Result:
""" """
...@@ -36,6 +38,8 @@ def find( ...@@ -36,6 +38,8 @@ def find(
prc_status (Optional[int], optional): 处理状态. Defaults to None. prc_status (Optional[int], optional): 处理状态. Defaults to None.
file_name (Optional[str], optional): 文件名. Defaults to None. file_name (Optional[str], optional): 文件名. Defaults to None.
object_name (Optional[str], optional): 天体名称. Defaults to None. object_name (Optional[str], optional): 天体名称. Defaults to None.
dataset (Optional[str], optional): 数据集名称. Defaults to None.
batch_id (Optional[str], optional): 最后一次成功的批次ID. Defaults to None.
page (int, optional): 页码. Defaults to 1. page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0. limit (int, optional): 每页数量. Defaults to 0.
...@@ -55,6 +59,8 @@ def find( ...@@ -55,6 +59,8 @@ def find(
'prc_status': prc_status, 'prc_status': prc_status,
'file_name': file_name, 'file_name': file_name,
'object_name': object_name, 'object_name': object_name,
'dataset': dataset,
'batch_id': batch_id,
'obs_time_start': None, 'obs_time_start': None,
'obs_time_end': None, 'obs_time_end': None,
'create_time_start': None, 'create_time_start': None,
...@@ -85,7 +91,7 @@ def find_by_level2_id(level2_id: str) -> Result: ...@@ -85,7 +91,7 @@ def find_by_level2_id(level2_id: str) -> Result:
""" """
return request.get(f"/api/level2/{level2_id}") return request.get(f"/api/level2/{level2_id}")
def update_qc2_status(level2_id: str, data_type: str, qc2_status: int) -> Result: def update_qc2_status(level2_id: str, data_type: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
""" """
更新2级数据的QC0状态 更新2级数据的QC0状态
...@@ -97,9 +103,9 @@ def update_qc2_status(level2_id: str, data_type: str, qc2_status: int) -> Result ...@@ -97,9 +103,9 @@ def update_qc2_status(level2_id: str, data_type: str, qc2_status: int) -> Result
Returns: Returns:
Result: 更新结果 Result: 更新结果
""" """
return request.put(f"/api/level2/qc2_status/{level2_id}", {'data_type': data_type, 'qc2_status': qc2_status}) return request.put(f"/api/level2/qc2_status/{level2_id}", {'data_type': data_type, 'qc2_status': qc2_status, 'batch_id': batch_id})
def update_prc_status(level2_id: str, data_type: str, prc_status: int) -> Result: def update_prc_status(level2_id: str, data_type: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
""" """
更新2级数据的处理状态 更新2级数据的处理状态
...@@ -111,9 +117,9 @@ def update_prc_status(level2_id: str, data_type: str, prc_status: int) -> Result ...@@ -111,9 +117,9 @@ def update_prc_status(level2_id: str, data_type: str, prc_status: int) -> Result
Returns: Returns:
Result: 操作结果 Result: 操作结果
""" """
return request.put(f"/api/level2/prc_status/{level2_id}", {'data_type': data_type, 'prc_status': prc_status}) return request.put(f"/api/level2/prc_status/{level2_id}", {'data_type': data_type, 'prc_status': prc_status, 'batch_id': batch_id})
def update_qc2_status_by_file_name(file_name: str, qc2_status: int) -> Result: def update_qc2_status_by_file_name(file_name: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
""" """
更新2级数据的QC0状态 更新2级数据的QC0状态
...@@ -124,9 +130,9 @@ def update_qc2_status_by_file_name(file_name: str, qc2_status: int) -> Result: ...@@ -124,9 +130,9 @@ def update_qc2_status_by_file_name(file_name: str, qc2_status: int) -> Result:
Returns: Returns:
Result: 更新结果 Result: 更新结果
""" """
return request.put(f"/api/level2/qc2_status/file/{file_name}", {'qc2_status': qc2_status}) return request.put(f"/api/level2/qc2_status/file/{file_name}", {'qc2_status': qc2_status, 'batch_id': batch_id})
def update_prc_status_by_file_name(file_name: str, prc_status: int) -> Result: def update_prc_status_by_file_name(file_name: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
""" """
更新2级数据的处理状态 更新2级数据的处理状态
...@@ -137,7 +143,7 @@ def update_prc_status_by_file_name(file_name: str, prc_status: int) -> Result: ...@@ -137,7 +143,7 @@ def update_prc_status_by_file_name(file_name: str, prc_status: int) -> Result:
Returns: Returns:
Result: 操作结果 Result: 操作结果
""" """
return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status}) return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status, 'batch_id': batch_id})
def write(local_file: Union[IO | str], def write(local_file: Union[IO | str],
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
...@@ -149,7 +155,8 @@ def write(local_file: Union[IO | str], ...@@ -149,7 +155,8 @@ def write(local_file: Union[IO | str],
file_name: str, file_name: str,
pipeline_id: str, pipeline_id: str,
build: int, build: int,
version: str, dataset: Optional[str] = None,
batch_id: Optional[str] = None,
**extra_kwargs) -> Result: **extra_kwargs) -> Result:
""" """
将本地的2级数据文件写入到DFS中 将本地的2级数据文件写入到DFS中
...@@ -164,8 +171,9 @@ def write(local_file: Union[IO | str], ...@@ -164,8 +171,9 @@ def write(local_file: Union[IO | str],
data_type (str): 数据类型,如'csst-msc-l2-mbi-cat' data_type (str): 数据类型,如'csst-msc-l2-mbi-cat'
file_name (str): 2级数据文件名 file_name (str): 2级数据文件名
pipeline_id (str): 管线ID pipeline_id (str): 管线ID
pmapname (str): CCDS pmap名称
build (int): 构建号 build (int): 构建号
dataset (Optional[str], optional): 数据集名称. Defaults to None.
batch_id (Optional[str], optional): 最后一次成功的批次ID. Defaults to None.
**kwargs: 额外的关键字参数,这些参数将传递给DFS **kwargs: 额外的关键字参数,这些参数将传递给DFS
Returns: Returns:
...@@ -174,7 +182,19 @@ def write(local_file: Union[IO | str], ...@@ -174,7 +182,19 @@ def write(local_file: Union[IO | str],
""" """
if utils.is_valid_filename(file_name): if utils.is_valid_filename(file_name):
raise ValueError(f"Incorrect file name [{file_name}], should be *.*") raise ValueError(f"Incorrect file name [{file_name}], should be *.*")
params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'level2_id': level2_id, 'brick_id': brick_id, 'file_name': file_name, 'data_type': data_type, 'pipeline_id': pipeline_id, 'build': build, 'version': version} params = {
'module_id': module_id,
'level0_id': level0_id,
'level1_id': level1_id,
'level2_id': level2_id,
'brick_id': brick_id,
'file_name': file_name,
'data_type': data_type,
'pipeline_id': pipeline_id,
'build': build,
'dataset': dataset,
'batch_id': batch_id
}
params.update(extra_kwargs) params.update(extra_kwargs)
if isinstance(local_file, str): if isinstance(local_file, str):
if not os.path.exists(local_file): if not os.path.exists(local_file):
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
| qc0_time | datetime | QC0时间 | | qc0_time | datetime | QC0时间 |
| prc_status | int | 处理状态,默认为-1024未处理 | | prc_status | int | 处理状态,默认为-1024未处理 |
| prc_time | datetime | 处理时间 | | prc_time | datetime | 处理时间 |
| dataset | str | 数据集标识 |
| create_time | datetime | 创建时间 | | create_time | datetime | 创建时间 |
**附加的头部信息**:('RA_OBJ', 'DEC_OBJ', 'CRPIX1', 'CRPIX2', 'CRVAL1', 'CRVAL2', 'CTYPE1', 'CTYPE2', 'CD1_1', 'CD1_2', 'CD2_1', 'CD2_2', 'FILTER', 'OBJECT') **附加的头部信息**:('RA_OBJ', 'DEC_OBJ', 'CRPIX1', 'CRPIX2', 'CRVAL1', 'CRVAL2', 'CTYPE1', 'CTYPE2', 'CD1_1', 'CD1_2', 'CD2_1', 'CD2_2', 'FILTER', 'OBJECT')
...@@ -32,6 +33,8 @@ ...@@ -32,6 +33,8 @@
| level0_id | str | 0级数据的ID | | level0_id | str | 0级数据的ID |
| pipeline_id | str | 流水线的ID | | pipeline_id | str | 流水线的ID |
| run_id | str | run ID | | run_id | str | run ID |
| batch_id | str | 批次ID |
| dataset | str | 数据集标识 |
| prc_status | int | 处理状态 | | prc_status | int | 处理状态 |
| prc_time | datetime | 处理时间 | | prc_time | datetime | 处理时间 |
| prc_module | str | 处理模块 | | prc_module | str | 处理模块 |
...@@ -61,6 +64,8 @@ ...@@ -61,6 +64,8 @@
| pmapname | str | ccds pmap的名称 | | pmapname | str | ccds pmap的名称 |
| build | int | 构建版本编号 | | build | int | 构建版本编号 |
| run_id | str | 运行ID | | run_id | str | 运行ID |
| batch_id | str | 运行批次ID |
| dataset | str | 数据集标识 |
| create_time | datetime | 创建时间 | | create_time | datetime | 创建时间 |
**附加的头部信息** **附加的头部信息**
...@@ -84,6 +89,8 @@ ...@@ -84,6 +89,8 @@
| level1_id | str | 1级数据的ID | | level1_id | str | 1级数据的ID |
| pipeline_id | str | 流水线的ID | | pipeline_id | str | 流水线的ID |
| run_id | str | run的ID | | run_id | str | run的ID |
| dataset | str | 数据集 |
| batch_id | str | 批次ID |
| prc_status | int | 处理状态 | | prc_status | int | 处理状态 |
| prc_time | datetime | 处理时间 | | prc_time | datetime | 处理时间 |
| prc_module | str | 处理模块 | | prc_module | str | 处理模块 |
...@@ -114,7 +121,8 @@ ...@@ -114,7 +121,8 @@
| pipeline_id | str | 处理管道ID | | pipeline_id | str | 处理管道ID |
| build | int | 软件构建版本的编号 | | build | int | 软件构建版本的编号 |
| run_id | str | 运行ID | | run_id | str | 运行ID |
| version | str | 版本信息 | | dataset | str | 数据集标识 |
| batch_id | str | 批次ID |
| create_time | datetime | 创建的时间 | | create_time | datetime | 创建的时间 |
**注释** **注释**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment