Commit 46384fc9 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

Initial commit

parents
[run]
omit =
tests/*
setup.py
test_codestyle.py
csst_dfs_client/version.py
.vscode
.idea
*.DS_Store*
*.pyc
*~
**/dist
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit examples / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# Pyre type checker
.pyre/
# joblib temp
joblib*
\ No newline at end of file
stages:
- build
- code_check
- test
- deploy
build-job:
stage: build
image: python:3.11
variables:
GIT_CLONE_PATH: $CI_BUILDS_DIR/work
before_script:
- ls
after_script:
- ls
script:
- whoami
- echo $SHELL
- pwd
- make wheel
artifacts:
paths:
- dist/*.whl
only:
- main
tags:
- asos2-unit-test
test-job:
stage: test
image: python:3.11
variables:
GIT_CLONE_PATH: $CI_BUILDS_DIR/work
script:
- pip install -r requirements-test.txt -i https://mirrors.aliyun.com/pypi/simple
- pip install .
- make test
coverage: '/TOTAL.*\s+(\d+\%)/'
only:
- main
tags:
- asos2-unit-test
deploy-job:
stage: deploy
image: python:3.11
variables:
GIT_CLONE_PATH: $CI_BUILDS_DIR/work
script:
- pip install -r requirements-deploy.txt --trusted-host mirror.int.cnlab.net -i http://mirror.int.cnlab.net/repository/py/simple
- make wheel
- make publish
artifacts:
paths:
- dist/*.whl
only:
- main
tags:
- asos2-unit-test
\ No newline at end of file
This diff is collapsed.
all: clean install clean
clean:
rm -rf build dist csst_dfs_client.egg-info
rm -rf .coverage .pytest_cache
install: clean
pip install .
rm -rf build dist csst_dfs_client.egg-info
uninstall:
pip uninstall csst_dfs_client -y
test:
UNIT_TEST_DATA_ROOT=/opt/temp/csst CSST_DFS_GATEWAY=http://172.31.249.145:8000 CSST_DFS_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0 coverage run -m pytest -vs --import-mode=importlib --cov-report=html --cov-report=term-missing --cov-config=.coveragerc
coverage report -m
rm -rf .coverage .pytest_cache
build:
rm -rf dist/*.whl
python setup.py bdist_wheel --plat-name=manylinux1_x86_64
publish:
twine upload --repository-url http://mirror.int.cnlab.net/repository/pypi/ -u admin -p gsdjsj.2024 dist/csst_dfs_client-*.whl
version:
bumpver update --patch
# ASOS2 数据服务
[![pipeline status](https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-client/badges/main/pipeline.svg)](https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-client/commits/main)
[![coverage](https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-client/badges/main/coverage.svg?job=test-job)](https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-client/-/jobs)
[![Static Badge](https://img.shields.io/badge/Python-3.11-green)](https://www.python.org/)
[![License](https://img.shields.io/badge/license-MIT-blue)](LICENSE)
[![Latest Release](https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-client/-/badges/release.svg)](https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-client/-/releases)
## 依赖
- bumpver==2023.1129
## 安装
`csst_dfs_client` 可用如下方式安装
```shell
git clone https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-client.git
cd csst-dfs-client
pip install .
```
或者单行命令:
```shell
pip install --force-reinstall git+https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-client.git
```
## 配置环境变量
```shell
export CSST_DFS_GATEWAY=http://GATEWAY_IP:PORT/api
export CSST_DFS_TOKEN=XXXX
```
from .version import __version__
from .common import request, fs
from io import BytesIO
from .exceptions.exception import AppError
__all__ = [
"__version__",
"download_file",
"read_file",
]
def download_file(file_path: str) -> bytes:
"""
下载文件。
Args:
file_path (str): 相对文件路径。
Returns:
bytes: 文件内容。
Raises:
Exception: 如果下载失败,则抛出异常AppError。
"""
response = request.download_file(f"/common/download/file?file_path={file_path}")
if not response.ok:
raise AppError("Failed to download the file. Reason: " + response.reason)
return response.content
def read_file(file_path: str) -> BytesIO:
"""
读取文件。
Args:
file_path (str): 相对文件路径。
Returns:
BytesIO: 文件内容。
Raises:
Exception: 如果读取失败,则抛出异常AppError。
"""
with fs.get_file_storage().read_file(file_path) as iobytes_io:
try:
iobytes_io.seek(0)
except OSError:
raise AppError("Failed to read file.")
return iobytes_io
\ No newline at end of file
from .common import request, Result
import pickle
def search(ra: float,
dec: float,
radius: float,
catalog_name: str,
columns: tuple = ('*'),
min_mag: float = -1,
max_mag: float = -1,
obstime: int = 0,
limit: int = 0) -> Result:
"""
根据输入的赤经、赤纬、半径、星表名称、列名、最小星等、最大星等、观测时间和限制数量,
查询指定范围内的天体数据,并返回查询结果。
Args:
ra (float): 天体赤经(以度为单位)。
dec (float): 天体赤纬(以度为单位)。
radius (float): 查询半径(以度为单位)。
catalog_name (str): 星表名称。
columns (tuple, optional): 查询的列名,默认为 ('*')。表示查询所有列。
min_mag (float, optional): 最小星等,默认为 -1。表示无限制。
max_mag (float, optional): 最大星等,默认为 -1。表示无限制。
obstime (int, optional): 观测时间,默认为 0,暂无用。
limit (int, optional): 查询结果的数量限制,默认为 0。表示无限制。
Returns:
Result: 查询结果。
Raises:
Exception: 如果请求失败,将抛出异常。
"""
if not columns:
columns = ('*')
try:
datas = request.post('/api/catalog', {
'ra': ra,
'dec': dec,
'radius': radius,
'catalog_name': catalog_name,
'columns': columns,
'min_mag': min_mag,
'max_mag': max_mag,
'obstime': obstime,
'limit': limit
})
records = pickle.loads(datas._content)
df, total_count = records['records'], records['totalCount']
return Result.ok_data(data = df).append("totalCount", total_count)
except Exception as e:
return Result.error(str(e))
import requests
import json
from enum import Enum
class Result(dict):
'''
类Result用于封装返回结果,包含success、message、data三个属性:
success (bool): 是否成功
message (str): 消息
data (any): 数据
'''
__setattr__ = dict.__setitem__
__getattr__ = dict.__getitem__
def __init__(self):
super(Result, self).__init__()
self["code"] = 200
self["message"] = ""
self["data"] = None
@property
def success(self) -> bool:
return self["code"] == 200
@property
def data(self) -> any:
return self["data"]
@property
def message(self) -> str:
return str(self["message"])
@staticmethod
def error(code: int = 500, message: str = "") -> "Result":
r = Result()
r["code"] = code
r["message"] = message
return r
@staticmethod
def ok_data(data = None) -> "Result":
r = Result()
r["code"] = 200
r["data"] = data
return r
@staticmethod
def ok_msg(message: str) -> "Result":
r = Result()
r["message"] = message
return r
def append(self, k: str, v) -> "Result":
self[k] = v
return self
@classmethod
def from_response(cls, resp: requests.Response) -> "Result":
self = cls()
self["code"] = resp.status_code
if self.success:
content_dict = json.loads(resp.content.decode('utf-8'))
if 'total_count' in content_dict and 'records' in content_dict:
self["data"] = content_dict['records']
self["total_count"] = content_dict['total_count']
else:
self["data"] = content_dict
self['data'] = DictObject.from_json(self['data'])
self['message'] = resp.url
else:
try:
content_dict = json.loads(resp.content.decode('utf-8'))
self["message"] = content_dict.get("detail", "")
except Exception as _:
pass
return self
class DictObject(dict):
def __init__(self, **kwargs):
super(DictObject, self).__init__(**kwargs)
def __getattr__(self, name):
if name in self.keys():
return self[name]
return None
@staticmethod
def from_json(data):
if not data:
return data
if isinstance(data, list):
return [DictObject.from_json(i) for i in data]
elif isinstance(data, dict):
return DictObject(**data)
else:
return data
class ModuleID(Enum):
MSC = "MSC"
IFS = "IFS"
MCI = "MCI"
HSTDM = "HSTDM"
CPIC = "CPIC"
\ No newline at end of file
from . import request, Result
def get_storage_config() -> Result:
return request.get("/api/common/config/storage")
\ No newline at end of file
from contextlib import contextmanager
from abc import ABCMeta, abstractmethod
from io import BytesIO
import os
import logging
import threading
import boto3
from typing import Generator, Any
from urllib.parse import urlparse
from .config import get_storage_config
class FileStorage(metaclass=ABCMeta):
@abstractmethod
def get_full_file_path(self, file_path: str) -> str:
return NotImplemented
@abstractmethod
def read_file(self, file_path: str) -> Generator[Any, Any, Any]:
return NotImplemented
@abstractmethod
def close(self) -> None:
return NotImplemented
class PosixFileStorage(FileStorage):
def __init__(self, root: str):
self.root = root
if not os.path.isdir(root):
raise ValueError("Root is not a directory")
def get_full_file_path(self, file_path: str) -> str:
if file_path.startswith('/'):
return file_path
return os.path.join(self.root, file_path)
@contextmanager
def read_file(self, file_path: str) -> Generator[BytesIO, Any, Any]:
full_file_path = self.get_full_file_path(file_path)
file_obj = None
try:
file_obj = open(full_file_path, "rb")
file_content = file_obj.read()
yield BytesIO(file_content)
finally:
if file_obj is not None:
file_obj.close()
def close(self) -> None:
pass
class S3FileStorage(FileStorage):
_instance = None
_is_initialized = False
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, s3_endpoint: str, s3_access_key: str, s3_secret_key: str, s3_storage: str):
if not self._is_initialized:
with self._lock:
if not self._is_initialized:
self.s3_storage = s3_storage
parsed_url = urlparse(s3_storage)
bucket_name = parsed_url.netloc
self.storage_prefix = parsed_url.path.lstrip('/')
self.bucket_name = bucket_name
self.s3_endpoint = s3_endpoint
self.s3 = boto3.client('s3', endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key)
self._is_initialized = True
def get_full_file_path(self, file_path: str) -> str:
return f"{self.s3_storage}/{file_path}"
def add_prefix(self, file_path: str) -> str:
if self.storage_prefix == '':
return file_path
return f"{self.storage_prefix}/{file_path}"
@contextmanager
def read_file(self, file_path: str) -> Generator[BytesIO, None, None]:
if file_path.startswith('/'):
with PosixFileStorage("/").read_file(file_path) as content:
yield content
else:
file_obj = self.s3.get_object(Bucket=self.bucket_name, Key=self.add_prefix(file_path))
try:
file_content = file_obj['Body'].read()
yield BytesIO(file_content)
finally:
pass
def close(self) -> None:
self.s3.close()
def get_file_storage() -> FileStorage:
storage_settings = get_storage_config().data
if storage_settings.FILE_STORAGE_TYPE == "s3":
return S3FileStorage(storage_settings.S3_ENDPOINT, storage_settings.S3_ACCESS_KEY, storage_settings.S3_SECRET_KEY, storage_settings.S3_STORAGE)
elif storage_settings.FILE_STORAGE_TYPE == "posix":
if not os.environ.get("CSST_DFS_ROOT", None):
logging.warning(f"CSST_DFS_ROOT environment variable is not set, using default value from DFS config [{storage_settings.CSST_DFS_ROOT}].")
return PosixFileStorage(os.environ.get("CSST_DFS_ROOT", storage_settings.CSST_DFS_ROOT))
else:
raise ValueError(f"Invalid storage type: {storage_settings.FILE_STORAGE_TYPE}")
import requests
import os
from typing import IO, Optional
from . import Result
def auth_header() -> dict:
token = os.getenv("CSST_DFS_TOKEN", None)
if not token:
raise ValueError("env CSST_DFS_TOKEN is not set.")
return {
"Authorization": f"Bearer {token}"
}
def get_request_url(endpoint: str) -> str:
base_url = os.getenv("CSST_DFS_GATEWAY", None)
if not base_url:
raise ValueError("env CSST_DFS_GATEWAY is not set.")
return f"{base_url}{endpoint}"
def request_error_handler_decorator(func):
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
if result.headers['content-type'] == "application/octet-stream":
return result
return Result.from_response(result)
except requests.exceptions.ConnectionError as e:
return Result.error(code=502, message= f"Bad Gateway: {e}")
except Exception as e:
return Result.error(message = str(e))
return wrapper
@request_error_handler_decorator
def get(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.get(
url = get_request_url(endpoint),
headers = auth_header(),
timeout = timeout
)
@request_error_handler_decorator
def post(endpoint: str, data: dict, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.post(
url = get_request_url(endpoint),
headers = auth_header(),
json = data,
timeout = timeout
)
@request_error_handler_decorator
def put(endpoint: str, data: Optional[dict] = None, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.put(
url = get_request_url(endpoint),
headers = auth_header(),
json = data,
timeout = timeout
)
@request_error_handler_decorator
def delete(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.delete(
url = get_request_url(endpoint),
headers = auth_header(),
timeout = timeout
)
def post_file(endpoint: str, file_path: str, data: dict) -> requests.Response:
with open(file_path, "rb") as bytesio:
return post_bytesio(endpoint, bytesio, data)
@request_error_handler_decorator
def post_bytesio(endpoint: str, file_bytes: IO, data: dict, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.post(
url = get_request_url(endpoint),
headers = auth_header(),
files = {
"file": file_bytes
},
data = data, timeout = timeout
)
def download_file(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
"""
下载指定endpoint对应的文件。
Args:
endpoint (str): 文件对应的endpoint。
Returns:
requests.Response: 发送HTTP GET请求后返回的响应对象。
"""
return requests.get(
url = get_request_url(endpoint),
headers = auth_header(), timeout = timeout
)
\ No newline at end of file
"""
框架异常类
"""
class AuthenticationError(Exception):
"""
未认证
"""
def __init__(self, detail: str = "Unauthorized"):
self.detail = detail
class AuthorizationError(Exception):
"""
未授权
"""
def __init__(self, detail: str = "Forbidden"):
self.detail = detail
class AppError(Exception):
"""
应用错误
"""
def __init__(self, detail: str = "error"):
self.detail = detail
\ No newline at end of file
import os
import json
from typing import Optional, IO, Tuple, Literal
from .common import request, Result
DateTimeTuple = Tuple[str, str]
def find(project_id: Optional[str] = None,
obs_id: Optional[str] = None,
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
detector_no: Optional[str] = None,
file_type: Optional[str] = None,
filter: Optional[str] = None,
obs_time: Optional[DateTimeTuple] = None,
create_time: Optional[DateTimeTuple] = None,
qc0_status: Optional[int] = None,
prc_status: Optional[int] = None,
file_name: Optional[str] = None,
ra_obj: Optional[int] = None,
dec_obj: Optional[int] = None,
radius: Optional[float] = None,
object_name: Optional[str] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数在搜索0级数据。
Args:
project_id (Optional[str], optional): 项目ID. Defaults to None.
obs_id (Optional[str], optional): 观测ID. Defaults to None.
module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
detector_no (Optional[str], optional): 探测器编号. Defaults to None.
file_type (Optional[str], optional): 文件类型,如'SCI'. Defaults to None.
filter (Optional[str], optional): 滤光片. Defaults to None.
obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
qc0_status (Optional[int], optional): QC0状态. Defaults to None.
prc_status (Optional[int], optional): 处理状态. Defaults to None.
file_name (Optional[str], optional): 文件名. Defaults to None.
ra_obj (Optional[int], optional): 目标赤经. Defaults to None.
dec_obj (Optional[int], optional): 目标赤纬. Defaults to None.
radius (Optional[float], optional): 搜索半径. Defaults to None.
object_name (Optional[str], optional): 目标名称. Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0,不限制.
Returns:
Result: 搜索结果对象.
"""
params = {
'project_id': project_id,
'obs_id': obs_id,
'module_id': module_id,
'detector_no': detector_no,
'file_type': file_type,
'filter': filter,
'qc0_status': qc0_status,
'prc_status': prc_status,
'file_name': file_name,
'ra_obj': ra_obj,
'dec_obj': dec_obj,
'radius': radius,
'object_name': object_name,
'obs_time_start': None,
'obs_time_end': None,
'create_time_start': None,
'create_time_end': None,
'page': page,
'limit': limit,
}
if obs_time is not None:
params['obs_time_start'], params['obs_time_end'] = obs_time
if create_time is not None:
params['create_time_start'], params['create_time_end'] = create_time
return request.post("/api/level0", params)
def get_by_id(_id: str) -> Result:
return request.get(f"/api/level0/_id/{_id}")
def find_by_level0_id(level0_id: str) -> Result:
"""
通过 level0 的 ID 查询0级数据。
Args:
level0_id (str): 0级数据的ID。
Returns:
Result: 查询结果。
"""
return request.get(f"/api/level0/{level0_id}")
def update_qc0_status(level0_id: str, file_type: str, qc0_status: int) -> Result:
"""
更新0级数据的QC0状态
Args:
level0_id (str): 0级数据的ID
file_type (str): 文件类型
qc0_status (int): QC0状态
Returns:
Result: 更新结果
"""
return request.put(f"/api/level0/qc0_status/{level0_id}", {'file_type': file_type, 'qc0_status': qc0_status})
def update_prc_status(level0_id: str, file_type: str, prc_status: int) -> Result:
"""
更新0级数据的处理状态。
Args:
level0_id (str): 0级数据的ID。
file_type (str): 文件类型。
prc_status (int): 处理状态。
Returns:
Result: 操作结果。
"""
return request.put(f"/api/level0/prc_status/{level0_id}", {'file_type': file_type, 'prc_status': prc_status})
def write(local_file: str, **kwargs) -> Result:
"""
将本地文件写入DFS中。
Args:
local_file (str]): 文件路径。
**kwargs: 额外的关键字参数,这些参数将传递给DFS。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
"""
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level0/file", local_file, kwargs)
def write_cat(local_file: str, **kwargs) -> Result:
"""
主巡天仿真数据的星表本地文件写入DFS中。
Args:
local_file (str]): 文件路径。
**kwargs: 额外的关键字参数,这些参数将传递给DFS。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
"""
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level0/cat/file", local_file, kwargs)
def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
obs_id: str,
detector_no: str,
pipeline_id: Optional[str] = '') -> Result:
"""
生成流水线的处理消息。
Args:
module_id (str): 模块ID。
obs_id (str): 观测ID。
detector_no (str): 探测器编号。
pipeline_id (str): 流水管线ID,默认为空字符串。
Returns:
Result: 处理消息生成的结果,是否成功以及相关的错误信息。
"""
return request.put(f"/api/level0/prc/{module_id}/{obs_id}/{detector_no}", {'pipeline_id': pipeline_id})
def process_list(level0_id: str) -> Result:
"""
通过 level0 的 ID 查询0级数据处理过程。
Args:
level0_id (str): 0级数据的ID。
Returns:
Result: 成功后,Result.data为数据列表,失败message为失败原因。
"""
return request.get(f"/api/level0/prc/{level0_id}")
def add_process(level0_id: str,
pipeline_id: str,
run_id: str,
prc_time: str,
prc_status: int = -1024,
prc_module: str = "",
message: str = "") -> Result:
"""
添加0级数据处理过程。
Args:
level0_id (str): 0级数据的ID。
pipeline_id (str): 管线ID。
run_id (str): 运行ID。
prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"。
prc_status (int): 处理状态。
prc_module (str): 处理模块。
message (str): 处理消息。
Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因。
"""
params = {
'level0_id': level0_id,
'pipeline_id': pipeline_id,
'run_id': run_id,
'prc_time': prc_time,
'prc_status': prc_status,
'prc_module': prc_module,
'message': message,
}
return request.post("/api/level0/prc", params)
def new(data: dict) -> Result:
"""
新建0级数据,用于仿真数据测试。
Args:
data (dict): 0级数据的字典表示。
Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因。
"""
return request.post("/api/level0/new", data)
\ No newline at end of file
from typing import Optional, IO, Tuple, Literal
from .common import request, Result
import os
DateTimeTuple = Tuple[str, str]
def find(project_id: Optional[str] = None,
obs_id: Optional[str] = None,
level0_id: Optional[str] = None,
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
detector_no: Optional[str] = None,
file_type: Optional[str] = None,
filter: Optional[str] = None,
obs_time: Optional[DateTimeTuple] = None,
create_time: Optional[DateTimeTuple] = None,
qc1_status: Optional[int] = None,
prc_status: Optional[int] = None,
file_name: Optional[str] = None,
ra_obj: Optional[int] = None,
dec_obj: Optional[int] = None,
radius: Optional[float] = None,
object_name: Optional[str] = None,
rss_id: Optional[str] = None,
cube_id: Optional[str] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数在搜索1级数据。
Args:
project_id (Optional[str], optional): 项目ID. Defaults to None.
obs_id (Optional[str], optional): 观测ID. Defaults to None.
module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
detector_no (Optional[str], optional): 探测器编号. Defaults to None.
file_type (Optional[str], optional): 文件类型,如'SCI'. Defaults to None.
filter (Optional[str], optional): 滤光片. Defaults to None.
obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
qc1_status (Optional[int], optional): QC1状态. Defaults to None.
prc_status (Optional[int], optional): 处理状态. Defaults to None.
file_name (Optional[str], optional): 文件名. Defaults to None.
ra_obj (Optional[int], optional): 目标赤经. Defaults to None.
dec_obj (Optional[int], optional): 目标赤纬. Defaults to None.
radius (Optional[float], optional): 搜索半径. Defaults to None.
object_name (Optional[str], optional): 天体名称. Defaults to None.
rss_id (Optional[str], optional): RSS ID (IFS) Defaults to None.
cube_id (Optional[str], optional): Cube ID (IFS). Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0.
Returns:
Result: 搜索结果对象.
"""
params = {
'project_id': project_id,
'level0_id': level0_id,
'obs_id': obs_id,
'module_id': module_id,
'detector_no': detector_no,
'file_type': file_type,
'filter': filter,
'qc1_status': qc1_status,
'prc_status': prc_status,
'file_name': file_name,
'ra_obj': ra_obj,
'dec_obj': dec_obj,
'radius': radius,
'object_name': object_name,
'obs_time_start': None,
'obs_time_end': None,
'create_time_start': None,
'create_time_end': None,
'rss_id': rss_id,
'cube_id': cube_id,
'page': page,
'limit': limit,
}
if obs_time is not None:
params['obs_time_start'], params['obs_time_end'] = obs_time
if create_time is not None:
params['create_time_start'], params['create_time_end'] = create_time
return request.post("/api/level1", params)
def find_by_level1_id(level1_id: str) -> Result:
"""
通过 level1 的 ID 查询1级数据。
Args:
level1_id (str): 1级数据的ID。
Returns:
Result: 查询结果。
"""
return request.get(f"/api/level1/{level1_id}")
def find_by_brick_id(brick_id: int) -> Result:
return request.get(f"/api/level1/brick/{brick_id}")
def sls_find_by_qc1_status(qc1_status: int, limit: int = 1) -> Result:
return request.post(f"/api/level1/sls/qc1_status/{qc1_status}", {'limit': limit})
def update_qc1_status(level1_id: str, file_type: str, qc1_status: int) -> Result:
"""
更新1级数据的QC0状态
Args:
level1_id (str): 1级数据的ID
file_type (str): 文件类型
qc1_status (int): QC0状态
Returns:
Result: 更新结果
"""
return request.put(f"/api/level1/qc1_status/{level1_id}", {'file_type': file_type, 'qc1_status': qc1_status})
def update_prc_status(level1_id: str, file_type: str, prc_status: int) -> Result:
"""
更新1级数据的处理状态。
Args:
level1_id (str): 1级数据的ID。
file_type (str): 文件类型。
prc_status (int): 处理状态。
Returns:
Result: 操作结果。
"""
return request.put(f"/api/level1/prc_status/{level1_id}", {'file_type': file_type, 'prc_status': prc_status})
def write(local_file: str,
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level0_id: Optional[str | None],
level1_id: Optional[str | None],
file_type: str,
pipeline_id: str,
pmapname: str,
build: int,
**extra_kwargs) -> Result:
'''
将本地的1级数据文件写入到DFS中。其他参数如rss_id, cube_id等,可通过extra_kwargs传入。
Args:
local_file (str]): 文件路径。
module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID。
level0_id (Optional[str | None]): 0级数据的ID。默认为 None。
level1_id (Optional[str | None]): 1级数据的ID。默认为 None。
file_type (str): 文件类型。
pipeline_id (str): 管线ID。
pmapname (str): CCDS pmap名称。
build (int): 构建号。
**kwargs: 额外的关键字参数,这些参数将传递给DFS。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
'''
params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'file_type': file_type, 'pipeline_id': pipeline_id, 'pmapname': pmapname, 'build': build}
params.update(extra_kwargs)
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level1/file", local_file, params)
def generate_prc_msg(module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level1_id: str,
pipeline_id: Optional[str] = '') -> Result:
"""
生成流水线的处理消息。
Args:
module_id (str): 模块ID。
level1_id (str): 1级数据的ID。
pipeline_id (str): 流水管线ID,默认为空字符串。
Returns:
Result: 处理消息生成的结果,是否成功以及相关的错误信息。
"""
return request.put(f"/api/level1/prc/{module_id}/{level1_id}", {'pipeline_id': pipeline_id})
def process_list(level1_id: str) -> Result:
"""
通过 level1 的 ID 查询1级数据处理过程。
Args:
level1_id (str): 1级数据的ID。
Returns:
Result: 查询结果。
"""
return request.get(f"/api/level1/prc/{level1_id}")
def add_process(level1_id: str,
pipeline_id: str,
run_id: str,
prc_time: str,
prc_status: int = -1024,
prc_module: str = "",
message: str = "") -> Result:
"""
添加1级数据处理过程。
Args:
level1_id (str): 1级数据的ID。
pipeline_id (str): 管线ID。
run_id (str): 运行ID。
prc_time (str): 处理时间,格式为"YYYY-MM-DD HH:MM:SS"。
prc_status (int): 处理状态。
prc_module (str): 处理模块。
message (str): 处理消息。
Returns:
Result: 成功后,Result.data为写入记录,失败message为失败原因。
"""
params = {
'level1_id': level1_id,
'pipeline_id': pipeline_id,
'run_id': run_id,
'prc_time': prc_time,
'prc_status': prc_status,
'prc_module': prc_module,
'message': message,
}
return request.post("/api/level1/prc", params)
\ No newline at end of file
import os
from typing import Optional, IO, Tuple, Literal
from .common import request, Result
DateTimeTuple = Tuple[str, str]
def find(
project_id: Optional[str] = None,
obs_id: Optional[str] = None,
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'] = 'MSC',
detector_no: Optional[str] = None,
data_type: Optional[str] = None,
filter: Optional[str] = None,
obs_time: Optional[DateTimeTuple] = None,
create_time: Optional[DateTimeTuple] = None,
qc2_status: Optional[int] = None,
prc_status: Optional[int] = None,
file_name: Optional[str] = None,
object_name: Optional[str] = None,
page: int = 1,
limit: int = 0) -> Result:
"""
根据给定的参数在搜索2级数据。
Args:
project_id (Optional[str], optional): 项目ID. Defaults to None.
obs_id (Optional[str], optional): 观测ID. Defaults to None.
module_id (Optional[str], optional): 模块ID,如'MSC', 'IFS'. Defaults to None.
detector_no (Optional[str], optional): 探测器编号. Defaults to None.
data_type (Optional[str], optional): 文件类型,如'SCI'. Defaults to None.
filter (Optional[str], optional): 滤光片. Defaults to None.
obs_time (Optional[DateTimeTuple], optional): 观测时间范围. Defaults to None.
create_time (Optional[DateTimeTuple], optional): 创建时间范围. Defaults to None.
qc2_status (Optional[int], optional): QC0状态. Defaults to None.
prc_status (Optional[int], optional): 处理状态. Defaults to None.
file_name (Optional[str], optional): 文件名. Defaults to None.
object_name (Optional[str], optional): 天体名称. Defaults to None.
page (int, optional): 页码. Defaults to 1.
limit (int, optional): 每页数量. Defaults to 0.
Returns:
Result: 搜索结果对象.
"""
params = {
'project_id': project_id,
'obs_id': obs_id,
'module_id': module_id,
'detector_no': detector_no,
'data_type': data_type,
'filter': filter,
'qc2_status': qc2_status,
'prc_status': prc_status,
'file_name': file_name,
'object_name': object_name,
'obs_time_start': None,
'obs_time_end': None,
'create_time_start': None,
'create_time_end': None,
'page': page,
'limit': limit,
}
if obs_time is not None:
params['obs_time_start'], params['obs_time_end'] = obs_time
if create_time is not None:
params['create_time_start'], params['create_time_end'] = create_time
return request.post("/api/level2", params)
def find_by_level2_id(level2_id: str) -> Result:
"""
通过 level2 的 ID 查询2级数据。
Args:
level2_id (str): 2级数据的ID。
Returns:
Result: 查询结果。
"""
return request.get(f"/api/level2/{level2_id}")
def update_qc2_status(level2_id: str, data_type: str, qc2_status: int) -> Result:
"""
更新2级数据的QC0状态
Args:
level2_id (str): 2级数据文件的ID
data_type (str): 文件类型
qc2_status (int): QC0状态
Returns:
Result: 更新结果
"""
return request.put(f"/api/level2/qc2_status/{level2_id}", {'data_type': data_type, 'qc2_status': qc2_status})
def update_prc_status(level2_id: str, data_type: str, prc_status: int) -> Result:
"""
更新2级数据的处理状态。
Args:
level2_id (str): 2级数据文件的ID。
data_type (str): 文件类型。
prc_status (int): 处理状态。
Returns:
Result: 操作结果。
"""
return request.put(f"/api/level2/prc_status/{level2_id}", {'data_type': data_type, 'prc_status': prc_status})
def update_qc2_status_by_file_name(file_name: str, qc2_status: int) -> Result:
"""
更新2级数据的QC0状态
Args:
file_name (str): 2级数据文件名
qc2_status (int): QC0状态
Returns:
Result: 更新结果
"""
return request.put(f"/api/level2/qc2_status/file/{file_name}", {'qc2_status': qc2_status})
def update_prc_status_by_file_name(file_name: str, prc_status: int) -> Result:
"""
更新2级数据的处理状态。
Args:
file_name (str): 2级数据文件名。
prc_status (int): 处理状态。
Returns:
Result: 操作结果。
"""
return request.put(f"/api/level2/prc_status/file/{file_name}", {'prc_status': prc_status})
def write(local_file: str,
module_id: Literal['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC'],
level0_id: Optional[str | None],
level1_id: Optional[str | None],
level2_id: Optional[str | None],
brick_id: Optional[int | None],
data_type: str,
pipeline_id: str,
build: int,
version: str,
**extra_kwargs) -> Result:
"""
将本地的2级数据文件写入到DFS中。
Args:
local_file (str): 文件路径。
module_id ['MSC', 'IFS', 'MCI', 'HSTDM', 'CPIC']其中一个,代表: 模块ID。
level0_id (Optional[str | None]): 0级数据的ID。默认为 None。
level1_id (Optional[str | None]): 1级数据的ID。默认为 None。
level2_id (Optional[str | None]): 2级数据的ID。默认为 None。
brick_id (Optional[int | None]): 天区的ID。默认为 None。
data_type (str): 数据文件类型。
pipeline_id (str): 管线ID。
pmapname (str): CCDS pmap名称。
build (int): 构建号。
**kwargs: 额外的关键字参数,这些参数将传递给DFS。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
"""
params = {'module_id': module_id, 'level0_id': level0_id, 'level1_id': level1_id, 'level2_id': level2_id, 'brick_id': brick_id, 'data_type': data_type, 'pipeline_id': pipeline_id, 'build': build, 'version': version}
params.update(extra_kwargs)
if not os.path.exists(local_file):
raise FileNotFoundError(local_file)
return request.post_file("/api/level2/file", local_file, params)
def catalog_query(sql: str, limit: int = 0) -> Result:
return request.post("/api/level2/catalog/query", {'sql': sql, 'limit': limit})
__version__ = "20241119"
\ No newline at end of file
# for deployment
bumpver==2023.1129
twine==5.1.1
\ No newline at end of file
requests==2.31.0
bumpver==2023.1129
# for testing
pytest==7.4.3
coverage==7.6.1
pytest-cov==5.0.0
pytest-html==4.1.1
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment