level1.py 11.5 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
from typing import Optional, Tuple, Literal, IO, Union
Wei Shoulin's avatar
Wei Shoulin committed
2
from .common import request, Result, utils, constants
Wei Shoulin's avatar
Wei Shoulin committed
3
4
5
6
import os

DateTimeTuple = Tuple[str, str]

7
8
def find(
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
9
10
        dataset: str,
        batch_id: str,        
11
        obs_group: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
12
13
        obs_id: Optional[str] = None,
        level0_id: Optional[str] = None,
14
        detector: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
15
        data_model: Optional[str] = None,
16
        obs_type: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
17
        filter: Optional[str] = None,
18
        obs_date: Optional[DateTimeTuple] = None,
Wei Shoulin's avatar
Wei Shoulin committed
19
        create_time: Optional[DateTimeTuple] = None,
20
        qc_status: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
21
22
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
23
24
        ra: Optional[int] = None,
        dec: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
25
        radius: Optional[float] = None,
26
        object: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
27
28
        rss_id: Optional[str] = None,
        cube_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
29
30
        build: Optional[int] = None,
        pmapname: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
31
        page: int = 1,
32
33
        limit: int = 0,
        **extra_kwargs) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
34
    """
Wei Shoulin's avatar
Wei Shoulin committed
35
    根据给定的参数搜索1级数据文件记录
Wei Shoulin's avatar
Wei Shoulin committed
36
37
    
    Args:
38
        obs_group (Optional[str], optional): 项目ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
39
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
40
        instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
41
        detector (Optional[str], optional): 探测器. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
42
        data_model (Optional[str], optional): 数据类型,如'csst-msc-l1-mbi'. Defaults to None.
43
        obs_type (Optional[str], optional): 观测类型,如'01'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
44
        filter (Optional[str], optional): 滤光片. Defaults to None.
45
        obs_date (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
46
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
47
        qc_status (Optional[int], optional): QC1状态. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
48
49
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
50
51
        ra (Optional[int], optional): 中心赤经. Defaults to None.
        dec (Optional[int], optional): 中心赤纬. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
52
        radius (Optional[float], optional): 搜索半径. Defaults to None.
53
        object (Optional[str], optional): 天体名称. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
54
55
        rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
        cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
56
57
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
58
59
        build (Optional[int], optional): 构建版本. Defaults to None.
        pmapname (Optional[str], optional): CCDS pmap名. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
60
61
62
63
64
65
66
67
68
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
69
        'obs_group': obs_group,
Wei Shoulin's avatar
Wei Shoulin committed
70
71
        'level0_id': level0_id,
        'obs_id': obs_id,
Wei Shoulin's avatar
Wei Shoulin committed
72
        'instrument': instrument,
73
        'detector': detector,
Wei Shoulin's avatar
Wei Shoulin committed
74
        'data_model': data_model,
75
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
76
        'filter': filter,
77
        'qc_status': qc_status,
Wei Shoulin's avatar
Wei Shoulin committed
78
79
        'prc_status': prc_status,
        'file_name': file_name,
80
81
        'ra_cen': ra,
        'dec_cen': dec,
Wei Shoulin's avatar
Wei Shoulin committed
82
        'radius': radius,
83
        'object': object,
84
85
        'obs_date_start': None,
        'obs_date_end': None,
Wei Shoulin's avatar
Wei Shoulin committed
86
87
88
89
        'create_time_start': None,
        'create_time_end': None,
        'rss_id': rss_id,
        'cube_id': cube_id,
Wei Shoulin's avatar
Wei Shoulin committed
90
91
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
92
93
        'build': build,
        'pmapname': pmapname,
Wei Shoulin's avatar
Wei Shoulin committed
94
95
96
        'page': page,
        'limit': limit,
    }
97
    params.update(extra_kwargs)
Wei Shoulin's avatar
Wei Shoulin committed
98
    
99
100
101
    if obs_date is not None:
        params['obs_date_start'], params['obs_date_end'] = obs_date
        utils.is_valid_datetime_format(params['obs_date_start']) or not utils.is_valid_datetime_format(params['obs_date_end'])
Wei Shoulin's avatar
Wei Shoulin committed
102
103
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
Wei Shoulin's avatar
Wei Shoulin committed
104
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
105
106
107
108
109
    
    return request.post("/api/level1", params)

def find_by_level1_id(level1_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
110
    通过 level1 的 ID 查询1级数据
Wei Shoulin's avatar
Wei Shoulin committed
111
112
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
113
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
114
115
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
116
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
117
118
119
120
121
    
    """
    return request.get(f"/api/level1/{level1_id}")

