Commit e0c4aa35 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

write IO

parent 291e3a43
from typing import Optional, Tuple, Literal
from .common import request, Result
from typing import Optional, Tuple, Literal, IO, Union
from .common import request, Result, utils
import os
DateTimeTuple = Tuple[str, str]
......@@ -25,7 +25,7 @@ def find(project_id: Optional[str] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数搜索1级数据。
根据给定的参数搜索1级数据文件记录
Args:
project_id (Optional[str], optional): 项目ID. Defaults to None.
......@@ -80,8 +80,10 @@ def find(project_id: Optional[str] = None,
if obs_time is not None:
params['obs_time_start'], params['obs_time_end'] = obs_time
utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
if create_time is not None:
params['create_time_start'], params['create_time_end'] = create_time
utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
return request.post("/api/level1", params)
......@@ -142,11 +144,12 @@ def update_prc_status(level1_id: str, file_type: str, prc_status: int) -> Result
"""
return request.put(f"/api/level1/prc_status/{level1_id}", {'file_type': file_type, 'prc_status': prc_status})
def write(local_file: str,
def write(local_file: Union[IO | str],
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level0_id: Optional[str | None],
level1_id: Optional[str | None],
file_type: str,
file_name: str,
pipeline_id: str,
pmapname: str,
build: int,
......@@ -155,25 +158,31 @@ def write(local_file: str,
将本地的1级数据文件写入到DFS中。其他参数如rss_id, cube_id等,可通过extra_kwargs传入。
Args:
local_file (str]): 文件路径
local_file (Union[IO | str]): 文件路径或文件对象
module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID。
level0_id (Optional[str | None]): 0级数据的ID。默认为 None。
level1_id (Optional[str | None]): 1级数据的ID。默认为 None。
file_type (str): 文件类型。
file_name (str): 1级数据文件名。
pipeline_id (str): 管线ID。
pmapname (str): CCDS pmap名称。
build (int): 构建号。
**kwargs: 额外的关键字参数,这些参数将传递给DFS。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为1级数据对象
'''
params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'file_type': file_type, 'pipeline_id': pipeline_id, 'pmapname': pmapname, 'build': build}
if utils.is_valid_filename(file_name):
raise ValueError(f"Incorrect file name [{file_name}], should be *.*")
params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'file_type': file_type, 'file_name': file_name, 'pipeline_id': pipeline_id, 'pmapname': pmapname, 'build': build}
params.update(extra_kwargs)
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level1/file", local_file, params)
if isinstance(local_file, str):
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level1/file", local_file, params)
return request.post_bytesio("/api/level1/file", local_file, params)
def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level1_id: str,
......@@ -195,7 +204,7 @@ def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
def process_list(level1_id: str) -> Result:
"""
通过 level1 的 ID 查询1级数据处理过程
通过 level1 的 ID 查询1级数据处理记录
Args:
level1_id (str): 1级数据的ID。
......@@ -214,7 +223,7 @@ def add_process(level1_id: str,
prc_module: str = "",
message: str = "") -> Result:
"""
添加1级数据处理过程
添加1级数据处理记录
Args:
level1_id (str): 1级数据的ID。
......@@ -238,4 +247,5 @@ def add_process(level1_id: str,
'prc_module': prc_module,
'message': message,
}
utils.is_valid_datetime_format(prc_time)
return request.post("/api/level1/prc", params)
\ No newline at end of file
import os
import pickle
from typing import Optional, Tuple, Literal
from .common import request, Result
from typing import Optional, Tuple, Literal, Union, IO
from .common import request, Result, utils
DateTimeTuple = Tuple[str, str]
......@@ -21,14 +21,14 @@ def find(
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数搜索2级数据
根据给定的参数搜索2级数据文件记录
Args:
project_id (Optional[str], optional): 项目ID. Defaults to None.
obs_id (Optional[str], optional): 观测ID. Defaults to None.
module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
detector_no (Optional[str], optional): 探测器编号. Defaults to None.
data_type (Optional[str], optional): 文件类型,如'SCI'. Defaults to None.
data_type (Optional[str], optional): 文件类型,如'csst-msc-l2-mbi-cat'. Defaults to None.
filter (Optional[str], optional): 滤光片. Defaults to None.
obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
......@@ -65,20 +65,22 @@ def find(
if obs_time is not None:
params['obs_time_start'], params['obs_time_end'] = obs_time
utils.is_valid_datetime_format(params['obs_time_start']) or not utils.is_valid_datetime_format(params['obs_time_end'])
if create_time is not None:
params['create_time_start'], params['create_time_end'] = create_time
utils.is_valid_datetime_format(params['create_time_start']) or utils.is_valid_datetime_format(params['create_time_end'])
return request.post("/api/level2", params)
def find_by_level2_id(level2_id: str) -> Result:
"""
通过 level2 的 ID 查询2级数据
通过 level2 的 ID 查询2级数据
Args:
level2_id (str): 2级数据的ID
level2_id (str): 2级数据的ID
Returns:
Result: 查询结果
Result: 查询结果
"""
return request.get(f"/api/level2/{level2_id}")
......@@ -89,7 +91,7 @@ def update_qc2_status(level2_id: str, data_type: str, qc2_status: int) -> Result
Args:
level2_id (str): 2级数据文件的ID
data_type (str): 文件类型
data_type (str): 数据类型,如'csst-msc-l2-mbi-cat'
qc2_status (int): QC0状态
Returns:
......@@ -99,15 +101,15 @@ def update_qc2_status(level2_id: str, data_type: str, qc2_status: int) -> Result
def update_prc_status(level2_id: str, data_type: str, prc_status: int) -> Result:
"""
更新2级数据的处理状态
更新2级数据的处理状态
Args:
level2_id (str): 2级数据文件的ID
data_type (str): 文件类型。
prc_status (int): 处理状态
level2_id (str): 2级数据文件的ID
data_type (str): 数据类型,如'csst-msc-l2-mbi-cat'
prc_status (int): 处理状态
Returns:
Result: 操作结果
Result: 操作结果
"""
return request.put(f"/api/level2/prc_status/{level2_id}", {'data_type': data_type, 'prc_status': prc_status})
......@@ -126,55 +128,73 @@ def update_qc2_status_by_file_name(file_name: str, qc2_status: int) -> Result:
def update_prc_status_by_file_name(file_name: str, prc_status: int) -> Result:
"""
更新2级数据的处理状态
更新2级数据的处理状态
Args:
file_name (str): 2级数据文件名
prc_status (int): 处理状态
file_name (str): 2级数据文件名
prc_status (int): 处理状态
Returns:
Result: 操作结果
Result: 操作结果
"""
return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status})
def write(local_file: str,
def write(local_file: Union[IO | str],
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level0_id: Optional[str | None],
level1_id: Optional[str | None],
level2_id: Optional[str | None],
brick_id: Optional[int | None],
data_type: str,
file_name: str,
pipeline_id: str,
build: int,
version: str,
**extra_kwargs) -> Result:
"""
将本地的2级数据文件写入到DFS中
将本地的2级数据文件写入到DFS中
Args:
local_file (str): 文件路径。
module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID。
level0_id (Optional[str | None]): 0级数据的ID。默认为 None。
level1_id (Optional[str | None]): 1级数据的ID。默认为 None。
level2_id (Optional[str | None]): 2级数据的ID。默认为 None。
brick_id (Optional[int | None]): 天区的ID。默认为 None。
data_type (str): 数据文件类型。
pipeline_id (str): 管线ID。
pmapname (str): CCDS pmap名称。
build (int): 构建号。
**kwargs: 额外的关键字参数,这些参数将传递给DFS。
local_file (Union[IO | str]): 文件路径 或 文件对象
module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID
level0_id (Optional[str | None]): 0级数据的ID默认为 None
level1_id (Optional[str | None]): 1级数据的ID默认为 None
level2_id (Optional[str | None]): 2级数据的ID默认为 None
brick_id (Optional[int | None]): 天区的ID默认为 None
data_type (str): 数据类型,如'csst-msc-l2-mbi-cat'
file_name (str): 2级数据文件名
pipeline_id (str): 管线ID
pmapname (str): CCDS pmap名称
build (int): 构建号
**kwargs: 额外的关键字参数,这些参数将传递给DFS
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回data为2级数据对象
"""
params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'level2_id': level2_id, 'brick_id': brick_id, 'data_type': data_type, 'pipeline_id': pipeline_id, 'build': build, 'version': version}
if utils.is_valid_filename(file_name):
raise ValueError(f"Incorrect file name [{file_name}], should be *.*")
params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'level2_id': level2_id, 'brick_id': brick_id, 'file_name': file_name, 'data_type': data_type, 'pipeline_id': pipeline_id, 'build': build, 'version': version}
params.update(extra_kwargs)
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level2/file", local_file, params)
if isinstance(local_file, str):
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level2/file", local_file, params)
return request.post_bytesio("/api/level2/file", local_file, params)
def catalog_query(sql: str, limit: int = 0) -> Result:
"""
根据给定的SQL查询语句和限制条件,从数据库中查询2级科学数据并返回查询结果。
Args:
sql (str): 要执行的SQL查询语句。
limit (int, optional): 查询结果的最大数量。默认为0,表示不限制数量。
Returns:
Result: 包含查询结果的Result对象,data为pd.DataFrame对象。
"""
datas = request.post("/api/level2/catalog/query", {'sql': sql, 'limit': limit})
if datas and isinstance(datas, Result):
return datas
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment