Commit ff5b057c authored by Wei Shoulin's avatar Wei Shoulin
Browse files

split level0data

parent e6920eeb
Pipeline #3 canceled with stages
from .calmerge import CalMergeApi
from ..ifs.calmerge import CalMergeApi
from .detector import DetectorApi
from .level0 import Level0DataApi
from .level0prc import Level0PrcApi
......
......@@ -6,7 +6,7 @@ class Level0DataApi(object):
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "facility")
self.pymodule = Delegate().load(sub_module = "ifs")
self.stub = getattr(self.pymodule, "Level0DataApi")()
def find(self, **kwargs):
......
......@@ -6,7 +6,7 @@ class Level0PrcApi(object):
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "facility")
self.pymodule = Delegate().load(sub_module = "ifs")
self.stub = getattr(self.pymodule, "Level0PrcApi")()
def find(self, **kwargs):
......
......@@ -6,7 +6,7 @@ class Level1PrcApi(object):
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "facility")
self.pymodule = Delegate().load(sub_module = "ifs")
self.stub = getattr(self.pymodule, "Level1PrcApi")()
def find(self, **kwargs):
......
from ..common.delegate import Delegate
class CalMergeApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "mci")
self.stub = getattr(self.pymodule, "CalMergeApi")()
def find(self, **kwargs):
''' retrieve calibration merge records from database
:param kwargs: Parameter dictionary, key items support:
detector_no: [str],
ref_type: [str],
obs_time: (start,end),
qc1_status : [int],
prc_status : [int],
file_name: [str],
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get_latest_by_l0(self, **kwargs):
''' retrieve calibration merge records from database by level0 data
:param kwargs: Parameter dictionary, key items support:
level0_id: [str],
ref_type: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get_latest_by_l0(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc1_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc1_status(**kwargs)
def write(self, **kwargs):
''' insert a calibration merge record into database
:param kwargs: Parameter dictionary, key items support:
cal_id : [str],
detector_no : [str],
ref_type : [str],
obs_time : [str],
exp_time : [float],
prc_status : [int],
prc_time : [str],
filename : [str],
file_path : [str],
level0_ids : [list],
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level0DataApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "mci")
self.stub = getattr(self.pymodule, "Level0DataApi")()
def find(self, **kwargs):
''' retrieve level0 records from database
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc0_status(self, **kwargs):
''' update the status of QC0
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc0_status(**kwargs)
def write(self, **kwargs):
''' insert a level0 data record into database
:param kwargs: Parameter dictionary, key items support:
obs_id = [str],
detector_no = [str],
obs_type = [str],
obs_time = [str],
exp_time = [int],
detector_status_id = [int],
filename = [str],
file_path = [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level0PrcApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "mci")
self.stub = getattr(self.pymodule, "Level0PrcApi")()
def find(self, **kwargs):
''' retrieve level0 procedure records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def write(self, **kwargs):
''' insert a level0 procedure record into database
:param kwargs: Parameter dictionary, key items support:
level0_id : [str]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level1DataApi(object):
"""
IFS Level1 Data Operation Class
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "mci")
self.stub = getattr(self.pymodule, "Level1DataApi")()
def find(self, **kwargs):
''' retrieve level1 records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str],
data_type: [str],
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc1_status(self, **kwargs):
''' update the status of QC0
:param kwargs: Parameter dictionary, key items support:
id = [int],
status = [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc1_status(**kwargs)
def write(self, **kwargs):
''' insert a level1 record into database
:param kwargs: Parameter dictionary, key items support:
level0_id : [str]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level1PrcApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "mci")
self.stub = getattr(self.pymodule, "Level1PrcApi")()
def find(self, **kwargs):
''' retrieve level1 procedure records from database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def write(self, **kwargs):
''' insert a level1 procedure record into database
:param kwargs: Parameter dictionary, key items support:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class CalMergeApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "msc")
self.stub = getattr(self.pymodule, "CalMergeApi")()
def find(self, **kwargs):
''' retrieve calibration merge records from database
:param kwargs: Parameter dictionary, key items support:
detector_no: [str],
ref_type: [str],
obs_time: (start,end),
qc1_status : [int],
prc_status : [int],
file_name: [str],
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get_latest_by_l0(self, **kwargs):
''' retrieve calibration merge records from database by level0 data
:param kwargs: Parameter dictionary, key items support:
level0_id: [str],
ref_type: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get_latest_by_l0(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc1_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc1_status(**kwargs)
def write(self, **kwargs):
''' insert a calibration merge record into database
:param kwargs: Parameter dictionary, key items support:
cal_id : [str],
detector_no : [str],
ref_type : [str],
obs_time : [str],
exp_time : [float],
prc_status : [int],
prc_time : [str],
filename : [str],
file_path : [str],
level0_ids : [list],
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level0DataApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "msc")
self.stub = getattr(self.pymodule, "Level0DataApi")()
def find(self, **kwargs):
''' retrieve level0 records from database
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc0_status(self, **kwargs):
''' update the status of QC0
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc0_status(**kwargs)
def write(self, **kwargs):
''' insert a level0 data record into database
:param kwargs: Parameter dictionary, key items support:
obs_id = [str],
detector_no = [str],
obs_type = [str],
obs_time = [str],
exp_time = [int],
detector_status_id = [int],
filename = [str],
file_path = [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level0PrcApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "msc")
self.stub = getattr(self.pymodule, "Level0PrcApi")()
def find(self, **kwargs):
''' retrieve level0 procedure records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def write(self, **kwargs):
''' insert a level0 procedure record into database
:param kwargs: Parameter dictionary, key items support:
level0_id : [str]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level1PrcApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "msc")
self.stub = getattr(self.pymodule, "Level1PrcApi")()
def find(self, **kwargs):
''' retrieve level1 procedure records from database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def write(self, **kwargs):
''' insert a level1 procedure record into database
:param kwargs: Parameter dictionary, key items support:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class CalMergeApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "sls")
self.stub = getattr(self.pymodule, "CalMergeApi")()
def find(self, **kwargs):
''' retrieve calibration merge records from database
:param kwargs: Parameter dictionary, key items support:
detector_no: [str],
ref_type: [str],
obs_time: (start,end),
qc1_status : [int],
prc_status : [int],
file_name: [str],
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get_latest_by_l0(self, **kwargs):
''' retrieve calibration merge records from database by level0 data
:param kwargs: Parameter dictionary, key items support:
level0_id: [str],
ref_type: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get_latest_by_l0(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc1_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
cal_id : [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc1_status(**kwargs)
def write(self, **kwargs):
''' insert a calibration merge record into database
:param kwargs: Parameter dictionary, key items support:
cal_id : [str],
detector_no : [str],
ref_type : [str],
obs_time : [str],
exp_time : [float],
prc_status : [int],
prc_time : [str],
filename : [str],
file_path : [str],
level0_ids : [list],
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level0DataApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "sls")
self.stub = getattr(self.pymodule, "Level0DataApi")()
def find(self, **kwargs):
''' retrieve level0 records from database
:param kwargs: Parameter dictionary, key items support:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def get(self, **kwargs):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.get(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def update_qc0_status(self, **kwargs):
''' update the status of QC0
:param kwargs: Parameter dictionary, key items support:
id : [int],
level0_id: [str],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_qc0_status(**kwargs)
def write(self, **kwargs):
''' insert a level0 data record into database
:param kwargs: Parameter dictionary, key items support:
obs_id = [str],
detector_no = [str],
obs_type = [str],
obs_time = [str],
exp_time = [int],
detector_status_id = [int],
filename = [str],
file_path = [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level0PrcApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "sls")
self.stub = getattr(self.pymodule, "Level0PrcApi")()
def find(self, **kwargs):
''' retrieve level0 procedure records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def write(self, **kwargs):
''' insert a level0 procedure record into database
:param kwargs: Parameter dictionary, key items support:
level0_id : [str]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
from ..common.delegate import Delegate
class Level1PrcApi(object):
"""
Level 0 Data Operation API
"""
def __init__(self):
self.pymodule = Delegate().load(sub_module = "sls")
self.stub = getattr(self.pymodule, "Level1PrcApi")()
def find(self, **kwargs):
''' retrieve level1 procedure records from database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.find(**kwargs)
def update_proc_status(self, **kwargs):
''' update the status of reduction
:param kwargs: Parameter dictionary, key items support:
id : [int],
status : [int]
:returns: csst_dfs_common.models.Result
'''
return self.stub.update_proc_status(**kwargs)
def write(self, **kwargs):
''' insert a level1 procedure record into database
:param kwargs: Parameter dictionary, key items support:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
:returns: csst_dfs_common.models.Result
'''
return self.stub.write(**kwargs)
......@@ -2,9 +2,9 @@ import os
import unittest
from astropy.io import fits
from csst_dfs_api.facility.calmerge import CalMergeApi
from csst_dfs_api.ifs.calmerge import CalMergeApi
class CalMergeApiTestCase(unittest.TestCase):
class IFSCalMergeApiTestCase(unittest.TestCase):
def setUp(self):
self.api = CalMergeApi()
......
......@@ -2,9 +2,9 @@ import os
import unittest
from astropy.io import fits
from csst_dfs_api.facility.level0 import Level0DataApi
from csst_dfs_api.ifs.level0 import Level0DataApi
class Level0DataTestCase(unittest.TestCase):
class IFSLevel0DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0DataApi()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment