An error occurred while loading the file. Please try again.
level1.py 10.27 KiB
from typing import Optional, Tuple, Literal, IO, Union
from .common import request, Result, utils, constants
import os
DateTimeTuple = Tuple[str, str]
def find(project_id: Optional[str] = None,
        obs_id: Optional[str] = None,
        level0_id: Optional[str] = None,
        module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
        detector_no: Optional[str] = None,
        data_type: Optional[str] = None,
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
        qc1_status: Optional[int] = None,
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
        ra_obj: Optional[int] = None,
        dec_obj: Optional[int] = None,
        radius: Optional[float] = None,
        object_name: Optional[str] = None,
        rss_id: Optional[str] = None,
        cube_id: Optional[str] = None,
        dataset: Optional[str] = None,
        batch_id: Optional[str] = None,
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数搜索1级数据文件记录
    Args:
        project_id (Optional[str], optional): 项目ID. Defaults to None.
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
        module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
        detector_no (Optional[str], optional): 探测器编号. Defaults to None.
        data_type (Optional[str], optional): 数据类型,如'csst-msc-l1-mbi'. Defaults to None.
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
        qc1_status (Optional[int], optional): QC1状态. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
        ra_obj (Optional[int], optional): 目标赤经. Defaults to None.
        dec_obj (Optional[int], optional): 目标赤纬. Defaults to None.
        radius (Optional[float], optional): 搜索半径. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
        rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
        cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    Returns:
        Result: 搜索结果对象.
    """
    params = {
        'project_id': project_id,
        'level0_id': level0_id,
        'obs_id': obs_id,
        'module_id': module_id,
        'detector_no': detector_no,
        'data_type': data_type,
        'filter': filter,
        'qc1_status': qc1_status,
        'prc_status': prc_status,
        'file_name': file_name,
'ra_obj': ra_obj, 'dec_obj': dec_obj, 'radius': radius, 'object_name': object_name, 'obs_time_start': None, 'obs_time_end': None, 'create_time_start': None, 'create_time_end': None, 'rss_id': rss_id, 'cube_id': cube_id, 'dataset': dataset, 'batch_id': batch_id, 'page': page, 'limit': limit, } if obs_time is not None: params['obs_time_start'], params['obs_time_end'] = obs_time utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end']) if create_time is not None: params['create_time_start'], params['create_time_end'] = create_time utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end']) return request.post("/api/level1", params) def find_by_level1_id(level1_id: str) -> Result: """ 通过 level1 的 ID 查询1级数据 Args: level1_id (str): 1级数据的ID Returns: Result: 查询结果 """ return request.get(f"/api/level1/{level1_id}") def find_by_brick_id(brick_id: int) -> Result: """ 通过 brick 的 ID 查询1级数据 Args: brick_id (int): 天区ID Returns: Result: 查询结果 """ return request.get(f"/api/level1/brick/{brick_id}") def sls_find_by_qc1_status(qc1_status: int, limit: int = 1, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result: return request.post(f"/api/level1/sls/qc1_status/{qc1_status}", {'limit': limit, 'batch_id': batch_id}) def update_qc1_status(level1_id: str, data_type: str, qc1_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result: """ 更新1级数据的QC0状态 Args: level1_id (str): 1级数据的ID data_type (str): 文件类型 qc1_status (int): QC0状态 batch_id (str): 批次ID Returns: Result: 更新结果 """ return request.put(f"/api/level1/qc1_status/{level1_id}", {'data_type': data_type, 'qc1_status': qc1_status, 'batch_id': batch_id}) def update_prc_status(level1_id: str, data_type: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
""" 更新1级数据的处理状态 Args: level1_id (str): 1级数据的ID data_type (str): 文件类型 prc_status (int): 处理状态 batch_id (str): 批次ID Returns: Result: 操作结果 """ return request.put(f"/api/level1/prc_status/{level1_id}", {'data_type': data_type, 'prc_status': prc_status, 'batch_id': batch_id}) def write(local_file: Union[IO, str], module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], level1_id: str, data_type: str, file_name: str, dag_id: str, pmapname: str, build: int, level0_id: Optional[str] = None, dataset: str = constants.DEFAULT_DATASET, batch_id: str = constants.DEFAULT_BATCH_ID, qc1_status: int = 0, **extra_kwargs) -> Result: ''' 将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入 Args: local_file (Union[IO, str]): 文件路径或文件对象 module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID level0_id (Optional[str]): 0级数据的ID默认为 None level1_id (str): 1级数据的ID data_type (str): 数据类型 file_name (str): 1级数据文件名 dag_id (str): 管线ID pmapname (str): CCDS pmap名称 build (int): 构建号 dataset (str): 数据集名称 batch_id (str): 批次ID qc1_status (int): QC1状态 **kwargs: 额外的关键字参数,这些参数将传递给DFS Returns: Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象 ''' params = { 'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'data_type': data_type, 'file_name': file_name, 'dag_id': dag_id, 'pmapname': pmapname, 'build': build, 'dataset': dataset, 'batch_id': batch_id, 'qc1_status': qc1_status } if not dataset or not batch_id: raise ValueError("dataset and batch_id is required") params.update(extra_kwargs) if local_file is None: raise ValueError("local_file is required") if isinstance(local_file, str): if not os.path.exists(local_file): raise FileNotFoundError(local_file) return request.post_file("/api/level1/file", local_file, params)
return request.post_bytesio("/api/level1/file", local_file, params) def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], level1_id: str, dataset: str = constants.DEFAULT_DATASET, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result: """ 生成流水线的处理消息 Args: module_id (str): 模块ID level1_id (str): 1级数据的ID dag_id (str): 流水管线ID,默认为空字符串 dataset (str): 数据集 batch_id (str): 批次ID Returns: Result: 处理消息生成的结果,是否成功以及相关的错误信息 """ params = { 'dataset': dataset, 'batch_id': batch_id, 'level1_id': level1_id, } return request.put(f"/api/level1/prc/{module_id}", params) def process_list(level1_id: str) -> Result: """ 通过 level1 的 ID 查询1级数据处理记录 Args: level1_id (str): 1级数据的ID Returns: Result: 查询结果 """ return request.get(f"/api/level1/prc/{level1_id}") def add_process(level1_id: str, dag_id: str, dag_run_id: str, dataset: str = constants.DEFAULT_DATASET, batch_id: str = constants.DEFAULT_BATCH_ID, prc_time: str = utils.get_current_time(), prc_status: int = -1024, prc_module: str = "", message: str = "") -> Result: """ 添加1级数据处理记录 Args: level1_id (str): 1级数据的ID dag_id (str): 管线ID dag_run_id (str): 运行ID dataset (str): 数据集 batch_id (str): 批次ID prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS" prc_status (int): 处理状态 prc_module (str): 处理模块 message (str): 处理消息 Returns: Result: 成功后,Result.data为写入记录,失败message为失败原因 """ params = { 'level1_id': level1_id,
'dag_id': dag_id, 'dag_run_id': dag_run_id, 'dataset': dataset, 'batch_id': batch_id, 'prc_time': prc_time, 'prc_status': prc_status, 'prc_module': prc_module, 'message': message, } utils.is_valid_datetime_format(prc_time) return request.post("/api/level1/prc", params)