level2.py 8.52 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
import os
Wei Shoulin's avatar
pkl    
Wei Shoulin committed
2
import pickle
Wei Shoulin's avatar
Wei Shoulin committed
3
from typing import Optional, Tuple, Literal, Union, IO
Wei Shoulin's avatar
Wei Shoulin committed
4
from .common import request, Result, utils, constants
Wei Shoulin's avatar
Wei Shoulin committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

DateTimeTuple = Tuple[str, str]

def find(
        project_id: Optional[str] = None,
        obs_id: Optional[str] = None,
        module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
        detector_no: Optional[str] = None,
        data_type: Optional[str] = None,
        filter: Optional[str] = None,
        obs_time: Optional[DateTimeTuple] = None,
        create_time: Optional[DateTimeTuple] = None,
        qc2_status: Optional[int] = None,
        prc_status: Optional[int] = None,
        file_name: Optional[str] = None,
        object_name: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
21
22
        dataset: str = constants.DEFAULT_DATASET,
        batch_id: str = constants.DEFAULT_BATCH_ID,
Wei Shoulin's avatar
Wei Shoulin committed
23
24
25
        page: int = 1,
        limit: int = 0) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
26
    根据给定的参数搜索2级数据文件记录
Wei Shoulin's avatar
Wei Shoulin committed
27
28
29
30
31
32
    
    Args:
        project_id (Optional[str], optional): 项目ID. Defaults to None.
        obs_id (Optional[str], optional): 观测ID. Defaults to None.
        module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
        detector_no (Optional[str], optional): 探测器编号. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
33
        data_type (Optional[str], optional): 文件类型,如'csst-msc-l2-mbi-cat'. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
34
35
36
37
38
39
40
        filter (Optional[str], optional): 滤光片. Defaults to None.
        obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
        create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
        qc2_status (Optional[int], optional): QC0状态. Defaults to None.
        prc_status (Optional[int], optional): 处理状态. Defaults to None.
        file_name (Optional[str], optional): 文件名. Defaults to None.
        object_name (Optional[str], optional): 天体名称. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
41
42
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 最后一次成功的批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
        page (int, optional): 页码. Defaults to 1.
        limit (int, optional): 每页数量. Defaults to 0.
    
    Returns:
        Result: 搜索结果对象.
    
    """

    params = {
        'project_id': project_id,
        'obs_id': obs_id,
        'module_id': module_id,
        'detector_no': detector_no,
        'data_type': data_type,
        'filter': filter,
        'qc2_status': qc2_status,
        'prc_status': prc_status,
        'file_name': file_name,
        'object_name': object_name,
Wei Shoulin's avatar
Wei Shoulin committed
62
63
        'dataset': dataset,
        'batch_id': batch_id,
Wei Shoulin's avatar
Wei Shoulin committed
64
65
66
67
68
69
70
71
72
73
        'obs_time_start': None,
        'obs_time_end': None,
        'create_time_start': None,
        'create_time_end': None,
        'page': page,
        'limit': limit,
    }
    
    if obs_time is not None:
        params['obs_time_start'], params['obs_time_end'] = obs_time
Wei Shoulin's avatar
Wei Shoulin committed
74
        utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
75
76
    if create_time is not None:
        params['create_time_start'], params['create_time_end'] = create_time
Wei Shoulin's avatar
Wei Shoulin committed
77
        utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
Wei Shoulin's avatar
Wei Shoulin committed
78
79
80
81
82
    
    return request.post("/api/level2", params)

def find_by_level2_id(level2_id: str) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
83
    通过 level2 的 ID 查询2级数据
Wei Shoulin's avatar
Wei Shoulin committed
84
85
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
86
        level2_id (str): 2级数据的ID
Wei Shoulin's avatar
Wei Shoulin committed
87
88
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
89
        Result: 查询结果
Wei Shoulin's avatar
Wei Shoulin committed
90
91
92
93
    
    """
    return request.get(f"/api/level2/{level2_id}")

