import os import pickle from typing import Optional, Tuple, Literal, Union, IO from .common import request, Result, utils, constants DateTimeTuple = Tuple[str, str] def find( project_id: Optional[str] = None, obs_id: Optional[str] = None, module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC', detector_no: Optional[str] = None, data_type: Optional[str] = None, filter: Optional[str] = None, obs_time: Optional[DateTimeTuple] = None, create_time: Optional[DateTimeTuple] = None, qc2_status: Optional[int] = None, prc_status: Optional[int] = None, file_name: Optional[str] = None, object_name: Optional[str] = None, dataset: str = constants.DEFAULT_DATASET, batch_id: str = constants.DEFAULT_BATCH_ID, page: int = 1, limit: int = 0) -> Result: """ 根据给定的参数搜索2级数据文件记录 Args: project_id (Optional[str], optional): 项目ID. Defaults to None. obs_id (Optional[str], optional): 观测ID. Defaults to None. module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None. detector_no (Optional[str], optional): 探测器编号. Defaults to None. data_type (Optional[str], optional): 文件类型,如'csst-msc-l2-mbi-cat'. Defaults to None. filter (Optional[str], optional): 滤光片. Defaults to None. obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None. create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None. qc2_status (Optional[int], optional): QC0状态. Defaults to None. prc_status (Optional[int], optional): 处理状态. Defaults to None. file_name (Optional[str], optional): 文件名. Defaults to None. object_name (Optional[str], optional): 天体名称. Defaults to None. dataset (Optional[str], optional): 数据集名称. Defaults to None. batch_id (Optional[str], optional): 最后一次成功的批次ID. Defaults to None. page (int, optional): 页码. Defaults to 1. limit (int, optional): 每页数量. Defaults to 0. Returns: Result: 搜索结果对象. """ params = { 'project_id': project_id, 'obs_id': obs_id, 'module_id': module_id, 'detector_no': detector_no, 'data_type': data_type, 'filter': filter, 'qc2_status': qc2_status, 'prc_status': prc_status, 'file_name': file_name, 'object_name': object_name, 'dataset': dataset, 'batch_id': batch_id, 'obs_time_start': None, 'obs_time_end': None, 'create_time_start': None, 'create_time_end': None, 'page': page, 'limit': limit, } if obs_time is not None: params['obs_time_start'], params['obs_time_end'] = obs_time utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end']) if create_time is not None: params['create_time_start'], params['create_time_end'] = create_time utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end']) return request.post("/api/level2", params) def find_by_level2_id(level2_id: str) -> Result: """ 通过 level2 的 ID 查询2级数据 Args: level2_id (str): 2级数据的ID Returns: Result: 查询结果 """ return request.get(f"/api/level2/{level2_id}") def update_qc2_status(level2_id: str, data_type: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result: """ 更新2级数据的QC0状态 Args: level2_id (str): 2级数据文件的ID data_type (str): 数据类型,如'csst-msc-l2-mbi-cat' qc2_status (int): QC0状态 Returns: Result: 更新结果 """ return request.put(f"/api/level2/qc2_status/{level2_id}", {'data_type': data_type, 'qc2_status': qc2_status, 'batch_id': batch_id}) def update_prc_status(level2_id: str, data_type: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result: """ 更新2级数据的处理状态 Args: level2_id (str): 2级数据文件的ID data_type (str): 数据类型,如'csst-msc-l2-mbi-cat' prc_status (int): 处理状态 Returns: Result: 操作结果 """ return request.put(f"/api/level2/prc_status/{level2_id}", {'data_type': data_type, 'prc_status': prc_status, 'batch_id': batch_id}) def update_qc2_status_by_file_name(file_name: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result: """ 更新2级数据的QC0状态 Args: file_name (str): 2级数据文件名 qc2_status (int): QC0状态 Returns: Result: 更新结果 """ return request.put(f"/api/level2/qc2_status/file/{file_name}", {'qc2_status': qc2_status, 'batch_id': batch_id}) def update_prc_status_by_file_name(file_name: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result: """ 更新2级数据的处理状态 Args: file_name (str): 2级数据文件名 prc_status (int): 处理状态 Returns: Result: 操作结果 """ return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status, 'batch_id': batch_id}) def write(local_file: Union[IO | str], module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], level0_id: Optional[str | None], level1_id: Optional[str | None], level2_id: Optional[str | None], brick_id: Optional[int | None], data_type: str, file_name: str, pipeline_id: str, build: int, dataset: Optional[str] = None, batch_id: Optional[str] = None, **extra_kwargs) -> Result: """ 将本地的2级数据文件写入到DFS中 Args: local_file (Union[IO | str]): 文件路径 或 文件对象 module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID level0_id (Optional[str | None]): 0级数据的ID默认为 None level1_id (Optional[str | None]): 1级数据的ID默认为 None level2_id (Optional[str | None]): 2级数据的ID默认为 None brick_id (Optional[int | None]): 天区的ID默认为 None data_type (str): 数据类型,如'csst-msc-l2-mbi-cat' file_name (str): 2级数据文件名 pipeline_id (str): 管线ID build (int): 构建号 dataset (Optional[str], optional): 数据集名称. Defaults to None. batch_id (Optional[str], optional): 最后一次成功的批次ID. Defaults to None. **kwargs: 额外的关键字参数,这些参数将传递给DFS Returns: Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为2级数据对象 """ if utils.is_valid_filename(file_name): raise ValueError(f"Incorrect file name [{file_name}], should be *.*") params = { 'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'level2_id': level2_id, 'brick_id': brick_id, 'file_name': file_name, 'data_type': data_type, 'pipeline_id': pipeline_id, 'build': build, 'dataset': dataset, 'batch_id': batch_id } params.update(extra_kwargs) if isinstance(local_file, str): if not os.path.exists(local_file): raise FileNotFoundError(local_file) return request.post_file("/api/level2/file", local_file, params) return request.post_bytesio("/api/level2/file", local_file, params) def catalog_query(sql: str, limit: int = 0) -> Result: """ 根据给定的SQL查询语句和限制条件,从数据库中查询2级科学数据并返回查询结果。 Args: sql (str): 要执行的SQL查询语句。 limit (int, optional): 查询结果的最大数量。默认为0,表示不限制数量。 Returns: Result: 包含查询结果的Result对象,data为pd.DataFrame对象。 """ datas = request.post("/api/level2/catalog/query", {'sql': sql, 'limit': limit}) if datas and isinstance(datas, Result): return datas records = pickle.loads(datas._content) df, total_count = records['records'], records['totalCount'] return Result.ok_data(data = df).append("totalCount", total_count)