Commit e6920eeb authored by Wei Shoulin's avatar Wei Shoulin
Browse files

msc level2 catalog

parent 287d0d41
......@@ -20,11 +20,11 @@ python setup.py install
```bash
sh -c "$(curl -fsSL https://raw.fastgit.org/astronomical-data-processing/csst-dfs-api/master/tools/csst-dfs-api-install.sh)" - v0.0.4
```
## Configuration
enviroment variables
- CSST_DFS_API_MODE = local or cluster # default: local
- CSST_LOCAL_FILE_ROOT = [a local file directory] # required if DFS_API_MODE = local, default: /opt/temp/csst
- CSST_DFS_GATEWAY = [gateway server's address] # required if DFS_API_MODE = cluster,
import os
import logging
import numpy as np
from astropy.io import fits
from ..common.delegate import Delegate
from ..common.utils import *
from csst_dfs_commons.models import Result
log = logging.getLogger('csst_api')
class Level2CatalogApi(object):
"""
Level1 Data Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "msc")
self.stub = getattr(self.pymodule, "Level2CatalogApi")()
def find(self, **kwargs):
''' retrieve level2catalog records from database
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
min_mag: [float]
max_mag: [float]
obs_time: (start, end),
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def write(self, **kwargs):
''' insert a level2 catalog file into database
:param kwargs: Parameter dictionary, key items support:
file_path: str
:returns: csst_dfs_common.models.Result
'''
try:
file_path = get_parameter(kwargs, "file_path", '')
if not file_path:
return Result.error(message="file_path is blank")
if not os.path.exists(file_path):
return Result.error(message="the file [%s] not existed" % (file_path, ))
records = []
success_num, fail_num = 0, 0
hdul = fits.open(file_path)
header = hdul[0].header
binTable = hdul[1]
obs_id = header["OBSID"]
detector_no = header["DETECTOR"][3:5]
obs_time = f"{header['DATE-OBS']} {header['TIME-OBS']}"
batch_size = 500
for tr in binTable.data:
v_list = [f"'{obs_id}'",f"'{detector_no}'"]
for td in tr:
if type(td) == np.ndarray:
v_list.extend(td)
else:
v_list.append(td)
v_list.append(f"'{obs_time}'")
records.append(",".join(['null' if type(v) != str and np.isnan(v) else str(v) for v in v_list]))
if len(records) == batch_size:
resp = self.stub.write(records)
if resp.success:
success_num += len(records)
else:
log.error(f"{resp.message}")
fail_num += len(records)
records.clear()
records = []
if records:
resp = self.stub.write(records)
if resp.success:
success_num += len(records)
else:
log.error(f"{resp.message}")
fail_num += len(records)
return Result.ok_data({"success_num":success_num, "fail_num": fail_num})
except Exception as e:
return Result.error(str(e))
......@@ -9,40 +9,40 @@ class CalMergeApiTestCase(unittest.TestCase):
def setUp(self):
self.api = CalMergeApi()
def test_find(self):
recs = self.api.find(detector_no='CCD01',
ref_type = "bias",
obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
# def test_find(self):
# recs = self.api.find(detector_no='CCD01',
# ref_type = "bias",
# obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
# print('find:', recs)
def test_get_latest_by_l0(self):
rec = self.api.get_latest_by_l0(level0_id='000001102', ref_type = "bias")
print('get_latest_by_l0:', rec)
# def test_get_latest_by_l0(self):
# rec = self.api.get_latest_by_l0(level0_id='000001102', ref_type = "bias")
# print('get_latest_by_l0:', rec)
def test_get(self):
rec = self.api.get(id = 3)
rec = self.api.get(id = 2)
print('get by id:', rec)
rec = self.api.get(cal_id = '00002')
rec = self.api.get(cal_id = '2')
print('get by cal_id:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 3, status = 1)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 3, status = 2)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
cal_id = '00002',
detector_no = '01',
ref_type = "bias",
obs_time = "2021-06-04 11:12:13",
exp_time = 150,
filename = "/opt/dddasd1.params",
file_path = "/opt/dddasd1.fits",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
level0_ids = ['1','2','3','4'])
print('write:', rec)
\ No newline at end of file
# def test_update_proc_status(self):
# rec = self.api.update_proc_status(id = 3, status = 1)
# print('update_proc_status:', rec)
# def test_update_qc1_status(self):
# rec = self.api.update_qc1_status(id = 3, status = 2)
# print('update_qc1_status:', rec)
# def test_write(self):
# rec = self.api.write(
# cal_id = '00002',
# detector_no = '01',
# ref_type = "bias",
# obs_time = "2021-06-04 11:12:13",
# exp_time = 150,
# filename = "/opt/dddasd1.params",
# file_path = "/opt/dddasd1.fits",
# prc_status = 3,
# prc_time = '2021-06-04 11:12:13',
# level0_ids = ['1','2','3','4'])
# print('write:', rec)
\ No newline at end of file
......@@ -17,7 +17,7 @@ class Level0DataTestCase(unittest.TestCase):
rec = self.api.get(id = 100)
print('get:', rec)
rec = self.api.get(level0_id = '000001102')
rec = self.api.get(level0_id = '1000000102')
print('get:', rec)
def test_update_proc_status(self):
......
import os
import unittest
from astropy.io import fits
from csst_dfs_api.msc.level2catalog import Level2CatalogApi
class MSCLevel2CatalogTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2CatalogApi()
def test_find(self):
recs = self.api.find(
obs_id='100000000',
obs_time = ("2021-05-24 11:12:13","2021-05-25 13:12:13"),
limit = 100)
print('find:', recs)
def test_write(self):
rec = self.api.write(
file_path = "/opt/temp/csst/MSC_MS_210525120000_100000000_20_cat.fits"
)
print('write:', rec)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment