level1.py 10.3 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
from typing import Optional, Tuple, Literal, IO, Union
Wei Shoulin's avatar
Wei Shoulin committed
2
from .common import request, Result, utils, constants
Wei Shoulin's avatar
Wei Shoulin committed
3
4
5
6
7
8
9
10
11
import os

DateTimeTuple = Tuple[str, str]

def find(project_id: Optional[str] = None,
        obs_id: Optional[str] = None,
        level0_id: Optional[str] = None,
        module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
        detector_no: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
12
        data_type: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
13
14
15
16
17
18
19
20
21
22
23
24
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
        qc1_status: Optional[int] = None,
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
        ra_obj: Optional[int] = None,
        dec_obj: Optional[int] = None,
        radius: Optional[float] = None,
        object_name: Optional[str] = None,
        rss_id: Optional[str] = None,
        cube_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
25
26
        dataset: Optional[str] = None,
        batch_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
27
28
29
        page: int = 1,
        limit: int = 0) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
30
    根据给定的参数搜索1级数据文件记录
Wei Shoulin's avatar
Wei Shoulin committed
31
32
33
34
35
36
    
    Args:
        project_id (Optional[str], optional): 项目ID. Defaults to None.
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
        module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
        detector_no (Optional[str], optional): 探测器编号. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
37
        data_type (Optional[str], optional): 文件类型,如'csst-msc-l1-mbi'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
38
39
40
41
42
43
44
45
46
47
48
49
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
        qc1_status (Optional[int], optional): QC1状态. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
        ra_obj (Optional[int], optional): 目标赤经. Defaults to None.
        dec_obj (Optional[int], optional): 目标赤纬. Defaults to None.
        radius (Optional[float], optional): 搜索半径. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
        rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
        cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
50
51
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
52
53
54
55
56
57
58
59
60
61
62
63
64
65
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'project_id': project_id,
        'level0_id': level0_id,
        'obs_id': obs_id,
        'module_id': module_id,
        'detector_no': detector_no,
Wei Shoulin's avatar
Wei Shoulin committed
66
        'data_type': data_type,
Wei Shoulin's avatar
Wei Shoulin committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
        'filter': filter,
        'qc1_status': qc1_status,
        'prc_status': prc_status,
        'file_name': file_name,
        'ra_obj': ra_obj,
        'dec_obj': dec_obj,
        'radius': radius,
        'object_name': object_name,
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
        'rss_id': rss_id,
        'cube_id': cube_id,
Wei Shoulin's avatar
Wei Shoulin committed
81
82
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
83
84
85
86
87
88
        'page': page,
        'limit': limit,
    }
    
    if obs_time is not None:
        params['obs_time_start'], params['obs_time_end'] = obs_time
Wei Shoulin's avatar
Wei Shoulin committed
89
        utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
90
91
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
Wei Shoulin's avatar
Wei Shoulin committed
92
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
93
94
95
96
97
    
    return request.post("/api/level1", params)

