other.py 2.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
from typing import Literal, Optional, Tuple

from .common import Result, request, utils

DateTimeTuple = Tuple[str, str]


def find(
    instrument: Literal["MSC", "IFS", "MCI", "HSTDM", "CPIC"],
    dag_run: Optional[str] = None,
    create_time: Optional[DateTimeTuple] = None,
    page: int = 1,
    limit: int = 0,
    **extra_kwargs,
) -> Result:
    """
    根据给定的参数搜索其他数据文件记录

    Args:
        instrument (str): 设备,必需为'MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'之一.
        dag_run (Optional[str], optional): DAG运行标识. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0,不限制.

    Returns:
        Result: 搜索结果对象.

    """

    params = {
        "instrument": instrument,
        "dag_run": dag_run,
        "page": page,
        "limit": limit,
    }
    params.update(extra_kwargs)

    if create_time is not None:
        params["create_time_start"], params["create_time_end"] = create_time
        utils.is_valid_datetime_format(
            params["create_time_start"]
        ) or utils.is_valid_datetime_format(params["create_time_end"])
    return request.post("/api/other", params)


def get_by_id(_id: str) -> Result:
    """
    根据内部ID获取其他数据

    Args:
        _id (str):其他数据的内部ID

    Returns:
        Result: 查询结果

    """
    return request.get(f"/api/other/_id/{_id}")

def write(
    local_file: str, instrument: Literal["MSC", "IFS", "MCI", "HSTDM", "CPIC"], **kwargs
) -> Result:
    """
    将本地文件写入DFS中

    Args:
        local_file (str]): 文件路径
        **kwargs: 额外的关键字参数,这些参数将传递给DFS

    Returns:
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象

    """
    params = {"instrument": instrument}
    params.update(kwargs)
    if not os.path.exists(local_file):
        raise FileNotFoundError(local_file)
    return request.post_file("/api/other/file", local_file, params)

def delete(instrument: str, dag_run: Optional[str] = None) -> Result:
    """
    删除其他数据,用于数据测试

    Args:
        instrument (str): 设备,必需为'MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'之一.
        dag_run (Optional[str], optional): DAG运行标识. Defaults to None.

    Returns:
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象

    """
    return request.delete("/api/other/delete", {"instrument": instrument, "dag_run": dag_run})