level1.py 12.1 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
from typing import Optional, Tuple, Literal, IO, Union
Wei Shoulin's avatar
Wei Shoulin committed
2
from .common import request, Result, utils, constants
Wei Shoulin's avatar
Wei Shoulin committed
3
4
5
6
7
8
9
import os

DateTimeTuple = Tuple[str, str]

def find(project_id: Optional[str] = None,
        obs_id: Optional[str] = None,
        level0_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
10
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
Wei Shoulin's avatar
Wei Shoulin committed
11
        detector_no: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
12
        data_model: Optional[str] = None,
13
        obs_type: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
14
15
16
17
18
19
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
        qc1_status: Optional[int] = None,
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
20
21
        ra_cen: Optional[int] = None,
        dec_cen: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
22
23
24
25
        radius: Optional[float] = None,
        object_name: Optional[str] = None,
        rss_id: Optional[str] = None,
        cube_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
26
27
        dataset: str = constants.DEFAULT_DATASET,
        batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
28
29
30
        page: int = 1,
        limit: int = 0) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
31
    根据给定的参数搜索1级数据文件记录
Wei Shoulin's avatar
Wei Shoulin committed
32
33
34
35
    
    Args:
        project_id (Optional[str], optional): 项目ID. Defaults to None.
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
36
        instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
37
        detector_no (Optional[str], optional): 探测器编号. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
38
        data_model (Optional[str], optional): 数据类型,如'csst-msc-l1-mbi'. Defaults to None.
39
        obs_type (Optional[str], optional): 观测类型,如'01'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
40
41
42
43
44
45
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
        qc1_status (Optional[int], optional): QC1状态. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
46
47
        ra_cen (Optional[int], optional): 中心赤经. Defaults to None.
        dec_cen (Optional[int], optional): 中心赤纬. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
48
49
50
51
        radius (Optional[float], optional): 搜索半径. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
        rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
        cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
52
53
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
54
55
56
57
58
59
60
61
62
63
64
65
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'project_id': project_id,
        'level0_id': level0_id,
        'obs_id': obs_id,
Wei Shoulin's avatar
Wei Shoulin committed
66
        'instrument': instrument,
Wei Shoulin's avatar
Wei Shoulin committed
67
        'detector_no': detector_no,
Wei Shoulin's avatar
Wei Shoulin committed
68
        'data_model': data_model,
69
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
70
71
72
73
        'filter': filter,
        'qc1_status': qc1_status,
        'prc_status': prc_status,
        'file_name': file_name,
Wei Shoulin's avatar
Wei Shoulin committed
74
75
        'ra_cen': ra_cen,
        'dec_cen': dec_cen,
Wei Shoulin's avatar
Wei Shoulin committed
76
77
78
79
80
81
82
83
        'radius': radius,
        'object_name': object_name,
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
        'rss_id': rss_id,
        'cube_id': cube_id,
Wei Shoulin's avatar
Wei Shoulin committed
84
85
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
86
87
88
89
90
91
        'page': page,
        'limit': limit,
    }
    
    if obs_time is not None:
        params['obs_time_start'], params['obs_time_end'] = obs_time
Wei Shoulin's avatar
Wei Shoulin committed
92
        utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
93
94
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
Wei Shoulin's avatar
Wei Shoulin committed
95
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
96
97
98
99
100
    
    return request.post("/api/level1", params)

def find_by_level1_id(level1_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
101
    通过 level1 的 ID 查询1级数据
Wei Shoulin's avatar
Wei Shoulin committed
102
103
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
104
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
105
106
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
107
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
108
109
110
111
112
    
    """
    return request.get(f"/api/level1/{level1_id}")

def find_by_brick_id(brick_id: int) -> Result:
Wei Shoulin's avatar
plan    
Wei Shoulin committed
113
    """
Wei Shoulin's avatar
Wei Shoulin committed
114
    通过 brick 的 ID 查询1级数据
Wei Shoulin's avatar
plan    
Wei Shoulin committed
115
116
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
117
        brick_id (int): 天区ID
Wei Shoulin's avatar
plan    
Wei Shoulin committed
118
119
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
120
        Result: 查询结果
Wei Shoulin's avatar
plan    
Wei Shoulin committed
121
122
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
123
124
    return request.get(f"/api/level1/brick/{brick_id}")

Wei Shoulin's avatar
Wei Shoulin committed
125
126
def sls_find_by_qc1_status(qc1_status: int, limit: int = 1, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    return request.post(f"/api/level1/sls/qc1_status/{qc1_status}", {'limit': limit, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
127

Wei Shoulin's avatar
Wei Shoulin committed
128
def update_qc1_status(level1_id: str, data_model: str, qc1_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
129
130
131
132
133
    """
    更新1级数据的QC0状态
    
    Args:
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
134
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
135
        qc1_status (int): QC0状态
Wei Shoulin's avatar
Wei Shoulin committed
136
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
137
138
139
140
    
    Returns:
        Result: 更新结果
    """
Wei Shoulin's avatar
Wei Shoulin committed
141
    return request.put(f"/api/level1/qc1_status/{level1_id}", {'data_model': data_model, 'qc1_status': qc1_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
142

Wei Shoulin's avatar
Wei Shoulin committed
143
def update_prc_status(level1_id: str, data_model: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
144
    """
Wei Shoulin's avatar
Wei Shoulin committed
145
    更新1级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
146
147
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
148
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
149
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
150
151
        prc_status (int): 处理状态
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
152
153
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
154
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
155
    """
Wei Shoulin's avatar
Wei Shoulin committed
156
    return request.put(f"/api/level1/prc_status/{level1_id}", {'data_model': data_model, 'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
157

Wei Shoulin's avatar
Wei Shoulin committed
158
def write(local_file: Union[IO, str], 
Wei Shoulin's avatar
Wei Shoulin committed
159
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
Wei Shoulin's avatar
Wei Shoulin committed
160
161
        project_id: Optional[str],
        obs_type: str,
Wei Shoulin's avatar
Wei Shoulin committed
162
        level1_id: str,
Wei Shoulin's avatar
Wei Shoulin committed
163
        data_model: str,
Wei Shoulin's avatar
Wei Shoulin committed
164
        file_name: str,
165
        dag_id: str,
Wei Shoulin's avatar
Wei Shoulin committed
166
167
        pmapname: str,
        build: int,
Wei Shoulin's avatar
Wei Shoulin committed
168
        level0_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
169
170
        dataset: str = constants.DEFAULT_DATASET,
        batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
171
        qc1_status: int = 0,
Wei Shoulin's avatar
Wei Shoulin committed
172
173
        **extra_kwargs) -> Result:
    '''
Wei Shoulin's avatar
Wei Shoulin committed
174
    将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入
Wei Shoulin's avatar
Wei Shoulin committed
175
176
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
177
        local_file (Union[IO, str]): 文件路径或文件对象
Wei Shoulin's avatar
Wei Shoulin committed
178
        instrument ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
Wei Shoulin's avatar
Wei Shoulin committed
179
180
        project_id (str): 项目ID
        obs_type (str): 观测类型
Wei Shoulin's avatar
| bug    
Wei Shoulin committed
181
        level0_id (Optional[str]): 0级数据的ID默认为 None
Wei Shoulin's avatar
Wei Shoulin committed
182
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
183
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
184
        file_name (str): 1级数据文件名
185
        dag_id (str): 管线ID
Wei Shoulin's avatar
Wei Shoulin committed
186
187
188
189
        pmapname (str): CCDS pmap名称
        build (int): 构建号
        dataset (str): 数据集名称
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
190
        qc1_status (int): QC1状态
Wei Shoulin's avatar
Wei Shoulin committed
191
        **kwargs: 额外的关键字参数,这些参数将传递给DFS
Wei Shoulin's avatar
Wei Shoulin committed
192
193
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
194
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
Wei Shoulin's avatar
Wei Shoulin committed
195
    '''
Wei Shoulin's avatar
Wei Shoulin committed
196
    params = {
Wei Shoulin's avatar
Wei Shoulin committed
197
        'instrument': instrument, 
Wei Shoulin's avatar
Wei Shoulin committed
198
199
        'project_id': project_id,
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
200
201
        'level0_id': level0_id, 
        'level1_id': level1_id, 
Wei Shoulin's avatar
Wei Shoulin committed
202
        'data_model': data_model, 
Wei Shoulin's avatar
Wei Shoulin committed
203
        'file_name': file_name, 
204
        'dag_id': dag_id, 
Wei Shoulin's avatar
Wei Shoulin committed
205
206
207
        'pmapname': pmapname, 
        'build': build,
        'dataset': dataset,
Wei Shoulin's avatar
Wei Shoulin committed
208
209
        'batch_id': batch_id,
        'qc1_status': qc1_status
Wei Shoulin's avatar
Wei Shoulin committed
210
211
212
    }
    if not dataset or not batch_id:
        raise ValueError("dataset and batch_id is required")
Wei Shoulin's avatar
Wei Shoulin committed
213
    params.update(extra_kwargs)
Wei Shoulin's avatar
Wei Shoulin committed
214
215
    if local_file is None:
        raise ValueError("local_file is required")
Wei Shoulin's avatar
Wei Shoulin committed
216
217
218
219
220
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level1/file", local_file, params)
    return request.post_bytesio("/api/level1/file", local_file, params)
Wei Shoulin's avatar
Wei Shoulin committed
221

Wei Shoulin's avatar
Wei Shoulin committed
222
def generate_prc_msg(instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], 
Wei Shoulin's avatar
Wei Shoulin committed
223
                    level1_id: str, 
Wei Shoulin's avatar
Wei Shoulin committed
224
225
                    dataset: str = constants.DEFAULT_DATASET,
                    batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
226
    """
Wei Shoulin's avatar
Wei Shoulin committed
227
    生成流水线的处理消息
Wei Shoulin's avatar
Wei Shoulin committed
228
229
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
230
        instrument (str): 模块ID
Wei Shoulin's avatar
Wei Shoulin committed
231
        level1_id (str): 1级数据的ID
232
        dag_id (str): 流水管线ID,默认为空字符串
Wei Shoulin's avatar
Wei Shoulin committed
233
234
235
        dataset (str): 数据集
        batch_id (str): 批次ID

Wei Shoulin's avatar
Wei Shoulin committed
236
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
237
        Result: 处理消息生成的结果,是否成功以及相关的错误信息
Wei Shoulin's avatar
Wei Shoulin committed
238
239
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
240
241
242
243
244
245
    params = {
        'dataset': dataset,
        'batch_id': batch_id,
        'level1_id': level1_id,
    }
    
Wei Shoulin's avatar
Wei Shoulin committed
246
    return request.put(f"/api/level1/prc/{instrument}", params)
Wei Shoulin's avatar
Wei Shoulin committed
247

Wei Shoulin's avatar
Wei Shoulin committed
248
249
250
251
252
253
254
255
256
257
def find_process(dag_id: Optional[str] = None,  
                dag_run_id: Optional[str] = None, 
                batch_id: Optional[str] = None, 
                level1_id: Optional[str] = None,
                dataset: Optional[str] = None,
                prc_module: Optional[str] = None,
                prc_status: Optional[int] = None,
                prc_time: Optional[DateTimeTuple] = None,
                page: int = 1,
                limit: int = 0) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
258
    """
Wei Shoulin's avatar
Wei Shoulin committed
259
    查询0级数据处理过程
Wei Shoulin's avatar
Wei Shoulin committed
260
261
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
262
263
264
        dag_id (str): 管线ID
        dag_run_id (str): 运行ID
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
265
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
266
267
268
269
270
271
        dataset (str): 数据集
        prc_module (str): 处理模块
        prc_status (int): 处理状态
        prc_time (DateTimeTuple): 处理时间范围
        page (int): 页码,默认为1
        limit (int): 每页数量 0: 不限制
Wei Shoulin's avatar
Wei Shoulin committed
272
273
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
274
        Result: 成功后,Result.data为数据列表,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
275
276
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    params = {
        'dag_id': dag_id,
        'dag_run_id': dag_run_id,
        'batch_id': batch_id,
        'level1_id': level1_id,
        'dataset': dataset,
        'prc_module': prc_module,
        'prc_status': prc_status,
        'prc_time_start': None,
        'prc_time_end': None,
        'page': page,
        'limit': limit
    }
    if prc_time is not None:
        params['prc_time_start'], params['prc_time_end'] = prc_time
        if params['prc_time_start'] and utils.is_valid_datetime_format(params['prc_time_start']):
            pass
        if params['prc_time_end'] and utils.is_valid_datetime_format(params['prc_time_end']):
            pass 
    return request.post("/api/level1/process", params)
Wei Shoulin's avatar
Wei Shoulin committed
297
298

def add_process(level1_id: str, 
299
300
                dag_id: str, 
                dag_run_id: str, 
Wei Shoulin's avatar
Wei Shoulin committed
301
302
                dataset: str = constants.DEFAULT_DATASET,
                batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
303
                prc_time: str = utils.get_current_time(),
Wei Shoulin's avatar
Wei Shoulin committed
304
305
306
307
                prc_status: int = -1024,
                prc_module: str = "",
                message: str = "") -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
308
    添加1级数据处理记录
Wei Shoulin's avatar
Wei Shoulin committed
309
310
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
311
        level1_id (str): 1级数据的ID
312
313
        dag_id (str): 管线ID
        dag_run_id (str): 运行ID
Wei Shoulin's avatar
Wei Shoulin committed
314
315
316
317
318
319
        dataset (str): 数据集
        batch_id (str): 批次ID
        prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
        prc_status (int): 处理状态
        prc_module (str): 处理模块
        message (str): 处理消息
Wei Shoulin's avatar
Wei Shoulin committed
320
321
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
322
        Result: 成功后,Result.data为写入记录,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
323
324
325
326
    
    """
    params = {
        'level1_id': level1_id,
327
328
        'dag_id': dag_id,
        'dag_run_id': dag_run_id,
Wei Shoulin's avatar
Wei Shoulin committed
329
330
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
331
332
333
334
335
        'prc_time': prc_time,
        'prc_status': prc_status,
        'prc_module': prc_module,
        'message': message,
    }
Wei Shoulin's avatar
Wei Shoulin committed
336
    utils.is_valid_datetime_format(prc_time)
Wei Shoulin's avatar
Wei Shoulin committed
337
    return request.post("/api/level1/prc", params)