From 68697532eb9df325ce8ce792c812e94b45fd6a11 Mon Sep 17 00:00:00 2001 From: shoulinwei Date: Sat, 3 Dec 2022 22:06:21 +0800 Subject: [PATCH] un level0 --- csst_dfs_api_cluster/common/service.py | 3 +- csst_dfs_api_cluster/cpic/__init__.py | 2 + .../{ifs/level0.py => cpic/level1.py} | 135 ++++++------ .../{msc/level0prc.py => cpic/level1prc.py} | 32 +-- csst_dfs_api_cluster/facility/__init__.py | 5 +- .../{sls => facility}/calmerge.py | 4 +- .../{msc => facility}/level0.py | 45 ++-- .../{sls => facility}/level0prc.py | 4 +- csst_dfs_api_cluster/ifs/__init__.py | 3 - csst_dfs_api_cluster/ifs/calmerge.py | 195 ------------------ csst_dfs_api_cluster/mci/__init__.py | 2 + csst_dfs_api_cluster/mci/level1.py | 176 ++++++++++++++++ .../{ifs/level0prc.py => mci/level1prc.py} | 32 +-- csst_dfs_api_cluster/msc/__init__.py | 3 - csst_dfs_api_cluster/msc/calmerge.py | 195 ------------------ csst_dfs_api_cluster/sls/__init__.py | 3 - csst_dfs_api_cluster/sls/level0.py | 167 --------------- csst_dfs_api_cluster/sls/level1.py | 36 +++- 18 files changed, 349 insertions(+), 693 deletions(-) rename csst_dfs_api_cluster/{ifs/level0.py => cpic/level1.py} (55%) rename csst_dfs_api_cluster/{msc/level0prc.py => cpic/level1prc.py} (76%) rename csst_dfs_api_cluster/{sls => facility}/calmerge.py (98%) rename csst_dfs_api_cluster/{msc => facility}/level0.py (79%) rename csst_dfs_api_cluster/{sls => facility}/level0prc.py (96%) delete mode 100644 csst_dfs_api_cluster/ifs/calmerge.py create mode 100644 csst_dfs_api_cluster/mci/level1.py rename csst_dfs_api_cluster/{ifs/level0prc.py => mci/level1prc.py} (76%) delete mode 100644 csst_dfs_api_cluster/msc/calmerge.py delete mode 100644 csst_dfs_api_cluster/sls/level0.py diff --git a/csst_dfs_api_cluster/common/service.py b/csst_dfs_api_cluster/common/service.py index 2b05da3..11461b6 100644 --- a/csst_dfs_api_cluster/common/service.py +++ b/csst_dfs_api_cluster/common/service.py @@ -8,7 +8,8 @@ class ServiceProxy: def channel(self): options = (('grpc.max_send_message_length', 1000 * 1024 * 1024), ('grpc.max_receive_message_length', 1000 * 1024 * 1024)) - channel = grpc.insecure_channel(self.gateway, options = options, compression = grpc.Compression.Gzip) + # channel = grpc.insecure_channel(self.gateway, options = options, compression = grpc.Compression.Gzip) + channel = grpc.insecure_channel(self.gateway, options = options) try: grpc.channel_ready_future(channel).result(timeout=10) except grpc.FutureTimeoutError: diff --git a/csst_dfs_api_cluster/cpic/__init__.py b/csst_dfs_api_cluster/cpic/__init__.py index e69de29..d4f4fff 100644 --- a/csst_dfs_api_cluster/cpic/__init__.py +++ b/csst_dfs_api_cluster/cpic/__init__.py @@ -0,0 +1,2 @@ +from .level1 import Level1DataApi +from .level1prc import Level1PrcApi \ No newline at end of file diff --git a/csst_dfs_api_cluster/ifs/level0.py b/csst_dfs_api_cluster/cpic/level1.py similarity index 55% rename from csst_dfs_api_cluster/ifs/level0.py rename to csst_dfs_api_cluster/cpic/level1.py index 84fe6d8..034c032 100644 --- a/csst_dfs_api_cluster/ifs/level0.py +++ b/csst_dfs_api_cluster/cpic/level1.py @@ -1,60 +1,52 @@ +import os import grpc +import datetime from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.ifs import Level0Record +from csst_dfs_commons.models.cpic import Level1Record from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE - -from csst_dfs_proto.ifs.level0 import level0_pb2, level0_pb2_grpc +from csst_dfs_proto.cpic.level1 import level1_pb2, level1_pb2_grpc from ..common.service import ServiceProxy from ..common.utils import * -class Level0DataApi(object): +class Level1DataApi(object): + """ + Level1 Data Operation Class + """ def __init__(self): - self.stub = level0_pb2_grpc.Level0SrvStub(ServiceProxy().channel()) + self.stub = level1_pb2_grpc.Level1SrvStub(ServiceProxy().channel()) def find(self, **kwargs): - ''' retrieve level0 records from database + ''' retrieve level1 records from database parameter kwargs: - obs_id: [str], - detector_no: [str], - obs_type: [str], - object_name: [str], - obs_time : (start, end), - qc0_status : [int], + level0_id: [str] + data_type: [str] + create_time : (start, end), + qc1_status : [int], prc_status : [int], - file_name: [str], - version: [str], - ra: [float], - dec: [float], - radius: [float], + filename: [str] limit: limits returns the number of records,default 0:no-limit return: csst_dfs_common.models.Result ''' try: - resp, _ = self.stub.Find.with_call(level0_pb2.FindLevel0DataReq( - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no"), - obs_type = get_parameter(kwargs, "obs_type"), - exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - qc0_status = get_parameter(kwargs, "qc0_status"), + resp, _ = self.stub.Find.with_call(level1_pb2.FindLevel1Req( + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type"), + create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], + create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], + qc1_status = get_parameter(kwargs, "qc1_status"), prc_status = get_parameter(kwargs, "prc_status"), - file_name = get_parameter(kwargs, "file_name"), - object_name = get_parameter(kwargs, "object_name"), - version = get_parameter(kwargs, "version"), - ra = get_parameter(kwargs, "ra"), - dec = get_parameter(kwargs, "dec"), - radius = get_parameter(kwargs, "radius"), + filename = get_parameter(kwargs, "filename"), limit = get_parameter(kwargs, "limit", 0), other_conditions = {"test":"cnlab.test"} ),metadata = get_auth_headers()) if resp.success: - return Result.ok_data(data=from_proto_model_list(Level0Record, resp.records)).append("totalCount", resp.totalCount) + return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount) else: return Result.error(message = str(resp.error.detail)) @@ -65,23 +57,21 @@ class Level0DataApi(object): ''' fetch a record from database parameter kwargs: - id : [int], - level0_id: [str] + id : [int] return csst_dfs_common.models.Result ''' try: - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") - resp, _ = self.stub.Get.with_call(level0_pb2.GetLevel0DataReq( - id = id, - level0_id = level0_id + resp, _ = self.stub.Get.with_call(level1_pb2.GetLevel1Req( + id = get_parameter(kwargs, "id"), + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type") ),metadata = get_auth_headers()) if resp.record is None or resp.record.id == 0: - return Result.error(message=f"not found") + return Result.error(message=f"data not found") - return Result.ok_data(data = Level0Record().from_proto_model(resp.record)) + return Result.ok_data(data = Level1Record().from_proto_model(resp.record)) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) @@ -91,20 +81,15 @@ class Level0DataApi(object): parameter kwargs: id : [int], - level0_id: [str], status : [int] return csst_dfs_common.models.Result ''' - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") + fits_id = get_parameter(kwargs, "id") status = get_parameter(kwargs, "status") try: resp,_ = self.stub.UpdateProcStatus.with_call( - level0_pb2.UpdateProcStatusReq( - id=id, - level0_id = level0_id, - status=status), + level1_pb2.UpdateProcStatusReq(id=fits_id, status=status), metadata = get_auth_headers() ) if resp.success: @@ -114,23 +99,18 @@ class Level0DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - def update_qc0_status(self, **kwargs): + def update_qc1_status(self, **kwargs): ''' update the status of QC0 parameter kwargs: id : [int], - level0_id: [str], status : [int] ''' - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") + fits_id = get_parameter(kwargs, "id") status = get_parameter(kwargs, "status") try: - resp,_ = self.stub.UpdateQc0Status.with_call( - level0_pb2.UpdateQc0StatusReq( - id=id, - level0_id = level0_id, - status=status), + resp,_ = self.stub.UpdateQc1Status.with_call( + level1_pb2.UpdateQc1StatusReq(id=fits_id, status=status), metadata = get_auth_headers() ) if resp.success: @@ -141,24 +121,43 @@ class Level0DataApi(object): return Result.error(message="%s:%s" % (e.code().value, e.details())) def write(self, **kwargs): - ''' insert a level0 data record into database + ''' insert a level1 record into database parameter kwargs: - file_path = [str] - return: csst_dfs_common.models.Result - ''' - rec = level0_pb2.Level0Record( - file_path = get_parameter(kwargs, "file_path") - ) + level0_id : [str] + data_type : [str] + cor_sci_id : [int] + prc_params : [str] + filename : [str] + file_path : [str] + prc_status : [int] + prc_time : [str] + pipeline_id : [str] + refs: [dict] + return csst_dfs_common.models.Result + ''' + + rec = level1_pb2.Level1Record( + id = 0, + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type"), + cor_sci_id = get_parameter(kwargs, "cor_sci_id"), + prc_params = get_parameter(kwargs, "prc_params"), + filename = get_parameter(kwargs, "filename", ""), + file_path = get_parameter(kwargs, "file_path", ""), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), + pipeline_id = get_parameter(kwargs, "pipeline_id"), + refs = get_parameter(kwargs, "refs", {}) + ) def stream(rec): with open(rec.file_path, 'rb') as f: while True: data = f.read(UPLOAD_CHUNK_SIZE) if not data: break - yield level0_pb2.WriteLevel0Req(record = rec, data = data) - + yield level1_pb2.WriteLevel1Req(record = rec, data = data) try: if not rec.file_path: return Result.error(message="file_path is blank") @@ -169,10 +168,8 @@ class Level0DataApi(object): resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) if resp.success: - return Result.ok_data(data=Level0Record().from_proto_model(resp.record)) + return Result.ok_data(data=Level1Record().from_proto_model(resp.record)) else: return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - + return Result.error(message="%s:%s" % (e.code().value, e.details())) diff --git a/csst_dfs_api_cluster/msc/level0prc.py b/csst_dfs_api_cluster/cpic/level1prc.py similarity index 76% rename from csst_dfs_api_cluster/msc/level0prc.py rename to csst_dfs_api_cluster/cpic/level1prc.py index 4a87b70..49d1bdb 100644 --- a/csst_dfs_api_cluster/msc/level0prc.py +++ b/csst_dfs_api_cluster/cpic/level1prc.py @@ -2,22 +2,22 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.msc import Level0PrcRecord +from csst_dfs_commons.models.cpic import Level1PrcRecord -from csst_dfs_proto.msc.level0prc import level0prc_pb2, level0prc_pb2_grpc +from csst_dfs_proto.cpic.level1prc import level1prc_pb2, level1prc_pb2_grpc from ..common.service import ServiceProxy from ..common.utils import * -class Level0PrcApi(object): +class Level1PrcApi(object): def __init__(self): - self.stub = level0prc_pb2_grpc.Level0PrcSrvStub(ServiceProxy().channel()) + self.stub = level1prc_pb2_grpc.Level1PrcSrvStub(ServiceProxy().channel()) def find(self, **kwargs): - ''' retrieve level0 procedure records from database + ''' retrieve level1 procedure records from database parameter kwargs: - level0_id: [str] + level1_id: [str] pipeline_id: [str] prc_module: [str] prc_status : [int] @@ -25,8 +25,8 @@ class Level0PrcApi(object): return: csst_dfs_common.models.Result ''' try: - resp, _ = self.stub.Find.with_call(level0prc_pb2.FindLevel0PrcReq( - level0_id = get_parameter(kwargs, "level0_id"), + resp, _ = self.stub.Find.with_call(level1prc_pb2.FindLevel1PrcReq( + level1_id = get_parameter(kwargs, "level1_id"), pipeline_id = get_parameter(kwargs, "pipeline_id"), prc_module = get_parameter(kwargs, "prc_module"), prc_status = get_parameter(kwargs, "prc_status"), @@ -34,7 +34,7 @@ class Level0PrcApi(object): ),metadata = get_auth_headers()) if resp.success: - return Result.ok_data(data = from_proto_model_list(Level0PrcRecord, resp.records)).append("totalCount", resp.totalCount) + return Result.ok_data(data = from_proto_model_list(Level1PrcRecord, resp.records)).append("totalCount", resp.totalCount) else: return Result.error(message = str(resp.error.detail)) @@ -55,7 +55,7 @@ class Level0PrcApi(object): try: resp,_ = self.stub.UpdateProcStatus.with_call( - level0prc_pb2.UpdateProcStatusReq(id=id, status=status), + level1prc_pb2.UpdateProcStatusReq(id=id, status=status), metadata = get_auth_headers() ) if resp.success: @@ -66,10 +66,10 @@ class Level0PrcApi(object): return Result.error(message="%s:%s" % (e.code().value, e.details())) def write(self, **kwargs): - ''' insert a level0 procedure record into database + ''' insert a level1 procedure record into database parameter kwargs: - level0_id : [str] + level1_id : [int] pipeline_id : [str] prc_module : [str] params_file_path : [str] @@ -79,9 +79,9 @@ class Level0PrcApi(object): return csst_dfs_common.models.Result ''' - rec = level0prc_pb2.Level0PrcRecord( + rec = level1prc_pb2.Level1PrcRecord( id = 0, - level0_id = get_parameter(kwargs, "level0_id"), + level1_id = get_parameter(kwargs, "level1_id"), pipeline_id = get_parameter(kwargs, "pipeline_id"), prc_module = get_parameter(kwargs, "prc_module"), params_file_path = get_parameter(kwargs, "params_file_path"), @@ -89,11 +89,11 @@ class Level0PrcApi(object): prc_time = get_parameter(kwargs, "prc_time"), result_file_path = get_parameter(kwargs, "result_file_path") ) - req = level0prc_pb2.WriteLevel0PrcReq(record = rec) + req = level1prc_pb2.WriteLevel1PrcReq(record = rec) try: resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) if resp.success: - return Result.ok_data(data = Level0PrcRecord().from_proto_model(resp.record)) + return Result.ok_data(data = Level1PrcRecord().from_proto_model(resp.record)) else: return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: diff --git a/csst_dfs_api_cluster/facility/__init__.py b/csst_dfs_api_cluster/facility/__init__.py index a7d4c7c..7cfc8b5 100644 --- a/csst_dfs_api_cluster/facility/__init__.py +++ b/csst_dfs_api_cluster/facility/__init__.py @@ -1,4 +1,7 @@ from .brick import BrickApi from .detector import DetectorApi from .level2producer import Level2ProducerApi -from .observation import ObservationApi \ No newline at end of file +from .observation import ObservationApi +from .calmerge import CalMergeApi +from .level0 import Level0DataApi +from .level0prc import Level0PrcApi \ No newline at end of file diff --git a/csst_dfs_api_cluster/sls/calmerge.py b/csst_dfs_api_cluster/facility/calmerge.py similarity index 98% rename from csst_dfs_api_cluster/sls/calmerge.py rename to csst_dfs_api_cluster/facility/calmerge.py index ac8c948..be0f653 100644 --- a/csst_dfs_api_cluster/sls/calmerge.py +++ b/csst_dfs_api_cluster/facility/calmerge.py @@ -2,9 +2,9 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.sls import CalMergeRecord +from csst_dfs_commons.models.facility import CalMergeRecord -from csst_dfs_proto.sls.calmerge import calmerge_pb2, calmerge_pb2_grpc +from csst_dfs_proto.facility.calmerge import calmerge_pb2, calmerge_pb2_grpc from ..common.service import ServiceProxy from ..common.utils import * diff --git a/csst_dfs_api_cluster/msc/level0.py b/csst_dfs_api_cluster/facility/level0.py similarity index 79% rename from csst_dfs_api_cluster/msc/level0.py rename to csst_dfs_api_cluster/facility/level0.py index 9185fa8..c0b3daf 100644 --- a/csst_dfs_api_cluster/msc/level0.py +++ b/csst_dfs_api_cluster/facility/level0.py @@ -2,9 +2,9 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.msc import Level0Record +from csst_dfs_commons.models.facility import Level0Record -from csst_dfs_proto.msc.level0 import level0_pb2, level0_pb2_grpc +from csst_dfs_proto.facility.level0 import level0_pb2, level0_pb2_grpc from ..common.service import ServiceProxy from ..common.utils import * @@ -38,6 +38,11 @@ class Level0DataApi(object): qc0_status = get_parameter(kwargs, "qc0_status"), prc_status = get_parameter(kwargs, "prc_status"), file_name = get_parameter(kwargs, "file_name"), + ra_obj = get_parameter(kwargs, "ra_obj", None), + dec_obj = get_parameter(kwargs, "dec_obj", None), + radius = get_parameter(kwargs, "radius", 0), + object_name = get_parameter(kwargs, "object_name", None), + version = get_parameter(kwargs, "version", None), limit = get_parameter(kwargs, "limit", 0), other_conditions = {"test":"cnlab.test"} ),metadata = get_auth_headers()) @@ -55,16 +60,16 @@ class Level0DataApi(object): parameter kwargs: id : [int], - level0_id: [str] + level0_id: [str], + obs_type: [str] return csst_dfs_common.models.Result ''' try: - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") resp, _ = self.stub.Get.with_call(level0_pb2.GetLevel0DataReq( - id = id, - level0_id = level0_id + id = get_parameter(kwargs, "id"), + level0_id = get_parameter(kwargs, "level0_id"), + obs_type = get_parameter(kwargs, "obs_type") ),metadata = get_auth_headers()) if resp.record is None or resp.record.id == 0: @@ -81,19 +86,20 @@ class Level0DataApi(object): parameter kwargs: id : [int], level0_id: [str], + obs_type: [str], status : [int] return csst_dfs_common.models.Result ''' - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") status = get_parameter(kwargs, "status") try: resp,_ = self.stub.UpdateProcStatus.with_call( level0_pb2.UpdateProcStatusReq( - id=id, - level0_id = level0_id, - status=status), + id = get_parameter(kwargs, "id"), + level0_id = get_parameter(kwargs, "level0_id"), + obs_type = get_parameter(kwargs, "obs_type"), + status=get_parameter(kwargs, "status") + ), metadata = get_auth_headers() ) if resp.success: @@ -109,17 +115,18 @@ class Level0DataApi(object): parameter kwargs: id : [int], level0_id: [str], + obs_type: [str], status : [int] ''' - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") - status = get_parameter(kwargs, "status") + try: resp,_ = self.stub.UpdateQc0Status.with_call( level0_pb2.UpdateQc0StatusReq( - id=id, - level0_id = level0_id, - status=status), + id = get_parameter(kwargs, "id"), + level0_id = get_parameter(kwargs, "level0_id"), + obs_type = get_parameter(kwargs, "obs_type"), + status=get_parameter(kwargs, "status") + ), metadata = get_auth_headers() ) if resp.success: @@ -155,7 +162,7 @@ class Level0DataApi(object): ) req = level0_pb2.WriteLevel0DataReq(record = rec) try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) + resp,_ = self.stub.Write.with_call(req, metadata = get_auth_headers()) if resp.success: return Result.ok_data(data = Level0Record().from_proto_model(resp.record)) else: diff --git a/csst_dfs_api_cluster/sls/level0prc.py b/csst_dfs_api_cluster/facility/level0prc.py similarity index 96% rename from csst_dfs_api_cluster/sls/level0prc.py rename to csst_dfs_api_cluster/facility/level0prc.py index 38d0a90..350cd3c 100644 --- a/csst_dfs_api_cluster/sls/level0prc.py +++ b/csst_dfs_api_cluster/facility/level0prc.py @@ -2,9 +2,9 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.sls import Level0PrcRecord +from csst_dfs_commons.models.facility import Level0PrcRecord -from csst_dfs_proto.sls.level0prc import level0prc_pb2, level0prc_pb2_grpc +from csst_dfs_proto.facility.level0prc import level0prc_pb2, level0prc_pb2_grpc from ..common.service import ServiceProxy from ..common.utils import * diff --git a/csst_dfs_api_cluster/ifs/__init__.py b/csst_dfs_api_cluster/ifs/__init__.py index a9eefd1..5e772d8 100644 --- a/csst_dfs_api_cluster/ifs/__init__.py +++ b/csst_dfs_api_cluster/ifs/__init__.py @@ -1,5 +1,2 @@ -from .calmerge import CalMergeApi -from .level0 import Level0DataApi -from .level0prc import Level0PrcApi from .level1 import Level1DataApi from .level1prc import Level1PrcApi diff --git a/csst_dfs_api_cluster/ifs/calmerge.py b/csst_dfs_api_cluster/ifs/calmerge.py deleted file mode 100644 index 7ed78fa..0000000 --- a/csst_dfs_api_cluster/ifs/calmerge.py +++ /dev/null @@ -1,195 +0,0 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.ifs import CalMergeRecord - -from csst_dfs_proto.ifs.calmerge import calmerge_pb2, calmerge_pb2_grpc - -from ..common.service import ServiceProxy -from ..common.utils import * - -class CalMergeApi(object): - def __init__(self): - self.stub = calmerge_pb2_grpc.CalMergeSrvStub(ServiceProxy().channel()) - - def get_latest_by_l0(self, **kwargs): - ''' retrieve calibration merge records from database by level0 data - - parameter kwargs: - level0_id: [str] - ref_type: [str] - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.GetLatestByL0.with_call(calmerge_pb2.GetLatestByL0Req( - level0_id = get_parameter(kwargs, "level0_id"), - ref_type = get_parameter(kwargs, "ref_type")), - metadata = get_auth_headers()) - - if resp.record.id == 0: - return Result.error(message=f"not found") - - return Result.ok_data(data=CalMergeRecord().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def find(self, **kwargs): - ''' retrieve calibration merge records from database - - parameter kwargs: - detector_no: [str] - ref_type: [str] - obs_time: (start,end) - qc1_status : [int] - prc_status : [int] - file_name: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(calmerge_pb2.FindCalMergeReq( - detector_no = get_parameter(kwargs, "detector_no"), - ref_type = get_parameter(kwargs, "ref_type"), - exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - qc1_status = get_parameter(kwargs, "qc1_status"), - prc_status = get_parameter(kwargs, "prc_status"), - file_name = get_parameter(kwargs, "file_name"), - limit = get_parameter(kwargs, "limit"), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(CalMergeRecord,resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def get(self, **kwargs): - ''' fetch a record from database - - :param kwargs: Parameter dictionary, key items support: - id : [int] - - :returns: csst_dfs_common.models.Result - ''' - try: - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - resp, _ = self.stub.Get.with_call(calmerge_pb2.GetCalMergeReq( - id = id, - cal_id = cal_id - ),metadata = get_auth_headers()) - - if resp.record.id == 0: - return Result.error(message=f"not found") - - return Result.ok_data(data=CalMergeRecord().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_qc1_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - cal_id = cal_id, - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - status = get_parameter(kwargs, "status") - - try: - resp,_ = self.stub.UpdateQc1Status.with_call( - calmerge_pb2.UpdateQc1StatusReq( - id = id, - cal_id = cal_id, - status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - cal_id: [str], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - status = get_parameter(kwargs, "status") - - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - calmerge_pb2.UpdateProcStatusReq( - id = id, - cal_id = cal_id, - status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def write(self, **kwargs): - ''' insert a calibration merge record into database - - parameter kwargs: - cal_id : [str], - detector_no : [str] - ref_type : [str] - obs_time : [str] - exp_time : [float] - prc_status : [int] - prc_time : [str] - filename : [str] - file_path : [str] - level0_ids : [list] - return csst_dfs_common.models.Result - ''' - - rec = calmerge_pb2.CalMergeRecord( - id = 0, - cal_id = get_parameter(kwargs, "cal_id"), - detector_no = get_parameter(kwargs, "detector_no"), - ref_type = get_parameter(kwargs, "ref_type"), - obs_time = get_parameter(kwargs, "obs_time"), - exp_time = get_parameter(kwargs, "exp_time"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path"), - prc_status = get_parameter(kwargs, "prc_status",-1), - prc_time = get_parameter(kwargs, "prc_time"), - level0_ids = get_parameter(kwargs, "level0_ids", []) - ) - req = calmerge_pb2.WriteCalMergeReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=CalMergeRecord().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - diff --git a/csst_dfs_api_cluster/mci/__init__.py b/csst_dfs_api_cluster/mci/__init__.py index e69de29..d4f4fff 100644 --- a/csst_dfs_api_cluster/mci/__init__.py +++ b/csst_dfs_api_cluster/mci/__init__.py @@ -0,0 +1,2 @@ +from .level1 import Level1DataApi +from .level1prc import Level1PrcApi \ No newline at end of file diff --git a/csst_dfs_api_cluster/mci/level1.py b/csst_dfs_api_cluster/mci/level1.py new file mode 100644 index 0000000..3794a34 --- /dev/null +++ b/csst_dfs_api_cluster/mci/level1.py @@ -0,0 +1,176 @@ +import os +import grpc +import datetime + +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.common import from_proto_model_list +from csst_dfs_commons.models.mci import Level1Record +from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE +from csst_dfs_proto.mci.level1 import level1_pb2, level1_pb2_grpc + +from ..common.service import ServiceProxy +from ..common.utils import * + +class Level1DataApi(object): + """ + Level1 Data Operation Class + """ + def __init__(self): + self.stub = level1_pb2_grpc.Level1SrvStub(ServiceProxy().channel()) + + def find(self, **kwargs): + ''' retrieve level1 records from database + + parameter kwargs: + level0_id: [str] + data_type: [str] + create_time : (start, end), + qc1_status : [int], + prc_status : [int], + filename: [str] + limit: limits returns the number of records,default 0:no-limit + + return: csst_dfs_common.models.Result + ''' + try: + resp, _ = self.stub.Find.with_call(level1_pb2.FindLevel1Req( + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type"), + create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], + create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], + qc1_status = get_parameter(kwargs, "qc1_status"), + prc_status = get_parameter(kwargs, "prc_status"), + filename = get_parameter(kwargs, "filename"), + limit = get_parameter(kwargs, "limit", 0), + other_conditions = {"test":"cnlab.test"} + ),metadata = get_auth_headers()) + + if resp.success: + return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount) + else: + return Result.error(message = str(resp.error.detail)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + + def get(self, **kwargs): + ''' fetch a record from database + + parameter kwargs: + id : [int] + + return csst_dfs_common.models.Result + ''' + try: + resp, _ = self.stub.Get.with_call(level1_pb2.GetLevel1Req( + id = get_parameter(kwargs, "id"), + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type") + ),metadata = get_auth_headers()) + + if resp.record is None or resp.record.id == 0: + return Result.error(message=f"data not found") + + return Result.ok_data(data=Level1Record().from_proto_model(resp.record)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + status : [int] + + return csst_dfs_common.models.Result + ''' + fits_id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + try: + resp,_ = self.stub.UpdateProcStatus.with_call( + level1_pb2.UpdateProcStatusReq(id=fits_id, status=status), + metadata = get_auth_headers() + ) + if resp.success: + return Result.ok_data() + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + + def update_qc1_status(self, **kwargs): + ''' update the status of QC0 + + parameter kwargs: + id : [int], + status : [int] + ''' + fits_id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + try: + resp,_ = self.stub.UpdateQc1Status.with_call( + level1_pb2.UpdateQc1StatusReq(id=fits_id, status=status), + metadata = get_auth_headers() + ) + if resp.success: + return Result.ok_data() + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + + def write(self, **kwargs): + ''' insert a level1 record into database + + parameter kwargs: + level0_id : [str] + data_type : [str] + cor_sci_id : [int] + prc_params : [str] + filename : [str] + file_path : [str] + prc_status : [int] + prc_time : [str] + pipeline_id : [str] + refs: [dict] + + return csst_dfs_common.models.Result + ''' + + rec = level1_pb2.Level1Record( + id = 0, + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type"), + cor_sci_id = get_parameter(kwargs, "cor_sci_id"), + prc_params = get_parameter(kwargs, "prc_params"), + filename = get_parameter(kwargs, "filename"), + file_path = get_parameter(kwargs, "file_path"), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), + pipeline_id = get_parameter(kwargs, "pipeline_id"), + refs = get_parameter(kwargs, "refs", {}) + ) + def stream(rec): + with open(rec.file_path, 'rb') as f: + while True: + data = f.read(UPLOAD_CHUNK_SIZE) + if not data: + break + yield level1_pb2.WriteLevel1Req(record = rec, data = data) + + try: + if not rec.file_path: + return Result.error(message="file_path is blank") + if not os.path.exists(rec.file_path): + return Result.error(message="the file [%s] not existed" % (rec.file_path, )) + if not rec.filename: + rec.filename = os.path.basename(rec.file_path) + + resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) + if resp.success: + return Result.ok_data(data=Level1Record().from_proto_model(resp.record)) + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) diff --git a/csst_dfs_api_cluster/ifs/level0prc.py b/csst_dfs_api_cluster/mci/level1prc.py similarity index 76% rename from csst_dfs_api_cluster/ifs/level0prc.py rename to csst_dfs_api_cluster/mci/level1prc.py index 451dee7..4ffc678 100644 --- a/csst_dfs_api_cluster/ifs/level0prc.py +++ b/csst_dfs_api_cluster/mci/level1prc.py @@ -2,22 +2,22 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.ifs import Level0PrcRecord +from csst_dfs_commons.models.ifs import Level1PrcRecord -from csst_dfs_proto.ifs.level0prc import level0prc_pb2, level0prc_pb2_grpc +from csst_dfs_proto.ifs.level1prc import level1prc_pb2, level1prc_pb2_grpc from ..common.service import ServiceProxy from ..common.utils import * -class Level0PrcApi(object): +class Level1PrcApi(object): def __init__(self): - self.stub = level0prc_pb2_grpc.Level0PrcSrvStub(ServiceProxy().channel()) + self.stub = level1prc_pb2_grpc.Level1PrcSrvStub(ServiceProxy().channel()) def find(self, **kwargs): - ''' retrieve level0 procedure records from database + ''' retrieve level1 procedure records from database parameter kwargs: - level0_id: [str] + level1_id: [str] pipeline_id: [str] prc_module: [str] prc_status : [int] @@ -25,8 +25,8 @@ class Level0PrcApi(object): return: csst_dfs_common.models.Result ''' try: - resp, _ = self.stub.Find.with_call(level0prc_pb2.FindLevel0PrcReq( - level0_id = get_parameter(kwargs, "level0_id"), + resp, _ = self.stub.Find.with_call(level1prc_pb2.FindLevel1PrcReq( + level1_id = get_parameter(kwargs, "level1_id"), pipeline_id = get_parameter(kwargs, "pipeline_id"), prc_module = get_parameter(kwargs, "prc_module"), prc_status = get_parameter(kwargs, "prc_status"), @@ -34,7 +34,7 @@ class Level0PrcApi(object): ),metadata = get_auth_headers()) if resp.success: - return Result.ok_data(data = from_proto_model_list(Level0PrcRecord, resp.records)).append("totalCount", resp.totalCount) + return Result.ok_data(data = from_proto_model_list(Level1PrcRecord, resp.records)).append("totalCount", resp.totalCount) else: return Result.error(message = str(resp.error.detail)) @@ -55,7 +55,7 @@ class Level0PrcApi(object): try: resp,_ = self.stub.UpdateProcStatus.with_call( - level0prc_pb2.UpdateProcStatusReq(id=id, status=status), + level1prc_pb2.UpdateProcStatusReq(id=id, status=status), metadata = get_auth_headers() ) if resp.success: @@ -66,10 +66,10 @@ class Level0PrcApi(object): return Result.error(message="%s:%s" % (e.code().value, e.details())) def write(self, **kwargs): - ''' insert a level0 procedure record into database + ''' insert a level1 procedure record into database parameter kwargs: - level0_id : [str] + level1_id : [int] pipeline_id : [str] prc_module : [str] params_file_path : [str] @@ -79,9 +79,9 @@ class Level0PrcApi(object): return csst_dfs_common.models.Result ''' - rec = level0prc_pb2.Level0PrcRecord( + rec = level1prc_pb2.Level1PrcRecord( id = 0, - level0_id = get_parameter(kwargs, "level0_id"), + level1_id = get_parameter(kwargs, "level1_id"), pipeline_id = get_parameter(kwargs, "pipeline_id"), prc_module = get_parameter(kwargs, "prc_module"), params_file_path = get_parameter(kwargs, "params_file_path"), @@ -89,11 +89,11 @@ class Level0PrcApi(object): prc_time = get_parameter(kwargs, "prc_time"), result_file_path = get_parameter(kwargs, "result_file_path") ) - req = level0prc_pb2.WriteLevel0PrcReq(record = rec) + req = level1prc_pb2.WriteLevel1PrcReq(record = rec) try: resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) if resp.success: - return Result.ok_data(data = Level0PrcRecord().from_proto_model(resp.record)) + return Result.ok_data(data = Level1PrcRecord().from_proto_model(resp.record)) else: return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: diff --git a/csst_dfs_api_cluster/msc/__init__.py b/csst_dfs_api_cluster/msc/__init__.py index 79badfb..4449958 100644 --- a/csst_dfs_api_cluster/msc/__init__.py +++ b/csst_dfs_api_cluster/msc/__init__.py @@ -1,6 +1,3 @@ -from .calmerge import CalMergeApi -from .level0 import Level0DataApi -from .level0prc import Level0PrcApi from .level1 import Level1DataApi from .level1prc import Level1PrcApi from .level2 import Level2DataApi \ No newline at end of file diff --git a/csst_dfs_api_cluster/msc/calmerge.py b/csst_dfs_api_cluster/msc/calmerge.py deleted file mode 100644 index a38350d..0000000 --- a/csst_dfs_api_cluster/msc/calmerge.py +++ /dev/null @@ -1,195 +0,0 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.msc import CalMergeRecord - -from csst_dfs_proto.msc.calmerge import calmerge_pb2, calmerge_pb2_grpc - -from ..common.service import ServiceProxy -from ..common.utils import * - -class CalMergeApi(object): - def __init__(self): - self.stub = calmerge_pb2_grpc.CalMergeSrvStub(ServiceProxy().channel()) - - def get_latest_by_l0(self, **kwargs): - ''' retrieve calibration merge records from database by level0 data - - parameter kwargs: - level0_id: [str] - ref_type: [str] - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.GetLatestByL0.with_call(calmerge_pb2.GetLatestByL0Req( - level0_id = get_parameter(kwargs, "level0_id"), - ref_type = get_parameter(kwargs, "ref_type")), - metadata = get_auth_headers()) - - if resp.record.id == 0: - return Result.error(message=f"not found") - - return Result.ok_data(data=CalMergeRecord().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def find(self, **kwargs): - ''' retrieve calibration merge records from database - - parameter kwargs: - detector_no: [str] - ref_type: [str] - obs_time: (start,end) - qc1_status : [int] - prc_status : [int] - file_name: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(calmerge_pb2.FindCalMergeReq( - detector_no = get_parameter(kwargs, "detector_no"), - ref_type = get_parameter(kwargs, "ref_type"), - exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - qc1_status = get_parameter(kwargs, "qc1_status"), - prc_status = get_parameter(kwargs, "prc_status"), - file_name = get_parameter(kwargs, "file_name"), - limit = get_parameter(kwargs, "limit"), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(CalMergeRecord,resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def get(self, **kwargs): - ''' fetch a record from database - - :param kwargs: Parameter dictionary, key items support: - id : [int] - - :returns: csst_dfs_common.models.Result - ''' - try: - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - resp, _ = self.stub.Get.with_call(calmerge_pb2.GetCalMergeReq( - id = id, - cal_id = cal_id - ),metadata = get_auth_headers()) - - if resp.record.id == 0: - return Result.error(message=f"not found") - - return Result.ok_data(data=CalMergeRecord().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_qc1_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - cal_id = cal_id, - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - status = get_parameter(kwargs, "status") - - try: - resp,_ = self.stub.UpdateQc1Status.with_call( - calmerge_pb2.UpdateQc1StatusReq( - id = id, - cal_id = cal_id, - status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - cal_id: [str], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - status = get_parameter(kwargs, "status") - - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - calmerge_pb2.UpdateProcStatusReq( - id = id, - cal_id = cal_id, - status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def write(self, **kwargs): - ''' insert a calibration merge record into database - - parameter kwargs: - cal_id : [str], - detector_no : [str] - ref_type : [str] - obs_time : [str] - exp_time : [float] - prc_status : [int] - prc_time : [str] - filename : [str] - file_path : [str] - level0_ids : [list] - return csst_dfs_common.models.Result - ''' - - rec = calmerge_pb2.CalMergeRecord( - id = 0, - cal_id = get_parameter(kwargs, "cal_id"), - detector_no = get_parameter(kwargs, "detector_no"), - ref_type = get_parameter(kwargs, "ref_type"), - obs_time = get_parameter(kwargs, "obs_time"), - exp_time = get_parameter(kwargs, "exp_time"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path"), - prc_status = get_parameter(kwargs, "prc_status",-1), - prc_time = get_parameter(kwargs, "prc_time"), - level0_ids = get_parameter(kwargs, "level0_ids", []) - ) - req = calmerge_pb2.WriteCalMergeReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=CalMergeRecord().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - diff --git a/csst_dfs_api_cluster/sls/__init__.py b/csst_dfs_api_cluster/sls/__init__.py index a9a2615..01dea78 100644 --- a/csst_dfs_api_cluster/sls/__init__.py +++ b/csst_dfs_api_cluster/sls/__init__.py @@ -1,6 +1,3 @@ -from .calmerge import CalMergeApi -from .level0 import Level0DataApi -from .level0prc import Level0PrcApi from .level1 import Level1DataApi from .level1prc import Level1PrcApi from .level2spectra import Level2SpectraApi \ No newline at end of file diff --git a/csst_dfs_api_cluster/sls/level0.py b/csst_dfs_api_cluster/sls/level0.py deleted file mode 100644 index 1117e17..0000000 --- a/csst_dfs_api_cluster/sls/level0.py +++ /dev/null @@ -1,167 +0,0 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.sls import Level0Record - -from csst_dfs_proto.sls.level0 import level0_pb2, level0_pb2_grpc - -from ..common.service import ServiceProxy -from ..common.utils import * - -class Level0DataApi(object): - def __init__(self): - self.stub = level0_pb2_grpc.Level0SrvStub(ServiceProxy().channel()) - - def find(self, **kwargs): - ''' retrieve level0 records from database - - parameter kwargs: - obs_id: [str] - detector_no: [str] - obs_type: [str] - obs_time : (start, end), - qc0_status : [int], - prc_status : [int], - file_name: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level0_pb2.FindLevel0DataReq( - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no"), - obs_type = get_parameter(kwargs, "obs_type"), - exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - qc0_status = get_parameter(kwargs, "qc0_status"), - prc_status = get_parameter(kwargs, "prc_status"), - file_name = get_parameter(kwargs, "file_name"), - limit = get_parameter(kwargs, "limit", 0), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level0Record, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int], - level0_id: [str] - - return csst_dfs_common.models.Result - ''' - try: - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") - resp, _ = self.stub.Get.with_call(level0_pb2.GetLevel0DataReq( - id = id, - level0_id = level0_id - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"not found") - - return Result.ok_data(data = Level0Record().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - level0_id: [str], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level0_pb2.UpdateProcStatusReq( - id=id, - level0_id = level0_id, - status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_qc0_status(self, **kwargs): - ''' update the status of QC0 - - parameter kwargs: - id : [int], - level0_id: [str], - status : [int] - ''' - id = get_parameter(kwargs, "id") - level0_id = get_parameter(kwargs, "level0_id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateQc0Status.with_call( - level0_pb2.UpdateQc0StatusReq( - id=id, - level0_id = level0_id, - status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def write(self, **kwargs): - ''' insert a level0 data record into database - - parameter kwargs: - obs_id = [str] - detector_no = [str] - obs_type = [str] - obs_time = [str] - exp_time = [int] - detector_status_id = [int] - filename = [str] - file_path = [str] - return: csst_dfs_common.models.Result - ''' - rec = level0_pb2.Level0Record( - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no"), - obs_type = get_parameter(kwargs, "obs_type"), - obs_time = get_parameter(kwargs, "obs_time"), - exp_time = get_parameter(kwargs, "exp_time"), - detector_status_id = get_parameter(kwargs, "detector_status_id"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path") - ) - req = level0_pb2.WriteLevel0DataReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data = Level0Record().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - diff --git a/csst_dfs_api_cluster/sls/level1.py b/csst_dfs_api_cluster/sls/level1.py index 6583ef3..69ac08f 100644 --- a/csst_dfs_api_cluster/sls/level1.py +++ b/csst_dfs_api_cluster/sls/level1.py @@ -42,7 +42,41 @@ class Level1DataApi(object): prc_status = get_parameter(kwargs, "prc_status"), filename = get_parameter(kwargs, "filename"), limit = get_parameter(kwargs, "limit", 0), - other_conditions = {"test":"cnlab.test"} + other_conditions = {} + ),metadata = get_auth_headers()) + + if resp.success: + return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount) + else: + return Result.error(message = str(resp.error.detail)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + + def find_by_prc_status(self, **kwargs): + ''' retrieve level1 records from database + + parameter kwargs: + level0_id: [str] + data_type: [str] + create_time : (start, end), + qc1_status : [int], + prc_status : [int], + filename: [str] + limit: limits returns the number of records,default 0:no-limit + + return: csst_dfs_common.models.Result + ''' + try: + resp, _ = self.stub.Find.with_call(level1_pb2.FindLevel1Req( + level0_id = None, + data_type = None, + create_time_start = None, + create_time_end = None, + qc1_status = None, + prc_status = get_parameter(kwargs, "prc_status", -1), + limit = get_parameter(kwargs, "limit", 1), + other_conditions = {"orderBy":"create_time asc"} ),metadata = get_auth_headers()) if resp.success: -- GitLab