Commit 1212f49a authored by Wei Shoulin's avatar Wei Shoulin
Browse files

plan

parent 46384fc9
......@@ -59,7 +59,7 @@ class Result(dict):
self["code"] = resp.status_code
if self.success:
content_dict = json.loads(resp.content.decode('utf-8'))
if 'total_count' in content_dict and 'records' in content_dict:
if isinstance(content_dict, dict) and 'total_count' in content_dict and 'records' in content_dict:
self["data"] = content_dict['records']
self["total_count"] = content_dict['total_count']
else:
......
from typing import Optional, IO, Tuple, Literal
from typing import Optional, Tuple, Literal
from .common import request, Result
import os
......@@ -99,6 +99,16 @@ def find_by_level1_id(level1_id: str) -> Result:
return request.get(f"/api/level1/{level1_id}")
def find_by_brick_id(brick_id: int) -> Result:
"""
通过 brick 的 ID 查询1级数据。
Args:
brick_id (int): 天区ID。
Returns:
Result: 查询结果。
"""
return request.get(f"/api/level1/brick/{brick_id}")
def sls_find_by_qc1_status(qc1_status: int, limit: int = 1) -> Result:
......
import os
from typing import Optional, IO, Tuple, Literal
from typing import Optional, Tuple, Literal
from .common import request, Result
DateTimeTuple = Tuple[str, str]
......
import os
from typing import Optional, IO, Tuple, Literal
from .common import request, Result
DateTimeTuple = Tuple[str, str]
def find(mode: Optional[str] = None,
obsid: Optional[str] = None,
backend: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
obstype: Optional[str] = None,
obstime: Optional[DateTimeTuple] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数在DFS中搜索编排数据。
Args:
mode (Optional[str], optional): 观测模式. Defaults to None.
obsid (Optional[str], optional): 观测ID,支持模糊搜索. Defaults to None.
backend (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
obstype (Optional[str], optional): 观测类型,如'SCIE'. Defaults to None.
obstime (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0,不限制.
Returns:
Result: 搜索结果对象.
"""
params = {
'mode': mode,
'obsid': obsid,
'backend': backend,
'obstype': obstype,
'obs_time_start': None,
'obs_time_end': None,
'create_time_start': None,
'create_time_end': None,
'page': page,
'limit': limit,
}
if obstime is not None:
params['obs_time_start'], params['obs_time_end'] = obstime
return request.post("/api/plan", params)
def get_by_id(_id: str) -> Result:
return request.get(f"/api/plan/_id/{_id}")
def find_by_opid(opid: str) -> Result:
"""
通过 plan 的 opid 查询编排数据。
Args:
opid (str): opid。
Returns:
Result: 查询结果。
"""
return request.get(f"/api/plan/{opid}")
def write_file(local_file: Optional[IO | str], **kwargs) -> Result:
"""
将本地json文件或json数据流写入DFS中。
Args:
local_file (str]): 文件路径。
**kwargs: 额外的关键字参数,这些参数将传递给DFS。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
"""
if isinstance(local_file, str):
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/plan/file", local_file, kwargs)
return request.post_bytesio("/api/plan/file", local_file, kwargs)
def new(data: dict) -> Result:
"""
新建编排数据
Args:
data (dict): 编排数据的字典表示,如:{'opid': 'xxx', 'backend': 'MSC', ...}
Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因。
"""
return request.post("/api/plan/new", data)
\ No newline at end of file
requests==2.31.0
bumpver==2023.1129
boto3==1.35.24
# for testing
pytest==7.4.3
coverage==7.6.1
......
......@@ -6,19 +6,19 @@ class CommonTestCase(unittest.TestCase):
def setUp(self):
pass
# def test_download_level0_file(self):
# bytes_data = download_file(file_path= "L0/MSC/SCI/59900/10609400034529/MS/CSST_MSC_MS_SCI_20221117100218_20221117100448_10609400034529_25_L0_V01.fits")
# # save the downloaded file to a local directory
# tmp_file_dir = os.environ.get("UNIT_TEST_DATA_ROOT", "/tmp")
# tmp_file_path = os.path.join(tmp_file_dir, "test_downloaded_file.fits")
# if os.path.exists(tmp_file_path):
# os.remove(tmp_file_path)
def test_download_level0_file(self):
bytes_data = download_file(file_path= "L0/MSC/SCI/60470/10109100157991/MS/CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L0_V01.fits")
# save the downloaded file to a local directory
tmp_file_dir = os.environ.get("UNIT_TEST_DATA_ROOT", "/tmp")
tmp_file_path = os.path.join(tmp_file_dir, "test_downloaded_file.fits")
if os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
# with open(tmp_file_path, "wb") as f:
# f.write(bytes_data)
with open(tmp_file_path, "wb") as f:
f.write(bytes_data)
# assert os.path.exists(tmp_file_path)
# os.remove(tmp_file_path)
assert os.path.exists(tmp_file_path)
os.remove(tmp_file_path)
def test_read_file(self):
bytes_io = read_file(file_path= "L0/MSC/SCI/60470/10109100157991/MS/CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L0_V01.fits")
......
......@@ -37,11 +37,11 @@ class Level0TestCase(unittest.TestCase):
# print(result)
# self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_write(self):
file_path = "/Users/wsl/temp/csst/import/CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L0_V01.fits"
result = level0.write(local_file = file_path)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
# def test_write(self):
# file_path = "/Users/wsl/temp/csst/import/CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L0_V01.fits"
# result = level0.write(local_file = file_path)
# print(result)
# self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
# def test_generate_prc_message(self):
# result = level0.generate_prc_msg(
......
import unittest
import time
from csst_dfs_client import plan
class Level0TestCase(unittest.TestCase):
def setUp(self):
pass
def test_find(self):
start_time = time.time()
result = plan.find(obsid = "101000003",
obstime = ("2021-08-30 00:00:00", "2024-12-30 23:59:59"))
if result.success:
print(f"time used: {time.time() - start_time} 's, count: {result['total_count']}")
else:
print(f"time used: {time.time() - start_time} 's, error message: " + result.message)
self.assertEqual(result.code, 200, "error code: " + str(result.code))
self.assertIsNotNone(result.data, "error message: " + result.message)
def test_find_by_opid(self):
result = plan.find_by_opid(opid = "101000000373")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
self.assertIsNotNone(result.data, "error message: " + result.message)
def test_write(self):
file_path = "/Users/wsl/temp/csst/import/plan_test.json"
result = plan.write_file(local_file = file_path)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_new(self):
data = {'opid':'1', 'backend':'IFS', 'obstype': 'SCIE', 'obsid': '1'}
result = plan.new(data = data)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment