Commit cc28864c authored by Wei Shoulin's avatar Wei Shoulin
Browse files

level2

parent 8a5b10cf
from ..common.delegate import Delegate
from astropy.table import Table
class Level2DataApi(object):
"""
Level2 Data Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "facility")
self.stub = getattr(self.pymodule, "Level2DataApi")()
def find(self, **kwargs):
''' retrieve level2 records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
level1_id: [int]
module_id: [str]
brick_id: [int]
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
import_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def catalog_query(self, **kwargs):
''' retrieve level2 catalog
:param kwargs: Parameter dictionary, key items support:
sql: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.catalog_query(**kwargs)
def find_existed_brick_ids(self, **kwargs):
''' retrieve existed brick_ids in the catalog with data_type
:param kwargs:
data_type: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find_existed_brick_ids(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc2_status(self, **kwargs):
''' update the status of QC2
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc2_status(**kwargs)
def write(self, **kwargs):
''' insert a level2 record into database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
brick_id: [int]
module_id : [str]
object_name: [str]
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
from astropy.table import Table
class Level2TypeApi(object):
"""
Level2Type Data Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "facility")
self.stub = getattr(self.pymodule, "Level2TypeApi")()
def find(self, **kwargs):
''' retrieve level2Type records from database
:param kwargs: Parameter dictionary, key items support:
module_id: [str]
data_type: [str]
import_status : [int],
page: [int]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
data_type: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_import_status(self, **kwargs):
''' update the status of import
:param kwargs: Parameter dictionary, key items support:
data_type: [str],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_import_status(**kwargs)
def write(self, **kwargs):
''' insert a level2Type record into database
:param kwargs: Parameter dictionary, key items support:
data_type : [str]
module_id : [str]
key_column : [str]
hdu_index : [int]
demo_filename : [str]
demo_file_path : [str]
ra_column : [str]
dec_column : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
......@@ -32,14 +32,14 @@ class Level0DataTestCase(unittest.TestCase):
def test_write(self):
rec = self.api.write(
level0_id = '000001101',
obs_id = '0000011',
level0_id = '10000010101',
obs_id = '100000101',
detector_no = "01",
filter = "u",
obs_type = "sci",
obs_time = "2021-06-06 11:12:13",
exp_time = 150,
detector_status_id = 3,
filename = "MSC_00001234",
file_path = "/opt/MSC_00001234.fits")
filename = "CSST_MSC_MS_SCI_20270810142128_20270810142358_100000101_01_L0_1.fits",
file_path = "/opt/temp/csst/CSST_MSC_MS_SCI_20270810142128_20270810142358_100000101_01_L0_1.fits")
print('write:', rec)
\ No newline at end of file
......@@ -35,7 +35,7 @@ class Level1DataTestCase(unittest.TestCase):
def test_write(self):
rec = self.api.write(
level0_id = '1000001201',
level0_id = '10000010101',
data_type = "SCIE",
cor_sci_id = 0,
prc_params = "/opt/dddasd.params",
......
import os
import unittest
from astropy.io import fits
from csst_dfs_api.common.utils import to_table as to_fits_table
from csst_dfs_api.facility.level2 import Level2DataApi
class Level2DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2DataApi()
def test_find(self):
recs = self.api.find(
level1_id=1)
print('find:', recs)
def test_find_existed_brick_ids(self):
recs = self.api.find_existed_brick_ids(data_type = "csst-msc-l1-phot")
print('find_existed_brick_ids:', recs)
def test_catalog_query(self):
result = self.api.catalog_query(
sql = 'select x,y,A,B,PA,AB,E from csst_msc_l1_phot',
limit = 2)
print(result)
if result.success and result['totalCount'] > 0:
dt = to_fits_table(result)
dt.pprint()
# def test_get(self):
# rec = self.api.get(id = 1)
# print('get:', rec)
# def test_update_proc_status(self):
# rec = self.api.update_proc_status(id = 1, status = 4)
# print('update_proc_status:', rec)
# def test_update_qc2_status(self):
# rec = self.api.update_qc2_status(id = 1, status = 7)
# print('update_qc2_status:', rec)
# def test_write(self):
# rec = self.api.write(
# level1_id= 1,
# module_id = 'MSC',
# data_type = "csst-msc-l1-phot",
# prc_status = 3,
# prc_time = '2021-10-22 11:12:13',
# filename = "CSST_MSC_MS_SCIE_20240821005429_20240821005659_10160000004_12_L1_V01_CAT.fits",
# file_path = "/opt/temp/csst/msc/L2/CSST_MSC_MS_SCIE_20240821005429_20240821005659_10160000004_12_L1_V01_CAT.fits",
# pipeline_id = "P1"
# )
# print('write:', rec)
# rec = self.api.write(
# level1_id= 1,
# module_id = 'MSC',
# data_type = "csst-msc-xcat",
# prc_status = 3,
# prc_time = '2021-10-22 11:12:13',
# filename = "MSC_MS_XCAT_BRICK_156958.fits",
# file_path = "/opt/temp/csst/fits_file/L2/L2_DEMO/MSC_MS_XCAT_BRICK_156958.fits",
# pipeline_id = "P1"
# )
# print('write:', rec)
\ No newline at end of file
import os
import unittest
from csst_dfs_api.facility.level2type import Level2TypeApi
class Level2TypeTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2TypeApi()
def test_find(self):
recs = self.api.find(
module_id='MSC',
data_type='csst-msc-l1-phot'
)
print('find:', recs)
def test_get(self):
rec = self.api.get(data_type = 'csst-msc-l1-phot')
print('get:', rec)
def test_update_import_status(self):
rec = self.api.update_import_status(data_type = 'csst-msc-l1-phot', status = 4)
print('update_import_status:', rec)
def test_write(self):
rec = self.api.write(
data_type = "csst-msc-l1-phot",
module_id = "MSC",
key_column = "ID",
hdu_index = 1,
demo_filename = "CSST_MSC_MS_SCIE_20240821005429_20240821005659_10160000004_12_L1_V01_CAT.fits",
demo_file_path = "/Users/wsl/temp/csst/msc/L2/CSST_MSC_MS_SCIE_20240821005429_20240821005659_10160000004_12_L1_V01_CAT.fits",
ra_column = "RA",
dec_column = "DEC")
rec = self.api.write(
data_type = "csst-msc-xcat",
module_id = "MSC",
key_column = "PhotoObjID",
hdu_index = 1,
demo_filename = "MSC_MS_XCAT_BRICK_156958.fits",
demo_file_path = "/Users/wsl/temp/csst/msc/L2/MSC_MS_XCAT_BRICK_156958.fits",
ra_column = "RA_r",
dec_column = "DEC_r")
print('write:', rec)
\ No newline at end of file
File mode changed from 100755 to 100644
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment