Commit b21bc8d9 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

feat(client): add other data file operations support

parent d5a88729
Pipeline #11192 canceled with stages
in 0 seconds
import os
from typing import Literal, Optional, Tuple
from .common import Result, request, utils
DateTimeTuple = Tuple[str, str]
def find(
instrument: Literal["MSC", "IFS", "MCI", "HSTDM", "CPIC"],
dag_run: Optional[str] = None,
create_time: Optional[DateTimeTuple] = None,
page: int = 1,
limit: int = 0,
**extra_kwargs,
) -> Result:
"""
根据给定的参数搜索其他数据文件记录
Args:
instrument (str): 设备,必需为'MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'之一.
dag_run (Optional[str], optional): DAG运行标识. Defaults to None.
create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0,不限制.
Returns:
Result: 搜索结果对象.
"""
params = {
"instrument": instrument,
"dag_run": dag_run,
"page": page,
"limit": limit,
}
params.update(extra_kwargs)
if create_time is not None:
params["create_time_start"], params["create_time_end"] = create_time
utils.is_valid_datetime_format(
params["create_time_start"]
) or utils.is_valid_datetime_format(params["create_time_end"])
return request.post("/api/other", params)
def get_by_id(_id: str) -> Result:
"""
根据内部ID获取其他数据
Args:
_id (str):其他数据的内部ID
Returns:
Result: 查询结果
"""
return request.get(f"/api/other/_id/{_id}")
def write(
local_file: str, instrument: Literal["MSC", "IFS", "MCI", "HSTDM", "CPIC"], **kwargs
) -> Result:
"""
将本地文件写入DFS中
Args:
local_file (str]): 文件路径
**kwargs: 额外的关键字参数,这些参数将传递给DFS
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象
"""
params = {"instrument": instrument}
params.update(kwargs)
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/other/file", local_file, params)
def delete(instrument: str, dag_run: Optional[str] = None) -> Result:
"""
删除其他数据,用于数据测试
Args:
instrument (str): 设备,必需为'MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'之一.
dag_run (Optional[str], optional): DAG运行标识. Defaults to None.
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象
"""
return request.delete("/api/other/delete", {"instrument": instrument, "dag_run": dag_run})
import unittest
from csst_dfs_client import other
class OtherTestCase(unittest.TestCase):
def setUp(self):
pass
def test_find(self):
try:
result = other.find(
instrument='MSC',
dag_run="test_dag_run",
create_time=("2024-01-01 00:00:00", "2024-12-31 23:59:59"),
page=1,
limit=10
)
print(f"find result: {result}")
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
if result.success:
self.assertIsNotNone(result.data, "data is None")
except Exception as e:
print(f"find skip, reason: {e}")
def test_get_by_id(self):
try:
test_id = "1212"
result = other.get_by_id(test_id)
print(f"get_by_id result: {result}")
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
if result.success:
self.assertIsNotNone(result.data, "data is None")
except Exception as e:
print(f"get_by_id skip, reason: {e}")
def test_write(self):
try:
test_file_path = "/path/to/test/file.txt"
result = other.write(
local_file=test_file_path,
test_param="test_value"
)
print(f"write result: {result}")
except FileNotFoundError:
print("write skip, reason: file not found")
except Exception as e:
print(f"write skip, reason: {e}")
def test_delete(self):
try:
result = other.delete(
instrument='MSC',
dag_run="test_dag_run_for_deletion"
)
print(f"delete result: {result}")
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
except Exception as e:
print(f"delete skip, reason: {e}")
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment