Commit dfa8873a authored by Wei Shoulin's avatar Wei Shoulin
Browse files

c3

parent 6ebbd274
......@@ -36,7 +36,7 @@ class CatalogApi(object):
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = resp.message)
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
......
from .calmerge import CalMergeApi
from .detector import DetectorApi
from .level0 import Level0DataApi
from .level0prc import Level0PrcApi
from .observation import ObservationApi
\ No newline at end of file
import os
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_proto.ifs.fits import fits_pb2, fits_pb2_grpc
from csst_dfs_proto.facility.calmerge import calmerge_pb2, calmerge_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
from ..common.constants import UPLOAD_CHUNK_SIZE
class FitsApi(object):
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.stub = fits_pb2_grpc.FitsSrvStub(ServiceProxy().channel())
class CalMergeApi(object):
def __init__(self):
self.stub = calmerge_pb2_grpc.CalMergeSrvStub(ServiceProxy().channel())
def find(self, **kwargs):
'''
''' retrieve calibration merge records from database
parameter kwargs:
obs_time = [int],
file_name = [str],
exp_time = (start, end),
ccd_num = [int],
qc0_status = [int],
prc_status = [int],
limit = [int]
return list of raw records
detector_no: [str]
ref_type: [str]
obs_time: (start,end)
qc1_status : [int]
prc_status : [int]
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(fits_pb2.FindRawFitsReq(
obs_time = get_parameter(kwargs, "obs_time"),
file_name = get_parameter(kwargs, "file_name"),
exp_time_start = get_parameter(kwargs, "exp_time", [None, None])[0],
exp_time_end = get_parameter(kwargs, "exp_time", [None, None])[1],
ccd_num = get_parameter(kwargs, "ccd_num"),
qc0_status = get_parameter(kwargs, "qc0_status"),
resp, _ = self.stub.Find.with_call(calmerge_pb2.FindCalMergeReq(
detector_no = get_parameter(kwargs, "detector_no"),
ref_type = get_parameter(kwargs, "ref_type"),
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0],
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1],
qc1_status = get_parameter(kwargs, "qc1_status"),
prc_status = get_parameter(kwargs, "prc_status"),
file_name = get_parameter(kwargs, "file_name"),
limit = get_parameter(kwargs, "limit"),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.rawFits).append("totalCount", resp.totalCount)
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = resp.message)
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get(self, **kwargs):
''' query database, return a record as dict
''' fetch a record from database
parameter kwargs:
fits_id = [int]
id : [int]
return dict or None
return csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Get.with_call(fits_pb2.GetRawFitsReq(
fits_id = get_parameter(kwargs, "fits_id")
id = get_parameter(kwargs, "id")
resp, _ = self.stub.Get.with_call(calmerge_pb2.GetCalMergeReq(
id = id
),metadata = get_auth_headers())
return Result.ok_data(data=resp)
if resp.record is None:
return Result.error(message=f"id:{id} not found")
return Result.ok_data(data=resp.record)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
return Result.error(message="%s:%s" % (e.code().value, e.details))
def read(self, **kwargs):
''' yield bytes of fits file
def update_qc1_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
fits_id = [int],
file_path = [str],
chunk_size = [int] default 20480
id : [int],
status : [int]
yield bytes of fits file
return csst_dfs_common.models.Result
'''
try:
streams = self.stub.Read.with_call(fits_pb2.ReadRawFitsReq(
fits_id = get_parameter(kwargs, "fits_id"),
file_path = get_parameter(kwargs, "file_path"),
chunk_size = get_parameter(kwargs, "chunk_size", 20480)
),metadata = get_auth_headers())
for stream in streams:
yield stream.data
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateQc1Status.with_call(
calmerge_pb2.UpdateQc1StatusReq(id=id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
......@@ -92,66 +95,62 @@ class FitsApi(object):
''' update the status of reduction
parameter kwargs:
fits_id = [int],
status = [int]
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status"),
try:
resp,_ = self.stub.update_qc0_status.with_call(
fits_pb2.UpdateProcStatusReq(fits_id=fits_id, status=status),
metadata = get_auth_headers()
)
return Result.ok_data(data=resp)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
def update_qc0_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
fits_id = [int],
status = [int]
'''
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status"),
try:
resp,_ = self.stub.update_qc0_status.with_call(
fits_pb2.UpdateQc0StatusReq(fits_id=fits_id, status=status),
resp,_ = self.stub.UpdateProcStatus.with_call(
calmerge_pb2.UpdateProcStatusReq(id=id, status=status),
metadata = get_auth_headers()
)
return Result.ok_data(data=resp)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' copy a local file to file storage, then reduce the header of fits file and insert a record into database
''' insert a calibration merge record into database
parameter kwargs:
file_path = [str]
id : [int]
detector_no : [str]
ref_type : [str]
obs_time : [str]
exp_time : [float]
prc_status : [int]
prc_time : [str]
filename : [str]
file_path : [str]
level0_ids : [list]
return csst_dfs_common.models.Result
'''
file_path = get_parameter(kwargs, "file_path")
if os.path.exists(file_path):
raise Exception("%s file not found" % (file_path))
def stream(v_file_path):
v_file_name = os.path.basename(v_file_path)
with open(v_file_path, 'rb') as f:
while True:
data = f.read(UPLOAD_CHUNK_SIZE)
if not data:
break
yield fits_pb2.WriteRawFitsReq(
file_name=v_file_name,
fitsData=data)
rec = calmerge_pb2.CalMergeRecord(
id = 0,
detector_no = get_parameter(kwargs, "detector_no"),
ref_type = get_parameter(kwargs, "ref_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status"),
prc_time = get_parameter(kwargs, "prc_time"),
level0_ids = get_parameter(kwargs, "level0_ids", [])
)
req = calmerge_pb2.WriteCalMergeReq(record = rec)
try:
resp,_ = self.stub.Write.with_call(stream(file_path),
metadata = get_auth_headers())
return Result.ok_data(data=resp)
resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.record)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_proto.facility.detector import detector_pb2, detector_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
class DetectorApi(object):
def __init__(self):
self.stub = detector_pb2_grpc.DetectorSrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve detector records from database
parameter kwargs:
module_id: [str]
key: [str]
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(detector_pb2.FindDetectorReq(
module_id = get_parameter(kwargs, "module_id"),
key = get_parameter(kwargs, "key")
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
no : [str]
return csst_dfs_common.models.Result
'''
try:
no = get_parameter(kwargs, "no")
resp, _ = self.stub.Get.with_call(detector_pb2.GetDetectorReq(
no = no
),metadata = get_auth_headers())
if not resp.record.no:
return Result.error(message=f"no:{no} not found")
return Result.ok_data(data=resp.record)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update(self, **kwargs):
''' update a detector by no
parameter kwargs:
no : [str],
detector_name : [str],
module_id : [str],
filter_id : [str]
return csst_dfs_common.models.Result
'''
try:
no = get_parameter(kwargs, "no")
result_get = self.get(no=no)
if not result_get.success:
return result_get
record = detector_pb2.Detector(
no = no,
detector_name = get_parameter(kwargs, "detector_name", result_get.data.detector_name),
module_id = get_parameter(kwargs, "module_id", result_get.data.module_id),
filter_id = get_parameter(kwargs, "filter_id", result_get.data.filter_id)
)
resp,_ = self.stub.Update.with_call(
detector_pb2.UpdateDetectorReq(record=record),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def delete(self, **kwargs):
''' delete a detector by no
parameter kwargs:
no : [str]
return csst_dfs_common.models.Result
'''
no = get_parameter(kwargs, "no")
try:
resp,_ = self.stub.Delete.with_call(
detector_pb2.DeleteDetectorReq(no=no),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a detector record into database
parameter kwargs:
no : [str],
detector_name : [str],
module_id : [str],
filter_id : [str]
return csst_dfs_common.models.Result
'''
rec = detector_pb2.Detector(
no = get_parameter(kwargs, "no"),
detector_name = get_parameter(kwargs, "detector_name"),
module_id = get_parameter(kwargs, "module_id"),
filter_id = get_parameter(kwargs, "filter_id")
)
req = detector_pb2.WriteDetectorReq(record = rec)
try:
resp,_ = self.stub.Write.with_call(req, metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.record)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def find_status(self, **kwargs):
''' retrieve a detector status's from database
parameter kwargs:
detector_no: [str]
status_occur_time: (begin,end)
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.FindStatus.with_call(detector_pb2.FindStatusReq(
detector_no = get_parameter(kwargs, "detector_no"),
status_begin_time = get_parameter(kwargs, "status_occur_time", [None, None])[0],
status_end_time = get_parameter(kwargs, "status_occur_time", [None, None])[1],
limit = get_parameter(kwargs, "limit", 0)
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get_status(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try:
id = get_parameter(kwargs, "id")
resp, _ = self.stub.GetStatus.with_call(detector_pb2.GetStatusReq(
id = id
),metadata = get_auth_headers())
if resp.record is None:
return Result.error(message=f"id:{id} not found")
return Result.ok_data(data=resp.record)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write_status(self, **kwargs):
''' insert a detector status into database
parameter kwargs:
detector_no : [str],
status : [str],
status_time : [str]
return csst_dfs_common.models.Result
'''
rec = detector_pb2.DetectorStatus(
id = 0,
detector_no = get_parameter(kwargs, "detector_no"),
status = get_parameter(kwargs, "status"),
status_time = get_parameter(kwargs, "status_time")
)
req = detector_pb2.WriteStatusReq(record = rec)
try:
resp,_ = self.stub.WriteStatus.with_call(req, metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.record)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
\ No newline at end of file
......@@ -17,7 +17,7 @@ class Level0DataApi(object):
obs_id: [int]
detector_no: [str]
obs_type: [str]
exp_time : (start, end),
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
......@@ -30,8 +30,8 @@ class Level0DataApi(object):
obs_id = get_parameter(kwargs, "obs_id"),
detector_no = get_parameter(kwargs, "detector_no"),
obs_type = get_parameter(kwargs, "obs_type"),
exp_time_start = get_parameter(kwargs, "exp_time", [None, None])[0],
exp_time_end = get_parameter(kwargs, "exp_time", [None, None])[1],
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0],
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1],
qc0_status = get_parameter(kwargs, "qc0_status"),
prc_status = get_parameter(kwargs, "prc_status"),
file_name = get_parameter(kwargs, "file_name"),
......@@ -42,7 +42,7 @@ class Level0DataApi(object):
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = resp.message)
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
......@@ -69,29 +69,6 @@ class Level0DataApi(object):
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def read(self, **kwargs):
''' yield bytes of fits file
parameter kwargs:
fits_id = [int],
file_path = [str],
chunk_size = [int] default 20480
yield bytes of fits file
'''
try:
streams = self.stub.Read.with_call(level0_pb2.ReadLevel0DatasReq(
id = get_parameter(kwargs, "fits_id"),
file_path = get_parameter(kwargs, "file_path"),
chunk_size = get_parameter(kwargs, "chunk_size", 20480)
),metadata = get_auth_headers())
for stream in streams:
yield stream.data
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
......@@ -136,5 +113,39 @@ class Level0DataApi(object):
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a level0 data record into database
parameter kwargs:
obs_id = [int]
detector_no = [str]
obs_type = [str]
obs_time = [str]
exp_time = [int]
detector_status_id = [int]
filename = [str]
file_path = [str]
return: csst_dfs_common.models.Result
'''
rec = level0_pb2.Level0Record(
obs_id = get_parameter(kwargs, "obs_id"),
detector_no = get_parameter(kwargs, "detector_no"),
obs_type = get_parameter(kwargs, "obs_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
detector_status_id = get_parameter(kwargs, "detector_status_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path")
)
req = level0_pb2.WriteLevel0DataReq(record = rec)
try:
resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.record)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
......@@ -33,7 +33,7 @@ class Level0PrcApi(object):
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = resp.message)
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
......@@ -42,7 +42,7 @@ class Level0PrcApi(object):
''' update the status of reduction
parameter kwargs:
obs_id : [int],
id : [int],
status : [int]
return csst_dfs_common.models.Result
......
......@@ -8,6 +8,9 @@ from ..common.utils import *
from ..common.constants import UPLOAD_CHUNK_SIZE
class ObservationApi(object):
"""
Observation Operation Class
"""
def __init__(self):
self.stub = observation_pb2_grpc.ObservationSrvStub(ServiceProxy().channel())
......@@ -17,7 +20,7 @@ class ObservationApi(object):
parameter kwargs:
module_id: [str]
obs_type: [str]
exp_time : (start, end),
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
limit: limits returns the number of records,default 0:no-limit
......@@ -28,8 +31,8 @@ class ObservationApi(object):
resp, _ = self.stub.Find.with_call(observation_pb2.FindObservationReq(
module_id = get_parameter(kwargs, "module_id"),
obs_type = get_parameter(kwargs, "obs_type"),
exp_time_start = get_parameter(kwargs, "exp_time", [None, None])[0],
exp_time_end = get_parameter(kwargs, "exp_time", [None, None])[1],
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0],
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1],
qc0_status = get_parameter(kwargs, "qc0_status"),
prc_status = get_parameter(kwargs, "prc_status"),
limit = get_parameter(kwargs, "limit", 0),
......@@ -114,22 +117,24 @@ class ObservationApi(object):
''' insert a observational record into database
parameter kwargs:
file_path : [str]
obs_time = [str]
exp_time = [int]
module_id = [str]
obs_type = [str]
facility_status_id = [int]
module_status_id = [int]
return: csst_dfs_common.models.Result
'''
exp_begin_time = get_parameter(kwargs, "exp_begin_time")
exp_end_time = get_parameter(kwargs, "exp_end_time")
module_id = get_parameter(kwargs, "module_id")
obs_type = get_parameter(kwargs, "obs_type")
facility_status_id = get_parameter(kwargs, "facility_status_id")
module_status_id = get_parameter(kwargs, "module_status_id")
req = observation_pb2.WriteObservationReq(
exp_begin_time=exp_begin_time,
exp_end_time=exp_end_time,
module_id=module_id,
obs_type=obs_type,
facility_status_id=facility_status_id,
module_status_id=module_status_id)
rec = observation_pb2.Observation(
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
module_id = get_parameter(kwargs, "module_id"),
obs_type = get_parameter(kwargs, "obs_type"),
facility_status_id = get_parameter(kwargs, "facility_status_id"),
module_status_id = get_parameter(kwargs, "module_status_id")
)
req = observation_pb2.WriteObservationReq(record = rec)
try:
resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers())
if resp.success:
......
from .fits import FitsApi
from .reffits import RefFitsApi
from .result0 import Result0Api
from .result1 import Result1Api
\ No newline at end of file
from .level1 import Level1DataApi
\ No newline at end of file
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_proto.ifs.level1 import level1_pb2, level1_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
class Level1DataApi(object):
"""
Level1 Data Operation Class
"""
def __init__(self):
self.stub = level1_pb2_grpc.Level1SrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
raw_id: [int]
data_type: [str]
obs_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(level1_pb2.FindLevel1Req(
raw_id = get_parameter(kwargs, "raw_id"),
data_type = get_parameter(kwargs, "data_type"),
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0],
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1],
qc1_status = get_parameter(kwargs, "qc1_status"),
prc_status = get_parameter(kwargs, "prc_status"),
filename = get_parameter(kwargs, "filename"),
limit = get_parameter(kwargs, "limit", 0),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try:
fits_id = get_parameter(kwargs, "id")
resp, _ = self.stub.Get.with_call(level1_pb2.GetLevel1Req(
id = fits_id
),metadata = get_auth_headers())
if resp.record is None:
return Result.error(message=f"id:{id} not found")
return Result.ok_data(data=resp.record)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateProcStatus.with_call(
level1_pb2.UpdateProcStatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_qc1_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateQc1Status.with_call(
level1_pb2.UpdateQc1StatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
raw_id : [int]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
flat_id : [int]
dark_id : [int]
bias_id : [int]
lamp_id : [int]
arc_id : [int]
sky_id : [int]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
return csst_dfs_common.models.Result
'''
rec = level1_pb2.Level1Record(
id = 0,
raw_id = get_parameter(kwargs, "raw_id"),
data_type = get_parameter(kwargs, "data_type"),
cor_sci_id = get_parameter(kwargs, "cor_sci_id"),
prc_params = get_parameter(kwargs, "prc_params"),
flat_id = get_parameter(kwargs, "flat_id"),
dark_id = get_parameter(kwargs, "dark_id"),
bias_id = get_parameter(kwargs, "bias_id"),
lamp_id = get_parameter(kwargs, "lamp_id"),
arc_id = get_parameter(kwargs, "arc_id"),
sky_id = get_parameter(kwargs, "sky_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status"),
prc_time = get_parameter(kwargs, "prc_time"),
pipeline_id = get_parameter(kwargs, "pipeline_id")
)
req = level1_pb2.WriteLevel1Req(record = rec)
try:
resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.record)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
import os
import grpc
from ..common.service import ServiceProxy
from ..common.utils import *
from ..common.constants import *
import grpc
class RefFitsApi(object):
def __init__(self, sub_system):
self.sub_system = sub_system
import os
import grpc
from ..common.service import ServiceProxy
from ..common.utils import *
from ..common.constants import *
import grpc
class Result0Api(object):
def __init__(self, sub_system):
self.sub_system = sub_system
import os
import grpc
from ..common.service import ServiceProxy
from ..common.utils import *
from ..common.constants import *
import grpc
class Result1Api(object):
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
from .level1 import Level1DataApi
\ No newline at end of file
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_proto.msc.level1 import level1_pb2, level1_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
class Level1DataApi(object):
"""
Level1 Data Operation Class
"""
def __init__(self):
self.stub = level1_pb2_grpc.Level1SrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
raw_id: [int]
data_type: [str]
obs_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(level1_pb2.FindLevel1Req(
raw_id = get_parameter(kwargs, "raw_id"),
data_type = get_parameter(kwargs, "data_type"),
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0],
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1],
qc1_status = get_parameter(kwargs, "qc1_status"),
prc_status = get_parameter(kwargs, "prc_status"),
filename = get_parameter(kwargs, "filename"),
limit = get_parameter(kwargs, "limit", 0),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try:
fits_id = get_parameter(kwargs, "id")
resp, _ = self.stub.Get.with_call(level1_pb2.GetLevel1Req(
id = fits_id
),metadata = get_auth_headers())
if resp.record is None:
return Result.error(message=f"id:{id} not found")
return Result.ok_data(data=resp.record)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateProcStatus.with_call(
level1_pb2.UpdateProcStatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_qc1_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateQc1Status.with_call(
level1_pb2.UpdateQc1StatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
raw_id : [int]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
flat_id : [int]
dark_id : [int]
bias_id : [int]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
return csst_dfs_common.models.Result
'''
rec = level1_pb2.Level1Record(
id = 0,
raw_id = get_parameter(kwargs, "raw_id"),
data_type = get_parameter(kwargs, "data_type"),
cor_sci_id = get_parameter(kwargs, "cor_sci_id"),
prc_params = get_parameter(kwargs, "prc_params"),
flat_id = get_parameter(kwargs, "flat_id"),
dark_id = get_parameter(kwargs, "dark_id"),
bias_id = get_parameter(kwargs, "bias_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status"),
prc_time = get_parameter(kwargs, "prc_time"),
pipeline_id = get_parameter(kwargs, "pipeline_id")
)
req = level1_pb2.WriteLevel1Req(record = rec)
try:
resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.record)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.facility.calmerge import CalMergeApi
class CalMergeApiTestCase(unittest.TestCase):
def setUp(self):
self.api = CalMergeApi()
def test_find(self):
recs = self.api.find(detector_no='CCD01',
ref_type = "bias",
obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 3)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 3, status = 1)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 3, status = 2)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(detector_no='CCD01',
ref_type = "bias",
obs_time = "2021-06-04 11:12:13",
exp_time = 150,
filename = "/opt/dddasd.params",
file_path = "/opt/dddasd.fits",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
level0_ids = [1,2,3,4])
print('write:', rec)
\ No newline at end of file
import unittest
from csst_dfs_api_cluster.facility.detector import DetectorApi
class DetectorApiTestCase(unittest.TestCase):
def setUp(self):
self.api = DetectorApi()
def test_find(self):
recs = self.api.find(module_id = 'MSC', key = 'CCD')
print('find:', recs)
def test_get(self):
rec = self.api.get(no = 'CCD01')
print('get:', rec)
def test_write(self):
rec = self.api.write(no = 'CCD02',
detector_name = 'CCD02',
module_id = 'MSC',
filter_id='f2')
print('write:', rec)
def test_update(self):
rec = self.api.update(no = 'CCD01', filter_id = 'f1')
print('update:', rec)
def test_delete(self):
rec = self.api.delete(no = 'CCD01')
print('delete:', rec)
def test_find_status(self):
recs = self.api.find_status(detector_no = 'CCD01',
status_occur_time = ('2021-06-02','2021-06-08'),
limit = 0)
print('find status:', recs)
def test_get_status(self):
rec = self.api.get_status(id = 2)
print('get status:', rec)
def test_write_status(self):
rec = self.api.write_status(detector_no = 'CCD01', status = '{........}',status_time='2021-06-05 12:12:13')
print('write status:', rec)
\ No newline at end of file
......@@ -23,4 +23,16 @@ class Level0DataTestCase(unittest.TestCase):
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(fits_id = 100, status = 7)
print('update_qc0_status:', rec)
\ No newline at end of file
print('update_qc0_status:', rec)
def test_write(self):
rec = self.api.write(
obs_id = 13,
detector_no = "CCD01",
obs_type = "sci",
obs_time = "2021-06-06 11:12:13",
exp_time = 150,
detector_status_id = 3,
filename = "MSC_00001234",
file_path = "/opt/MSC_00001234.fits")
print('write:', rec)
\ No newline at end of file
......@@ -23,4 +23,14 @@ class FacilityObservationTestCase(unittest.TestCase):
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(obs_id = 9, status = 3, )
print('update_qc0_status:', rec)
\ No newline at end of file
print('update_qc0_status:', rec)
def test_write(self):
rec = self.api.write(
obs_time = "2021-06-06 11:12:13",
exp_time = 150,
module_id = "MSC",
obs_type = "sci",
facility_status_id = 3,
module_status_id = 3)
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.ifs import FitsApi
class IFSFitsTestCase(unittest.TestCase):
def setUp(self):
self.api = FitsApi()
# def test_find(self):
# recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits')
# print('find:', recs)
# assert len(recs) == 1
# recs = self.api.find()
# print('find:', recs)
# assert len(recs) > 1
def test_get(self):
rec = self.api.get(fits_id=0)
print('get:', rec)
# def test_read(self):
# recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits')
# print("The full path: ", os.path.join(self.api.root_dir, recs[0]['file_path']))
# file_segments = self.api.read(file_path=recs[0]['file_path'])
# file_bytes = b''.join(file_segments)
# hdul = fits.HDUList.fromstring(file_bytes)
# print(hdul.info())
# hdr = hdul[0].header
# print(repr(hdr))
# def test_update_proc_status(self):
# recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits')
# self.api.update_proc_status(fits_id=recs[0]['id'],status=1)
# rec = self.api.get(fits_id=recs[0]['id'])
# assert rec['prc_status'] == 1
# def test_update_qc0_status(self):
# recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits')
# self.api.update_qc0_status(fits_id=recs[0]['id'],status=1)
# rec = self.api.get(fits_id=recs[0]['id'])
# assert rec['qc0_status'] == 1
# def test_write(self):
# recs = self.api.write(file_path='/opt/temp/csst_ifs/CCD2_ObsTime_1200_ObsNum_40.fits')
# recs = self.api.find(file_name='CCD2_ObsTime_1200_ObsNum_40.fits')
# rec = self.api.get(fits_id=recs[0]['id'])
# print(rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.ifs.level1 import Level1DataApi
class IFSLevel1DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1DataApi()
def test_find(self):
recs = self.api.find(raw_id=1,
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 2)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 2, status = 4)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 2, status = 7)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(raw_id=11,
data_type = "sci",
cor_sci_id = 2,
prc_params = "/opt/dddasd.params",
flat_id = 1,
dark_id = 2,
bias_id = 3,
lamp_id = 4,
arc_id = 5,
sky_id = 6,
prc_status = 3,
prc_time = '2021-06-05 11:12:13',
filename = "dddasd",
file_path = "/opt/dddasd.fits",
pipeline_id = "P2")
print('write:', rec)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment