level1.py 11.4 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
from typing import Optional, Tuple, Literal, IO, Union
Wei Shoulin's avatar
Wei Shoulin committed
2
from .common import request, Result, utils, constants
Wei Shoulin's avatar
Wei Shoulin committed
3
4
5
6
import os

DateTimeTuple = Tuple[str, str]

7
8
def find(
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
9
10
        dataset: str,
        batch_id: str,        
11
        obs_group: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
12
13
        obs_id: Optional[str] = None,
        level0_id: Optional[str] = None,
14
        detector: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
15
        data_model: Optional[str] = None,
16
        obs_type: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
17
        filter: Optional[str] = None,
18
        obs_date: Optional[DateTimeTuple] = None,
Wei Shoulin's avatar
Wei Shoulin committed
19
        create_time: Optional[DateTimeTuple] = None,
20
        qc_status: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
21
22
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
23
24
        ra: Optional[int] = None,
        dec: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
25
        radius: Optional[float] = None,
26
        object: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
27
28
        rss_id: Optional[str] = None,
        cube_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
29
30
        build: Optional[int] = None,
        pmapname: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
31
        page: int = 1,
32
33
        limit: int = 0,
        **extra_kwargs) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
34
    """
Wei Shoulin's avatar
Wei Shoulin committed
35
    根据给定的参数搜索1级数据文件记录
Wei Shoulin's avatar
Wei Shoulin committed
36
37
    
    Args:
38
        obs_group (Optional[str], optional): 项目ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
39
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
40
        instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
41
        detector (Optional[str], optional): 探测器. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
42
        data_model (Optional[str], optional): 数据类型,如'csst-msc-l1-mbi'. Defaults to None.
43
        obs_type (Optional[str], optional): 观测类型,如'01'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
44
        filter (Optional[str], optional): 滤光片. Defaults to None.
45
        obs_date (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
46
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
47
        qc_status (Optional[int], optional): QC1状态. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
48
49
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
50
51
        ra (Optional[int], optional): 中心赤经. Defaults to None.
        dec (Optional[int], optional): 中心赤纬. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
52
        radius (Optional[float], optional): 搜索半径. Defaults to None.
53
        object (Optional[str], optional): 天体名称. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
54
55
        rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
        cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
56
57
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
58
59
        build (Optional[int], optional): 构建版本. Defaults to None.
        pmapname (Optional[str], optional): CCDS pmap名. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
60
61
62
63
64
65
66
67
68
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
69
        'obs_group': obs_group,
Wei Shoulin's avatar
Wei Shoulin committed
70
71
        'level0_id': level0_id,
        'obs_id': obs_id,
Wei Shoulin's avatar
Wei Shoulin committed
72
        'instrument': instrument,
73
        'detector': detector,
Wei Shoulin's avatar
Wei Shoulin committed
74
        'data_model': data_model,
75
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
76
        'filter': filter,
77
        'qc_status': qc_status,
Wei Shoulin's avatar
Wei Shoulin committed
78
79
        'prc_status': prc_status,
        'file_name': file_name,
80
81
        'ra_cen': ra,
        'dec_cen': dec,
Wei Shoulin's avatar
Wei Shoulin committed
82
        'radius': radius,
83
        'object': object,
84
85
        'obs_date_start': None,
        'obs_date_end': None,
Wei Shoulin's avatar
Wei Shoulin committed
86
87
88
89
        'create_time_start': None,
        'create_time_end': None,
        'rss_id': rss_id,
        'cube_id': cube_id,
Wei Shoulin's avatar
Wei Shoulin committed
90
91
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
92
93
        'build': build,
        'pmapname': pmapname,
Wei Shoulin's avatar
Wei Shoulin committed
94
95
96
        'page': page,
        'limit': limit,
    }
97
    params.update(extra_kwargs)
Wei Shoulin's avatar
Wei Shoulin committed
98
    
99
100
101
    if obs_date is not None:
        params['obs_date_start'], params['obs_date_end'] = obs_date
        utils.is_valid_datetime_format(params['obs_date_start']) or not utils.is_valid_datetime_format(params['obs_date_end'])
Wei Shoulin's avatar
Wei Shoulin committed
102
103
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
Wei Shoulin's avatar
Wei Shoulin committed
104
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
105
106
107
108
109
    
    return request.post("/api/level1", params)

def find_by_level1_id(level1_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
110
    通过 level1 的 ID 查询1级数据
Wei Shoulin's avatar
Wei Shoulin committed
111
112
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
113
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
114
115
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
116
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
117
118
119
120
121
    
    """
    return request.get(f"/api/level1/{level1_id}")

def find_by_brick_id(brick_id: int) -> Result:
Wei Shoulin's avatar
plan    
Wei Shoulin committed
122
    """
Wei Shoulin's avatar
Wei Shoulin committed
123
    通过 brick 的 ID 查询1级数据
Wei Shoulin's avatar
plan    
Wei Shoulin committed
124
125
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
126
        brick_id (int): 天区ID
Wei Shoulin's avatar
plan    
Wei Shoulin committed
127
128
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
129
        Result: 查询结果
Wei Shoulin's avatar
plan    
Wei Shoulin committed
130
131
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
132
133
    return request.get(f"/api/level1/brick/{brick_id}")

134
def sls_find_by_qc_status(qc_status: int, batch_id: str, limit: int = 1) -> Result:
135
    return request.post(f"/api/level1/sls/qc_status/{qc_status}", {'limit': limit, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
136

137
def update_qc_status_by_ids(ids: list[str], qc_status: int) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
138
    """
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    根据内部_id,批量更新1级数据的QC状态
    
    Args:
        ids (List[str]): 内部_id列表
        qc_status (int): QC状态
    
    Returns:
        Result: 更新结果
    """
    return request.put("/api/level1/qc_status/batch/update", {'qc_status': qc_status, 'ids': ids})

def update_prc_status_by_ids(ids: list[str], prc_status: int) -> Result:
    """
    根据内部_id,批量更新1级数据的处理状态
    
    Args:
        ids (List[str]): 内部_id列表
        prc_status (int): 处理状态
    
    Returns:
        Result: 更新结果
    """
    return request.put("/api/level1/prc_status/batch/update", {'prc_status': prc_status, 'ids': ids})

def update_qc_status(level1_id: str, data_model: str, qc_status: int, batch_id: str) -> Result:
    """
    更新1级数据的QC状态
Wei Shoulin's avatar
Wei Shoulin committed
166
167
168
    
    Args:
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
169
        data_model (str): 数据类型
170
        qc_status (int): QC状态
Wei Shoulin's avatar
Wei Shoulin committed
171
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
172
173
174
175
    
    Returns:
        Result: 更新结果
    """
176
    return request.put(f"/api/level1/qc_status/{level1_id}", {'data_model': data_model, 'qc_status': qc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
177

178
def update_prc_status(level1_id: str, data_model: str, prc_status: int, batch_id: str) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
179
    """
Wei Shoulin's avatar
Wei Shoulin committed
180
    更新1级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
181
182
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
183
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
184
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
185
186
        prc_status (int): 处理状态
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
187
188
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
189
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
190
    """
Wei Shoulin's avatar
Wei Shoulin committed
191
    return request.put(f"/api/level1/prc_status/{level1_id}", {'data_model': data_model, 'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
192

193
def write(local_file: Union[IO, str]) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
194
    '''
Wei Shoulin's avatar
Wei Shoulin committed
195
    将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入
Wei Shoulin's avatar
Wei Shoulin committed
196
197
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
198
        local_file (Union[IO, str]): 文件路径或文件对象
Wei Shoulin's avatar
Wei Shoulin committed
199
200
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
201
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
Wei Shoulin's avatar
Wei Shoulin committed
202
    '''
203
    params = {}
Wei Shoulin's avatar
Wei Shoulin committed
204
205
    if local_file is None:
        raise ValueError("local_file is required")
Wei Shoulin's avatar
Wei Shoulin committed
206
207
208
209
210
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level1/file", local_file, params)
    return request.post_bytesio("/api/level1/file", local_file, params)
Wei Shoulin's avatar
Wei Shoulin committed
211

212
213
def find_process(dag: Optional[str] = None,  
                dag_run: Optional[str] = None, 
Wei Shoulin's avatar
Wei Shoulin committed
214
215
216
217
218
                batch_id: Optional[str] = None, 
                level1_id: Optional[str] = None,
                dataset: Optional[str] = None,
                prc_module: Optional[str] = None,
                prc_status: Optional[int] = None,
219
                prc_date: Optional[DateTimeTuple] = None,
Wei Shoulin's avatar
Wei Shoulin committed
220
221
                page: int = 1,
                limit: int = 0) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
222
    """
Wei Shoulin's avatar
Wei Shoulin committed
223
    查询0级数据处理过程
