Commit d986550f authored by Wei Shoulin's avatar Wei Shoulin
Browse files

level1 and sls

parent 1f4f6132
...@@ -2,4 +2,5 @@ from .calmerge import CalMergeApi ...@@ -2,4 +2,5 @@ from .calmerge import CalMergeApi
from .detector import DetectorApi from .detector import DetectorApi
from .level0 import Level0DataApi from .level0 import Level0DataApi
from .level0prc import Level0PrcApi from .level0prc import Level0PrcApi
from .level1prc import Level1PrcApi
from .observation import ObservationApi from .observation import ObservationApi
\ No newline at end of file
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.common import from_proto_model_list
from csst_dfs_commons.models.facility import Level1PrcRecord
from csst_dfs_proto.facility.level1prc import level1prc_pb2, level1prc_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
class Level1PrcApi(object):
def __init__(self):
self.stub = level1prc_pb2_grpc.Level1PrcSrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve level1 procedure records from database
parameter kwargs:
level1_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(level1prc_pb2.FindLevel1PrcReq(
level1_id = get_parameter(kwargs, "level1_id"),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
prc_status = get_parameter(kwargs, "prc_status"),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data = from_proto_model_list(Level1PrcRecord, resp.records)).append("totalCount", resp.totalCount)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateProcStatus.with_call(
level1prc_pb2.UpdateProcStatusReq(id=id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a level1 procedure record into database
parameter kwargs:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec = level1prc_pb2.Level1PrcRecord(
id = 0,
level1_id = get_parameter(kwargs, "level1_id"),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_file_path = get_parameter(kwargs, "params_file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
result_file_path = get_parameter(kwargs, "result_file_path")
)
req = level1prc_pb2.WriteLevel1PrcReq(record = rec)
try:
resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data = Level1PrcRecord().from_proto_model(resp.record))
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
...@@ -24,7 +24,6 @@ class Level1DataApi(object): ...@@ -24,7 +24,6 @@ class Level1DataApi(object):
parameter kwargs: parameter kwargs:
level0_id: [str] level0_id: [str]
data_type: [str] data_type: [str]
obs_type: [str]
create_time : (start, end), create_time : (start, end),
qc1_status : [int], qc1_status : [int],
prc_status : [int], prc_status : [int],
...@@ -69,7 +68,7 @@ class Level1DataApi(object): ...@@ -69,7 +68,7 @@ class Level1DataApi(object):
),metadata = get_auth_headers()) ),metadata = get_auth_headers())
if resp.record is None or resp.record.id == 0: if resp.record is None or resp.record.id == 0:
return Result.error(message=f"id:{id} not found") return Result.error(message=f"id:{fits_id} not found")
return Result.ok_data(data=Level1Record().from_proto_model(resp.record)) return Result.ok_data(data=Level1Record().from_proto_model(resp.record))
...@@ -128,17 +127,12 @@ class Level1DataApi(object): ...@@ -128,17 +127,12 @@ class Level1DataApi(object):
data_type : [str] data_type : [str]
cor_sci_id : [int] cor_sci_id : [int]
prc_params : [str] prc_params : [str]
flat_id : [int]
dark_id : [int]
bias_id : [int]
lamp_id : [int]
arc_id : [int]
sky_id : [int]
filename : [str] filename : [str]
file_path : [str] file_path : [str]
prc_status : [int] prc_status : [int]
prc_time : [str] prc_time : [str]
pipeline_id : [str] pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result return csst_dfs_common.models.Result
''' '''
...@@ -149,17 +143,12 @@ class Level1DataApi(object): ...@@ -149,17 +143,12 @@ class Level1DataApi(object):
data_type = get_parameter(kwargs, "data_type"), data_type = get_parameter(kwargs, "data_type"),
cor_sci_id = get_parameter(kwargs, "cor_sci_id"), cor_sci_id = get_parameter(kwargs, "cor_sci_id"),
prc_params = get_parameter(kwargs, "prc_params"), prc_params = get_parameter(kwargs, "prc_params"),
flat_id = get_parameter(kwargs, "flat_id"),
dark_id = get_parameter(kwargs, "dark_id"),
bias_id = get_parameter(kwargs, "bias_id"),
lamp_id = get_parameter(kwargs, "lamp_id"),
arc_id = get_parameter(kwargs, "arc_id"),
sky_id = get_parameter(kwargs, "sky_id"),
filename = get_parameter(kwargs, "filename"), filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"), file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1), prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id") pipeline_id = get_parameter(kwargs, "pipeline_id"),
refs = get_parameter(kwargs, "refs", {})
) )
def stream(rec): def stream(rec):
with open(rec.file_path, 'rb') as f: with open(rec.file_path, 'rb') as f:
......
...@@ -24,7 +24,6 @@ class Level1DataApi(object): ...@@ -24,7 +24,6 @@ class Level1DataApi(object):
parameter kwargs: parameter kwargs:
level0_id: [str] level0_id: [str]
data_type: [str] data_type: [str]
obs_type: [str]
create_time : (start, end), create_time : (start, end),
qc1_status : [int], qc1_status : [int],
prc_status : [int], prc_status : [int],
...@@ -69,7 +68,7 @@ class Level1DataApi(object): ...@@ -69,7 +68,7 @@ class Level1DataApi(object):
),metadata = get_auth_headers()) ),metadata = get_auth_headers())
if resp.record is None or resp.record.id == 0: if resp.record is None or resp.record.id == 0:
return Result.error(message=f"id:{id} not found") return Result.error(message=f"id:{fits_id} not found")
return Result.ok_data(data = Level1Record().from_proto_model(resp.record)) return Result.ok_data(data = Level1Record().from_proto_model(resp.record))
...@@ -128,14 +127,12 @@ class Level1DataApi(object): ...@@ -128,14 +127,12 @@ class Level1DataApi(object):
data_type : [str] data_type : [str]
cor_sci_id : [int] cor_sci_id : [int]
prc_params : [str] prc_params : [str]
flat_id : [int]
dark_id : [int]
bias_id : [int]
filename : [str] filename : [str]
file_path : [str] file_path : [str]
prc_status : [int] prc_status : [int]
prc_time : [str] prc_time : [str]
pipeline_id : [str] pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result return csst_dfs_common.models.Result
''' '''
...@@ -146,14 +143,12 @@ class Level1DataApi(object): ...@@ -146,14 +143,12 @@ class Level1DataApi(object):
data_type = get_parameter(kwargs, "data_type"), data_type = get_parameter(kwargs, "data_type"),
cor_sci_id = get_parameter(kwargs, "cor_sci_id"), cor_sci_id = get_parameter(kwargs, "cor_sci_id"),
prc_params = get_parameter(kwargs, "prc_params"), prc_params = get_parameter(kwargs, "prc_params"),
flat_id = get_parameter(kwargs, "flat_id"),
dark_id = get_parameter(kwargs, "dark_id"),
bias_id = get_parameter(kwargs, "bias_id"),
filename = get_parameter(kwargs, "filename", ""), filename = get_parameter(kwargs, "filename", ""),
file_path = get_parameter(kwargs, "file_path", ""), file_path = get_parameter(kwargs, "file_path", ""),
prc_status = get_parameter(kwargs, "prc_status", -1), prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id") pipeline_id = get_parameter(kwargs, "pipeline_id"),
refs = get_parameter(kwargs, "refs", {})
) )
def stream(rec): def stream(rec):
with open(rec.file_path, 'rb') as f: with open(rec.file_path, 'rb') as f:
......
from .level1 import Level1DataApi
from .level2spectra import Level2SpectraApi
\ No newline at end of file
import os
import grpc
import datetime
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.common import from_proto_model_list
from csst_dfs_commons.models.sls import Level1Record
from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE
from csst_dfs_proto.sls.level1 import level1_pb2, level1_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
class Level1DataApi(object):
"""
Level1 Data Operation Class
"""
def __init__(self):
self.stub = level1_pb2_grpc.Level1SrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
level0_id: [str]
data_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(level1_pb2.FindLevel1Req(
level0_id = get_parameter(kwargs, "level0_id"),
data_type = get_parameter(kwargs, "data_type"),
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0],
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1],
qc1_status = get_parameter(kwargs, "qc1_status"),
prc_status = get_parameter(kwargs, "prc_status"),
filename = get_parameter(kwargs, "filename"),
limit = get_parameter(kwargs, "limit", 0),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try:
fits_id = get_parameter(kwargs, "id")
resp, _ = self.stub.Get.with_call(level1_pb2.GetLevel1Req(
id = fits_id
),metadata = get_auth_headers())
if resp.record is None or resp.record.id == 0:
return Result.error(message=f"id:{fits_id} not found")
return Result.ok_data(data = Level1Record().from_proto_model(resp.record))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateProcStatus.with_call(
level1_pb2.UpdateProcStatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_qc1_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateQc1Status.with_call(
level1_pb2.UpdateQc1StatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
level0_id : [str]
data_type : [str]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result
'''
rec = level1_pb2.Level1Record(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
data_type = get_parameter(kwargs, "data_type"),
prc_params = get_parameter(kwargs, "prc_params"),
filename = get_parameter(kwargs, "filename", ""),
file_path = get_parameter(kwargs, "file_path", ""),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
refs = get_parameter(kwargs, "refs", {})
)
def stream(rec):
with open(rec.file_path, 'rb') as f:
while True:
data = f.read(UPLOAD_CHUNK_SIZE)
if not data:
break
yield level1_pb2.WriteLevel1Req(record = rec, data = data)
try:
if not rec.file_path:
return Result.error(message="file_path is blank")
if not os.path.exists(rec.file_path):
return Result.error(message="the file [%s] not existed" % (rec.file_path, ))
if not rec.filename:
rec.filename = os.path.basename(rec.file_path)
resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=Level1Record().from_proto_model(resp.record))
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
import os
import grpc
import datetime
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.common import from_proto_model_list
from csst_dfs_commons.models.sls import Level2Spectra
from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE
from csst_dfs_proto.sls.level2spectra import level2spectra_pb2, level2spectra_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
class Level2SpectraApi(object):
"""
Level2spectra Data Operation Class
"""
def __init__(self):
self.stub = level2spectra_pb2_grpc.Level2spectraSrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve level2spectra records from database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
spectra_id: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(level2spectra_pb2.FindLevel2spectraReq(
level1_id = get_parameter(kwargs, "level1_id",0),
spectra_id = get_parameter(kwargs, "spectra_id"),
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0],
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1],
qc1_status = get_parameter(kwargs, "qc1_status"),
prc_status = get_parameter(kwargs, "prc_status"),
filename = get_parameter(kwargs, "filename"),
limit = get_parameter(kwargs, "limit", 0),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=from_proto_model_list(Level2Spectra, resp.records)).append("totalCount", resp.totalCount)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try:
fits_id = get_parameter(kwargs, "id")
resp, _ = self.stub.Get.with_call(level2spectra_pb2.GetLevel2spectraReq(
id = fits_id
),metadata = get_auth_headers())
if resp.record is None or resp.record.id == 0:
return Result.error(message=f"id:{fits_id} not found")
return Result.ok_data(data = Level2Spectra().from_proto_model(resp.record))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateProcStatus.with_call(
level2spectra_pb2.UpdateProcStatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_qc1_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateQc1Status.with_call(
level2spectra_pb2.UpdateQc1StatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a level2spectra record into database
parameter kwargs:
level1_id: [int]
spectra_id : [str]
region : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result
'''
rec = level2spectra_pb2.Level2spectraRecord(
id = 0,
level1_id = get_parameter(kwargs, "level1_id", 0),
spectra_id = get_parameter(kwargs, "spectra_id"),
region = get_parameter(kwargs, "region"),
filename = get_parameter(kwargs, "filename", ""),
file_path = get_parameter(kwargs, "file_path", ""),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id")
)
def stream(rec):
with open(rec.file_path, 'rb') as f:
while True:
data = f.read(UPLOAD_CHUNK_SIZE)
if not data:
break
yield level2spectra_pb2.WriteLevel2spectraReq(record = rec, data = data)
try:
if not rec.file_path:
return Result.error(message="file_path is blank")
if not os.path.exists(rec.file_path):
return Result.error(message="the file [%s] not existed" % (rec.file_path, ))
if not rec.filename:
rec.filename = os.path.basename(rec.file_path)
resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=Level2Spectra().from_proto_model(resp.record))
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
...@@ -15,30 +15,30 @@ class MSCLevel1DataTestCase(unittest.TestCase): ...@@ -15,30 +15,30 @@ class MSCLevel1DataTestCase(unittest.TestCase):
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs) print('find:', recs)
def test_get(self): # def test_get(self):
rec = self.api.get(id = 2) # rec = self.api.get(id = 2)
print('get:', rec) # print('get:', rec)
def test_update_proc_status(self): # def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 2, status = 4) # rec = self.api.update_proc_status(id = 2, status = 4)
print('update_proc_status:', rec) # print('update_proc_status:', rec)
def test_update_qc1_status(self): # def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 2, status = 7) # rec = self.api.update_qc1_status(id = 2, status = 7)
print('update_qc1_status:', rec) # print('update_qc1_status:', rec)
def test_write(self): # def test_write(self):
rec = self.api.write( # rec = self.api.write(
level0_id='1', # level0_id='1',
data_type = "sci", # data_type = "sci",
cor_sci_id = 1, # cor_sci_id = 1,
prc_params = "/opt/dddasd.params", # prc_params = "/opt/dddasd.params",
flat_id = 1, # flat_id = 1,
dark_id = 2, # dark_id = 2,
bias_id = 3, # bias_id = 3,
prc_status = 3, # prc_status = 3,
prc_time = '2021-06-04 11:12:13', # prc_time = '2021-06-04 11:12:13',
filename = "MSC_MS_210525121500_100000001_09_raw", # filename = "MSC_MS_210525121500_100000001_09_raw",
file_path = "/opt/temp/csst/MSC_MS_210525121500_100000001_09_raw.fits", # file_path = "/opt/temp/csst/MSC_MS_210525121500_100000001_09_raw.fits",
pipeline_id = "P1") # pipeline_id = "P1")
print('write:', rec) # print('write:', rec)
\ No newline at end of file \ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment