Commit 7579ac2e authored by Wei Shoulin's avatar Wei Shoulin
Browse files

level2

parent 7b14e022
......@@ -9,7 +9,7 @@ This package provides APIs to access csst's files and databases.
`csst-dfs-api` can be installed with the following command:
```bash
git clone https://github.com/astronomical-data-processing/csst-dfs-api.git
git clone https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-api.git
cd csst-dfs-api
pip install -r requirements.txt
python setup.py install
......@@ -18,7 +18,7 @@ python setup.py install
`csst-dfs-api` and relevant packages could be installed by running one of the following commands in your terminal.
```bash
sh -c "$(curl -fsSL https://raw.fastgit.org/astronomical-data-processing/csst-dfs-api/master/tools/csst-dfs-api-install.sh)"
sh -c "$(curl -fsSL https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-api/-/raw/master/tools/csst-dfs-api-install.sh?inline=false)"
```
## Configuration
......
from astropy.table import QTable, Table, Column
from astropy.table import Table
from .delegate import Delegate
from csst_dfs_commons.models import Result
......
......@@ -13,13 +13,14 @@ class Level0DataApi(object):
''' retrieve level0 records from database
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_id: [str],
detector_no: [str],
obs_type: [str],
object_name: [str],
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
file_name: [str],
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
......
......@@ -3,4 +3,4 @@ from .level0 import Level0DataApi
from .level0prc import Level0PrcApi
from .level1 import Level1DataApi
from .level1prc import Level1PrcApi
from .level2catalog import Level2CatalogApi
\ No newline at end of file
from .level2 import Level2DataApi
\ No newline at end of file
from ..common.delegate import Delegate
from astropy.table import Table
class Level2DataApi(object):
"""
Level2 Data Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "msc")
self.stub = getattr(self.pymodule, "Level2DataApi")()
def find(self, **kwargs):
''' retrieve level2 records from database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def catalog_query(self, **kwargs):
''' retrieve level2 catalog
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
min_mag: [float]
max_mag: [float]
obs_time: (start, end),
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.catalog_query(**kwargs)
def _fields_dtypes(self, rec):
fields = tuple(rec.__dataclass_fields__.keys())
dtypes = []
for _, f in rec.__dataclass_fields__.items():
if f.type == int:
dtypes.append('i8')
elif f.type == float:
dtypes.append('f8')
elif f.type == str:
dtypes.append('S2')
elif f.type == list:
dtypes.append('(12,)f8')
else:
dtypes.append('S2')
dtypes = tuple(dtypes)
return fields, dtypes
def to_table(self, query_result):
if not query_result.success or not query_result.data:
return Table()
fields, dtypes = self._fields_dtypes(query_result.data[0])
t = Table(names = fields, dtype = dtypes)
t.meta['comments'] = [str(query_result.data[0].__class__)]
t.meta['total'] = query_result['totalCount']
for rec in query_result.data:
t.add_row(tuple([rec.__getattribute__(k) for k in fields]))
return t
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc2_status(self, **kwargs):
''' update the status of QC0
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc2_status(**kwargs)
def write(self, **kwargs):
''' insert a level2 record into database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
import os
import logging
import numpy as np
from astropy.io import fits
from ..common.delegate import Delegate
from ..common.utils import *
from csst_dfs_commons.models import Result
log = logging.getLogger('csst_api')
class Level2CatalogApi(object):
"""
Level1 Data Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "msc")
self.stub = getattr(self.pymodule, "Level2CatalogApi")()
def find(self, **kwargs):
''' retrieve level2catalog records from database
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
min_mag: [float]
max_mag: [float]
obs_time: (start, end),
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def write(self, **kwargs):
''' insert a level2 catalog file into database
:param kwargs: Parameter dictionary, key items support:
file_path: str
:returns: csst_dfs_common.models.Result
'''
try:
file_path = get_parameter(kwargs, "file_path", '')
if not file_path:
return Result.error(message="file_path is blank")
if not os.path.exists(file_path):
return Result.error(message="the file [%s] not existed" % (file_path, ))
records = []
success_num, fail_num = 0, 0
hdul = fits.open(file_path)
header = hdul[0].header
binTable = hdul[1]
obs_id = header["OBSID"]
detector_no = header["DETECTOR"][3:5]
obs_time = f"{header['DATE-OBS']} {header['TIME-OBS']}"
batch_size = 500
for tr in binTable.data:
v_list = [f"'{obs_id}'",f"'{detector_no}'"]
for td in tr:
if type(td) == np.ndarray:
v_list.extend(td)
else:
v_list.append(td)
v_list.append(f"'{obs_time}'")
records.append(",".join(['null' if type(v) != str and np.isnan(v) else str(v) for v in v_list]))
if len(records) == batch_size:
resp = self.stub.write(records)
if resp.success:
success_num += len(records)
else:
log.error(f"{resp.message}")
fail_num += len(records)
records.clear()
records = []
if records:
resp = self.stub.write(records)
if resp.success:
success_num += len(records)
else:
log.error(f"{resp.message}")
fail_num += len(records)
return Result.ok_data({"success_num":success_num, "fail_num": fail_num})
except Exception as e:
return Result.error(str(e))
import os
import unittest
from astropy.io import fits
from csst_dfs_api.msc.level2 import Level2DataApi
class MSCLevel2DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2DataApi()
def test_find(self):
recs = self.api.find(
level0_id='000001201',
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
def test_catalog_query(self):
result = self.api.catalog_query(
obs_id='100000000',
obs_time = ("2021-05-24 11:12:13","2021-05-25 13:12:13"),
limit = 2)
dt = self.api.to_table(result)
print()
dt.pprint()
def test_get(self):
rec = self.api.get(id = 2)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 2, status = 4)
print('update_proc_status:', rec)
def test_update_qc2_status(self):
rec = self.api.update_qc2_status(id = 2, status = 7)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
level1_id= 1,
data_type = "sci",
prc_status = 3,
prc_time = '2021-10-22 11:12:13',
filename = "MSC_MS_210525120000_100000000_20_cat.fits",
file_path = "/opt/temp/csst/msc_data/MSC_MS_210525120000_100000000_20_cat.fits"
)
print('write:', rec)
import os
import unittest
from astropy.io import fits
from csst_dfs_api.msc.level2catalog import Level2CatalogApi
class MSCLevel2CatalogTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2CatalogApi()
def test_find(self):
recs = self.api.find(
obs_id='100000000',
obs_time = ("2021-05-24 11:12:13","2021-05-25 13:12:13"),
limit = 100)
print('find:', recs)
def test_write(self):
rec = self.api.write(
file_path = "/opt/temp/csst/MSC_MS_210525120000_100000000_20_cat.fits"
)
print('write:', rec)
\ No newline at end of file
......@@ -13,14 +13,14 @@ pip uninstall csst-dfs-api -y
echo "Installing CSST DFS API with Version:$version"
echo "➡==============================================="
echo " 🛰 1️⃣/5️⃣"
pip install$user git+https://hub.fastgit.xyz/astronomical-data-processing/csst-dfs-commons.git$version
pip install$user git+https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-commons.git$version
echo " 🛰 2️⃣/5️⃣"
pip install$user git+https://hub.fastgit.xyz/astronomical-data-processing/csst-dfs-proto-py.git$version
pip install$user git+https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-proto-py.git$version
echo " 🛰 3️⃣/5️⃣"
pip install$user git+https://hub.fastgit.xyz/astronomical-data-processing/csst-dfs-api-local.git$version
pip install$user git+https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-api-local.git$version
echo " 🛰 4️⃣/5️⃣"
pip install$user git+https://hub.fastgit.xyz/astronomical-data-processing/csst-dfs-api-cluster.git$version
pip install$user git+https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-api-cluster.git$version
echo " 🛰 5️⃣/5️⃣"
pip install$user git+https://hub.fastgit.xyz/astronomical-data-processing/csst-dfs-api.git$version
pip install$user git+https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-api.git$version
echo "➡==============================================="
echo "🇨🇳🇨🇳🇨🇳🚀🚀🚀Done!"
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment