level1.py 12.1 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
from typing import Optional, Tuple, Literal, IO, Union
Wei Shoulin's avatar
Wei Shoulin committed
2
from .common import request, Result, utils, constants
Wei Shoulin's avatar
Wei Shoulin committed
3
4
5
6
import os

DateTimeTuple = Tuple[str, str]

7
8
def find(
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
9
10
        dataset: str,
        batch_id: str,        
11
        obs_group: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
12
13
        obs_id: Optional[str] = None,
        level0_id: Optional[str] = None,
14
        detector: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
15
        data_model: Optional[str] = None,
16
        obs_type: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
17
18
19
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
20
        qc_status: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
21
22
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
23
24
        ra_cen: Optional[int] = None,
        dec_cen: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
25
26
27
28
        radius: Optional[float] = None,
        object_name: Optional[str] = None,
        rss_id: Optional[str] = None,
        cube_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
29
30
        build: Optional[int] = None,
        pmapname: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
31
32
33
        page: int = 1,
        limit: int = 0) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
34
    根据给定的参数搜索1级数据文件记录
Wei Shoulin's avatar
Wei Shoulin committed
35
36
    
    Args:
37
        obs_group (Optional[str], optional): 项目ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
38
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
39
        instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
40
        detector (Optional[str], optional): 探测器. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
41
        data_model (Optional[str], optional): 数据类型,如'csst-msc-l1-mbi'. Defaults to None.
42
        obs_type (Optional[str], optional): 观测类型,如'01'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
43
44
45
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
46
        qc_status (Optional[int], optional): QC1状态. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
47
48
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
49
50
        ra_cen (Optional[int], optional): 中心赤经. Defaults to None.
        dec_cen (Optional[int], optional): 中心赤纬. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
51
52
53
54
        radius (Optional[float], optional): 搜索半径. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
        rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
        cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
55
56
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
57
58
        build (Optional[int], optional): 构建版本. Defaults to None.
        pmapname (Optional[str], optional): CCDS pmap名. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
59
60
61
62
63
64
65
66
67
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
68
        'obs_group': obs_group,
Wei Shoulin's avatar
Wei Shoulin committed
69
70
        'level0_id': level0_id,
        'obs_id': obs_id,
Wei Shoulin's avatar
Wei Shoulin committed
71
        'instrument': instrument,
72
        'detector': detector,
Wei Shoulin's avatar
Wei Shoulin committed
73
        'data_model': data_model,
74
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
75
        'filter': filter,
76
        'qc_status': qc_status,
Wei Shoulin's avatar
Wei Shoulin committed
77
78
        'prc_status': prc_status,
        'file_name': file_name,
Wei Shoulin's avatar
Wei Shoulin committed
79
80
        'ra_cen': ra_cen,
        'dec_cen': dec_cen,
Wei Shoulin's avatar
Wei Shoulin committed
81
82
83
84
85
86
87
88
        'radius': radius,
        'object_name': object_name,
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
        'rss_id': rss_id,
        'cube_id': cube_id,
Wei Shoulin's avatar
Wei Shoulin committed
89
90
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
91
92
        'build': build,
        'pmapname': pmapname,
Wei Shoulin's avatar
Wei Shoulin committed
93
94
95
96
97
98
        'page': page,
        'limit': limit,
    }
    
    if obs_time is not None:
        params['obs_time_start'], params['obs_time_end'] = obs_time
Wei Shoulin's avatar
Wei Shoulin committed
99
        utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
100
101
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
Wei Shoulin's avatar
Wei Shoulin committed
102
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
103
104
105
106
107
    
    return request.post("/api/level1", params)

def find_by_level1_id(level1_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
108
    通过 level1 的 ID 查询1级数据
Wei Shoulin's avatar
Wei Shoulin committed
109
110
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
111
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
112
113
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
114
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
115
116
117
118
119
    
    """
    return request.get(f"/api/level1/{level1_id}")

def find_by_brick_id(brick_id: int) -> Result:
Wei Shoulin's avatar
plan    
Wei Shoulin committed
120
    """
Wei Shoulin's avatar
Wei Shoulin committed
121
    通过 brick 的 ID 查询1级数据
Wei Shoulin's avatar
plan    
Wei Shoulin committed
122
123
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
124
        brick_id (int): 天区ID
Wei Shoulin's avatar
plan    
Wei Shoulin committed
125
126
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
127
        Result: 查询结果
Wei Shoulin's avatar
plan    
Wei Shoulin committed
128
129
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
130
131
    return request.get(f"/api/level1/brick/{brick_id}")

132
def sls_find_by_qc_status(qc_status: int, batch_id: str, limit: int = 1) -> Result:
133
    return request.post(f"/api/level1/sls/qc_status/{qc_status}", {'limit': limit, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
134

135
def update_qc_status_by_ids(ids: list[str], qc_status: int) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
136
    """
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
    根据内部_id,批量更新1级数据的QC状态
    
    Args:
        ids (List[str]): 内部_id列表
        qc_status (int): QC状态
    
    Returns:
        Result: 更新结果
    """
    return request.put("/api/level1/qc_status/batch/update", {'qc_status': qc_status, 'ids': ids})

def update_prc_status_by_ids(ids: list[str], prc_status: int) -> Result:
    """
    根据内部_id,批量更新1级数据的处理状态
    
    Args:
        ids (List[str]): 内部_id列表
        prc_status (int): 处理状态
    
    Returns:
        Result: 更新结果
    """
    return request.put("/api/level1/prc_status/batch/update", {'prc_status': prc_status, 'ids': ids})

def update_qc_status(level1_id: str, data_model: str, qc_status: int, batch_id: str) -> Result:
    """
    更新1级数据的QC状态
Wei Shoulin's avatar
Wei Shoulin committed
164
165
166
    
    Args:
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
167
        data_model (str): 数据类型
168
        qc_status (int): QC状态
Wei Shoulin's avatar
Wei Shoulin committed
169
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
170
171
172
173
    
    Returns:
        Result: 更新结果
    """
174
    return request.put(f"/api/level1/qc_status/{level1_id}", {'data_model': data_model, 'qc_status': qc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
175

176
def update_prc_status(level1_id: str, data_model: str, prc_status: int, batch_id: str) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
177
    """
Wei Shoulin's avatar
Wei Shoulin committed
178
    更新1级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
179
180
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
181
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
182
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
183
184
        prc_status (int): 处理状态
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
185
186
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
187
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
188
    """
Wei Shoulin's avatar
Wei Shoulin committed
189
    return request.put(f"/api/level1/prc_status/{level1_id}", {'data_model': data_model, 'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
190

Wei Shoulin's avatar
Wei Shoulin committed
191
def write(local_file: Union[IO, str], 
Wei Shoulin's avatar
Wei Shoulin committed
192
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
193
        obs_group: Optional[str],
Wei Shoulin's avatar
Wei Shoulin committed
194
        obs_type: str,
Wei Shoulin's avatar
Wei Shoulin committed
195
        level1_id: str,
Wei Shoulin's avatar
Wei Shoulin committed
196
        data_model: str,
Wei Shoulin's avatar
Wei Shoulin committed
197
        file_name: str,
198
        dag: str,
Wei Shoulin's avatar
Wei Shoulin committed
199
200
        pmapname: str,
        build: int,
201
202
        dataset: str,
        batch_id: str,        
Wei Shoulin's avatar
Wei Shoulin committed
203
        level0_id: Optional[str] = None,
204
        qc_status: int = 0,
Wei Shoulin's avatar
Wei Shoulin committed
205
206
        **extra_kwargs) -> Result:
    '''
Wei Shoulin's avatar
Wei Shoulin committed
207
    将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入
Wei Shoulin's avatar
Wei Shoulin committed
208
209
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
210
        local_file (Union[IO, str]): 文件路径或文件对象
Wei Shoulin's avatar
Wei Shoulin committed
211
        instrument ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
212
        obs_group (str): 项目ID
Wei Shoulin's avatar
Wei Shoulin committed
213
        obs_type (str): 观测类型
Wei Shoulin's avatar
| bug    
Wei Shoulin committed
214
        level0_id (Optional[str]): 0级数据的ID默认为 None
Wei Shoulin's avatar
Wei Shoulin committed
215
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
216
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
217
        file_name (str): 1级数据文件名
218
        dag (str): DAG标识
Wei Shoulin's avatar
Wei Shoulin committed
219
220
221
222
        pmapname (str): CCDS pmap名称
        build (int): 构建号
        dataset (str): 数据集名称
        batch_id (str): 批次ID
223
        qc_status (int): QC状态
Wei Shoulin's avatar
Wei Shoulin committed
224
        **kwargs: 额外的关键字参数,这些参数将传递给DFS
Wei Shoulin's avatar
Wei Shoulin committed
225
226
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
227
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
Wei Shoulin's avatar
Wei Shoulin committed
228
    '''
Wei Shoulin's avatar
Wei Shoulin committed
229
    params = {
Wei Shoulin's avatar
Wei Shoulin committed
230
        'instrument': instrument, 
231
        'obs_group': obs_group,
Wei Shoulin's avatar
Wei Shoulin committed
232
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
233
234
        'level0_id': level0_id, 
        'level1_id': level1_id, 
Wei Shoulin's avatar
Wei Shoulin committed
235
        'data_model': data_model, 
Wei Shoulin's avatar
Wei Shoulin committed
236
        'file_name': file_name, 
237
        'dag': dag, 
Wei Shoulin's avatar
Wei Shoulin committed
238
239
240
        'pmapname': pmapname, 
        'build': build,
        'dataset': dataset,
Wei Shoulin's avatar
Wei Shoulin committed
241
        'batch_id': batch_id,
242
        'qc_status': qc_status
Wei Shoulin's avatar
Wei Shoulin committed
243
244
245
    }
    if not dataset or not batch_id:
        raise ValueError("dataset and batch_id is required")
Wei Shoulin's avatar
Wei Shoulin committed
246
    params.update(extra_kwargs)
Wei Shoulin's avatar
Wei Shoulin committed
247
248
    if local_file is None:
        raise ValueError("local_file is required")
Wei Shoulin's avatar
Wei Shoulin committed
249
250
251
252
253
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level1/file", local_file, params)
    return request.post_bytesio("/api/level1/file", local_file, params)
Wei Shoulin's avatar
Wei Shoulin committed
254

255
256
def find_process(dag: Optional[str] = None,  
                dag_run: Optional[str] = None, 
Wei Shoulin's avatar
Wei Shoulin committed
257
258
259
260
261
262
263
264
                batch_id: Optional[str] = None, 
                level1_id: Optional[str] = None,
                dataset: Optional[str] = None,
                prc_module: Optional[str] = None,
                prc_status: Optional[int] = None,
                prc_time: Optional[DateTimeTuple] = None,
                page: int = 1,
                limit: int = 0) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
265
    """
Wei Shoulin's avatar
Wei Shoulin committed
266
    查询0级数据处理过程
Wei Shoulin's avatar
Wei Shoulin committed
267
268
    
    Args:
269
270
        dag (str): DAG标识
        dag_run (str): DAG运行标识
Wei Shoulin's avatar
Wei Shoulin committed
271
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
272
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
273
274
275
276
277
278
        dataset (str): 数据集
        prc_module (str): 处理模块
        prc_status (int): 处理状态
        prc_time (DateTimeTuple): 处理时间范围
        page (int): 页码,默认为1
        limit (int): 每页数量 0: 不限制
Wei Shoulin's avatar
Wei Shoulin committed
279
280
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
281
        Result: 成功后,Result.data为数据列表,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
282
283
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
284
    params = {
285
286
        'dag': dag,
        'dag_run': dag_run,
Wei Shoulin's avatar
Wei Shoulin committed
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
        'batch_id': batch_id,
        'level1_id': level1_id,
        'dataset': dataset,
        'prc_module': prc_module,
        'prc_status': prc_status,
        'prc_time_start': None,
        'prc_time_end': None,
        'page': page,
        'limit': limit
    }
    if prc_time is not None:
        params['prc_time_start'], params['prc_time_end'] = prc_time
        if params['prc_time_start'] and utils.is_valid_datetime_format(params['prc_time_start']):
            pass
        if params['prc_time_end'] and utils.is_valid_datetime_format(params['prc_time_end']):
            pass 
    return request.post("/api/level1/process", params)
Wei Shoulin's avatar
Wei Shoulin committed
304
305

def add_process(level1_id: str, 
306
307
                dag: str, 
                dag_run: str, 
Wei Shoulin's avatar
Wei Shoulin committed
308
309
                dataset: str = constants.DEFAULT_DATASET,
                batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
310
                prc_time: str = utils.get_current_time(),
Wei Shoulin's avatar
Wei Shoulin committed
311
312
313
314
                prc_status: int = -1024,
                prc_module: str = "",
                message: str = "") -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
315
    添加1级数据处理记录
Wei Shoulin's avatar
Wei Shoulin committed
316
317
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
318
        level1_id (str): 1级数据的ID
319
320
        dag (str): DAG标识
        dag_run (str): DAG运行标识
Wei Shoulin's avatar
Wei Shoulin committed
321
322
323
324
325
326
        dataset (str): 数据集
        batch_id (str): 批次ID
        prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
        prc_status (int): 处理状态
        prc_module (str): 处理模块
        message (str): 处理消息
Wei Shoulin's avatar
Wei Shoulin committed
327
328
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
329
        Result: 成功后,Result.data为写入记录,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
330
331
332
333
    
    """
    params = {
        'level1_id': level1_id,
334
335
        'dag': dag,
        'dag_run': dag_run,
Wei Shoulin's avatar
Wei Shoulin committed
336
337
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
338
339
340
341
342
        'prc_time': prc_time,
        'prc_status': prc_status,
        'prc_module': prc_module,
        'message': message,
    }
Wei Shoulin's avatar
Wei Shoulin committed
343
    utils.is_valid_datetime_format(prc_time)
Wei Shoulin's avatar
Wei Shoulin committed
344
    return request.post("/api/level1/prc", params)