level1.py 11.8 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
from typing import Optional, Tuple, Literal, IO, Union
Wei Shoulin's avatar
Wei Shoulin committed
2
from .common import request, Result, utils, constants
Wei Shoulin's avatar
Wei Shoulin committed
3
4
5
6
import os

DateTimeTuple = Tuple[str, str]

7
8
def find(
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
9
10
        dataset: str,
        batch_id: str,        
11
        obs_group: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
12
13
        obs_id: Optional[str] = None,
        level0_id: Optional[str] = None,
14
        detector: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
15
        data_model: Optional[str] = None,
16
        obs_type: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
17
18
19
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
20
        qc_status: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
21
22
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
23
24
        ra_cen: Optional[int] = None,
        dec_cen: Optional[int] = None,
Wei Shoulin's avatar
Wei Shoulin committed
25
26
27
28
29
30
31
        radius: Optional[float] = None,
        object_name: Optional[str] = None,
        rss_id: Optional[str] = None,
        cube_id: Optional[str] = None,
        page: int = 1,
        limit: int = 0) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
32
    根据给定的参数搜索1级数据文件记录
Wei Shoulin's avatar
Wei Shoulin committed
33
34
    
    Args:
35
        obs_group (Optional[str], optional): 项目ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
36
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
37
        instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
38
        detector (Optional[str], optional): 探测器. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
39
        data_model (Optional[str], optional): 数据类型,如'csst-msc-l1-mbi'. Defaults to None.
40
        obs_type (Optional[str], optional): 观测类型,如'01'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
41
42
43
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
44
        qc_status (Optional[int], optional): QC1状态. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
45
46
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
47
48
        ra_cen (Optional[int], optional): 中心赤经. Defaults to None.
        dec_cen (Optional[int], optional): 中心赤纬. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
49
50
51
52
        radius (Optional[float], optional): 搜索半径. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
        rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
        cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
53
54
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
55
56
57
58
59
60
61
62
63
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
64
        'obs_group': obs_group,
Wei Shoulin's avatar
Wei Shoulin committed
65
66
        'level0_id': level0_id,
        'obs_id': obs_id,
Wei Shoulin's avatar
Wei Shoulin committed
67
        'instrument': instrument,
68
        'detector': detector,
Wei Shoulin's avatar
Wei Shoulin committed
69
        'data_model': data_model,
70
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
71
        'filter': filter,
72
        'qc_status': qc_status,
Wei Shoulin's avatar
Wei Shoulin committed
73
74
        'prc_status': prc_status,
        'file_name': file_name,
Wei Shoulin's avatar
Wei Shoulin committed
75
76
        'ra_cen': ra_cen,
        'dec_cen': dec_cen,
Wei Shoulin's avatar
Wei Shoulin committed
77
78
79
80
81
82
83
84
        'radius': radius,
        'object_name': object_name,
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
        'rss_id': rss_id,
        'cube_id': cube_id,
Wei Shoulin's avatar
Wei Shoulin committed
85
86
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
87
88
89
90
91
92
        'page': page,
        'limit': limit,
    }
    
    if obs_time is not None:
        params['obs_time_start'], params['obs_time_end'] = obs_time
Wei Shoulin's avatar
Wei Shoulin committed
93
        utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
94
95
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
Wei Shoulin's avatar
Wei Shoulin committed
96
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
97
98
99
100
101
    
    return request.post("/api/level1", params)

