Commit 97625e52 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

mbi level2

parent ca36eae0
Pipeline #3077 passed with stage
from .level2 import Level2DataApi
from .level2co import Level2CoApi
\ No newline at end of file
from ..common.delegate import Delegate
from astropy.table import Table
class Level2DataApi(object):
"""
Level2 Data Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "mbi")
self.stub = getattr(self.pymodule, "Level2DataApi")()
def find(self, **kwargs):
''' retrieve level2 records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
level1_id: [int]
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def catalog_query(self, **kwargs):
''' retrieve level2 catalog
:param kwargs: Parameter dictionary, key items support:
brick_ids: tuple of int, like (1,2,3)
obs_id: [str]
detector_no: [str]
filter: [str]
ra: [float] in deg
dec: [float] in deg
radius: [float] in deg
obs_time: (start, end)
columns: tuple of str, like ('ra','dec','sky')
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.catalog_query(**kwargs)
def catalog_query_file(self, **kwargs):
''' retrieve level2 catalog, return Level2Record
:param kwargs: Parameter dictionary, key items support:
brick_ids: list[int]
obs_id: [str]
detector_no: [str]
filter: [str]
ra: [float] in deg
dec: [float] in deg
radius: [float] in deg
obs_time: (start, end)
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.catalog_query_file(**kwargs)
def find_existed_brick_ids(self, **kwargs):
''' retrieve existed brick_ids in a single exposure catalog
:param kwargs:
:returns: csst_dfs_common.models.Result
'''
return self.stub.find_existed_brick_ids(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc2_status(self, **kwargs):
''' update the status of QC2
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc2_status(**kwargs)
def write(self, **kwargs):
''' insert a level2 record into database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level2CoApi(object):
"""
Level2 Merge Catalog Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "mbi")
self.stub = getattr(self.pymodule, "Level2CoApi")()
def find(self, **kwargs):
''' retrieve level2 Merge Catalog records from database
:param kwargs: Parameter dictionary, key items support:
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def catalog_query(self, **kwargs):
''' retrieve level2 Merge catalog
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
min_mag: [float]
max_mag: [float]
obs_time: (start, end),
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.catalog_query(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc2_status(self, **kwargs):
''' update the status of QC2
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc2_status(**kwargs)
def write(self, **kwargs):
''' insert a level2 record after merge into database
:param kwargs: Parameter dictionary, key items support:
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
import os
import unittest
from astropy.io import fits
from csst_dfs_api.common.utils import to_table as to_fits_table
from csst_dfs_api.mbi.level2 import Level2DataApi
class MBILevel2DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2DataApi()
def test_find(self):
recs = self.api.find(
level1_id=1,
create_time = ("2022-06-15 11:12:13","2022-06-16 11:12:13"))
print('find:', recs)
def test_find_existed_brick_ids(self):
recs = self.api.find_existed_brick_ids()
print('find_existed_brick_ids:', recs)
def test_catalog_query(self):
result = self.api.catalog_query(
obs_id = '100000133',
detector_no = '13',
ra = 192,
dec = 26,
radius = 1,
limit = 2)
print(result)
if result.success and result['totalCount'] > 0:
dt = to_fits_table(result)
dt.pprint()
def test_catalog_query_file(self):
result = self.api.catalog_query_file(
ra = 192,
dec = 26,
radius = 1)
print(result)
def test_get(self):
rec = self.api.get(id = 1)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_update_qc2_status(self):
rec = self.api.update_qc2_status(id = 1, status = 7)
print('update_qc2_status:', rec)
def test_write(self):
rec = self.api.write(
level1_id= 1,
data_type = "sci",
prc_status = 3,
prc_time = '2021-10-22 11:12:13',
filename = "MSC_MS_210525120000_100000000_20_cat.fits",
file_path = "/opt/temp/csst/msc_data/MSC_MS_210525120000_100000000_20_cat.fits"
)
print('write:', rec)
import unittest
from csst_dfs_api.mbi import Level2CoApi
class MBILevel2CoDataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2CoApi()
def test_find(self):
recs = self.api.find(
data_type='CAT',
create_time = ("2022-06-15 11:12:13", "2024-06-16 11:12:13"))
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 4)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 4, status = 9)
print('update_proc_status:', rec)
def test_update_qc2_status(self):
rec = self.api.update_qc2_status(id = 4, status = 9)
print('update_qc2_status:', rec)
def test_write(self):
rec = self.api.write(
data_type = "CAT",
prc_status = 3,
prc_time = '2023-07-01 11:12:13',
filename = "MSC_MS_210525120000_100000000_20_cat.fits",
file_path = "/opt/temp/csst/msc_data/MSC_MS_210525120000_100000000_20_cat.fits",
pipeline_id = "COMBI-2"
)
print('write:', rec)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment