Commit 21bc8a4a authored by Wei Shoulin's avatar Wei Shoulin
Browse files

cpic and bug

parent 135b2eb4
......@@ -28,3 +28,4 @@ enviroment variables
- CSST_DFS_API_MODE = local or cluster # default: local
- CSST_LOCAL_FILE_ROOT = [a local file directory] # required if DFS_API_MODE = local, default: /opt/temp/csst
- CSST_DFS_GATEWAY = [gateway server's address] # required if DFS_API_MODE = cluster
wsl123
\ No newline at end of file
from .calmerge import CalMergeApi
from .level0 import Level0DataApi
from .level0prc import Level0PrcApi
from .level1 import Level1DataApi
\ No newline at end of file
from ..common.delegate import Delegate
class CalMergeApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "cpic")
self.stub = getattr(self.pymodule, "CalMergeApi")()
def find(self, **kwargs):
''' retrieve calibration merge records from database
:param kwargs: Parameter dictionary, key items support:
detector_no: [str],
ref_type: [str],
obs_time: (start,end),
qc1_status : [int],
prc_status : [int],
file_name: [str],
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get_latest_by_l0(self, **kwargs):
''' retrieve calibration merge records from database by level0 data
:param kwargs: Parameter dictionary, key items support:
level0_id: [str],
ref_type: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get_latest_by_l0(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc1_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc1_status(**kwargs)
def write(self, **kwargs):
''' insert a calibration merge record into database
:param kwargs: Parameter dictionary, key items support:
cal_id : [str],
detector_no : [str],
ref_type : [str],
obs_time : [str],
exp_time : [float],
prc_status : [int],
prc_time : [str],
filename : [str],
file_path : [str],
level0_ids : [list],
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level0DataApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "cpic")
self.stub = getattr(self.pymodule, "Level0DataApi")()
def find(self, **kwargs):
''' retrieve level0 records from database
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc0_status(self, **kwargs):
''' update the status of QC0
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc0_status(**kwargs)
def write(self, **kwargs):
''' insert a level0 data record into database
:param kwargs: Parameter dictionary, key items support:
obs_id = [str],
detector_no = [str],
obs_type = [str],
obs_time = [str],
exp_time = [int],
detector_status_id = [int],
filename = [str],
file_path = [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level0PrcApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "cpic")
self.stub = getattr(self.pymodule, "Level0PrcApi")()
def find(self, **kwargs):
''' retrieve level0 procedure records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def write(self, **kwargs):
''' insert a level0 procedure record into database
:param kwargs: Parameter dictionary, key items support:
level0_id : [str]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level1DataApi(object):
"""
Level1 Data Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "cpic")
self.stub = getattr(self.pymodule, "Level1DataApi")()
def find(self, **kwargs):
''' retrieve level1 records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
data_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc1_status(self, **kwargs):
''' update the status of QC0
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc1_status(**kwargs)
def write(self, **kwargs):
''' insert a level1 record into database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level1PrcApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "cpic")
self.stub = getattr(self.pymodule, "Level1PrcApi")()
def find(self, **kwargs):
''' retrieve level1 procedure records from database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def write(self, **kwargs):
''' insert a level1 procedure record into database
:param kwargs: Parameter dictionary, key items support:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
import random
from ..common.delegate import Delegate
from csst_dfs_commons.models.errors import CSSTGenericException
......@@ -94,6 +94,7 @@ class Level2ProducerApi(object):
''' new a Level2Producer Job
:param kwargs: Parameter dictionary, key items support:
name = [str]
dag = [str]
:returns: csst_dfs_common.models.Result
......@@ -115,6 +116,7 @@ class Level2ProducerApi(object):
:param kwargs: Parameter dictionary, key items support:
id = [int]
name = [str]
dag = [str]
status = [int]
......@@ -191,7 +193,8 @@ class Level2ProducerApi(object):
graph_id_edges = [(pre_node.id, n.id) for n in the_nodes.data]
pos = {pre_node.name: (node_level_x,node_level_y)}
for idx, node in enumerate(the_nodes.data):
sub_id_edges, sub_name_edges, sub_pos = get_next(node, node_level_x+1, idx-len(the_nodes.data)/2)
sub_id_edges, sub_name_edges, sub_pos = get_next(node, node_level_x+1, random.randint(-3,3))
graph_id_edges.extend(sub_id_edges)
graph_name_edges.extend(sub_name_edges)
pos.update(sub_pos)
......@@ -206,15 +209,15 @@ class Level2ProducerApi(object):
g1.add_nodes_from(vertex_list)
g1.add_edges_from(graph_name_edges)
plt.xlim(-1, 8)
plt.ylim(-3, 3)
plt.ylim(-4, 4)
plt.tight_layout()
nx.draw(
g1,
pos = pos,
node_color = 'orange',
edge_color = 'black',
font_size =10,
node_size =300,
font_size =12,
node_size =360,
with_labels=True
)
......
import os
import unittest
from astropy.io import fits
from csst_dfs_api.cpic.calmerge import CalMergeApi
class SLSCalMergeApiTestCase(unittest.TestCase):
def setUp(self):
self.api = CalMergeApi()
def test_find(self):
recs = self.api.find(detector_no='CCD01',
ref_type = "bias",
obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
def test_get_latest_by_l0(self):
rec = self.api.get_latest_by_l0(level0_id='000001102', ref_type = "bias")
print('get_latest_by_l0:', rec)
def test_get(self):
rec = self.api.get(id = 2)
print('get by id:', rec)
rec = self.api.get(cal_id = '2')
print('get by cal_id:', rec)
# def test_update_proc_status(self):
# rec = self.api.update_proc_status(id = 3, status = 1)
# print('update_proc_status:', rec)
# def test_update_qc1_status(self):
# rec = self.api.update_qc1_status(id = 3, status = 2)
# print('update_qc1_status:', rec)
# def test_write(self):
# rec = self.api.write(
# cal_id = '00002',
# detector_no = '01',
# ref_type = "bias",
# obs_time = "2021-06-04 11:12:13",
# exp_time = 150,
# filename = "/opt/dddasd1.params",
# file_path = "/opt/dddasd1.fits",
# prc_status = 3,
# prc_time = '2021-06-04 11:12:13',
# level0_ids = ['1','2','3','4'])
# print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api.cpic.level0 import Level0DataApi
class CPICLevel0DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0DataApi()
def test_find(self):
recs = self.api.find(obs_id = '000009', obs_type = 'sci', limit = 0)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 100)
print('get:', rec)
rec = self.api.get(level0_id = '1000000102')
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(level0_id = '000001102', status = 6)
print('update_proc_status:', rec)
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(level0_id = '000001102', status = 7)
print('update_qc0_status:', rec)
def test_write(self):
rec = self.api.write(
level0_id = '000001101',
obs_id = '0000011',
detector_no = "01",
obs_type = "sci",
obs_time = "2021-06-06 11:12:13",
exp_time = 150,
detector_status_id = 3,
filename = "MSC_00001234",
file_path = "/opt/MSC_00001234.fits")
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api.cpic.level0prc import Level0PrcApi
class CPICLevel0PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0PrcApi()
def test_find(self):
recs = self.api.find(level0_id='134')
print('find:', recs)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 8, status = 4)
print('update_proc_status:', rec)
def test_write(self):
rec = self.api.write(
level0_id='134',
pipeline_id = "P1",
prc_module = "QC0",
params_file_path = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
result_file_path = "/opt/dddasd.header")
print('write:', rec)
\ No newline at end of file
import unittest
from csst_dfs_api.cpic import Level1DataApi
class CPICResult1TestCase(unittest.TestCase):
def setUp(self):
self.api = Level1DataApi()
def test_find(self):
recs = self.api.find(
level0_id='0000223',
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")
)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 1)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 1, status = 7)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
level0_id='0000223',
data_type = "sci",
prc_params = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-10-22 11:12:13',
filename = "MSC_MS_210525121500_100000001_09_raw",
file_path = "/opt/temp/csst/MSC_MS_210525121500_100000001_09_raw.fits",
pipeline_id = "P2",
refs = {'dark': 1, 'bias': 2, 'flat': 3 })
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api.cpic.level1prc import Level1PrcApi
class CPICLevel1PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1PrcApi()
def test_find(self):
recs = self.api.find(level1_id=1)
print('find:', recs)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_write(self):
rec = self.api.write(level1_id=1,
pipeline_id = "P1",
prc_module = "QC0",
params_file_path = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
result_file_path = "/opt/dddasd.header")
print('write:', rec)
\ No newline at end of file
......@@ -11,6 +11,7 @@ class FacilityLevel2ProducerTestCase(unittest.TestCase):
rec = self.api.register(name='Test2',
gitlink='http://github.com/xxx/xxx',
paramfiles='/opt/csst/param1.ini',
image='centos:7',
priority = 3,
pre_producers=[1,2] )
print('register:', rec)
......@@ -36,6 +37,7 @@ class FacilityLevel2ProducerTestCase(unittest.TestCase):
id=2,
name = "start2",
gitlink = "http://github.com/xxx/xxx",
image = "centos:8",
paramfiles='/opt/csst/param1.ini',
priority = 3,
pre_producers=[1,3]
......@@ -50,7 +52,8 @@ class FacilityLevel2ProducerTestCase(unittest.TestCase):
def test_new_job(self):
recs = self.api.new_job(
dag = "start2-1-1-"
name = "start2-1-1-",
dag = "{'start':'1'}"
)
print('new_job:', recs)
......@@ -108,6 +111,6 @@ class FacilityLevel2ProducerTestCase(unittest.TestCase):
def test_make_graph(self):
graph_id_edges = self.api.make_graph(
start_producer_id = 2,
fig_path = 'graph.png'
fig_path = 'graph11.png'
)
print('graph_id_edges:', graph_id_edges)
\ No newline at end of file
......@@ -10,25 +10,25 @@ class MCILevel0DataTestCase(unittest.TestCase):
self.api = Level0DataApi()
def test_find(self):
recs = self.api.find(obs_id = '000009', obs_type = 'sci', limit = 0)
recs = self.api.find(limit = 0)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 100)
rec = self.api.get(id = 1)
print('get:', rec)
rec = self.api.get(level0_id = '1000000102')
rec = self.api.get(level0_id = '200000006-')
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(level0_id = '000001102', status = 6)
print('update_proc_status:', rec)
# def test_update_proc_status(self):
# rec = self.api.update_proc_status(level0_id = '000001102', status = 6)
# print('update_proc_status:', rec)
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(level0_id = '000001102', status = 7)
print('update_qc0_status:', rec)
# def test_update_qc0_status(self):
# rec = self.api.update_qc0_status(level0_id = '000001102', status = 7)
# print('update_qc0_status:', rec)
def test_write(self):
rec = self.api.write(
file_path = "/opt/MSC_00001234.fits")
print('write:', rec)
\ No newline at end of file
# def test_write(self):
# rec = self.api.write(
# file_path = "/opt/MSC_00001234.fits")
# print('write:', rec)
\ No newline at end of file
......@@ -9,15 +9,15 @@ class MSCCalMergeApiTestCase(unittest.TestCase):
def setUp(self):
self.api = CalMergeApi()
# def test_find(self):
# recs = self.api.find(detector_no='CCD01',
# ref_type = "bias",
# obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
# print('find:', recs)
def test_find(self):
recs = self.api.find(detector_no='CCD01',
ref_type = "bias",
obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
# def test_get_latest_by_l0(self):
# rec = self.api.get_latest_by_l0(level0_id='000001102', ref_type = "bias")
# print('get_latest_by_l0:', rec)
def test_get_latest_by_l0(self):
rec = self.api.get_latest_by_l0(level0_id='000001102', ref_type = "bias")
print('get_latest_by_l0:', rec)
def test_get(self):
rec = self.api.get(id = 2)
......@@ -25,24 +25,24 @@ class MSCCalMergeApiTestCase(unittest.TestCase):
rec = self.api.get(cal_id = '2')
print('get by cal_id:', rec)
# def test_update_proc_status(self):
# rec = self.api.update_proc_status(id = 3, status = 1)
# print('update_proc_status:', rec)
# def test_update_qc1_status(self):
# rec = self.api.update_qc1_status(id = 3, status = 2)
# print('update_qc1_status:', rec)
# def test_write(self):
# rec = self.api.write(
# cal_id = '00002',
# detector_no = '01',
# ref_type = "bias",
# obs_time = "2021-06-04 11:12:13",
# exp_time = 150,
# filename = "/opt/dddasd1.params",
# file_path = "/opt/dddasd1.fits",
# prc_status = 3,
# prc_time = '2021-06-04 11:12:13',
# level0_ids = ['1','2','3','4'])
# print('write:', rec)
\ No newline at end of file
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 3, status = 1)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 3, status = 2)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
cal_id = '00002',
detector_no = '01',
ref_type = "bias",
obs_time = "2021-06-04 11:12:13",
exp_time = 150,
filename = "/opt/dddasd1.params",
file_path = "/opt/dddasd1.fits",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
level0_ids = ['1','2','3','4'])
print('write:', rec)
\ No newline at end of file
......@@ -10,14 +10,14 @@ class MSCLevel0DataTestCase(unittest.TestCase):
self.api = Level0DataApi()
def test_find(self):
recs = self.api.find(obs_id = '000009', obs_type = 'sci', limit = 0)
recs = self.api.find(obs_id = '100000154', obs_type = 'sci', limit = 0)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 100)
rec = self.api.get(id = 2)
print('get:', rec)
rec = self.api.get(level0_id = '1000000102')
rec = self.api.get(level0_id = '10000000123')
print('get:', rec)
def test_update_proc_status(self):
......
......@@ -9,32 +9,34 @@ class MSCLevel2DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2DataApi()
# def test_find(self):
# recs = self.api.find(
# level1_id=1,
# create_time = ("2022-06-15 11:12:13","2022-06-16 11:12:13"))
# print('find:', recs)
# def test_catalog_query(self):
# result = self.api.catalog_query(
# obs_id='100000000',
# obs_time = ("2021-05-24 11:12:13","2021-05-25 13:12:13"),
# limit = 2)
# dt = self.api.to_table(result)
# print(result)
# dt.pprint()
# def test_get(self):
# rec = self.api.get(id = 1)
# print('get:', rec)
# def test_update_proc_status(self):
# rec = self.api.update_proc_status(id = 1, status = 4)
# print('update_proc_status:', rec)
# def test_update_qc2_status(self):
# rec = self.api.update_qc2_status(id = 1, status = 7)
# print('update_qc2_status:', rec)
def test_find(self):
recs = self.api.find(
level1_id=1,
create_time = ("2022-06-15 11:12:13","2022-06-16 11:12:13"))
print('find:', recs)
def test_catalog_query(self):
result = self.api.catalog_query(
obs_id='100000000',
obs_time = ("2021-05-24 11:12:13","2021-05-25 13:12:13"),
limit = 2)
if result.success:
dt = self.api.to_table(result)
dt.pprint()
else:
print(result)
def test_get(self):
rec = self.api.get(id = 1)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_update_qc2_status(self):
rec = self.api.update_qc2_status(id = 1, status = 7)
print('update_qc2_status:', rec)
def test_write(self):
rec = self.api.write(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment