level1.py 11.9 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
from typing import Optional, Tuple, Literal, IO, Union
Wei Shoulin's avatar
Wei Shoulin committed
2
from .common import request, Result, utils, constants
Wei Shoulin's avatar
Wei Shoulin committed
3
4
5
6
7
8
9
import os

DateTimeTuple = Tuple[str, str]

def find(project_id: Optional[str] = None,
        obs_id: Optional[str] = None,
        level0_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
10
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
Wei Shoulin's avatar
Wei Shoulin committed
11
        detector_no: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
12
        data_type: Optional[str] = None,
13
        obs_type: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
14
15
16
17
18
19
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
        qc1_status: Optional[int] = None,
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
20
21
        ra_cen: Optional[int] = None,
        dec_cen: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
22
23
24
25
        radius: Optional[float] = None,
        object_name: Optional[str] = None,
        rss_id: Optional[str] = None,
        cube_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
26
27
        dataset: str = constants.DEFAULT_DATASET,
        batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
28
29
30
        page: int = 1,
        limit: int = 0) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
31
    根据给定的参数搜索1级数据文件记录
Wei Shoulin's avatar
Wei Shoulin committed
32
33
34
35
    
    Args:
        project_id (Optional[str], optional): 项目ID. Defaults to None.
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
36
        instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
37
        detector_no (Optional[str], optional): 探测器编号. Defaults to None.
38
        data_type (Optional[str], optional): 数据类型,如'csst-msc-l1-mbi'. Defaults to None.
39
        obs_type (Optional[str], optional): 观测类型,如'01'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
40
41
42
43
44
45
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
        qc1_status (Optional[int], optional): QC1状态. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
46
47
        ra_cen (Optional[int], optional): 中心赤经. Defaults to None.
        dec_cen (Optional[int], optional): 中心赤纬. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
48
49
50
51
        radius (Optional[float], optional): 搜索半径. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
        rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
        cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
52
53
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
54
55
56
57
58
59
60
61
62
63
64
65
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'project_id': project_id,
        'level0_id': level0_id,
        'obs_id': obs_id,
Wei Shoulin's avatar
Wei Shoulin committed
66
        'instrument': instrument,
Wei Shoulin's avatar
Wei Shoulin committed
67
        'detector_no': detector_no,
Wei Shoulin's avatar
Wei Shoulin committed
68
        'data_type': data_type,
69
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
70
71
72
73
        'filter': filter,
        'qc1_status': qc1_status,
        'prc_status': prc_status,
        'file_name': file_name,
Wei Shoulin's avatar
Wei Shoulin committed
74
75
        'ra_cen': ra_cen,
        'dec_cen': dec_cen,
Wei Shoulin's avatar
Wei Shoulin committed
76
77
78
79
80
81
82
83
        'radius': radius,
        'object_name': object_name,
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
        'rss_id': rss_id,
        'cube_id': cube_id,
Wei Shoulin's avatar
Wei Shoulin committed
84
85
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
86
87
88
89
90
91
        'page': page,
        'limit': limit,
    }
    
    if obs_time is not None:
        params['obs_time_start'], params['obs_time_end'] = obs_time
Wei Shoulin's avatar
Wei Shoulin committed
92
        utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
93
94
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
Wei Shoulin's avatar
Wei Shoulin committed
95
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
96
97
98
99
100
    
    return request.post("/api/level1", params)

