import os from typing import Literal, Optional, Tuple from .common import Result, request, utils DateTimeTuple = Tuple[str, str] def find( instrument: Literal["MSC", "IFS", "MCI", "HSTDM", "CPIC"], dag_run: Optional[str] = None, create_time: Optional[DateTimeTuple] = None, page: int = 1, limit: int = 0, **extra_kwargs, ) -> Result: """ 根据给定的参数搜索其他数据文件记录 Args: instrument (str): 设备,必需为'MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'之一. dag_run (Optional[str], optional): DAG运行标识. Defaults to None. create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None. page (int, optional): 页码. Defaults to 1. limit (int, optional): 每页数量. Defaults to 0,不限制. Returns: Result: 搜索结果对象. """ params = { "instrument": instrument, "dag_run": dag_run, "page": page, "limit": limit, } params.update(extra_kwargs) if create_time is not None: params["create_time_start"], params["create_time_end"] = create_time utils.is_valid_datetime_format( params["create_time_start"] ) or utils.is_valid_datetime_format(params["create_time_end"]) return request.post("/api/other", params) def get_by_id(_id: str) -> Result: """ 根据内部ID获取其他数据 Args: _id (str):其他数据的内部ID Returns: Result: 查询结果 """ return request.get(f"/api/other/_id/{_id}") def write( local_file: str, instrument: Literal["MSC", "IFS", "MCI", "HSTDM", "CPIC"], dag_run: str, **kwargs ) -> Result: """ 将本地文件写入DFS中 Args: local_file (str]): 文件路径 instrument (str): 设备,必需为'MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'之一. dag_run (str): DAG运行标识. **kwargs: 额外的关键字参数,这些参数将传递给DFS Returns: Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象 """ params = {"instrument": instrument, "dag_run": dag_run} params.update(kwargs) if not os.path.exists(local_file): raise FileNotFoundError(local_file) return request.post_file("/api/other/file", local_file, params) def delete(instrument: str, dag_run: Optional[str] = None) -> Result: """ 删除其他数据,用于数据测试 Args: instrument (str): 设备,必需为'MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'之一. dag_run (Optional[str], optional): DAG运行标识. Defaults to None. Returns: Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象 """ return request.delete("/api/other/delete", {"instrument": instrument, "dag_run": dag_run})