def find_by_brick_id(brick_id: int) -> Result:
Wei Shoulin's avatar
plan    
Wei Shoulin committed
122
    """
Wei Shoulin's avatar
Wei Shoulin committed
123
    通过 brick 的 ID 查询1级数据
Wei Shoulin's avatar
plan    
Wei Shoulin committed
124
125
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
126
        brick_id (int): 天区ID
Wei Shoulin's avatar
plan    
Wei Shoulin committed
127
128
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
129
        Result: 查询结果
Wei Shoulin's avatar
plan    
Wei Shoulin committed
130
131
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
132
133
    return request.get(f"/api/level1/brick/{brick_id}")

134
def sls_find_by_qc_status(qc_status: int, batch_id: str, limit: int = 1) -> Result:
135
    return request.post(f"/api/level1/sls/qc_status/{qc_status}", {'limit': limit, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
136

137
def update_qc_status_by_ids(ids: list[str], qc_status: int) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
138
    """
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    根据内部_id,批量更新1级数据的QC状态
    
    Args:
        ids (List[str]): 内部_id列表
        qc_status (int): QC状态
    
    Returns:
        Result: 更新结果
    """
    return request.put("/api/level1/qc_status/batch/update", {'qc_status': qc_status, 'ids': ids})

def update_prc_status_by_ids(ids: list[str], prc_status: int) -> Result:
    """
    根据内部_id,批量更新1级数据的处理状态
    
    Args:
        ids (List[str]): 内部_id列表
        prc_status (int): 处理状态
    
    Returns:
        Result: 更新结果
    """
    return request.put("/api/level1/prc_status/batch/update", {'prc_status': prc_status, 'ids': ids})

def update_qc_status(level1_id: str, data_model: str, qc_status: int, batch_id: str) -> Result:
    """
    更新1级数据的QC状态
Wei Shoulin's avatar
Wei Shoulin committed
166
167
168
    
    Args:
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
169
        data_model (str): 数据类型
170
        qc_status (int): QC状态
Wei Shoulin's avatar
Wei Shoulin committed
171
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
172
173
174
175
    
    Returns:
        Result: 更新结果
    """
176
    return request.put(f"/api/level1/qc_status/{level1_id}", {'data_model': data_model, 'qc_status': qc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
177

178
def update_prc_status(level1_id: str, data_model: str, prc_status: int, batch_id: str) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
179
    """
Wei Shoulin's avatar
Wei Shoulin committed
180
    更新1级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
181
182
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
183
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
184
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
185
186
        prc_status (int): 处理状态
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
187
188
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
189
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
190
    """
Wei Shoulin's avatar
Wei Shoulin committed
191
    return request.put(f"/api/level1/prc_status/{level1_id}", {'data_model': data_model, 'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
192

193
def write(local_file: Union[IO, str], **extra_kwargs) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
194
    '''
Wei Shoulin's avatar
Wei Shoulin committed
195
    将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入
Wei Shoulin's avatar
Wei Shoulin committed
196
197
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
198
        local_file (Union[IO, str]): 文件路径或文件对象
Wei Shoulin's avatar
Wei Shoulin committed
199
        **kwargs: 额外的关键字参数,这些参数将传递给DFS
Wei Shoulin's avatar
Wei Shoulin committed
200
201
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
202
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
Wei Shoulin's avatar
Wei Shoulin committed
203
    '''
204
    params = {}
Wei Shoulin's avatar
Wei Shoulin committed
205
    params.update(extra_kwargs)
Wei Shoulin's avatar
Wei Shoulin committed
206
207
    if local_file is None:
        raise ValueError("local_file is required")
Wei Shoulin's avatar
Wei Shoulin committed
208
209
210
211
212
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level1/file", local_file, params)
    return request.post_bytesio("/api/level1/file", local_file, params)
Wei Shoulin's avatar
Wei Shoulin committed
213

214
215
def find_process(dag: Optional[str] = None,  
                dag_run: Optional[str] = None, 
Wei Shoulin's avatar
Wei Shoulin committed
216
217
218
219
220
                batch_id: Optional[str] = None, 
                level1_id: Optional[str] = None,
                dataset: Optional[str] = None,
                prc_module: Optional[str] = None,
                prc_status: Optional[int] = None,
221
                prc_date: Optional[DateTimeTuple] = None,
Wei Shoulin's avatar
Wei Shoulin committed
222
223
                page: int = 1,
                limit: int = 0) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
224
    """
Wei Shoulin's avatar
Wei Shoulin committed
225
    查询0级数据处理过程
Wei Shoulin's avatar
Wei Shoulin committed
226
227
    
    Args:
228
229
        dag (str): DAG标识
        dag_run (str): DAG运行标识
Wei Shoulin's avatar
Wei Shoulin committed
230
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
231
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
232
233
234
        dataset (str): 数据集
        prc_module (str): 处理模块
        prc_status (int): 处理状态
235
        prc_date (DateTimeTuple): 处理时间范围
Wei Shoulin's avatar
Wei Shoulin committed
236
237
        page (int): 页码,默认为1
        limit (int): 每页数量 0: 不限制
Wei Shoulin's avatar
Wei Shoulin committed
238
239
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
240
        Result: 成功后,Result.data为数据列表,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
241
242
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
243
    params = {
244
245
        'dag': dag,
        'dag_run': dag_run,
Wei Shoulin's avatar
Wei Shoulin committed
246
247
248
249
250
        'batch_id': batch_id,
        'level1_id': level1_id,
        'dataset': dataset,
        'prc_module': prc_module,
        'prc_status': prc_status,
251
252
        'prc_date_start': None,
        'prc_date_end': None,
Wei Shoulin's avatar
Wei Shoulin committed
253
254
255
        'page': page,
        'limit': limit
    }
256
257
258
    if prc_date is not None:
        params['prc_date_start'], params['prc_date_end'] = prc_date
        if params['prc_date_start'] and utils.is_valid_datetime_format(params['prc_date_start']):
Wei Shoulin's avatar
Wei Shoulin committed
259
            pass
260
        if params['prc_date_end'] and utils.is_valid_datetime_format(params['prc_date_end']):
Wei Shoulin's avatar
Wei Shoulin committed
261
262
            pass 
    return request.post("/api/level1/process", params)
Wei Shoulin's avatar
Wei Shoulin committed
263
264

def add_process(level1_id: str, 
265
266
                dag: str, 
                dag_run: str, 
Wei Shoulin's avatar
Wei Shoulin committed
267
268
                dataset: str = constants.DEFAULT_DATASET,
                batch_id: str = constants.DEFAULT_BATCH_ID,
269
                prc_date: str = utils.get_current_time(),
Wei Shoulin's avatar
Wei Shoulin committed
270
271
272
273
                prc_status: int = -1024,
                prc_module: str = "",
                message: str = "") -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
274
    添加1级数据处理记录
Wei Shoulin's avatar
Wei Shoulin committed
275
276
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
277
        level1_id (str): 1级数据的ID
278
279
        dag (str): DAG标识
        dag_run (str): DAG运行标识
Wei Shoulin's avatar
Wei Shoulin committed
280
281
        dataset (str): 数据集
        batch_id (str): 批次ID
282
        prc_date (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
Wei Shoulin's avatar
Wei Shoulin committed
283
284
285
        prc_status (int): 处理状态
        prc_module (str): 处理模块
        message (str): 处理消息
Wei Shoulin's avatar
Wei Shoulin committed
286
287
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
288
        Result: 成功后,Result.data为写入记录,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
289
290
291
292
    
    """
    params = {
        'level1_id': level1_id,
293
294
        'dag': dag,
        'dag_run': dag_run,
Wei Shoulin's avatar
Wei Shoulin committed
295
296
        'dataset': dataset,
        'batch_id': batch_id,
297
        'prc_date': prc_date,
Wei Shoulin's avatar
Wei Shoulin committed
298
299
300
301
        'prc_status': prc_status,
        'prc_module': prc_module,
        'message': message,
    }
302
    utils.is_valid_datetime_format(prc_date)
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
    return request.post("/api/level1/prc", params)

def delete(dataset: str, batch_id: str, data_model: str) -> Result:
    """
    删除1级数据处理记录
    
    Args:
        dataset (str): 数据集
        batch_id (str): 批次ID
        data_model (str): 数据模型
    
    Returns:
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象
    
    """
318
    return request.delete("/api/level1/delete", {"dataset": dataset, "batch_id": batch_id, "data_model": data_model})
319

320
def count_by_dataset(dataset: Optional[str] = None) -> Result:
321
322
323
    """
    查询0级数据按数据集分组的数量

324
325
326
    Args:
        dataset (str): 数据集

327
328
329
330
    Returns:
        Result: 成功后,Result.data为数据列表,失败message为失败原因

    """
331
332
    params = {"dataset": dataset}
    return request.post("/api/level1/count_by_dataset", params)