Commit bad9d9f6 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

mbi level2co

parent 12318e8d
......@@ -74,7 +74,7 @@ class Level2CoApi(object):
def write(self, **kwargs):
''' insert a level2 record after merge into database
:param kwargs: Parameter dictionary, key items support:
data_type : [str]
filename : [str]
......
import os
import unittest
from astropy.io import fits
from csst_dfs_api.mbi.level2co import Level2CoApi
......@@ -11,40 +9,29 @@ class MBILevel2CoDataTestCase(unittest.TestCase):
def test_find(self):
recs = self.api.find(
level1_id=1,
create_time = ("2022-06-15 11:12:13","2022-06-16 11:12:13"))
data_type='CAT',
create_time = ("2022-06-15 11:12:13", "2024-06-16 11:12:13"))
print('find:', recs)
def test_catalog_query(self):
result = self.api.catalog_query(
obs_id='100000000',
obs_time = ("2021-05-24 11:12:13","2021-05-25 13:12:13"),
limit = 2)
if result.success:
dt = self.api.to_table(result)
dt.pprint()
else:
print(result)
def test_get(self):
rec = self.api.get(id = 1)
rec = self.api.get(id = 4)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
rec = self.api.update_proc_status(id = 4, status = 9)
print('update_proc_status:', rec)
def test_update_qc2_status(self):
rec = self.api.update_qc2_status(id = 1, status = 7)
rec = self.api.update_qc2_status(id = 4, status = 9)
print('update_qc2_status:', rec)
def test_write(self):
rec = self.api.write(
level1_id= 1,
data_type = "sci",
data_type = "CAT",
prc_status = 3,
prc_time = '2021-10-22 11:12:13',
prc_time = '2023-07-01 11:12:13',
filename = "MSC_MS_210525120000_100000000_20_cat.fits",
file_path = "/opt/temp/csst/msc_data/MSC_MS_210525120000_100000000_20_cat.fits"
)
file_path = "/opt/temp/csst/msc_data/MSC_MS_210525120000_100000000_20_cat.fits",
pipeline_id = "COMBI-2"
)
print('write:', rec)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment