From 5202236f9693b71f7294b9784d5fdb78179f2ece Mon Sep 17 00:00:00 2001 From: shoulinwei Date: Fri, 12 Jan 2024 21:18:44 +0800 Subject: [PATCH] grpc channel --- csst_dfs_api_cluster/common/catalog.py | 9 +- csst_dfs_api_cluster/common/service.py | 23 +- csst_dfs_api_cluster/common/utils.py | 21 +- csst_dfs_api_cluster/facility/brick.py | 18 +- csst_dfs_api_cluster/facility/detector.py | 20 +- csst_dfs_api_cluster/facility/level0.py | 13 +- csst_dfs_api_cluster/facility/level0prc.py | 8 +- csst_dfs_api_cluster/facility/level1.py | 15 +- csst_dfs_api_cluster/facility/level1prc.py | 13 +- csst_dfs_api_cluster/facility/level2.py | 18 +- .../facility/level2producer.py | 37 ++- csst_dfs_api_cluster/facility/level2type.py | 11 +- csst_dfs_api_cluster/facility/observation.py | 18 +- csst_dfs_api_cluster/facility/otherdata.py | 8 +- csst_dfs_api_cluster/hstdm/level2.py | 11 +- csst_dfs_api_cluster/mbi/__init__.py | 2 - csst_dfs_api_cluster/mbi/level2.py | 287 ------------------ csst_dfs_api_cluster/mbi/level2co.py | 202 ------------ csst_dfs_api_cluster/sls/level2spectra.py | 10 +- 19 files changed, 178 insertions(+), 566 deletions(-) delete mode 100644 csst_dfs_api_cluster/mbi/level2.py delete mode 100644 csst_dfs_api_cluster/mbi/level2co.py diff --git a/csst_dfs_api_cluster/common/catalog.py b/csst_dfs_api_cluster/common/catalog.py index b6fe62b..47b8e97 100644 --- a/csst_dfs_api_cluster/common/catalog.py +++ b/csst_dfs_api_cluster/common/catalog.py @@ -6,15 +6,16 @@ import io from csst_dfs_commons.models import Result from csst_dfs_proto.common.ephem import ephem_pb2, ephem_pb2_grpc -from .service import ServiceProxy -from .constants import * +from .service import grpc_channel from .utils import get_auth_headers log = logging.getLogger('csst') class CatalogApi(object): def __init__(self): - self.stub = ephem_pb2_grpc.EphemSearchSrvStub(ServiceProxy().channel()) - + self.stub_class = ephem_pb2_grpc.EphemSearchSrvStub + self.stub = None + + @grpc_channel def gaia3_query(self, ra: float, dec: float, radius: float, columns: tuple, min_mag: float, max_mag: float, obstime: int, limit: int): ''' retrieval GAIA DR 3 args: diff --git a/csst_dfs_api_cluster/common/service.py b/csst_dfs_api_cluster/common/service.py index 3aae863..308ab05 100644 --- a/csst_dfs_api_cluster/common/service.py +++ b/csst_dfs_api_cluster/common/service.py @@ -1,6 +1,7 @@ import os import grpc from csst_dfs_commons.models.errors import CSSTFatalException + class ServiceProxy: def __init__(self): self.gateway = os.getenv("CSST_DFS_GATEWAY",'172.31.248.218:30880') @@ -15,4 +16,24 @@ class ServiceProxy: except grpc.FutureTimeoutError: raise CSSTFatalException('Error connecting to server {}'.format(self.gateway)) else: - return channel \ No newline at end of file + return channel + +def grpc_channel(func): + def wrapper(*args, **kwargs): + with get_grpc_channel() as c: + args[0].stub = args[0].stub_class(c) + return func(*args, **kwargs) + return wrapper + +def get_grpc_channel(): + options = (('grpc.max_send_message_length', 1024 * 1024 * 1024), + ('grpc.max_receive_message_length', 1024 * 1024 * 1024)) + gateway = os.getenv("CSST_DFS_GATEWAY",'172.31.248.218:30880') + # channel = grpc.insecure_channel(self.gateway, options = options, compression = grpc.Compression.Gzip) + channel = grpc.insecure_channel(gateway, options = options) + try: + grpc.channel_ready_future(channel).result(timeout=10) + except grpc.FutureTimeoutError: + raise CSSTFatalException('Error connecting to server {}'.format(gateway)) + else: + return channel \ No newline at end of file diff --git a/csst_dfs_api_cluster/common/utils.py b/csst_dfs_api_cluster/common/utils.py index 4309590..04ca8a5 100644 --- a/csst_dfs_api_cluster/common/utils.py +++ b/csst_dfs_api_cluster/common/utils.py @@ -5,7 +5,7 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_proto.common.misc import misc_pb2, misc_pb2_grpc -from .service import ServiceProxy +from .service import get_grpc_channel def format_datetime(dt): return dt.strftime('%Y-%m-%d %H:%M:%S') @@ -57,12 +57,13 @@ def get_auth_headers(): return (("csst_dfs_app",os.getenv("CSST_DFS_APP_ID")),("csst_dfs_token",os.getenv("CSST_DFS_APP_TOKEN")),) def get_nextId_by_prefix(prefix): - stub = misc_pb2_grpc.MiscSrvStub(ServiceProxy().channel()) - try: - resp,_ = stub.GetSeqId.with_call( - misc_pb2.GetSeqIdReq(prefix=prefix), - metadata = get_auth_headers() - ) - return Result.ok_data(data=resp.nextId) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) \ No newline at end of file + with get_grpc_channel() as c: + stub = misc_pb2_grpc.MiscSrvStub(c) + try: + resp,_ = stub.GetSeqId.with_call( + misc_pb2.GetSeqIdReq(prefix=prefix), + metadata = get_auth_headers() + ) + return Result.ok_data(data=resp.nextId) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) \ No newline at end of file diff --git a/csst_dfs_api_cluster/facility/brick.py b/csst_dfs_api_cluster/facility/brick.py index 9c1c84c..8a2637c 100644 --- a/csst_dfs_api_cluster/facility/brick.py +++ b/csst_dfs_api_cluster/facility/brick.py @@ -6,7 +6,7 @@ from csst_dfs_commons.models.facility import Brick, BrickObsStatus, BrickLevel1 from csst_dfs_proto.facility.brick import brick_pb2, brick_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * from ..common.constants import UPLOAD_CHUNK_SIZE @@ -15,8 +15,10 @@ class BrickApi(object): Brick Operation Class """ def __init__(self): - self.stub = brick_pb2_grpc.BrickSrvStub(ServiceProxy().channel()) + self.stub_class = brick_pb2_grpc.BrickSrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' find brick records @@ -39,6 +41,7 @@ class BrickApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -57,10 +60,11 @@ class BrickApi(object): return Result.error(message=f"{brick_id} not found") return Result.ok_data(data=Brick().from_proto_model(resp.record)) - + except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a brickal record into database @@ -88,6 +92,7 @@ class BrickApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_obs_status(self, **kwargs): ''' find observation status of bricks @@ -112,13 +117,14 @@ class BrickApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - + + @grpc_channel def find_level1_data(self, **kwargs): ''' find level1 data :param kwargs: Parameter dictionary, support: - brick_id = [int]\n - level1_id = [int]\n + brick_id = [int] + level1_id = [int] module = [str], limit = [int] diff --git a/csst_dfs_api_cluster/facility/detector.py b/csst_dfs_api_cluster/facility/detector.py index 5d08b6f..a45ddd1 100644 --- a/csst_dfs_api_cluster/facility/detector.py +++ b/csst_dfs_api_cluster/facility/detector.py @@ -3,16 +3,17 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list from csst_dfs_commons.models.facility import Detector, DetectorStatus - from csst_dfs_proto.facility.detector import detector_pb2, detector_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class DetectorApi(object): def __init__(self): - self.stub = detector_pb2_grpc.DetectorSrvStub(ServiceProxy().channel()) + self.stub_class = detector_pb2_grpc.DetectorSrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve detector records from database @@ -36,6 +37,7 @@ class DetectorApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -58,6 +60,7 @@ class DetectorApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update(self, **kwargs): ''' update a detector by no @@ -92,6 +95,7 @@ class DetectorApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def delete(self, **kwargs): ''' delete a detector by no @@ -114,9 +118,10 @@ class DetectorApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a detector record into database - + parameter kwargs: no : [str], detector_name : [str], @@ -141,6 +146,7 @@ class DetectorApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_status(self, **kwargs): ''' retrieve a detector status's from database @@ -167,6 +173,7 @@ class DetectorApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get_status(self, **kwargs): ''' fetch a record from database @@ -185,13 +192,14 @@ class DetectorApi(object): return Result.error(message=f"id:{id} not found") return Result.ok_data(data=DetectorStatus().from_proto_model(resp.record)) - + except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write_status(self, **kwargs): ''' insert a detector status into database - + parameter kwargs: detector_no : [str], status : [str], diff --git a/csst_dfs_api_cluster/facility/level0.py b/csst_dfs_api_cluster/facility/level0.py index cfa23a8..d3678f1 100644 --- a/csst_dfs_api_cluster/facility/level0.py +++ b/csst_dfs_api_cluster/facility/level0.py @@ -6,13 +6,15 @@ from csst_dfs_commons.models.facility import Level0Record from csst_dfs_proto.facility.level0 import level0_pb2, level0_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class Level0DataApi(object): def __init__(self): - self.stub = level0_pb2_grpc.Level0SrvStub(ServiceProxy().channel()) + self.stub_class = level0_pb2_grpc.Level0SrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve level0 records from database @@ -64,6 +66,7 @@ class Level0DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_by_brick_ids(self, **kwargs): ''' retrieve level0 records by brick_ids like [1,2,3,4] @@ -85,6 +88,7 @@ class Level0DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -106,10 +110,11 @@ class Level0DataApi(object): return Result.error(message=f"not found") return Result.ok_data(data = Level0Record().from_proto_model(resp.record)) - + except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -139,6 +144,7 @@ class Level0DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_qc0_status(self, **kwargs): ''' update the status of QC0 @@ -166,6 +172,7 @@ class Level0DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a level0 data record into database diff --git a/csst_dfs_api_cluster/facility/level0prc.py b/csst_dfs_api_cluster/facility/level0prc.py index 350cd3c..3389ac7 100644 --- a/csst_dfs_api_cluster/facility/level0prc.py +++ b/csst_dfs_api_cluster/facility/level0prc.py @@ -6,13 +6,15 @@ from csst_dfs_commons.models.facility import Level0PrcRecord from csst_dfs_proto.facility.level0prc import level0prc_pb2, level0prc_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class Level0PrcApi(object): def __init__(self): - self.stub = level0prc_pb2_grpc.Level0PrcSrvStub(ServiceProxy().channel()) + self.stub_class = level0prc_pb2_grpc.Level0PrcSrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve level0 procedure records from database @@ -41,6 +43,7 @@ class Level0PrcApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -65,6 +68,7 @@ class Level0PrcApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a level0 procedure record into database diff --git a/csst_dfs_api_cluster/facility/level1.py b/csst_dfs_api_cluster/facility/level1.py index 301598f..df1a5d5 100644 --- a/csst_dfs_api_cluster/facility/level1.py +++ b/csst_dfs_api_cluster/facility/level1.py @@ -8,7 +8,7 @@ from csst_dfs_commons.models.facility import Level1Record from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE from csst_dfs_proto.facility.level1 import level1_pb2, level1_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class Level1DataApi(object): @@ -16,8 +16,10 @@ class Level1DataApi(object): Level1 Data Operation Class """ def __init__(self): - self.stub = level1_pb2_grpc.Level1SrvStub(ServiceProxy().channel()) + self.stub_class = level1_pb2_grpc.Level1SrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve level1 records from database @@ -64,6 +66,7 @@ class Level1DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_by_brick_ids(self, **kwargs): ''' retrieve level1 records by brick_ids like [1,2,3,4] @@ -85,6 +88,7 @@ class Level1DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_by_ids(self, **kwargs): ''' retrieve level1 records by internal level1 ids like [1,2,3,4] @@ -105,7 +109,8 @@ class Level1DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - + + @grpc_channel def sls_find_by_qc1_status(self, **kwargs): ''' retrieve level1 records from database @@ -135,6 +140,7 @@ class Level1DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -158,6 +164,7 @@ class Level1DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -181,6 +188,7 @@ class Level1DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_qc1_status(self, **kwargs): ''' update the status of QC0 @@ -202,6 +210,7 @@ class Level1DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a level1 record into database diff --git a/csst_dfs_api_cluster/facility/level1prc.py b/csst_dfs_api_cluster/facility/level1prc.py index 4c08f4d..79afdf8 100644 --- a/csst_dfs_api_cluster/facility/level1prc.py +++ b/csst_dfs_api_cluster/facility/level1prc.py @@ -6,13 +6,15 @@ from csst_dfs_commons.models.facility import Level1PrcRecord from csst_dfs_proto.facility.level1prc import level1prc_pb2, level1prc_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class Level1PrcApi(object): def __init__(self): - self.stub = level1prc_pb2_grpc.Level1PrcSrvStub(ServiceProxy().channel()) + self.stub_class = level1prc_pb2_grpc.Level1PrcSrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve level1 procedure records from database @@ -41,6 +43,7 @@ class Level1PrcApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -65,9 +68,10 @@ class Level1PrcApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a level1 procedure record into database - + parameter kwargs: level1_id : [int] pipeline_id : [str] @@ -98,6 +102,3 @@ class Level1PrcApi(object): return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - - - diff --git a/csst_dfs_api_cluster/facility/level2.py b/csst_dfs_api_cluster/facility/level2.py index 4bff9b6..e39f350 100644 --- a/csst_dfs_api_cluster/facility/level2.py +++ b/csst_dfs_api_cluster/facility/level2.py @@ -11,7 +11,7 @@ from csst_dfs_commons.models.common import from_proto_model_list from csst_dfs_commons.models.level2 import Level2Record, filter_table_name from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE from csst_dfs_proto.facility.level2 import level2_pb2, level2_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class Level2DataApi(object): @@ -19,8 +19,10 @@ class Level2DataApi(object): Level2 Data Operation Class """ def __init__(self): - self.stub = level2_pb2_grpc.Level2SrvStub(ServiceProxy().channel()) + self.stub_class = level2_pb2_grpc.Level2SrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve level2 records from database @@ -69,6 +71,7 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def catalog_columns(self, **kwargs): ''' retrieve columns data type @@ -87,6 +90,7 @@ class Level2DataApi(object): resp['totalCount'] = len(resp['data']) return resp + @grpc_channel def catalog_query(self, **kwargs): ''' retrieve level2catalog records from database @@ -117,6 +121,7 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def coord_cond_sql(self, **kwargs): ''' generate coordinate search condition sql @@ -144,6 +149,7 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_existed_brick_ids(self, **kwargs): ''' retrieve existed brick_ids in a single exposure catalog @@ -164,6 +170,7 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -185,6 +192,7 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -207,7 +215,8 @@ class Level2DataApi(object): return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - + + @grpc_channel def update_qc2_status(self, **kwargs): ''' update the status of QC0 @@ -228,7 +237,8 @@ class Level2DataApi(object): return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - + + @grpc_channel def write(self, **kwargs): ''' insert a level2 record into database diff --git a/csst_dfs_api_cluster/facility/level2producer.py b/csst_dfs_api_cluster/facility/level2producer.py index 0374b5a..11027dc 100644 --- a/csst_dfs_api_cluster/facility/level2producer.py +++ b/csst_dfs_api_cluster/facility/level2producer.py @@ -6,7 +6,7 @@ from csst_dfs_commons.models.facility import Level2Producer, Level2Job, Level2Pr from csst_dfs_proto.facility.level2producer import level2producer_pb2, level2producer_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * from ..common.constants import UPLOAD_CHUNK_SIZE @@ -15,8 +15,10 @@ class Level2ProducerApi(object): Level2Producer Operation Class """ def __init__(self): - self.stub = level2producer_pb2_grpc.Level2ProducerSrvStub(ServiceProxy().channel()) + self.stub_class = level2producer_pb2_grpc.Level2ProducerSrvStub + self.stub = None + @grpc_channel def register(self, **kwargs): ''' register a Level2Producer data record into database @@ -48,6 +50,7 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find(self, **kwargs): ''' retrieve Level2Producer records from database @@ -70,6 +73,7 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -88,10 +92,11 @@ class Level2ProducerApi(object): return Result.error(message=f"{p_id} not found") return Result.ok_data(data=Level2Producer().from_proto_model(resp.record)) - + except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_nexts(self, **kwargs): ''' retrieve Level2Producer records from database @@ -113,6 +118,7 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_start(self, **kwargs): ''' retrieve Level2Producer records from database @@ -135,6 +141,7 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update(self, **kwargs): ''' update a Level2Producer @@ -168,9 +175,10 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def delete(self, **kwargs): ''' delete a Level2Producer data - + :param kwargs: Parameter dictionary, key items support: id = [int] @@ -189,9 +197,10 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def new_job(self, **kwargs): ''' new a Level2Producer Job - + :param kwargs: Parameter dictionary, key items support: dag = [str] @@ -213,6 +222,7 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get_job(self, **kwargs): ''' fetch a record from database @@ -231,13 +241,14 @@ class Level2ProducerApi(object): return Result.error(message=f"{p_id} not found") return Result.ok_data(data=Level2Job().from_proto_model(resp.record)) - + except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_job(self, **kwargs): ''' update a Level2Producer Job - + :param kwargs: Parameter dictionary, key items support: id = [int] dag = [str] @@ -262,9 +273,10 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def new_running(self, **kwargs): ''' insert a Level2ProducerRuningRecord data - + :param kwargs: Parameter dictionary, key items support: job_id = [int]\n producer_id = [int]\n @@ -296,6 +308,7 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get_running(self, **kwargs): ''' fetch a record from database @@ -314,13 +327,14 @@ class Level2ProducerApi(object): return Result.error(message=f"{p_id} not found") return Result.ok_data(data=Level2ProducerRuning().from_proto_model(resp.record)) - + except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_running(self, **kwargs): ''' udpate a Level2ProducerRuningRecord data - + :param kwargs: Parameter dictionary, key items support: id = [int]\n job_id = [int]\n @@ -354,9 +368,10 @@ class Level2ProducerApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def find_running(self, **kwargs): ''' find Level2ProducerRuningRecord data - + :param kwargs: Parameter dictionary, key items support: job_id = [int]\n producer_id = [int]\n diff --git a/csst_dfs_api_cluster/facility/level2type.py b/csst_dfs_api_cluster/facility/level2type.py index d031063..67af8b4 100644 --- a/csst_dfs_api_cluster/facility/level2type.py +++ b/csst_dfs_api_cluster/facility/level2type.py @@ -12,7 +12,7 @@ from csst_dfs_commons.models.level2 import Level2TypeRecord from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE from csst_dfs_proto.facility.level2type import level2type_pb2, level2type_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class Level2TypeApi(object): @@ -20,8 +20,10 @@ class Level2TypeApi(object): Level2Type Data Operation Class """ def __init__(self): - self.stub = level2type_pb2_grpc.Level2TypeSrvStub(ServiceProxy().channel()) + self.stub_class = level2type_pb2_grpc.Level2TypeSrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve level2type records from database @@ -54,7 +56,7 @@ class Level2TypeApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -76,6 +78,7 @@ class Level2TypeApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_import_status(self, **kwargs): ''' update the status of level2 type @@ -99,7 +102,7 @@ class Level2TypeApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - + @grpc_channel def write(self, **kwargs): ''' insert a level2type record into database diff --git a/csst_dfs_api_cluster/facility/observation.py b/csst_dfs_api_cluster/facility/observation.py index 3fc6494..bba43a1 100644 --- a/csst_dfs_api_cluster/facility/observation.py +++ b/csst_dfs_api_cluster/facility/observation.py @@ -3,10 +3,9 @@ import grpc from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list from csst_dfs_commons.models.facility import Observation - from csst_dfs_proto.facility.observation import observation_pb2, observation_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * from ..common.constants import UPLOAD_CHUNK_SIZE @@ -15,8 +14,10 @@ class ObservationApi(object): Observation Operation Class """ def __init__(self): - self.stub = observation_pb2_grpc.ObservationSrvStub(ServiceProxy().channel()) + self.stub_class = observation_pb2_grpc.ObservationSrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve exposure records from database @@ -50,6 +51,7 @@ class ObservationApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -71,10 +73,11 @@ class ObservationApi(object): return Result.error(message=f"not found") return Result.ok_data(data=Observation().from_proto_model(resp.observation)) - + except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -103,6 +106,7 @@ class ObservationApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_qc0_status(self, **kwargs): ''' update the status of QC0 @@ -129,9 +133,10 @@ class ObservationApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a observational record into database - + parameter kwargs: id = [id] obs_id = [str] @@ -164,6 +169,3 @@ class ObservationApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - - - diff --git a/csst_dfs_api_cluster/facility/otherdata.py b/csst_dfs_api_cluster/facility/otherdata.py index d9bf2b2..3396882 100644 --- a/csst_dfs_api_cluster/facility/otherdata.py +++ b/csst_dfs_api_cluster/facility/otherdata.py @@ -8,7 +8,7 @@ from csst_dfs_commons.models.facility import OtherDataRecord from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE from csst_dfs_proto.facility.otherdata import otherdata_pb2, otherdata_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class OtherDataApi(object): @@ -16,8 +16,10 @@ class OtherDataApi(object): OtherData Data Operation Class """ def __init__(self): - self.stub = otherdata_pb2_grpc.OtherDataSrvStub(ServiceProxy().channel()) + self.stub_class = otherdata_pb2_grpc.OtherDataSrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve otherdata records from database @@ -55,6 +57,7 @@ class OtherDataApi(object): return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -77,6 +80,7 @@ class OtherDataApi(object): return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a otherdata record into database diff --git a/csst_dfs_api_cluster/hstdm/level2.py b/csst_dfs_api_cluster/hstdm/level2.py index cffee2b..c6a1b5f 100644 --- a/csst_dfs_api_cluster/hstdm/level2.py +++ b/csst_dfs_api_cluster/hstdm/level2.py @@ -8,7 +8,7 @@ from csst_dfs_commons.models.hstdm import Level2Data from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE from csst_dfs_proto.hstdm.level2 import level2_pb2, level2_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class Level2DataApi(object): @@ -16,8 +16,10 @@ class Level2DataApi(object): Level2 Data Operation Class """ def __init__(self): - self.stub = level2_pb2_grpc.Level2SrvStub(ServiceProxy().channel()) + self.stub = level2_pb2_grpc.Level2SrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve level2 records from database @@ -57,6 +59,7 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -79,6 +82,7 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -101,7 +105,7 @@ class Level2DataApi(object): return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) - + @grpc_channel def update_qc2_status(self, **kwargs): ''' update the status of QC2 @@ -123,6 +127,7 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a level2 record into database diff --git a/csst_dfs_api_cluster/mbi/__init__.py b/csst_dfs_api_cluster/mbi/__init__.py index b216a14..e69de29 100644 --- a/csst_dfs_api_cluster/mbi/__init__.py +++ b/csst_dfs_api_cluster/mbi/__init__.py @@ -1,2 +0,0 @@ -from .level2 import Level2DataApi -from .level2co import Level2CoApi \ No newline at end of file diff --git a/csst_dfs_api_cluster/mbi/level2.py b/csst_dfs_api_cluster/mbi/level2.py deleted file mode 100644 index 1cf2d97..0000000 --- a/csst_dfs_api_cluster/mbi/level2.py +++ /dev/null @@ -1,287 +0,0 @@ -import io -import os -import grpc -import datetime -import pickle - -from collections.abc import Iterable - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.msc import Level2Record -from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE -from csst_dfs_proto.msc.level2 import level2_pb2, level2_pb2_grpc - -from ..common.service import ServiceProxy -from ..common.utils import * - -class Level2DataApi(object): - """ - Level2 Data Operation Class - """ - def __init__(self): - self.stub = level2_pb2_grpc.Level2SrvStub(ServiceProxy().channel()) - - def find(self, **kwargs): - ''' retrieve level2 records from database - - parameter kwargs: - level0_id: [str] - level1_id: [int] - data_type: [str] - create_time : (start, end), - qc2_status : [int], - prc_status : [int], - filename: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level2_pb2.FindLevel2Req( - level0_id = get_parameter(kwargs, "level0_id"), - level1_id = get_parameter(kwargs, "level1_id"), - data_type = get_parameter(kwargs, "data_type"), - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], - qc2_status = get_parameter(kwargs, "qc2_status", 1024), - prc_status = get_parameter(kwargs, "prc_status", 1024), - filename = get_parameter(kwargs, "filename"), - limit = get_parameter(kwargs, "limit", 0), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level2Record, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def catalog_query(self, **kwargs): - ''' retrieve level2catalog records from database - - parameter kwargs: - brick_ids: list[int] - obs_id: [str] - detector_no: [str] - filter: [str] - ra: [float] in deg - dec: [float] in deg - radius: [float] in deg - obs_time: (start, end), - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - datas = io.BytesIO() - totalCount = 0 - - brick_ids = get_parameter(kwargs, "brick_ids", []) - if not isinstance(brick_ids,Iterable): - brick_ids = [brick_ids] - - resps = self.stub.FindCatalog(level2_pb2.FindLevel2CatalogReq( - brick_ids = ",".join([str(i) for i in brick_ids]), - obs_id = get_parameter(kwargs, "obs_id", None), - detector_no = get_parameter(kwargs, "detector_no", None), - filter = get_parameter(kwargs, "filter", None), - obs_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - obs_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - ra = get_parameter(kwargs, "ra"), - dec = get_parameter(kwargs, "dec"), - radius = get_parameter(kwargs, "radius"), - minMag = get_parameter(kwargs, "min_mag"), - maxMag = get_parameter(kwargs, "max_mag"), - limit = get_parameter(kwargs, "limit", 0), - columns = ",".join(get_parameter(kwargs, "columns", "*")) - ),metadata = get_auth_headers()) - - for resp in resps: - if resp.success: - datas.write(resp.records) - totalCount = resp.totalCount - else: - return Result.error(message = str(resp.error.detail)) - datas.flush() - records = pickle.loads(datas.getvalue()) - return Result.ok_data(data = records[0]).append("totalCount", totalCount).append("columns", records[1]) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def catalog_query_file(self, **kwargs): - ''' retrieve level2catalog records from database - - parameter kwargs: - brick_ids: list[int] - obs_id: [str] - detector_no: [str] - filter: [str] - ra: [float] in deg - dec: [float] in deg - radius: [float] in deg - obs_time: (start, end), - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - - brick_ids = get_parameter(kwargs, "brick_ids", []) - if not isinstance(brick_ids, Iterable): - brick_ids = [brick_ids] - - resp, _ = self.stub.FindCatalogFile.with_call(level2_pb2.FindLevel2CatalogReq( - brick_ids = ",".join([str(i) for i in brick_ids]), - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no"), - filter = get_parameter(kwargs, "filter", None), - obs_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - obs_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - ra = get_parameter(kwargs, "ra"), - dec = get_parameter(kwargs, "dec"), - radius = get_parameter(kwargs, "radius"), - minMag = get_parameter(kwargs, "min_mag"), - maxMag = get_parameter(kwargs, "max_mag"), - limit = get_parameter(kwargs, "limit", 0) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level2Record, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def find_existed_brick_ids(self, **kwargs): - ''' retrieve existed brick_ids in a single exposure catalog - - parameter kwargs: - return: csst_dfs_common.models.Result - ''' - try: - resp = self.stub.FindExistedBricks(level2_pb2.FindExistedBricksReq(),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = resp.brick_ids) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Get.with_call(level2_pb2.GetLevel2Req( - id = get_parameter(kwargs, "id") - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"data not found") - - return Result.ok_data(data = Level2Record().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level2_pb2.UpdateProcStatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_qc2_status(self, **kwargs): - ''' update the status of QC0 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateQc2Status.with_call( - level2_pb2.UpdateQc2StatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def write(self, **kwargs): - ''' insert a level2 record into database - - parameter kwargs: - level1_id : [int] - data_type : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - - return csst_dfs_common.models.Result - ''' - - rec = level2_pb2.Level2Record( - id = 0, - level1_id = get_parameter(kwargs, "level1_id"), - data_type = get_parameter(kwargs, "data_type"), - filename = get_parameter(kwargs, "filename", ""), - file_path = get_parameter(kwargs, "file_path", ""), - qc2_status = get_parameter(kwargs, "qc2_status", 0), - prc_status = get_parameter(kwargs, "prc_status", 0), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id", "") - ) - def stream(rec): - with open(rec.file_path, 'rb') as f: - while True: - data = f.read(UPLOAD_CHUNK_SIZE) - if not data: - break - yield level2_pb2.WriteLevel2Req(record = rec, data = data) - try: - if not rec.file_path: - return Result.error(message="file_path is blank") - if not os.path.exists(rec.file_path): - return Result.error(message="the file [%s] not existed" % (rec.file_path, )) - if not rec.filename: - rec.filename = os.path.basename(rec.file_path) - - resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level2Record().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) diff --git a/csst_dfs_api_cluster/mbi/level2co.py b/csst_dfs_api_cluster/mbi/level2co.py deleted file mode 100644 index 3e7ef35..0000000 --- a/csst_dfs_api_cluster/mbi/level2co.py +++ /dev/null @@ -1,202 +0,0 @@ -import io -import os -import grpc -import datetime - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.msc import Level2CoRecord,Level2CoCatalogRecord -from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE -from csst_dfs_proto.msc.level2co import level2co_pb2, level2co_pb2_grpc - -from ..common.service import ServiceProxy -from ..common.utils import * - -class Level2CoApi(object): - """ - Level2 Merge Catalog Operation Class - """ - def __init__(self): - self.stub = level2co_pb2_grpc.Level2CoSrvStub(ServiceProxy().channel()) - - def find(self, **kwargs): - ''' retrieve level2 records from database - - parameter kwargs: - data_type: [str] - create_time : (start, end), - qc2_status : [int], - prc_status : [int], - filename: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level2co_pb2.FindLevel2CoReq( - data_type = get_parameter(kwargs, "data_type"), - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], - qc2_status = get_parameter(kwargs, "qc2_status"), - prc_status = get_parameter(kwargs, "prc_status"), - filename = get_parameter(kwargs, "filename"), - limit = get_parameter(kwargs, "limit", 0), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level2CoRecord, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def catalog_query(self, **kwargs): - ''' retrieve level2catalog records from database - - parameter kwargs: - obs_id: [str] - detector_no: [str] - ra: [float] in deg - dec: [float] in deg - radius: [float] in deg - min_mag: [float] - max_mag: [float] - obs_time: (start, end), - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindCatalog.with_call(level2co_pb2.FindLevel2CoCatalogReq( - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no"), - obs_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - obs_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - ra = get_parameter(kwargs, "ra"), - dec = get_parameter(kwargs, "dec"), - radius = get_parameter(kwargs, "radius"), - minMag = get_parameter(kwargs, "min_mag"), - maxMag = get_parameter(kwargs, "max_mag"), - limit = get_parameter(kwargs, "limit", 0) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level2CoCatalogRecord, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Get.with_call(level2co_pb2.GetLevel2CoReq( - id = get_parameter(kwargs, "id") - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"data not found") - - return Result.ok_data(data = Level2CoRecord().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level2co_pb2.UpdateProcStatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def update_qc2_status(self, **kwargs): - ''' update the status of QC0 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateQc2Status.with_call( - level2co_pb2.UpdateQc2StatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - def write(self, **kwargs): - ''' insert a level2 record into database - - parameter kwargs: - data_type : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - - return csst_dfs_common.models.Result - ''' - - rec = level2co_pb2.Level2CoRecord( - id = 0, - data_type = get_parameter(kwargs, "data_type"), - filename = get_parameter(kwargs, "filename", ""), - file_path = get_parameter(kwargs, "file_path", ""), - qc2_status = get_parameter(kwargs, "qc2_status", 0), - prc_status = get_parameter(kwargs, "prc_status", 0), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id", "") - ) - def stream(rec): - with open(rec.file_path, 'rb') as f: - while True: - data = f.read(UPLOAD_CHUNK_SIZE) - if not data: - break - yield level2co_pb2.WriteLevel2CoReq(record = rec, data = data) - try: - if not rec.file_path: - return Result.error(message="file_path is blank") - if not os.path.exists(rec.file_path): - return Result.error(message="the file [%s] not existed" % (rec.file_path, )) - if not rec.filename: - rec.filename = os.path.basename(rec.file_path) - - resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level2CoRecord().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) diff --git a/csst_dfs_api_cluster/sls/level2spectra.py b/csst_dfs_api_cluster/sls/level2spectra.py index 7a3bdcf..d85e1b4 100644 --- a/csst_dfs_api_cluster/sls/level2spectra.py +++ b/csst_dfs_api_cluster/sls/level2spectra.py @@ -8,7 +8,7 @@ from csst_dfs_commons.models.sls import Level2Spectra from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE from csst_dfs_proto.sls.level2spectra import level2spectra_pb2, level2spectra_pb2_grpc -from ..common.service import ServiceProxy +from ..common.service import grpc_channel from ..common.utils import * class Level2SpectraApi(object): @@ -16,8 +16,10 @@ class Level2SpectraApi(object): Level2spectra Data Operation Class """ def __init__(self): - self.stub = level2spectra_pb2_grpc.Level2spectraSrvStub(ServiceProxy().channel()) + self.stub_class = level2spectra_pb2_grpc.Level2spectraSrvStub + self.stub = None + @grpc_channel def find(self, **kwargs): ''' retrieve level2spectra records from database @@ -55,6 +57,7 @@ class Level2SpectraApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def get(self, **kwargs): ''' fetch a record from database @@ -77,6 +80,7 @@ class Level2SpectraApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -100,6 +104,7 @@ class Level2SpectraApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def update_qc2_status(self, **kwargs): ''' update the status of QC2 @@ -121,6 +126,7 @@ class Level2SpectraApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + @grpc_channel def write(self, **kwargs): ''' insert a level2spectra record into database -- GitLab