def find_by_level1_id(level1_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
101
    通过 level1 的 ID 查询1级数据
Wei Shoulin's avatar
Wei Shoulin committed
102
103
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
104
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
105
106
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
107
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
108
109
110
111
112
    
    """
    return request.get(f"/api/level1/{level1_id}")

def find_by_brick_id(brick_id: int) -> Result:
Wei Shoulin's avatar
plan    
Wei Shoulin committed
113
    """
Wei Shoulin's avatar
Wei Shoulin committed
114
    通过 brick 的 ID 查询1级数据
Wei Shoulin's avatar
plan    
Wei Shoulin committed
115
116
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
117
        brick_id (int): 天区ID
Wei Shoulin's avatar
plan    
Wei Shoulin committed
118
119
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
120
        Result: 查询结果
Wei Shoulin's avatar
plan    
Wei Shoulin committed
121
122
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
123
124
    return request.get(f"/api/level1/brick/{brick_id}")

Wei Shoulin's avatar
Wei Shoulin committed
125
126
def sls_find_by_qc1_status(qc1_status: int, limit: int = 1, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    return request.post(f"/api/level1/sls/qc1_status/{qc1_status}", {'limit': limit, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
127

Wei Shoulin's avatar
Wei Shoulin committed
128
def update_qc1_status(level1_id: str, data_type: str, qc1_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
129
130
131
132
133
    """
    更新1级数据的QC0状态
    
    Args:
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
134
        data_type (str): 文件类型
Wei Shoulin's avatar
Wei Shoulin committed
135
        qc1_status (int): QC0状态
Wei Shoulin's avatar
Wei Shoulin committed
136
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
137
138
139
140
    
    Returns:
        Result: 更新结果
    """
Wei Shoulin's avatar
Wei Shoulin committed
141
    return request.put(f"/api/level1/qc1_status/{level1_id}", {'data_type': data_type, 'qc1_status': qc1_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
142

Wei Shoulin's avatar
Wei Shoulin committed
143
def update_prc_status(level1_id: str, data_type: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
144
    """
Wei Shoulin's avatar
Wei Shoulin committed
145
    更新1级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
146
147
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
148
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
149
        data_type (str): 文件类型
Wei Shoulin's avatar
Wei Shoulin committed
150
151
        prc_status (int): 处理状态
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
152
153
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
154
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
155
    """
Wei Shoulin's avatar
Wei Shoulin committed
156
    return request.put(f"/api/level1/prc_status/{level1_id}", {'data_type': data_type, 'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
157

Wei Shoulin's avatar
Wei Shoulin committed
158
def write(local_file: Union[IO, str], 
Wei Shoulin's avatar
Wei Shoulin committed
159
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
Wei Shoulin's avatar
Wei Shoulin committed
160
        level1_id: str,
Wei Shoulin's avatar
Wei Shoulin committed
161
        data_type: str,
Wei Shoulin's avatar
Wei Shoulin committed
162
        file_name: str,
163
        dag_id: str,
Wei Shoulin's avatar
Wei Shoulin committed
164
165
        pmapname: str,
        build: int,
Wei Shoulin's avatar
Wei Shoulin committed
166
        level0_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
167
168
        dataset: str = constants.DEFAULT_DATASET,
        batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
169
        qc1_status: int = 0,
Wei Shoulin's avatar
Wei Shoulin committed
170
171
        **extra_kwargs) -> Result:
    '''
Wei Shoulin's avatar
Wei Shoulin committed
172
    将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入
Wei Shoulin's avatar
Wei Shoulin committed
173
174
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
175
        local_file (Union[IO, str]): 文件路径或文件对象
Wei Shoulin's avatar
Wei Shoulin committed
176
        instrument ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
Wei Shoulin's avatar
| bug    
Wei Shoulin committed
177
        level0_id (Optional[str]): 0级数据的ID默认为 None
Wei Shoulin's avatar
Wei Shoulin committed
178
        level1_id (str): 1级数据的ID
179
        data_type (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
180
        file_name (str): 1级数据文件名
181
        dag_id (str): 管线ID
Wei Shoulin's avatar
Wei Shoulin committed
182
183
184
185
        pmapname (str): CCDS pmap名称
        build (int): 构建号
        dataset (str): 数据集名称
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
186
        qc1_status (int): QC1状态
Wei Shoulin's avatar
Wei Shoulin committed
187
        **kwargs: 额外的关键字参数,这些参数将传递给DFS
Wei Shoulin's avatar
Wei Shoulin committed
188
189
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
190
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
Wei Shoulin's avatar
Wei Shoulin committed
191
    '''
Wei Shoulin's avatar
Wei Shoulin committed
192
    params = {
Wei Shoulin's avatar
Wei Shoulin committed
193
        'instrument': instrument, 
Wei Shoulin's avatar
Wei Shoulin committed
194
195
        'level0_id': level0_id, 
        'level1_id': level1_id, 
Wei Shoulin's avatar
Wei Shoulin committed
196
        'data_type': data_type, 
Wei Shoulin's avatar
Wei Shoulin committed
197
        'file_name': file_name, 
198
        'dag_id': dag_id, 
Wei Shoulin's avatar
Wei Shoulin committed
199
200
201
        'pmapname': pmapname, 
        'build': build,
        'dataset': dataset,
Wei Shoulin's avatar
Wei Shoulin committed
202
203
        'batch_id': batch_id,
        'qc1_status': qc1_status
Wei Shoulin's avatar
Wei Shoulin committed
204
205
206
    }
    if not dataset or not batch_id:
        raise ValueError("dataset and batch_id is required")
Wei Shoulin's avatar
Wei Shoulin committed
207
    params.update(extra_kwargs)
Wei Shoulin's avatar
Wei Shoulin committed
208
209
    if local_file is None:
        raise ValueError("local_file is required")
Wei Shoulin's avatar
Wei Shoulin committed
210
211
212
213
214
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level1/file", local_file, params)
    return request.post_bytesio("/api/level1/file", local_file, params)
Wei Shoulin's avatar
Wei Shoulin committed
215

Wei Shoulin's avatar
Wei Shoulin committed
216
def generate_prc_msg(instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], 
Wei Shoulin's avatar
Wei Shoulin committed
217
                    level1_id: str, 
Wei Shoulin's avatar
Wei Shoulin committed
218
219
                    dataset: str = constants.DEFAULT_DATASET,
                    batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
220
    """
Wei Shoulin's avatar
Wei Shoulin committed
221
    生成流水线的处理消息
Wei Shoulin's avatar
Wei Shoulin committed
222
223
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
224
        instrument (str): 模块ID
Wei Shoulin's avatar
Wei Shoulin committed
225
        level1_id (str): 1级数据的ID
226
        dag_id (str): 流水管线ID,默认为空字符串
Wei Shoulin's avatar
Wei Shoulin committed
227
228
229
        dataset (str): 数据集
        batch_id (str): 批次ID

Wei Shoulin's avatar
Wei Shoulin committed
230
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
231
        Result: 处理消息生成的结果,是否成功以及相关的错误信息
Wei Shoulin's avatar
Wei Shoulin committed
232
233
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
234
235
236
237
238
239
    params = {
        'dataset': dataset,
        'batch_id': batch_id,
        'level1_id': level1_id,
    }
    
Wei Shoulin's avatar
Wei Shoulin committed
240
    return request.put(f"/api/level1/prc/{instrument}", params)
Wei Shoulin's avatar
Wei Shoulin committed
241

Wei Shoulin's avatar
Wei Shoulin committed
242
243
244
245
246
247
248
249
250
251
def find_process(dag_id: Optional[str] = None,  
                dag_run_id: Optional[str] = None, 
                batch_id: Optional[str] = None, 
                level1_id: Optional[str] = None,
                dataset: Optional[str] = None,
                prc_module: Optional[str] = None,
                prc_status: Optional[int] = None,
                prc_time: Optional[DateTimeTuple] = None,
                page: int = 1,
                limit: int = 0) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
252
    """
Wei Shoulin's avatar
Wei Shoulin committed
253
    查询0级数据处理过程
Wei Shoulin's avatar
Wei Shoulin committed
254
255
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
256
257
258
        dag_id (str): 管线ID
        dag_run_id (str): 运行ID
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
259
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
260
261
262
263
264
265
        dataset (str): 数据集
        prc_module (str): 处理模块
        prc_status (int): 处理状态
        prc_time (DateTimeTuple): 处理时间范围
        page (int): 页码,默认为1
        limit (int): 每页数量 0: 不限制
Wei Shoulin's avatar
Wei Shoulin committed
266
267
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
268
        Result: 成功后,Result.data为数据列表,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
269
270
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
    params = {
        'dag_id': dag_id,
        'dag_run_id': dag_run_id,
        'batch_id': batch_id,
        'level1_id': level1_id,
        'dataset': dataset,
        'prc_module': prc_module,
        'prc_status': prc_status,
        'prc_time_start': None,
        'prc_time_end': None,
        'page': page,
        'limit': limit
    }
    if prc_time is not None:
        params['prc_time_start'], params['prc_time_end'] = prc_time
        if params['prc_time_start'] and utils.is_valid_datetime_format(params['prc_time_start']):
            pass
        if params['prc_time_end'] and utils.is_valid_datetime_format(params['prc_time_end']):
            pass 
    return request.post("/api/level1/process", params)
Wei Shoulin's avatar
Wei Shoulin committed
291
292

def add_process(level1_id: str, 
293
294
                dag_id: str, 
                dag_run_id: str, 
Wei Shoulin's avatar
Wei Shoulin committed
295
296
                dataset: str = constants.DEFAULT_DATASET,
                batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
297
                prc_time: str = utils.get_current_time(),
Wei Shoulin's avatar
Wei Shoulin committed
298
299
300
301
                prc_status: int = -1024,
                prc_module: str = "",
                message: str = "") -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
302
    添加1级数据处理记录
Wei Shoulin's avatar
Wei Shoulin committed
303
304
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
305
        level1_id (str): 1级数据的ID
306
307
        dag_id (str): 管线ID
        dag_run_id (str): 运行ID
Wei Shoulin's avatar
Wei Shoulin committed
308
309
310
311
312
313
        dataset (str): 数据集
        batch_id (str): 批次ID
        prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
        prc_status (int): 处理状态
        prc_module (str): 处理模块
        message (str): 处理消息
Wei Shoulin's avatar
Wei Shoulin committed
314
315
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
316
        Result: 成功后,Result.data为写入记录,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
317
318
319
320
    
    """
    params = {
        'level1_id': level1_id,
321
322
        'dag_id': dag_id,
        'dag_run_id': dag_run_id,
Wei Shoulin's avatar
Wei Shoulin committed
323
324
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
325
326
327
328
329
        'prc_time': prc_time,
        'prc_status': prc_status,
        'prc_module': prc_module,
        'message': message,
    }
Wei Shoulin's avatar
Wei Shoulin committed
330
    utils.is_valid_datetime_format(prc_time)
Wei Shoulin's avatar
Wei Shoulin committed
331
    return request.post("/api/level1/prc", params)