From f55e786a3533342df73486342c928154fc3eb47c Mon Sep 17 00:00:00 2001 From: shoulinwei Date: Sat, 10 Sep 2022 00:03:16 +0800 Subject: [PATCH] bugs --- .../facility/level2producer.py | 3 +- csst_dfs_api_cluster/ifs/level0.py | 59 +++++++++++-------- csst_dfs_api_cluster/msc/level2.py | 2 +- 3 files changed, 38 insertions(+), 26 deletions(-) diff --git a/csst_dfs_api_cluster/facility/level2producer.py b/csst_dfs_api_cluster/facility/level2producer.py index ec6fba0..ec50721 100644 --- a/csst_dfs_api_cluster/facility/level2producer.py +++ b/csst_dfs_api_cluster/facility/level2producer.py @@ -199,6 +199,7 @@ class Level2ProducerApi(object): ''' rec = level2producer_pb2.Level2JobRecord( id = 0, + name = get_parameter(kwargs, "name", ""), dag = get_parameter(kwargs, "dag", "") ) req = level2producer_pb2.NewJobReq(record = rec) @@ -244,9 +245,9 @@ class Level2ProducerApi(object): :returns: csst_dfs_common.models.Result ''' - rec = level2producer_pb2.Level2JobRecord( id = get_parameter(kwargs, "id", 0), + name = get_parameter(kwargs, "name", ""), dag = get_parameter(kwargs, "dag", ""), status = get_parameter(kwargs, "status", -1) ) diff --git a/csst_dfs_api_cluster/ifs/level0.py b/csst_dfs_api_cluster/ifs/level0.py index f3bfa1a..9155683 100644 --- a/csst_dfs_api_cluster/ifs/level0.py +++ b/csst_dfs_api_cluster/ifs/level0.py @@ -3,6 +3,7 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list from csst_dfs_commons.models.ifs import Level0Record +from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE from csst_dfs_proto.ifs.level0 import level0_pb2, level0_pb2_grpc @@ -17,13 +18,18 @@ class Level0DataApi(object): ''' retrieve level0 records from database parameter kwargs: - obs_id: [str] - detector_no: [str] - obs_type: [str] + obs_id: [str], + detector_no: [str], + obs_type: [str], + object_name: [str], obs_time : (start, end), qc0_status : [int], prc_status : [int], - file_name: [str] + file_name: [str], + version: [str], + ra: [float], + dec: [float], + radius: [float], limit: limits returns the number of records,default 0:no-limit return: csst_dfs_common.models.Result @@ -38,6 +44,11 @@ class Level0DataApi(object): qc0_status = get_parameter(kwargs, "qc0_status"), prc_status = get_parameter(kwargs, "prc_status"), file_name = get_parameter(kwargs, "file_name"), + object_name = get_parameter(kwargs, "object_name"), + version = get_parameter(kwargs, "version"), + ra = get_parameter(kwargs, "ra"), + dec = get_parameter(kwargs, "dec"), + radius = get_parameter(kwargs, "radius"), limit = get_parameter(kwargs, "limit", 0), other_conditions = {"test":"cnlab.test"} ),metadata = get_auth_headers()) @@ -133,35 +144,35 @@ class Level0DataApi(object): ''' insert a level0 data record into database parameter kwargs: - obs_id = [str] - detector_no = [str] - obs_type = [str] - obs_time = [str] - exp_time = [int] - detector_status_id = [int] - filename = [str] file_path = [str] return: csst_dfs_common.models.Result ''' rec = level0_pb2.Level0Record( - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no"), - obs_type = get_parameter(kwargs, "obs_type"), - obs_time = get_parameter(kwargs, "obs_time"), - exp_time = get_parameter(kwargs, "exp_time"), - detector_status_id = get_parameter(kwargs, "detector_status_id"), - filename = get_parameter(kwargs, "filename"), file_path = get_parameter(kwargs, "file_path") ) - req = level0_pb2.WriteLevel0DataReq(record = rec) + + def stream(rec): + with open(rec.file_path, 'rb') as f: + while True: + data = f.read(UPLOAD_CHUNK_SIZE) + if not data: + break + yield level0_pb2.WriteLevel0Req(record = rec, data = data) + try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) + if not rec.file_path: + return Result.error(message="file_path is blank") + if not os.path.exists(rec.file_path): + return Result.error(message="the file [%s] not existed" % (rec.file_path, )) + if not rec.filename: + rec.filename = os.path.basename(rec.file_path) + + resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) if resp.success: - return Result.ok_data(data = Level0Record().from_proto_model(resp.record)) + return Result.ok_data(data=Level0Record().from_proto_model(resp.record)) else: return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details)) - + return Result.error(message="%s:%s" % (e.code().value, e.details)) + diff --git a/csst_dfs_api_cluster/msc/level2.py b/csst_dfs_api_cluster/msc/level2.py index e33a7a8..255670d 100644 --- a/csst_dfs_api_cluster/msc/level2.py +++ b/csst_dfs_api_cluster/msc/level2.py @@ -84,7 +84,7 @@ class Level2DataApi(object): ),metadata = get_auth_headers()) if resp.success: - return Result.ok_data(data=from_proto_model_list(MSCLevel2CatalogRecord, resp.records)).append("totalCount", resp.totalCount) + return Result.ok_data(data=from_proto_model_list(Level2CatalogRecord, resp.records)).append("totalCount", resp.totalCount) else: return Result.error(message = str(resp.error.detail)) -- GitLab