def find_by_level1_id(level1_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
98
    通过 level1 的 ID 查询1级数据
Wei Shoulin's avatar
Wei Shoulin committed
99
100
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
101
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
102
103
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
104
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
105
106
107
108
109
    
    """
    return request.get(f"/api/level1/{level1_id}")

def find_by_brick_id(brick_id: int) -> Result:
Wei Shoulin's avatar
plan    
Wei Shoulin committed
110
    """
Wei Shoulin's avatar
Wei Shoulin committed
111
    通过 brick 的 ID 查询1级数据
Wei Shoulin's avatar
plan    
Wei Shoulin committed
112
113
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
114
        brick_id (int): 天区ID
Wei Shoulin's avatar
plan    
Wei Shoulin committed
115
116
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
117
        Result: 查询结果
Wei Shoulin's avatar
plan    
Wei Shoulin committed
118
119
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
120
121
    return request.get(f"/api/level1/brick/{brick_id}")

Wei Shoulin's avatar
Wei Shoulin committed
122
123
def sls_find_by_qc1_status(qc1_status: int, limit: int = 1, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    return request.post(f"/api/level1/sls/qc1_status/{qc1_status}", {'limit': limit, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
124

Wei Shoulin's avatar
Wei Shoulin committed
125
def update_qc1_status(level1_id: str, data_type: str, qc1_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
126
127
128
129
130
    """
    更新1级数据的QC0状态
    
    Args:
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
131
        data_type (str): 文件类型
Wei Shoulin's avatar
Wei Shoulin committed
132
        qc1_status (int): QC0状态
Wei Shoulin's avatar
Wei Shoulin committed
133
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
134
135
136
137
    
    Returns:
        Result: 更新结果
    """
Wei Shoulin's avatar
Wei Shoulin committed
138
    return request.put(f"/api/level1/qc1_status/{level1_id}", {'data_type': data_type, 'qc1_status': qc1_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
139

Wei Shoulin's avatar
Wei Shoulin committed
140
def update_prc_status(level1_id: str, data_type: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
141
    """
Wei Shoulin's avatar
Wei Shoulin committed
142
    更新1级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
143
144
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
145
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
146
        data_type (str): 文件类型
Wei Shoulin's avatar
Wei Shoulin committed
147
148
        prc_status (int): 处理状态
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
149
150
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
151
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
152
    """
Wei Shoulin's avatar
Wei Shoulin committed
153
    return request.put(f"/api/level1/prc_status/{level1_id}", {'data_type': data_type, 'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
154

Wei Shoulin's avatar
Wei Shoulin committed
155
def write(local_file: Union[IO, str], 
Wei Shoulin's avatar
Wei Shoulin committed
156
        module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
Wei Shoulin's avatar
Wei Shoulin committed
157
        level1_id: str,
Wei Shoulin's avatar
Wei Shoulin committed
158
        data_type: str,
Wei Shoulin's avatar
Wei Shoulin committed
159
        file_name: str,
160
        dag_id: str,
Wei Shoulin's avatar
Wei Shoulin committed
161
162
        pmapname: str,
        build: int,
Wei Shoulin's avatar
Wei Shoulin committed
163
        level0_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
164
165
        dataset: str = constants.DEFAULT_DATASET,
        batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
166
        qc1_status: int = 0,
Wei Shoulin's avatar
Wei Shoulin committed
167
168
        **extra_kwargs) -> Result:
    '''
Wei Shoulin's avatar
Wei Shoulin committed
169
    将本地的1级数据文件写入到DFS中其他参数如rss_id, cube_id等,可通过extra_kwargs传入
Wei Shoulin's avatar
Wei Shoulin committed
170
171
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
172
        local_file (Union[IO, str]): 文件路径或文件对象
Wei Shoulin's avatar
Wei Shoulin committed
173
        module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
Wei Shoulin's avatar
| bug    
Wei Shoulin committed
174
        level0_id (Optional[str]): 0级数据的ID默认为 None
Wei Shoulin's avatar
Wei Shoulin committed
175
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
176
        data_type (str): 文件类型
Wei Shoulin's avatar
Wei Shoulin committed
177
        file_name (str): 1级数据文件名
178
        dag_id (str): 管线ID
Wei Shoulin's avatar
Wei Shoulin committed
179
180
181
182
        pmapname (str): CCDS pmap名称
        build (int): 构建号
        dataset (str): 数据集名称
        batch_id (str): 批次ID
Wei Shoulin's avatar
Wei Shoulin committed
183
        qc1_status (int): QC1状态
Wei Shoulin's avatar
Wei Shoulin committed
184
        **kwargs: 额外的关键字参数,这些参数将传递给DFS
Wei Shoulin's avatar
Wei Shoulin committed
185
186
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
187
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
Wei Shoulin's avatar
Wei Shoulin committed
188
    '''
Wei Shoulin's avatar
Wei Shoulin committed
189
190
191
192
    params = {
        'module_id': module_id, 
        'level0_id': level0_id, 
        'level1_id': level1_id, 
Wei Shoulin's avatar
Wei Shoulin committed
193
        'data_type': data_type, 
Wei Shoulin's avatar
Wei Shoulin committed
194
        'file_name': file_name, 
195
        'dag_id': dag_id, 
Wei Shoulin's avatar
Wei Shoulin committed
196
197
198
        'pmapname': pmapname, 
        'build': build,
        'dataset': dataset,
Wei Shoulin's avatar
Wei Shoulin committed
199
200
        'batch_id': batch_id,
        'qc1_status': qc1_status
Wei Shoulin's avatar
Wei Shoulin committed
201
202
203
    }
    if not dataset or not batch_id:
        raise ValueError("dataset and batch_id is required")
Wei Shoulin's avatar
Wei Shoulin committed
204
    params.update(extra_kwargs)
Wei Shoulin's avatar
Wei Shoulin committed
205
206
    if local_file is None:
        raise ValueError("local_file is required")
Wei Shoulin's avatar
Wei Shoulin committed
207
208
209
210
211
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level1/file", local_file, params)
    return request.post_bytesio("/api/level1/file", local_file, params)
Wei Shoulin's avatar
Wei Shoulin committed
212
213
214

def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'], 
                    level1_id: str, 
Wei Shoulin's avatar
Wei Shoulin committed
215
216
                    dataset: str = constants.DEFAULT_DATASET,
                    batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
217
    """
Wei Shoulin's avatar
Wei Shoulin committed
218
    生成流水线的处理消息
Wei Shoulin's avatar
Wei Shoulin committed
219
220
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
221
222
        module_id (str): 模块ID
        level1_id (str): 1级数据的ID
223
        dag_id (str): 流水管线ID,默认为空字符串
Wei Shoulin's avatar
Wei Shoulin committed
224
225
226
        dataset (str): 数据集
        batch_id (str): 批次ID

Wei Shoulin's avatar
Wei Shoulin committed
227
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
228
        Result: 处理消息生成的结果,是否成功以及相关的错误信息
Wei Shoulin's avatar
Wei Shoulin committed
229
230
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
231
232
233
234
235
236
237
    params = {
        'dataset': dataset,
        'batch_id': batch_id,
        'level1_id': level1_id,
    }
    
    return request.put(f"/api/level1/prc/{module_id}", params)
Wei Shoulin's avatar
Wei Shoulin committed
238
239
240

def process_list(level1_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
241
    通过 level1 的 ID 查询1级数据处理记录
Wei Shoulin's avatar
Wei Shoulin committed
242
243
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
244
        level1_id (str): 1级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
245
246
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
247
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
248
249
250
251
252
    
    """
    return request.get(f"/api/level1/prc/{level1_id}")

def add_process(level1_id: str, 
253
254
                dag_id: str, 
                dag_run_id: str, 
Wei Shoulin's avatar
Wei Shoulin committed
255
256
                dataset: str = constants.DEFAULT_DATASET,
                batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
257
                prc_time: str = utils.get_current_time(),
Wei Shoulin's avatar
Wei Shoulin committed
258
259
260
261
                prc_status: int = -1024,
                prc_module: str = "",
                message: str = "") -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
262
    添加1级数据处理记录
Wei Shoulin's avatar
Wei Shoulin committed
263
264
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
265
        level1_id (str): 1级数据的ID
266
267
        dag_id (str): 管线ID
        dag_run_id (str): 运行ID
Wei Shoulin's avatar
Wei Shoulin committed
268
269
270
271
272
273
        dataset (str): 数据集
        batch_id (str): 批次ID
        prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"
        prc_status (int): 处理状态
        prc_module (str): 处理模块
        message (str): 处理消息
Wei Shoulin's avatar
Wei Shoulin committed
274
275
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
276
        Result: 成功后,Result.data为写入记录,失败message为失败原因
Wei Shoulin's avatar
Wei Shoulin committed
277
278
279
280
    
    """
    params = {
        'level1_id': level1_id,
281
282
        'dag_id': dag_id,
        'dag_run_id': dag_run_id,
Wei Shoulin's avatar
Wei Shoulin committed
283
284
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
285
286
287
288
289
        'prc_time': prc_time,
        'prc_status': prc_status,
        'prc_module': prc_module,
        'message': message,
    }
Wei Shoulin's avatar
Wei Shoulin committed
290
    utils.is_valid_datetime_format(prc_time)
Wei Shoulin's avatar
Wei Shoulin committed
291
    return request.post("/api/level1/prc", params)