An error occurred while loading the file. Please try again.
-
Wei Shoulin authoredbe8cb11e
import os
from typing import Optional, IO, Tuple, Literal, Union
from .common import request, Result
DateTimeTuple = Tuple[str, str]
def find(mode: Optional[str] = None,
obsid: Optional[str] = None,
backend: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
obstype: Optional[str] = None,
obstime: Optional[DateTimeTuple] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数在DFS中搜索编排数据。
Args:
mode (Optional[str], optional): 观测模式. Defaults to None.
obsid (Optional[str], optional): 观测ID,支持模糊搜索. Defaults to None.
backend (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
obstype (Optional[str], optional): 观测类型,如'SCIE'. Defaults to None.
obstime (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0,不限制.
Returns:
Result: 搜索结果对象.
"""
params = {
'mode': mode,
'obsid': obsid,
'backend': backend,
'obstype': obstype,
'obs_time_start': None,
'obs_time_end': None,
'create_time_start': None,
'create_time_end': None,
'page': page,
'limit': limit,
}
if obstime is not None:
params['obs_time_start'], params['obs_time_end'] = obstime
return request.post("/api/plan", params)
def get_by_id(_id: int) -> Result:
"""
通过计划ID获取编排数据。
Args:
_id (int): 计划ID。
Returns:
Result: 查询结果。
"""
return request.get(f"/api/plan/id/{_id}")
def find_by_opid(opid: str) -> Result:
"""
通过 plan 的 opid 查询编排数据。
Args:
opid (str): opid。
Returns:
Result: 查询结果。
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
return request.get(f"/api/plan/{opid}")
def write_file(local_file: Union[IO, str], **kwargs) -> Result:
"""
将本地json文件或json数据流写入DFS中。
Args:
local_file (str]): 文件路径。
**kwargs: 额外的关键字参数,这些参数将传递给DFS。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
"""
if isinstance(local_file, str):
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/plan/file", local_file, kwargs)
return request.post_bytesio("/api/plan/file", local_file, kwargs)
def new(data: dict) -> Result:
"""
新建编排数据
Args:
data (dict): 编排数据的字典表示,如:{'id': 394, 'opid': 'xxx', 'backend': 'MSC', ...}
Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因。
"""
return request.post("/api/plan/new", data)