level2.py 7 KB
Newer Older
1
2
3
4
5
6
7
8
import os
import pickle
from typing import Optional, Tuple, Literal, Union, IO
from .common import request, Result, utils, constants

DateTimeTuple = Tuple[str, str]

def find(
9
10
        dataset: str,
        batch_id: str,    
11
12
        obs_group: Optional[str] = None,
        obs_id: Optional[str] = None,
13
        instrument: str = 'MSC',
14
15
16
17
18
        detector: Optional[str] = None,
        data_model: Optional[str] = None,
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
19
        qc_status: Optional[int] = None,
20
21
22
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
        object_name: Optional[str] = None,
23
        brick_id: Optional[int] = None,
24
25
26
27
28
29
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数搜索2级数据文件记录
    
    Args:
30
31
        dataset (Optional[str], optional): 数据集名称.
        batch_id (Optional[str], optional): 批次ID.    
32
33
34
35
36
37
38
39
        obs_group (Optional[str], optional): 项目ID. Defaults to None.
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
        instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
        detector (Optional[str], optional): 探测器. Defaults to None.
        data_model (Optional[str], optional): 数据类型,如'csst-msc-l2-mbi-cat'. Defaults to None.
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
40
        qc_status (Optional[int], optional): QC状态. Defaults to None.
41
42
43
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
44
        brick_id (Optional[int], optional):  brick ID. Defaults to None.
45
46
47
48
49
50
51
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """
52
53
    if instrument not in constants.INSTRUMENTS:
        raise ValueError(f"Instrument {instrument} is not supported")
54
55
56
57
58
59
60
    params = {
        'obs_group': obs_group,
        'obs_id': obs_id,
        'instrument': instrument,
        'detector': detector,
        'data_model': data_model,
        'filter': filter,
61
        'qc_status': qc_status,
62
63
64
65
66
67
68
69
70
        'prc_status': prc_status,
        'file_name': file_name,
        'object_name': object_name,
        'dataset': dataset,
        'batch_id': batch_id,
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
71
        'brick_id': brick_id,
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
        'page': page,
        'limit': limit,
    }
    
    if obs_time is not None:
        params['obs_time_start'], params['obs_time_end'] = obs_time
        utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
    
    return request.post("/api/level2", params)

def find_by_level2_id(level2_id: str) -> Result:
    """
    通过 level2 的 ID 查询2级数据
    
    Args:
        level2_id (str): 2级数据的ID
    
    Returns:
        Result: 查询结果
    
    """
    return request.get(f"/api/level2/{level2_id}")

98
def update_qc_status(level2_id: str, data_model: str, qc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
99
    """
100
    更新2级数据的QC状态
101
102
103
104
    
    Args:
        level2_id (str): 2级数据文件的ID
        data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
105
        qc_status (int): QC状态
106
107
108
109
    
    Returns:
        Result: 更新结果
    """
110
    return request.put(f"/api/level2/qc_status/{level2_id}", {'data_model': data_model, 'qc_status': qc_status, 'batch_id': batch_id})
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125

def update_prc_status(level2_id: str, data_model: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    """
    更新2级数据的处理状态
    
    Args:
        level2_id (str): 2级数据文件的ID
        data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
        prc_status (int): 处理状态
    
    Returns:
        Result: 操作结果
    """
    return request.put(f"/api/level2/prc_status/{level2_id}", {'data_model': data_model, 'prc_status': prc_status, 'batch_id': batch_id})

126
def update_qc_status_by_file_name(file_name: str, qc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
127
    """
128
    更新2级数据的QC状态
129
130
131
    
    Args:
        file_name (str): 2级数据文件名
132
        qc_status (int): QC状态
133
134
135
136
    
    Returns:
        Result: 更新结果
    """
137
    return request.put(f"/api/level2/qc_status/file/{file_name}", {'qc_status': qc_status, 'batch_id': batch_id})
138
139
140
141
142
143
144
145
146
147
148
149
150
151

def update_prc_status_by_file_name(file_name: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    """
    更新2级数据的处理状态
    
    Args:
        file_name (str): 2级数据文件名
        prc_status (int): 处理状态
    
    Returns:
        Result: 操作结果
    """
    return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status, 'batch_id': batch_id})

152
def write(local_file: Union[IO, str]) -> Result:
153
154
155
156
157
158
159
160
161
162
163
    """
    将本地的2级数据文件写入到DFS中
    
    Args:
        local_file (Union[IO, str]): 文件路径 或 文件对象
    
    Returns:
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为2级数据对象
    
    """

164
    params = {}
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
    if local_file is None:
        raise ValueError("local_file is required")    
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level2/file", local_file, params)
    return request.post_bytesio("/api/level2/file", local_file, params)

def catalog_query(sql: str, limit: int = 0) -> Result:
    """
    根据给定的SQL查询语句和限制条件,从数据库中查询2级科学数据并返回查询结果。

    Args:
        sql (str): 要执行的SQL查询语句。
        limit (int, optional): 查询结果的最大数量。默认为0,表示不限制数量。

    Returns:
        Result: 包含查询结果的Result对象,data为pd.DataFrame对象。

    """

    datas = request.post("/api/level2/catalog/query", {'sql': sql, 'limit': limit})
    if datas and isinstance(datas, Result):
        return datas
    records = pickle.loads(datas._content)
    df, total_count = records['records'], records['totalCount']
    return Result.ok_data(data = df).append("totalCount", total_count)