plan.py 2.88 KB
Newer Older
Wei Shoulin's avatar
plan  
Wei Shoulin committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
from typing import Optional, IO, Tuple, Literal
from .common import request, Result

DateTimeTuple = Tuple[str, str]

def find(mode: Optional[str] = None,
        obsid: Optional[str] = None,
        backend: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC', 
        obstype: Optional[str] = None,
        obstime: Optional[DateTimeTuple] = None,
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数在DFS中搜索编排数据。
    
    Args:
        mode (Optional[str], optional): 观测模式. Defaults to None.
        obsid (Optional[str], optional): 观测ID,支持模糊搜索. Defaults to None.
        backend (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
        obstype (Optional[str], optional): 观测类型,如'SCIE'. Defaults to None.
        obstime (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0,不限制.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'mode': mode,
        'obsid': obsid,
        'backend': backend,
        'obstype': obstype,
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
        'page': page,
        'limit': limit,
    }
    
    if obstime is not None:
        params['obs_time_start'], params['obs_time_end'] = obstime
    
    return request.post("/api/plan", params)

def get_by_id(_id: str) -> Result:
    return request.get(f"/api/plan/_id/{_id}")

def find_by_opid(opid: str) -> Result:
    """
    通过 plan 的 opid 查询编排数据。
    
    Args:
        opid (str): opid。
    
    Returns:
        Result: 查询结果。
    
    """
    return request.get(f"/api/plan/{opid}")

def write_file(local_file: Optional[IO | str], **kwargs) -> Result:
    """
    将本地json文件或json数据流写入DFS中。
    
    Args:
        local_file (str]): 文件路径。
        **kwargs: 额外的关键字参数,这些参数将传递给DFS。
    
    Returns:
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
    
    """    
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)                
        return request.post_file("/api/plan/file", local_file, kwargs)
    return request.post_bytesio("/api/plan/file", local_file, kwargs)

def new(data: dict) -> Result:
    """
    新建编排数据
    
    Args:
        data (dict): 编排数据的字典表示,如:{'opid': 'xxx', 'backend': 'MSC', ...}
    
    Returns:
        Result: 成功后,Result.data为写入记录,失败message为失败原因。
    
    """
    return request.post("/api/plan/new", data)