Commit eef6321b authored by Wei Shoulin's avatar Wei Shoulin
Browse files

brick

parent 254f81bd
import unittest
from csst_dfs_api_cluster.facility.detector import DetectorApi
class DetectorApiTestCase(unittest.TestCase):
def setUp(self):
self.api = DetectorApi()
def test_find(self):
recs = self.api.find(module_id = 'MSC', key = 'CCD')
print('find:', recs)
def test_get(self):
rec = self.api.get(no = '02')
print('get:', rec)
def test_write(self):
rec = self.api.write(no = '05',
detector_name = 'CCD05',
module_id = 'MSC',
filter_id='f5')
print('write:', rec)
def test_update(self):
rec = self.api.update(no = '01', filter_id = 'f1')
print('update:', rec)
def test_delete(self):
rec = self.api.delete(no = '01')
print('delete:', rec)
def test_find_status(self):
recs = self.api.find_status(detector_no = '01',
status_occur_time = ('2021-06-02','2021-06-08'),
limit = 0)
print('find status:', recs)
def test_get_status(self):
rec = self.api.get_status(id = 2)
print('get status:', rec)
def test_write_status(self):
rec = self.api.write_status(detector_no = '01', status = '{........}',status_time='2021-06-05 12:12:13')
print('write status:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.facility.level0 import Level0DataApi
class Level0DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0DataApi()
def test_find(self):
recs = self.api.find(obs_id = '9', obs_type = 'sci', limit = 0)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 100)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 100, status = 6)
print('update_proc_status:', rec)
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(id = 100, status = 7)
print('update_qc0_status:', rec)
def test_write(self):
rec = self.api.write(
obs_id = '00013',
detector_no = "01",
obs_type = "sci",
obs_time = "2021-06-06 11:12:13",
exp_time = 150,
detector_status_id = 3,
filename = "MSC_00001234",
file_path = "/opt/MSC_00001234.fits")
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.facility.level0prc import Level0PrcApi
class Level0PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0PrcApi()
def test_find(self):
recs = self.api.find(level0_id='134')
print('find:', recs)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 8, status = 4)
print('update_proc_status:', rec)
def test_write(self):
rec = self.api.write(level0_id='134',
pipeline_id = "P1",
prc_module = "QC0",
params_file_path = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
result_file_path = "/opt/dddasd.header")
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.facility.observation import ObservationApi
class FacilityObservationTestCase(unittest.TestCase):
def setUp(self):
self.api = ObservationApi()
def test_find(self):
recs = self.api.find(module_id="MSC",limit = 0)
print('find:', recs)
def test_get(self):
rec = self.api.get(obs_id='00009')
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(obs_id='00009', status = 3, )
print('update_proc_status:', rec)
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(obs_id='00009', status = 3, )
print('update_qc0_status:', rec)
def test_write(self):
rec = self.api.write(
obs_id='00009',
obs_time = "2021-06-06 11:12:13",
exp_time = 150,
module_id = "MSC",
obs_type = "sci",
facility_status_id = 3,
module_status_id = 3)
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.ifs.level1 import Level1DataApi
class IFSLevel1DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1DataApi()
def test_find(self):
recs = self.api.find(
level0_id='1',
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 2)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 2, status = 4)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 2, status = 7)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
level0_id='11',
data_type = "sci",
cor_sci_id = 2,
prc_params = "/opt/dddasd.params",
flat_id = 1,
dark_id = 2,
bias_id = 3,
lamp_id = 4,
arc_id = 5,
sky_id = 6,
prc_status = 3,
prc_time = '2021-06-05 11:12:13',
filename = "MSC_MS_210525121500_100000001_09_IFS",
file_path = "/opt/temp/csst/MSC_MS_210525121500_100000001_09_raw.fits",
pipeline_id = "P2")
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.msc.level1 import Level1DataApi
class MSCLevel1DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1DataApi()
def test_find(self):
recs = self.api.find(
level0_id='1',
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
# def test_get(self):
# rec = self.api.get(id = 2)
# print('get:', rec)
# def test_update_proc_status(self):
# rec = self.api.update_proc_status(id = 2, status = 4)
# print('update_proc_status:', rec)
# def test_update_qc1_status(self):
# rec = self.api.update_qc1_status(id = 2, status = 7)
# print('update_qc1_status:', rec)
# def test_write(self):
# rec = self.api.write(
# level0_id='1',
# data_type = "sci",
# cor_sci_id = 1,
# prc_params = "/opt/dddasd.params",
# flat_id = 1,
# dark_id = 2,
# bias_id = 3,
# prc_status = 3,
# prc_time = '2021-06-04 11:12:13',
# filename = "MSC_MS_210525121500_100000001_09_raw",
# file_path = "/opt/temp/csst/MSC_MS_210525121500_100000001_09_raw.fits",
# pipeline_id = "P1")
# print('write:', rec)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment