import os import pickle from typing import Optional, Tuple, Literal from .common import request, Result DateTimeTuple = Tuple[str, str] def find( project_id: Optional[str] = None, obs_id: Optional[str] = None, module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC', detector_no: Optional[str] = None, data_type: Optional[str] = None, filter: Optional[str] = None, obs_time: Optional[DateTimeTuple] = None, create_time: Optional[DateTimeTuple] = None, qc2_status: Optional[int] = None, prc_status: Optional[int] = None, file_name: Optional[str] = None, object_name: Optional[str] = None, page: int = 1, limit: int = 0) -> Result: """ 根据给定的参数在搜索2级数据。 Args: project_id (Optional[str], optional): 项目ID. Defaults to None. obs_id (Optional[str], optional): 观测ID. Defaults to None. module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None. detector_no (Optional[str], optional): 探测器编号. Defaults to None. data_type (Optional[str], optional): 文件类型,如'SCI'. Defaults to None. filter (Optional[str], optional): 滤光片. Defaults to None. obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None. create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None. qc2_status (Optional[int], optional): QC0状态. Defaults to None. prc_status (Optional[int], optional): 处理状态. Defaults to None. file_name (Optional[str], optional): 文件名. Defaults to None. object_name (Optional[str], optional): 天体名称. Defaults to None. page (int, optional): 页码. Defaults to 1. limit (int, optional): 每页数量. Defaults to 0. Returns: Result: 搜索结果对象. """ params = { 'project_id': project_id, 'obs_id': obs_id, 'module_id': module_id, 'detector_no': detector_no, 'data_type': data_type, 'filter': filter, 'qc2_status': qc2_status, 'prc_status': prc_status, 'file_name': file_name, 'object_name': object_name, 'obs_time_start': None, 'obs_time_end': None, 'create_time_start': None, 'create_time_end': None, 'page': page, 'limit': limit, } if obs_time is not None: params['obs_time_start'], params['obs_time_end'] = obs_time if create_time is not None: params['create_time_start'], params['create_time_end'] = create_time return request.post("/api/level2", params) def find_by_level2_id(level2_id: str) -> Result: """ 通过 level2 的 ID 查询2级数据。 Args: level2_id (str): 2级数据的ID。 Returns: Result: 查询结果。 """ return request.get(f"/api/level2/{level2_id}") def update_qc2_status(level2_id: str, data_type: str, qc2_status: int) -> Result: """ 更新2级数据的QC0状态 Args: level2_id (str): 2级数据文件的ID data_type (str): 文件类型 qc2_status (int): QC0状态 Returns: Result: 更新结果 """ return request.put(f"/api/level2/qc2_status/{level2_id}", {'data_type': data_type, 'qc2_status': qc2_status}) def update_prc_status(level2_id: str, data_type: str, prc_status: int) -> Result: """ 更新2级数据的处理状态。 Args: level2_id (str): 2级数据文件的ID。 data_type (str): 文件类型。 prc_status (int): 处理状态。 Returns: Result: 操作结果。 """ return request.put(f"/api/level2/prc_status/{level2_id}", {'data_type': data_type, 'prc_status': prc_status}) def update_qc2_status_by_file_name(file_name: str, qc2_status: int) -> Result: """ 更新2级数据的QC0状态 Args: file_name (str): 2级数据文件名 qc2_status (int): QC0状态 Returns: Result: 更新结果 """ return request.put(f"/api/level2/qc2_status/file/{file_name}", {'qc2_status': qc2_status}) def update_prc_status_by_file_name(file_name: str, prc_status: int) -> Result: """ 更新2级数据的处理状态。 Args: file_name (str): 2级数据文件名。 prc_status (int): 处理状态。 Returns: Result: 操作结果。 """ return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status}) def write(local_file: str, module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], level0_id: Optional[str | None], level1_id: Optional[str | None], level2_id: Optional[str | None], brick_id: Optional[int | None], data_type: str, pipeline_id: str, build: int, version: str, **extra_kwargs) -> Result: """ 将本地的2级数据文件写入到DFS中。 Args: local_file (str): 文件路径。 module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID。 level0_id (Optional[str | None]): 0级数据的ID。默认为 None。 level1_id (Optional[str | None]): 1级数据的ID。默认为 None。 level2_id (Optional[str | None]): 2级数据的ID。默认为 None。 brick_id (Optional[int | None]): 天区的ID。默认为 None。 data_type (str): 数据文件类型。 pipeline_id (str): 管线ID。 pmapname (str): CCDS pmap名称。 build (int): 构建号。 **kwargs: 额外的关键字参数,这些参数将传递给DFS。 Returns: Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。 """ params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'level2_id': level2_id, 'brick_id': brick_id, 'data_type': data_type, 'pipeline_id': pipeline_id, 'build': build, 'version': version} params.update(extra_kwargs) if not os.path.exists(local_file): raise FileNotFoundError(local_file) return request.post_file("/api/level2/file", local_file, params) def catalog_query(sql: str, limit: int = 0) -> Result: datas = request.post("/api/level2/catalog/query", {'sql': sql, 'limit': limit}) if datas and isinstance(datas, Result): return datas records = pickle.loads(datas._content) df, total_count = records['records'], records['totalCount'] return Result.ok_data(data = df).append("totalCount", total_count)