import os from typing import Optional, Tuple, Literal from .common import request, Result, utils, constants DateTimeTuple = Tuple[str, str] def find(project_id: Optional[str] = None, obs_id: Optional[str] = None, module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC', detector_no: Optional[str] = None, file_type: Optional[str] = None, filter: Optional[str] = None, obs_time: Optional[DateTimeTuple] = None, create_time: Optional[DateTimeTuple] = None, qc0_status: Optional[int] = None, prc_status: Optional[int] = None, file_name: Optional[str] = None, ra_obj: Optional[int] = None, dec_obj: Optional[int] = None, radius: Optional[float] = None, object_name: Optional[str] = None, dataset: str = constants.DEFAULT_DATASET, page: int = 1, limit: int = 0) -> Result: """ 根据给定的参数搜索0级数据文件记录 Args: project_id (Optional[str], optional): 项目ID. Defaults to None. obs_id (Optional[str], optional): 观测ID. Defaults to None. module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to 'MSC'. detector_no (Optional[str], optional): 探测器编号. Defaults to None. file_type (Optional[str], optional): 文件类型,如'SCI'. Defaults to None. filter (Optional[str], optional): 滤光片. Defaults to None. obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None. create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None. qc0_status (Optional[int], optional): QC0状态. Defaults to None. prc_status (Optional[int], optional): 处理状态. Defaults to None. file_name (Optional[str], optional): 文件名. Defaults to None. ra_obj (Optional[int], optional): 目标赤经. Defaults to None. dec_obj (Optional[int], optional): 目标赤纬. Defaults to None. radius (Optional[float], optional): 搜索半径. Defaults to None. object_name (Optional[str], optional): 目标名称. Defaults to None. dataset (Optional[str], optional): 数据集名称. Defaults to constants.DEFAULT_DATASET. page (int, optional): 页码. Defaults to 1. limit (int, optional): 每页数量. Defaults to 0,不限制. Returns: Result: 搜索结果对象. """ params = { 'project_id': project_id, 'obs_id': obs_id, 'module_id': module_id, 'detector_no': detector_no, 'file_type': file_type, 'filter': filter, 'qc0_status': qc0_status, 'prc_status': prc_status, 'file_name': file_name, 'ra_obj': ra_obj, 'dec_obj': dec_obj, 'radius': radius, 'object_name': object_name, 'obs_time_start': None, 'obs_time_end': None, 'create_time_start': None, 'create_time_end': None, 'dataset': dataset, 'page': page, 'limit': limit, } if obs_time is not None: params['obs_time_start'], params['obs_time_end'] = obs_time if params['obs_time_start'] and utils.is_valid_datetime_format(params['obs_time_start']): pass if params['obs_time_end'] and utils.is_valid_datetime_format(params['obs_time_end']): pass if create_time is not None: params['create_time_start'], params['create_time_end'] = create_time utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end']) return request.post("/api/level0", params) def get_by_id(_id: str) -> Result: """ 根据内部ID获取0级数据 Args: _id (str): 0级数据的内部ID Returns: Result: 查询结果 """ return request.get(f"/api/level0/_id/{_id}") def find_by_level0_id(level0_id: str) -> Result: """ 通过 level0 的 ID 查询0级数据 Args: level0_id (str): 0级数据的ID Returns: Result: 查询结果 """ return request.get(f"/api/level0/{level0_id}") def update_qc0_status(level0_id: str, file_type: str, qc0_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result: """ 更新0级数据的QC0状态 Args: level0_id (str): 0级数据的ID file_type (str): 文件类型 qc0_status (int): QC0状态 dataset (str): 数据集名称 Returns: Result: 更新结果 """ return request.put(f"/api/level0/qc0_status/{level0_id}", {'file_type': file_type, 'qc0_status': qc0_status, 'dataset': dataset}) def update_prc_status(level0_id: str, file_type: str, prc_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result: """ 更新0级数据的处理状态 Args: level0_id (str): 0级数据的ID file_type (str): 文件类型 prc_status (int): 处理状态 dataset (str): 数据集名称 Returns: Result: 操作结果 """ return request.put(f"/api/level0/prc_status/{level0_id}", {'file_type': file_type, 'prc_status': prc_status, 'dataset': dataset}) def write(local_file: str, dataset: str = constants.DEFAULT_DATASET, **kwargs) -> Result: """ 将本地文件写入DFS中 Args: local_file (str]): 文件路径 dataset (Optional[str], optional): 数据集名称. Defaults to None. **kwargs: 额外的关键字参数,这些参数将传递给DFS Returns: Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象 """ params = { 'dataset': dataset, } params.update(kwargs) if not os.path.exists(local_file): raise FileNotFoundError(local_file) return request.post_file("/api/level0/file", local_file, params) def write_cat(local_file: str, dataset: str = constants.DEFAULT_DATASET, **kwargs) -> Result: """ 主巡天仿真数据的星表本地文件写入DFS中 Args: local_file (str]): 文件路径 dataset (Optional[str], optional): 数据集名称. Defaults to None. **kwargs: 额外的关键字参数,这些参数将传递给DFS Returns: Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象 """ params = { 'dataset': dataset, } params.update(kwargs) if not os.path.exists(local_file): raise FileNotFoundError(local_file) return request.post_file("/api/level0/cat/file", local_file, params) def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], obs_id: str, detector_no: str, dataset: str = constants.DEFAULT_DATASET, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result: """ 生成流水线的处理消息 Args: module_id (str): 模块ID obs_id (str): 观测ID detector_no (str): 探测器编号 dataset (Optional[str], optional): 数据集名称.. batch_id (Optional[str], optional): 批次ID. Defaults to 'auto'. Returns: Result: 处理消息生成的结果,是否成功以及相关的错误信息 """ params = { 'dataset': dataset, 'batch_id': batch_id, 'obs_id': obs_id, 'detector_no': detector_no, } return request.put(f"/api/level0/prc/{module_id}", params) def process_list(level0_id: str) -> Result: """ 通过 level0 的 ID 查询0级数据处理过程 Args: level0_id (str): 0级数据的ID Returns: Result: 成功后,Result.data为数据列表,失败message为失败原因 """ return request.get(f"/api/level0/prc/{level0_id}") def add_process(level0_id: str, pipeline_id: str, run_id: str, batch_id: Optional[str] = None, dataset: str = constants.DEFAULT_DATASET, prc_status: int = -1024, prc_time: str = utils.get_current_time(), prc_module: str = "", message: str = "") -> Result: """ 添加0级数据处理过程 Args: level0_id (str): 0级数据的ID pipeline_id (str): 管线ID run_id (str): 运行ID dataset (str): 数据集 batch_id (str): 批次ID prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS" prc_status (int): 处理状态 prc_module (str): 处理模块 message (str): 处理消息 Returns: Result: 成功后,Result.data为写入记录,失败message为失败原因 """ params = { 'level0_id': level0_id, 'pipeline_id': pipeline_id, 'run_id': run_id, 'dataset': dataset, 'batch_id': batch_id, 'prc_time': prc_time, 'prc_status': prc_status, 'prc_module': prc_module, 'message': message, } utils.is_valid_datetime_format(prc_time) return request.post("/api/level0/prc", params) def new(data: dict) -> Result: """ 新建0级数据,用于仿真数据测试 Args: data (dict): 0级数据的字典表示 Returns: Result: 成功后,Result.data为写入记录,失败message为失败原因 """ return request.post("/api/level0/new", data)