Wei Shoulin's avatar
Wei Shoulin committed
224
225
    
    Args:
226
227
        dag (str): DAG标识
        dag_run (str): DAG运行标识
Wei Shoulin's avatar
Wei Shoulin committed
228
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
229
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
230
231
232
        dataset (str): 数据集
        prc_module (str): 处理模块
        prc_status (int): 处理状态
233
        prc_date (DateTimeTuple): 处理时间范围
Wei Shoulin's avatar
Wei Shoulin committed
234
235
        page (int): 页码,默认为1
        limit (int): 每页数量 0: 不限制
Wei Shoulin's avatar
Wei Shoulin committed
236
237
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
238
        Result: 成功后,Result.data为数据列表,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
239
240
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
241
    params = {
242
243
        'dag': dag,
        'dag_run': dag_run,
Wei Shoulin's avatar
Wei Shoulin committed
244
245
246
247
248
        'batch_id': batch_id,
        'level1_id': level1_id,
        'dataset': dataset,
        'prc_module': prc_module,
        'prc_status': prc_status,
249
250
        'prc_date_start': None,
        'prc_date_end': None,
Wei Shoulin's avatar
Wei Shoulin committed
251
252
253
        'page': page,
        'limit': limit
    }
254
255
256
    if prc_date is not None:
        params['prc_date_start'], params['prc_date_end'] = prc_date
        if params['prc_date_start'] and utils.is_valid_datetime_format(params['prc_date_start']):
Wei Shoulin's avatar
Wei Shoulin committed
257
            pass
258
        if params['prc_date_end'] and utils.is_valid_datetime_format(params['prc_date_end']):
Wei Shoulin's avatar
Wei Shoulin committed
259
260
            pass 
    return request.post("/api/level1/process", params)
Wei Shoulin's avatar
Wei Shoulin committed
261
262

def add_process(level1_id: str, 
263
264
                dag: str, 
                dag_run: str, 
Wei Shoulin's avatar
Wei Shoulin committed
265
266
                dataset: str = constants.DEFAULT_DATASET,
                batch_id: str = constants.DEFAULT_BATCH_ID,
267
                prc_date: str = utils.get_current_time(),
Wei Shoulin's avatar
Wei Shoulin committed
268
269
270
271
                prc_status: int = -1024,
                prc_module: str = "",
                message: str = "") -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
272
    添加1级数据处理记录
Wei Shoulin's avatar
Wei Shoulin committed
273
274
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
275
        level1_id (str): 1级数据的ID
276
277
        dag (str): DAG标识
        dag_run (str): DAG运行标识
Wei Shoulin's avatar
Wei Shoulin committed
278
279
        dataset (str): 数据集
        batch_id (str): 批次ID
280
        prc_date (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
Wei Shoulin's avatar
Wei Shoulin committed
281
282
283
        prc_status (int): 处理状态
        prc_module (str): 处理模块
        message (str): 处理消息
Wei Shoulin's avatar
Wei Shoulin committed
284
285
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
286
        Result: 成功后,Result.data为写入记录,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
287
288
289
290
    
    """
    params = {
        'level1_id': level1_id,
291
292
        'dag': dag,
        'dag_run': dag_run,
Wei Shoulin's avatar
Wei Shoulin committed
293
294
        'dataset': dataset,
        'batch_id': batch_id,
295
        'prc_date': prc_date,
Wei Shoulin's avatar
Wei Shoulin committed
296
297
298
299
        'prc_status': prc_status,
        'prc_module': prc_module,
        'message': message,
    }
300
    utils.is_valid_datetime_format(prc_date)
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
    return request.post("/api/level1/prc", params)

def delete(dataset: str, batch_id: str, data_model: str) -> Result:
    """
    删除1级数据处理记录
    
    Args:
        dataset (str): 数据集
        batch_id (str): 批次ID
        data_model (str): 数据模型
    
    Returns:
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象
    
    """
316
    return request.delete("/api/level1/delete", {"dataset": dataset, "batch_id": batch_id, "data_model": data_model})
317

318
def count_by_dataset(dataset: Optional[str] = None) -> Result:
319
320
321
    """
    查询0级数据按数据集分组的数量

322
323
324
    Args:
        dataset (str): 数据集

325
326
327
328
    Returns:
        Result: 成功后,Result.data为数据列表,失败message为失败原因

    """
329
330
    params = {"dataset": dataset}
    return request.post("/api/level1/count_by_dataset", params)