Wei Shoulin's avatar
Wei Shoulin committed
94
def update_qc2_status(level2_id: str, data_type: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
95
96
97
98
99
    """
    更新2级数据的QC0状态
    
    Args:
        level2_id (str): 2级数据文件的ID
Wei Shoulin's avatar
Wei Shoulin committed
100
        data_type (str): 数据类型,如'csst-msc-l2-mbi-cat'
Wei Shoulin's avatar
Wei Shoulin committed
101
102
103
104
105
        qc2_status (int): QC0状态
    
    Returns:
        Result: 更新结果
    """
Wei Shoulin's avatar
Wei Shoulin committed
106
    return request.put(f"/api/level2/qc2_status/{level2_id}", {'data_type': data_type, 'qc2_status': qc2_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
107

Wei Shoulin's avatar
Wei Shoulin committed
108
def update_prc_status(level2_id: str, data_type: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
109
    """
Wei Shoulin's avatar
Wei Shoulin committed
110
    更新2级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
111
112
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
113
114
115
        level2_id (str): 2级数据文件的ID
        data_type (str): 数据类型,如'csst-msc-l2-mbi-cat'
        prc_status (int): 处理状态
Wei Shoulin's avatar
Wei Shoulin committed
116
117
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
118
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
119
    """
Wei Shoulin's avatar
Wei Shoulin committed
120
    return request.put(f"/api/level2/prc_status/{level2_id}", {'data_type': data_type, 'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
121

Wei Shoulin's avatar
Wei Shoulin committed
122
def update_qc2_status_by_file_name(file_name: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
123
124
125
126
127
128
129
130
131
132
    """
    更新2级数据的QC0状态
    
    Args:
        file_name (str): 2级数据文件名
        qc2_status (int): QC0状态
    
    Returns:
        Result: 更新结果
    """
Wei Shoulin's avatar
Wei Shoulin committed
133
    return request.put(f"/api/level2/qc2_status/file/{file_name}", {'qc2_status': qc2_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
134

Wei Shoulin's avatar
Wei Shoulin committed
135
def update_prc_status_by_file_name(file_name: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
136
    """
Wei Shoulin's avatar
Wei Shoulin committed
137
    更新2级数据的处理状态
Wei Shoulin's avatar
Wei Shoulin committed
138
139
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
140
141
        file_name (str): 2级数据文件名
        prc_status (int): 处理状态
Wei Shoulin's avatar
Wei Shoulin committed
142
143
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
144
        Result: 操作结果
Wei Shoulin's avatar
Wei Shoulin committed
145
    """
Wei Shoulin's avatar
Wei Shoulin committed
146
    return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status, 'batch_id': batch_id})
Wei Shoulin's avatar
Wei Shoulin committed
147

Wei Shoulin's avatar
Wei Shoulin committed
148
def write(local_file: Union[IO | str],
Wei Shoulin's avatar
Wei Shoulin committed
149
150
151
152
153
154
        module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
        level0_id: Optional[str | None],
        level1_id: Optional[str | None],
        level2_id: Optional[str | None],
        brick_id: Optional[int | None],
        data_type: str,
Wei Shoulin's avatar
Wei Shoulin committed
155
        file_name: str,
Wei Shoulin's avatar
Wei Shoulin committed
156
157
        pipeline_id: str,
        build: int,
Wei Shoulin's avatar
Wei Shoulin committed
158
159
        dataset: Optional[str] = None,
        batch_id: Optional[str] = None,
Wei Shoulin's avatar
Wei Shoulin committed
160
161
        **extra_kwargs) -> Result:
    """
Wei Shoulin's avatar
Wei Shoulin committed
162
    将本地的2级数据文件写入到DFS中
Wei Shoulin's avatar
Wei Shoulin committed
163
164
    
    Args:
Wei Shoulin's avatar
Wei Shoulin committed
165
166
167
168
169
170
171
172
173
174
        local_file (Union[IO | str]): 文件路径 或 文件对象
        module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
        level0_id (Optional[str | None]): 0级数据的ID默认为 None
        level1_id (Optional[str | None]): 1级数据的ID默认为 None
        level2_id (Optional[str | None]): 2级数据的ID默认为 None
        brick_id (Optional[int | None]): 天区的ID默认为 None
        data_type (str): 数据类型,如'csst-msc-l2-mbi-cat'
        file_name (str): 2级数据文件名
        pipeline_id (str): 管线ID
        build (int): 构建号
Wei Shoulin's avatar
Wei Shoulin committed
175
176
        dataset (Optional[str], optional): 数据集名称. Defaults to None.
        batch_id (Optional[str], optional): 最后一次成功的批次ID. Defaults to None.
Wei Shoulin's avatar
Wei Shoulin committed
177
        **kwargs: 额外的关键字参数,这些参数将传递给DFS
Wei Shoulin's avatar
Wei Shoulin committed
178
179
    
    Returns:
Wei Shoulin's avatar
Wei Shoulin committed
180
        Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为2级数据对象
Wei Shoulin's avatar
Wei Shoulin committed
181
182
    
    """
Wei Shoulin's avatar
Wei Shoulin committed
183
184
    if utils.is_valid_filename(file_name):
        raise ValueError(f"Incorrect file name [{file_name}], should be *.*")    
Wei Shoulin's avatar
Wei Shoulin committed
185
186
187
188
189
190
191
192
193
194
195
196
197
    params = {
        'module_id': module_id,
        'level0_id': level0_id, 
        'level1_id': level1_id,
        'level2_id': level2_id,
        'brick_id': brick_id,
        'file_name': file_name,
        'data_type': data_type,
        'pipeline_id': pipeline_id,
        'build': build,
        'dataset': dataset,
        'batch_id': batch_id
    }
Wei Shoulin's avatar
Wei Shoulin committed
198
    params.update(extra_kwargs)    
Wei Shoulin's avatar
Wei Shoulin committed
199
200
201
202
203
    if isinstance(local_file, str):
        if not os.path.exists(local_file):
            raise FileNotFoundError(local_file)        
        return request.post_file("/api/level2/file", local_file, params)
    return request.post_bytesio("/api/level2/file", local_file, params)
Wei Shoulin's avatar
Wei Shoulin committed
204
205

def catalog_query(sql: str, limit: int = 0) -> Result:
Wei Shoulin's avatar
Wei Shoulin committed
206
207
208
209
210
211
212
213
214
215
216
217
    """
    根据给定的SQL查询语句和限制条件,从数据库中查询2级科学数据并返回查询结果。

    Args:
        sql (str): 要执行的SQL查询语句。
        limit (int, optional): 查询结果的最大数量。默认为0,表示不限制数量。

    Returns:
        Result: 包含查询结果的Result对象,data为pd.DataFrame对象。

    """

Wei Shoulin's avatar
pkl    
Wei Shoulin committed
218
219
220
221
222
223
    datas = request.post("/api/level2/catalog/query", {'sql': sql, 'limit': limit})
    if datas and isinstance(datas, Result):
        return datas
    records = pickle.loads(datas._content)
    df, total_count = records['records'], records['totalCount']
    return Result.ok_data(data = df).append("totalCount", total_count)