level2.py 8.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import os
import pickle
from typing import Optional, Tuple, Literal, Union, IO
from .common import request, Result, utils, constants

DateTimeTuple = Tuple[str, str]

def find(
        obs_group: Optional[str] = None,
        obs_id: Optional[str] = None,
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
        detector: Optional[str] = None,
        data_model: Optional[str] = None,
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
        qc2_status: Optional[int] = None,
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
        object_name: Optional[str] = None,
        dataset: str = constants.DEFAULT_DATASET,
        batch_id: str = constants.DEFAULT_BATCH_ID,
        page: int = 1,
        limit: int = 0) -> Result:
    """
    根据给定的参数搜索2级数据文件记录
    
    Args:
        obs_group (Optional[str], optional): 项目ID. Defaults to None.
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
        instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
        detector (Optional[str], optional): 探测器. Defaults to None.
        data_model (Optional[str], optional): 数据类型,如'csst-msc-l2-mbi-cat'. Defaults to None.
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
        qc2_status (Optional[int], optional): QC0状态. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 批次ID. Defaults to None.
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'obs_group': obs_group,
        'obs_id': obs_id,
        'instrument': instrument,
        'detector': detector,
        'data_model': data_model,
        'filter': filter,
        'qc2_status': qc2_status,
        'prc_status': prc_status,
        'file_name': file_name,
        'object_name': object_name,
        'dataset': dataset,
        'batch_id': batch_id,
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
        'page': page,
        'limit': limit,
    }
    
    if obs_time is not None:
        params['obs_time_start'], params['obs_time_end'] = obs_time
        utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
    
    return request.post("/api/level2", params)

def find_by_level2_id(level2_id: str) -> Result:
    """
    通过 level2 的 ID 查询2级数据
    
    Args:
        level2_id (str): 2级数据的ID
    
    Returns:
        Result: 查询结果
    
    """
    return request.get(f"/api/level2/{level2_id}")

def update_qc2_status(level2_id: str, data_model: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    """
    更新2级数据的QC0状态
    
    Args:
        level2_id (str): 2级数据文件的ID
        data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
        qc2_status (int): QC0状态
    
    Returns:
        Result: 更新结果
    """
    return request.put(f"/api/level2/qc2_status/{level2_id}", {'data_model': data_model, 'qc2_status': qc2_status, 'batch_id': batch_id})

def update_prc_status(level2_id: str, data_model: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    """
    更新2级数据的处理状态
    
    Args:
        level2_id (str): 2级数据文件的ID
        data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
        prc_status (int): 处理状态
    
    Returns:
        Result: 操作结果
    """
    return request.put(f"/api/level2/prc_status/{level2_id}", {'data_model': data_model, 'prc_status': prc_status, 'batch_id': batch_id})

def update_qc2_status_by_file_name(file_name: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    """
    更新2级数据的QC0状态
    
    Args:
        file_name (str): 2级数据文件名
        qc2_status (int): QC0状态
    
    Returns:
        Result: 更新结果
    """
    return request.put(f"/api/level2/qc2_status/file/{file_name}", {'qc2_status': qc2_status, 'batch_id': batch_id})

def update_prc_status_by_file_name(file_name: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
    """
    更新2级数据的处理状态
    
    Args:
        file_name (str): 2级数据文件名
        prc_status (int): 处理状态
    
    Returns:
        Result: 操作结果
    """
    return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status, 'batch_id': batch_id})

def write(local_file: Union[IO, str],
        instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
        level2_id: str,
        data_model: str,
        file_name: str,
        dag: str,
        build: int,
        level0_id: Optional[str] = None,
        level1_id: Optional[str] = None,   
        brick_id: Optional[int] = 0,     
        dataset: str = constants.DEFAULT_DATASET,
        batch_id: str = constants.DEFAULT_BATCH_ID,
        qc2_status: int = 0,
        **extra_kwargs) -> Result:
    """
    将本地的2级数据文件写入到DFS中
    
    Args:
        local_file (Union[IO, str]): 文件路径 或 文件对象
        instrument ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
        level2_id (str): 2级数据的ID
        data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
        file_name (str): 2级数据文件名
        dag (str): DAG标识
        build (int): 构建号
        level0_id (Optional[str]): 0级数据的ID默认为 None
        level1_id (Optional[str]): 1级数据的ID默认为 None
        brick_id (Optional[int]): 天区的ID默认为 0        
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 最后一次成功的批次ID. Defaults to None.
        qc2_status (int): QC0状态默认为 0
        **kwargs: 额外的关键字参数,这些参数将传递给DFS
    
    Returns:
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为2级数据对象
    
    """

    params = {
        'instrument': instrument,
        'level0_id': level0_id, 
        'level1_id': level1_id,
        'level2_id': level2_id,
        'brick_id': brick_id,
        'file_name': file_name,
        'data_model': data_model,
        'dag': dag,
        'build': build,
        'dataset': dataset,
        'batch_id': batch_id,
        'qc2_status': qc2_status
    }
    params.update(extra_kwargs)
    if local_file is None:
        raise ValueError("local_file is required")    
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level2/file", local_file, params)
    return request.post_bytesio("/api/level2/file", local_file, params)

def catalog_query(sql: str, limit: int = 0) -> Result:
    """
    根据给定的SQL查询语句和限制条件,从数据库中查询2级科学数据并返回查询结果。

    Args:
        sql (str): 要执行的SQL查询语句。
        limit (int, optional): 查询结果的最大数量。默认为0,表示不限制数量。

    Returns:
        Result: 包含查询结果的Result对象,data为pd.DataFrame对象。

    """

    datas = request.post("/api/level2/catalog/query", {'sql': sql, 'limit': limit})
    if datas and isinstance(datas, Result):
        return datas
    records = pickle.loads(datas._content)
    df, total_count = records['records'], records['totalCount']
    return Result.ok_data(data = df).append("totalCount", total_count)