def find_by_level1_id(level1_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
102
    通过 level1 的 ID 查询1级数据
Wei Shoulin's avatar
Wei Shoulin committed
103
104
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
105
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
106
107
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
108
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
109
110
111
112
113
    
    """
    return request.get(f"/api/level1/{level1_id}")

def find_by_brick_id(brick_id: int) -> Result:
Wei Shoulin's avatar
plan    
Wei Shoulin committed
114
    """
Wei Shoulin's avatar
Wei Shoulin committed
115
    通过 brick 的 ID 查询1级数据
Wei Shoulin's avatar
plan    
Wei Shoulin committed
116
117
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
118
        brick_id (int): 天区ID
Wei Shoulin's avatar
plan    
Wei Shoulin committed
119
120
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
121
        Result: 查询结果
Wei Shoulin's avatar
plan    
Wei Shoulin committed
122
123
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
124
125
    return request.get(f"/api/level1/brick/{brick_id}")

126
def sls_find_by_qc_status(qc_status: int, batch_id: str, limit: int = 1) -> Result:
127
    return request.post(f"/api/level1/sls/qc_status/{qc_status}", {'limit': limit, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
128

129
def update_qc_status_by_ids(ids: list[str], qc_status: int) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
130
    """
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    根据内部_id,批量更新1级数据的QC状态
    
    Args:
        ids (List[str]): 内部_id列表
        qc_status (int): QC状态
    
    Returns:
        Result: 更新结果
    """
    return request.put("/api/level1/qc_status/batch/update", {'qc_status': qc_status, 'ids': ids})

def update_prc_status_by_ids(ids: list[str], prc_status: int) -> Result:
    """
    根据内部_id,批量更新1级数据的处理状态
    
    Args:
        ids (List[str]): 内部_id列表
        prc_status (int): 处理状态
    
    Returns:
        Result: 更新结果
    """
    return request.put("/api/level1/prc_status/batch/update", {'prc_status': prc_status, 'ids': ids})

def update_qc_status(level1_id: str, data_model: str, qc_status: int, batch_id: str) -> Result:
    """
    更新1级数据的QC状态
Wei Shoulin's avatar
Wei Shoulin committed
158
159
160
    
    Args:
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
161
        data_model (str): 数据类型
162
        qc_status (int): QC状态
Wei Shoulin's avatar
Wei Shoulin committed
163
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
164
165
166
167
    
    Returns:
        Result: 更新结果
    """
168
    return request.put(f"/api/level1/qc_status/{level1_id}", {'data_model': data_model, 'qc_status': qc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
169

170
def update_prc_status(level1_id: str, data_model: str, prc_status: int, batch_id: str) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
171
    """
Wei Shoulin's avatar
Wei Shoulin committed
172
    更新1级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
173
174
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
175
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
176
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
177
178
        prc_status (int): 处理状态
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
179
180
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
181
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
182
    """
Wei Shoulin's avatar
Wei Shoulin committed
183
    return request.put(f"/api/level1/prc_status/{level1_id}", {'data_model': data_model, 'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
184

Wei Shoulin's avatar
Wei Shoulin committed
185
def write(local_file: Union[IO, str], 
Wei Shoulin's avatar
Wei Shoulin committed
186
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
187
        obs_group: Optional[str],
Wei Shoulin's avatar
Wei Shoulin committed
188
        obs_type: str,
Wei Shoulin's avatar
Wei Shoulin committed
189
        level1_id: str,
Wei Shoulin's avatar
Wei Shoulin committed
190
        data_model: str,
Wei Shoulin's avatar
Wei Shoulin committed
191
        file_name: str,
192
        dag: str,
Wei Shoulin's avatar
Wei Shoulin committed
193
194
        pmapname: str,
        build: int,
195
196
        dataset: str,
        batch_id: str,        
Wei Shoulin's avatar
Wei Shoulin committed
197
        level0_id: Optional[str] = None,
198
        qc_status: int = 0,
Wei Shoulin's avatar
Wei Shoulin committed
199
200
        **extra_kwargs) -> Result:
    '''
Wei Shoulin's avatar
Wei Shoulin committed
201
    将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入
Wei Shoulin's avatar
Wei Shoulin committed
202
203
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
204
        local_file (Union[IO, str]): 文件路径或文件对象
Wei Shoulin's avatar
Wei Shoulin committed
205
        instrument ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
206
        obs_group (str): 项目ID
Wei Shoulin's avatar
Wei Shoulin committed
207
        obs_type (str): 观测类型
Wei Shoulin's avatar
| bug    
Wei Shoulin committed
208
        level0_id (Optional[str]): 0级数据的ID默认为 None
Wei Shoulin's avatar
Wei Shoulin committed
209
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
210
        data_model (str): 数据类型
Wei Shoulin's avatar
Wei Shoulin committed
211
        file_name (str): 1级数据文件名
212
        dag (str): DAG标识
Wei Shoulin's avatar
Wei Shoulin committed
213
214
215
216
        pmapname (str): CCDS pmap名称
        build (int): 构建号
        dataset (str): 数据集名称
        batch_id (str): 批次ID
217
        qc_status (int): QC状态
Wei Shoulin's avatar
Wei Shoulin committed
218
        **kwargs: 额外的关键字参数,这些参数将传递给DFS
Wei Shoulin's avatar
Wei Shoulin committed
219
220
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
221
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
Wei Shoulin's avatar
Wei Shoulin committed
222
    '''
Wei Shoulin's avatar
Wei Shoulin committed
223
    params = {
Wei Shoulin's avatar
Wei Shoulin committed
224
        'instrument': instrument, 
225
        'obs_group': obs_group,
Wei Shoulin's avatar
Wei Shoulin committed
226
        'obs_type': obs_type,
Wei Shoulin's avatar
Wei Shoulin committed
227
228
        'level0_id': level0_id, 
        'level1_id': level1_id, 
Wei Shoulin's avatar
Wei Shoulin committed
229
        'data_model': data_model, 
Wei Shoulin's avatar
Wei Shoulin committed
230
        'file_name': file_name, 
231
        'dag': dag, 
Wei Shoulin's avatar
Wei Shoulin committed
232
233
234
        'pmapname': pmapname, 
        'build': build,
        'dataset': dataset,
Wei Shoulin's avatar
Wei Shoulin committed
235
        'batch_id': batch_id,
236
        'qc_status': qc_status
Wei Shoulin's avatar
Wei Shoulin committed
237
238
239
    }
    if not dataset or not batch_id:
        raise ValueError("dataset and batch_id is required")
Wei Shoulin's avatar
Wei Shoulin committed
240
    params.update(extra_kwargs)
Wei Shoulin's avatar
Wei Shoulin committed
241
242
    if local_file is None:
        raise ValueError("local_file is required")
Wei Shoulin's avatar
Wei Shoulin committed
243
244
245
246
247
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level1/file", local_file, params)
    return request.post_bytesio("/api/level1/file", local_file, params)
Wei Shoulin's avatar
Wei Shoulin committed
248

249
250
def find_process(dag: Optional[str] = None,  
                dag_run: Optional[str] = None, 
Wei Shoulin's avatar
Wei Shoulin committed
251
252
253
254
255
256
257
258
                batch_id: Optional[str] = None, 
                level1_id: Optional[str] = None,
                dataset: Optional[str] = None,
                prc_module: Optional[str] = None,
                prc_status: Optional[int] = None,
                prc_time: Optional[DateTimeTuple] = None,
                page: int = 1,
                limit: int = 0) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
259
    """
Wei Shoulin's avatar
Wei Shoulin committed
260
    查询0级数据处理过程
Wei Shoulin's avatar
Wei Shoulin committed
261
262
    
    Args:
263
264
        dag (str): DAG标识
        dag_run (str): DAG运行标识
Wei Shoulin's avatar
Wei Shoulin committed
265
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
266
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
267
268
269
270
271
272
        dataset (str): 数据集
        prc_module (str): 处理模块
        prc_status (int): 处理状态
        prc_time (DateTimeTuple): 处理时间范围
        page (int): 页码,默认为1
        limit (int): 每页数量 0: 不限制
Wei Shoulin's avatar
Wei Shoulin committed
273
274
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
275
        Result: 成功后,Result.data为数据列表,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
276
277
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
278
    params = {
279
280
        'dag': dag,
        'dag_run': dag_run,
Wei Shoulin's avatar
Wei Shoulin committed
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
        'batch_id': batch_id,
        'level1_id': level1_id,
        'dataset': dataset,
        'prc_module': prc_module,
        'prc_status': prc_status,
        'prc_time_start': None,
        'prc_time_end': None,
        'page': page,
        'limit': limit
    }
    if prc_time is not None:
        params['prc_time_start'], params['prc_time_end'] = prc_time
        if params['prc_time_start'] and utils.is_valid_datetime_format(params['prc_time_start']):
            pass
        if params['prc_time_end'] and utils.is_valid_datetime_format(params['prc_time_end']):
            pass 
    return request.post("/api/level1/process", params)
Wei Shoulin's avatar
Wei Shoulin committed
298
299

def add_process(level1_id: str, 
300
301
                dag: str, 
                dag_run: str, 
Wei Shoulin's avatar
Wei Shoulin committed
302
303
                dataset: str = constants.DEFAULT_DATASET,
                batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
304
                prc_time: str = utils.get_current_time(),
Wei Shoulin's avatar
Wei Shoulin committed
305
306
307
308
                prc_status: int = -1024,
                prc_module: str = "",
                message: str = "") -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
309
    添加1级数据处理记录
Wei Shoulin's avatar
Wei Shoulin committed
310
311
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
312
        level1_id (str): 1级数据的ID
313
314
        dag (str): DAG标识
        dag_run (str): DAG运行标识
Wei Shoulin's avatar
Wei Shoulin committed
315
316
317
318
319
320
        dataset (str): 数据集
        batch_id (str): 批次ID
        prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
        prc_status (int): 处理状态
        prc_module (str): 处理模块
        message (str): 处理消息
Wei Shoulin's avatar
Wei Shoulin committed
321
322
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
323
        Result: 成功后,Result.data为写入记录,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
324
325
326
327
    
    """
    params = {
        'level1_id': level1_id,
328
329
        'dag': dag,
        'dag_run': dag_run,
Wei Shoulin's avatar
Wei Shoulin committed
330
331
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
332
333
334
335
336
        'prc_time': prc_time,
        'prc_status': prc_status,
        'prc_module': prc_module,
        'message': message,
    }
Wei Shoulin's avatar
Wei Shoulin committed
337
    utils.is_valid_datetime_format(prc_time)
Wei Shoulin's avatar
Wei Shoulin committed
338
    return request.post("/api/level1/prc", params)