Commit 2cf4016e authored by Wei Shoulin's avatar Wei Shoulin
Browse files

refactor(client): remove unused Level2 API and update Level0/Level1 API

- Remove all Level2 API functions and related test cases
- Update Level0 and Level1 API functions to use more generic QC status instead of QC0/QC1
- Adjust function names and parameters for better consistency across levels
- Remove redundant default values for dataset and batch_id parameters
parent 02a2b773
Pipeline #9263 failed with stages
in 0 seconds
......@@ -117,7 +117,7 @@ def update_qc_status(level0_id: str, qc_status: int, dataset: str) -> Result:
Args:
level0_id (str): 0级数据的ID
qc_status (int): QC0状态
qc_status (int): QC状态
dataset (str): 数据集名称
Returns:
......@@ -127,18 +127,18 @@ def update_qc_status(level0_id: str, qc_status: int, dataset: str) -> Result:
def update_qc_status_by_ids(ids: List[str], qc_status: int) -> Result:
"""
根据内部_id,批量更新0级数据的QC0状态
根据内部_id,批量更新0级数据的QC状态
Args:
ids (List[str]): 内部_id列表
qc_status (int): QC0状态
qc_status (int): QC状态
Returns:
Result: 更新结果
"""
return request.put("/api/level0/qc_status/batch/update", {'qc_status': qc_status, 'ids': ids})
return request.put("/api/level0/qc_status/batch", {'qc_status': qc_status, 'ids': ids})
def update_prc_status(level0_id: str, dag_run: str, prc_status: int, dataset: str = constants.DEFAULT_DATASET) -> Result:
def update_prc_status(level0_id: str, dag_run: str, prc_status: int, dataset: str) -> Result:
"""
更新0级数据的处理状态
......@@ -164,7 +164,7 @@ def update_prc_status_by_ids(ids: List[str], prc_status: int) -> Result:
Returns:
Result: 操作结果
"""
return request.put("/api/level0/prc_status/batch", {'prc_status': prc_status, 'ids': ids})
return request.put("/api/level0/prc_status/batch/update", {'prc_status': prc_status, 'ids': ids})
def write(local_file: str,
dataset: str = constants.DEFAULT_DATASET,
......
......@@ -6,6 +6,8 @@ DateTimeTuple = Tuple[str, str]
def find(
instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
dataset: str,
batch_id: str,
obs_group: Optional[str] = None,
obs_id: Optional[str] = None,
level0_id: Optional[str] = None,
......@@ -24,8 +26,6 @@ def find(
object_name: Optional[str] = None,
rss_id: Optional[str] = None,
cube_id: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID,
page: int = 1,
limit: int = 0) -> Result:
"""
......@@ -123,17 +123,43 @@ def find_by_brick_id(brick_id: int) -> Result:
"""
return request.get(f"/api/level1/brick/{brick_id}")
def sls_find_by_qc_status(qc_status: int, limit: int = 1, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
def sls_find_by_qc_status(qc_status: int, batch_id: str, limit: int = 1) -> Result:
return request.post(f"/api/level1/sls/qc_status/{qc_status}", {'limit': limit, 'batch_id': batch_id})
def update_qc_status(level1_id: str, data_model: str, qc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
def update_qc_status_by_ids(ids: list[str], qc_status: int) -> Result:
"""
更新1级数据的QC0状态
根据内部_id,批量更新1级数据的QC状态
Args:
ids (List[str]): 内部_id列表
qc_status (int): QC状态
Returns:
Result: 更新结果
"""
return request.put("/api/level1/qc_status/batch/update", {'qc_status': qc_status, 'ids': ids})
def update_prc_status_by_ids(ids: list[str], prc_status: int) -> Result:
"""
根据内部_id,批量更新1级数据的处理状态
Args:
ids (List[str]): 内部_id列表
prc_status (int): 处理状态
Returns:
Result: 更新结果
"""
return request.put("/api/level1/prc_status/batch/update", {'prc_status': prc_status, 'ids': ids})
def update_qc_status(level1_id: str, data_model: str, qc_status: int, batch_id: str) -> Result:
"""
更新1级数据的QC状态
Args:
level1_id (str): 1级数据的ID
data_model (str): 数据类型
qc_status (int): QC0状态
qc_status (int): QC状态
batch_id (str): 批次ID
Returns:
......@@ -141,7 +167,7 @@ def update_qc_status(level1_id: str, data_model: str, qc_status: int, batch_id:
"""
return request.put(f"/api/level1/qc_status/{level1_id}", {'data_model': data_model, 'qc_status': qc_status, 'batch_id': batch_id})
def update_prc_status(level1_id: str, data_model: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
def update_prc_status(level1_id: str, data_model: str, prc_status: int, batch_id: str) -> Result:
"""
更新1级数据的处理状态
......@@ -166,9 +192,9 @@ def write(local_file: Union[IO, str],
dag: str,
pmapname: str,
build: int,
dataset: str,
batch_id: str,
level0_id: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID,
qc_status: int = 0,
**extra_kwargs) -> Result:
'''
......@@ -188,7 +214,7 @@ def write(local_file: Union[IO, str],
build (int): 构建号
dataset (str): 数据集名称
batch_id (str): 批次ID
qc_status (int): QC1状态
qc_status (int): QC状态
**kwargs: 额外的关键字参数,这些参数将传递给DFS
Returns:
......
import os
import pickle
from typing import Optional, Tuple, Literal, Union, IO
from .common import request, Result, utils, constants
DateTimeTuple = Tuple[str, str]
def find(
obs_group: Optional[str] = None,
obs_id: Optional[str] = None,
instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
detector: Optional[str] = None,
data_model: Optional[str] = None,
filter: Optional[str] = None,
obs_time: Optional[DateTimeTuple] = None,
create_time: Optional[DateTimeTuple] = None,
qc2_status: Optional[int] = None,
prc_status: Optional[int] = None,
file_name: Optional[str] = None,
object_name: Optional[str] = None,
dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数搜索2级数据文件记录
Args:
obs_group (Optional[str], optional): 项目ID. Defaults to None.
obs_id (Optional[str], optional): 观测ID. Defaults to None.
instrument (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
detector (Optional[str], optional): 探测器. Defaults to None.
data_model (Optional[str], optional): 数据类型,如'csst-msc-l2-mbi-cat'. Defaults to None.
filter (Optional[str], optional): 滤光片. Defaults to None.
obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
qc2_status (Optional[int], optional): QC0状态. Defaults to None.
prc_status (Optional[int], optional): 处理状态. Defaults to None.
file_name (Optional[str], optional): 文件名. Defaults to None.
object_name (Optional[str], optional): 天体名称. Defaults to None.
dataset (Optional[str], optional): 数据集名称. Defaults to None.
batch_id (Optional[str], optional): 批次ID. Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0.
Returns:
Result: 搜索结果对象.
"""
params = {
'obs_group': obs_group,
'obs_id': obs_id,
'instrument': instrument,
'detector': detector,
'data_model': data_model,
'filter': filter,
'qc2_status': qc2_status,
'prc_status': prc_status,
'file_name': file_name,
'object_name': object_name,
'dataset': dataset,
'batch_id': batch_id,
'obs_time_start': None,
'obs_time_end': None,
'create_time_start': None,
'create_time_end': None,
'page': page,
'limit': limit,
}
if obs_time is not None:
params['obs_time_start'], params['obs_time_end'] = obs_time
utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
if create_time is not None:
params['create_time_start'], params['create_time_end'] = create_time
utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
return request.post("/api/level2", params)
def find_by_level2_id(level2_id: str) -> Result:
"""
通过 level2 的 ID 查询2级数据
Args:
level2_id (str): 2级数据的ID
Returns:
Result: 查询结果
"""
return request.get(f"/api/level2/{level2_id}")
def update_qc2_status(level2_id: str, data_model: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
"""
更新2级数据的QC0状态
Args:
level2_id (str): 2级数据文件的ID
data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
qc2_status (int): QC0状态
Returns:
Result: 更新结果
"""
return request.put(f"/api/level2/qc2_status/{level2_id}", {'data_model': data_model, 'qc2_status': qc2_status, 'batch_id': batch_id})
def update_prc_status(level2_id: str, data_model: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
"""
更新2级数据的处理状态
Args:
level2_id (str): 2级数据文件的ID
data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
prc_status (int): 处理状态
Returns:
Result: 操作结果
"""
return request.put(f"/api/level2/prc_status/{level2_id}", {'data_model': data_model, 'prc_status': prc_status, 'batch_id': batch_id})
def update_qc2_status_by_file_name(file_name: str, qc2_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
"""
更新2级数据的QC0状态
Args:
file_name (str): 2级数据文件名
qc2_status (int): QC0状态
Returns:
Result: 更新结果
"""
return request.put(f"/api/level2/qc2_status/file/{file_name}", {'qc2_status': qc2_status, 'batch_id': batch_id})
def update_prc_status_by_file_name(file_name: str, prc_status: int, batch_id: str = constants.DEFAULT_BATCH_ID) -> Result:
"""
更新2级数据的处理状态
Args:
file_name (str): 2级数据文件名
prc_status (int): 处理状态
Returns:
Result: 操作结果
"""
return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status, 'batch_id': batch_id})
def write(local_file: Union[IO, str],
instrument: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level2_id: str,
data_model: str,
file_name: str,
dag: str,
build: int,
level0_id: Optional[str] = None,
level1_id: Optional[str] = None,
brick_id: Optional[int] = 0,
dataset: str = constants.DEFAULT_DATASET,
batch_id: str = constants.DEFAULT_BATCH_ID,
qc2_status: int = 0,
**extra_kwargs) -> Result:
"""
将本地的2级数据文件写入到DFS中
Args:
local_file (Union[IO, str]): 文件路径 或 文件对象
instrument ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
level2_id (str): 2级数据的ID
data_model (str): 数据类型,如'csst-msc-l2-mbi-cat'
file_name (str): 2级数据文件名
dag (str): DAG标识
build (int): 构建号
level0_id (Optional[str]): 0级数据的ID默认为 None
level1_id (Optional[str]): 1级数据的ID默认为 None
brick_id (Optional[int]): 天区的ID默认为 0
dataset (Optional[str], optional): 数据集名称. Defaults to None.
batch_id (Optional[str], optional): 最后一次成功的批次ID. Defaults to None.
qc2_status (int): QC0状态默认为 0
**kwargs: 额外的关键字参数,这些参数将传递给DFS
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为2级数据对象
"""
params = {
'instrument': instrument,
'level0_id': level0_id,
'level1_id': level1_id,
'level2_id': level2_id,
'brick_id': brick_id,
'file_name': file_name,
'data_model': data_model,
'dag': dag,
'build': build,
'dataset': dataset,
'batch_id': batch_id,
'qc2_status': qc2_status
}
params.update(extra_kwargs)
if local_file is None:
raise ValueError("local_file is required")
if isinstance(local_file, str):
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level2/file", local_file, params)
return request.post_bytesio("/api/level2/file", local_file, params)
def catalog_query(sql: str, limit: int = 0) -> Result:
"""
根据给定的SQL查询语句和限制条件,从数据库中查询2级科学数据并返回查询结果。
Args:
sql (str): 要执行的SQL查询语句。
limit (int, optional): 查询结果的最大数量。默认为0,表示不限制数量。
Returns:
Result: 包含查询结果的Result对象,data为pd.DataFrame对象。
"""
datas = request.post("/api/level2/catalog/query", {'sql': sql, 'limit': limit})
if datas and isinstance(datas, Result):
return datas
records = pickle.loads(datas._content)
df, total_count = records['records'], records['totalCount']
return Result.ok_data(data = df).append("totalCount", total_count)
import unittest
from csst_dfs_client import level2
class Level2TestCase(unittest.TestCase):
def setUp(self):
pass
def test_find(self):
result = level2.find(obs_id = "1010940063886712")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code))
self.assertIsNotNone(result.data, "error message: " + result.message)
def test_find_by_level2_id(self):
result = level2.find_by_level2_id(level2_id = "1010940063886712")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
self.assertIsNotNone(result.data, "error message: " + result.message)
def test_update_qc2_status(self):
result = level2.update_qc2_status(level2_id = "1010940063886712", data_model='csst-msc-l2-mbi-cat', qc2_status=1)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_update_prc_status(self):
result = level2.update_prc_status(level2_id = "1010940063886712", data_model='csst-msc-l2-mbi-cat', prc_status=3)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_update_qc2_status_by_file_name(self):
result = level2.update_qc2_status_by_file_name(file_name = "CSST_MSC_MS_SCI_20310423084104_20310423084334_10109400638867_12_L2_V01_CATMIX.fits", qc2_status=1)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_update_prc_status_by_file_name(self):
result = level2.update_prc_status_by_file_name(file_name = "CSST_MSC_MS_SCI_20310423084104_20310423084334_10109400638867_12_L2_V01_CATMIX.fits", prc_status=3)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_write(self):
file_path = "/Users/wsl/temp/csst/import/CSST_MSC_MS_SCI_20310423084104_20310423084334_10109400638867_12_L2_V01_CATMIX.fits"
result = level2.write(local_file = file_path,
instrument = "MSC",
level0_id="1010940063886712",
level1_id = "1010940063886712",
level2_id = "1010940063886712",
data_model='csst-msc-l2-mbi-cat',
file_name = "CSST_MSC_MS_SCI_20310423084104_20310423084334_10109400638867_12_L2_V01_CATMIX.fits",
brick_id = 254,
qc2_status=12,
dag = "csst-msc-l2-mbi",
build=1,
dataset="094",
prc_status=1)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_catalog_query(self):
result = level2.catalog_query(sql="select * from csst_msc_l2_mbi_cat")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment