from typing import Optional, Tuple, Literal, IO, Union from .common import request, Result, utils import os DateTimeTuple = Tuple[str, str] def find(project_id: Optional[str] = None, obs_id: Optional[str] = None, level0_id: Optional[str] = None, module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC', detector_no: Optional[str] = None, file_type: Optional[str] = None, filter: Optional[str] = None, obs_time: Optional[DateTimeTuple] = None, create_time: Optional[DateTimeTuple] = None, qc1_status: Optional[int] = None, prc_status: Optional[int] = None, file_name: Optional[str] = None, ra_obj: Optional[int] = None, dec_obj: Optional[int] = None, radius: Optional[float] = None, object_name: Optional[str] = None, rss_id: Optional[str] = None, cube_id: Optional[str] = None, page: int = 1, limit: int = 0) -> Result: """ 根据给定的参数搜索1级数据文件记录。 Args: project_id (Optional[str], optional): 项目ID. Defaults to None. obs_id (Optional[str], optional): 观测ID. Defaults to None. module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None. detector_no (Optional[str], optional): 探测器编号. Defaults to None. file_type (Optional[str], optional): 文件类型,如'SCI'. Defaults to None. filter (Optional[str], optional): 滤光片. Defaults to None. obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None. create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None. qc1_status (Optional[int], optional): QC1状态. Defaults to None. prc_status (Optional[int], optional): 处理状态. Defaults to None. file_name (Optional[str], optional): 文件名. Defaults to None. ra_obj (Optional[int], optional): 目标赤经. Defaults to None. dec_obj (Optional[int], optional): 目标赤纬. Defaults to None. radius (Optional[float], optional): 搜索半径. Defaults to None. object_name (Optional[str], optional): 天体名称. Defaults to None. rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None. cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None. page (int, optional): 页码. Defaults to 1. limit (int, optional): 每页数量. Defaults to 0. Returns: Result: 搜索结果对象. """ params = { 'project_id': project_id, 'level0_id': level0_id, 'obs_id': obs_id, 'module_id': module_id, 'detector_no': detector_no, 'file_type': file_type, 'filter': filter, 'qc1_status': qc1_status, 'prc_status': prc_status, 'file_name': file_name, 'ra_obj': ra_obj, 'dec_obj': dec_obj, 'radius': radius, 'object_name': object_name, 'obs_time_start': None, 'obs_time_end': None, 'create_time_start': None, 'create_time_end': None, 'rss_id': rss_id, 'cube_id': cube_id, 'page': page, 'limit': limit, } if obs_time is not None: params['obs_time_start'], params['obs_time_end'] = obs_time utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end']) if create_time is not None: params['create_time_start'], params['create_time_end'] = create_time utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end']) return request.post("/api/level1", params) def find_by_level1_id(level1_id: str) -> Result: """ 通过 level1 的 ID 查询1级数据。 Args: level1_id (str): 1级数据的ID。 Returns: Result: 查询结果。 """ return request.get(f"/api/level1/{level1_id}") def find_by_brick_id(brick_id: int) -> Result: """ 通过 brick 的 ID 查询1级数据。 Args: brick_id (int): 天区ID。 Returns: Result: 查询结果。 """ return request.get(f"/api/level1/brick/{brick_id}") def sls_find_by_qc1_status(qc1_status: int, limit: int = 1) -> Result: return request.post(f"/api/level1/sls/qc1_status/{qc1_status}", {'limit': limit}) def update_qc1_status(level1_id: str, file_type: str, qc1_status: int) -> Result: """ 更新1级数据的QC0状态 Args: level1_id (str): 1级数据的ID file_type (str): 文件类型 qc1_status (int): QC0状态 Returns: Result: 更新结果 """ return request.put(f"/api/level1/qc1_status/{level1_id}", {'file_type': file_type, 'qc1_status': qc1_status}) def update_prc_status(level1_id: str, file_type: str, prc_status: int) -> Result: """ 更新1级数据的处理状态。 Args: level1_id (str): 1级数据的ID。 file_type (str): 文件类型。 prc_status (int): 处理状态。 Returns: Result: 操作结果。 """ return request.put(f"/api/level1/prc_status/{level1_id}", {'file_type': file_type, 'prc_status': prc_status}) def write(local_file: Union[IO | str], module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], level0_id: Optional[str | None], level1_id: Optional[str | None], file_type: str, file_name: str, pipeline_id: str, pmapname: str, build: int, **extra_kwargs) -> Result: ''' 将本地的1级数据文件写入到DFS中。其他参数如rss_id, cube_id等,可通过extra_kwargs传入。 Args: local_file (Union[IO | str]): 文件路径或文件对象 module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID。 level0_id (Optional[str | None]): 0级数据的ID。默认为 None。 level1_id (Optional[str | None]): 1级数据的ID。默认为 None。 file_type (str): 文件类型。 file_name (str): 1级数据文件名。 pipeline_id (str): 管线ID。 pmapname (str): CCDS pmap名称。 build (int): 构建号。 **kwargs: 额外的关键字参数,这些参数将传递给DFS。 Returns: Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象 ''' if utils.is_valid_filename(file_name): raise ValueError(f"Incorrect file name [{file_name}], should be *.*") params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'file_type': file_type, 'file_name': file_name, 'pipeline_id': pipeline_id, 'pmapname': pmapname, 'build': build} params.update(extra_kwargs) if isinstance(local_file, str): if not os.path.exists(local_file): raise FileNotFoundError(local_file) return request.post_file("/api/level1/file", local_file, params) return request.post_bytesio("/api/level1/file", local_file, params) def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], level1_id: str, pipeline_id: Optional[str] = '') -> Result: """ 生成流水线的处理消息。 Args: module_id (str): 模块ID。 level1_id (str): 1级数据的ID。 pipeline_id (str): 流水管线ID,默认为空字符串。 Returns: Result: 处理消息生成的结果,是否成功以及相关的错误信息。 """ return request.put(f"/api/level1/prc/{module_id}/{level1_id}", {'pipeline_id': pipeline_id}) def process_list(level1_id: str) -> Result: """ 通过 level1 的 ID 查询1级数据处理记录。 Args: level1_id (str): 1级数据的ID。 Returns: Result: 查询结果。 """ return request.get(f"/api/level1/prc/{level1_id}") def add_process(level1_id: str, pipeline_id: str, run_id: str, prc_time: str, prc_status: int = -1024, prc_module: str = "", message: str = "") -> Result: """ 添加1级数据处理记录。 Args: level1_id (str): 1级数据的ID。 pipeline_id (str): 管线ID。 run_id (str): 运行ID。 prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"。 prc_status (int): 处理状态。 prc_module (str): 处理模块。 message (str): 处理消息。 Returns: Result: 成功后,Result.data为写入记录,失败message为失败原因。 """ params = { 'level1_id': level1_id, 'pipeline_id': pipeline_id, 'run_id': run_id, 'prc_time': prc_time, 'prc_status': prc_status, 'prc_module': prc_module, 'message': message, } utils.is_valid_datetime_format(prc_time) return request.post("/api/level1/prc", params)