diff --git a/csst_dfs_api_cluster/common/catalog.py b/csst_dfs_api_cluster/common/catalog.py index 47b8e97022fb6b0a81d6383a3d2978951b552f86..139fa31b6578d6ee37dbc29e3992c5fdfd5516d4 100644 --- a/csst_dfs_api_cluster/common/catalog.py +++ b/csst_dfs_api_cluster/common/catalog.py @@ -16,31 +16,12 @@ class CatalogApi(object): self.stub = None @grpc_channel - def gaia3_query(self, ra: float, dec: float, radius: float, columns: tuple, min_mag: float, max_mag: float, obstime: int, limit: int): - ''' retrieval GAIA DR 3 - args: - ra: in deg - dec: in deg - radius: in deg - tuple of str, like ('ra','dec','phot_g_mean_mag') - min_mag: minimal magnitude - max_mag: maximal magnitude - obstime: seconds - limit: limits returns the number of records - return: csst_dfs_common.models.Result - ''' + def catalog_query(self, **kwargs): try: datas = io.BytesIO() totalCount = 0 - resps = self.stub.Gaia3Search(ephem_pb2.EphemSearchRequest( - ra = ra, - dec = dec, - radius = radius, - columns = ",".join(columns), - minMag = min_mag, - maxMag = max_mag, - obstime = obstime, - limit = limit + resps = self.stub.Search(ephem_pb2.SearchRequest( + conditions = { k:str(v) for k,v in kwargs.items() } ),metadata = get_auth_headers()) for resp in resps: if resp.success: @@ -51,6 +32,6 @@ class CatalogApi(object): datas.flush() records = pickle.loads(datas.getvalue()) - return Result.ok_data(data = records).append("totalCount", totalCount).append("columns", columns) + return Result.ok_data(data = records).append("totalCount", totalCount).append("columns", kwargs['columns']) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) diff --git a/csst_dfs_api_cluster/common/service.py b/csst_dfs_api_cluster/common/service.py index 308ab05b0b314f25a8b9633e9108b933a698122b..028b4766e93183eeb09b01a7cdcee8c9b1d19ebd 100644 --- a/csst_dfs_api_cluster/common/service.py +++ b/csst_dfs_api_cluster/common/service.py @@ -1,27 +1,12 @@ import os import grpc +from csst_dfs_proto.db import db_pb2, db_pb2_grpc from csst_dfs_commons.models.errors import CSSTFatalException -class ServiceProxy: - def __init__(self): - self.gateway = os.getenv("CSST_DFS_GATEWAY",'172.31.248.218:30880') - - def channel(self): - options = (('grpc.max_send_message_length', 1024 * 1024 * 1024), - ('grpc.max_receive_message_length', 1024 * 1024 * 1024)) - # channel = grpc.insecure_channel(self.gateway, options = options, compression = grpc.Compression.Gzip) - channel = grpc.insecure_channel(self.gateway, options = options) - try: - grpc.channel_ready_future(channel).result(timeout=10) - except grpc.FutureTimeoutError: - raise CSSTFatalException('Error connecting to server {}'.format(self.gateway)) - else: - return channel - def grpc_channel(func): def wrapper(*args, **kwargs): with get_grpc_channel() as c: - args[0].stub = args[0].stub_class(c) + args[0].stub = db_pb2_grpc.DBSrvStub(c) return func(*args, **kwargs) return wrapper @@ -36,4 +21,4 @@ def get_grpc_channel(): except grpc.FutureTimeoutError: raise CSSTFatalException('Error connecting to server {}'.format(gateway)) else: - return channel \ No newline at end of file + return channel diff --git a/csst_dfs_api_cluster/common/utils.py b/csst_dfs_api_cluster/common/utils.py index 04ca8a5bcd4553c67d570238fd0726245ea43926..691cf6a843db9f8258d2dbf63dbee4cfb87bc557 100644 --- a/csst_dfs_api_cluster/common/utils.py +++ b/csst_dfs_api_cluster/common/utils.py @@ -1,11 +1,14 @@ +import io import os from datetime import datetime import time import grpc +import pickle -from csst_dfs_commons.models import Result -from csst_dfs_proto.common.misc import misc_pb2, misc_pb2_grpc +from csst_dfs_commons.models import Result, Record +from csst_dfs_proto.db import db_pb2, db_pb2_grpc from .service import get_grpc_channel +from .constants import UPLOAD_CHUNK_SIZE def format_datetime(dt): return dt.strftime('%Y-%m-%d %H:%M:%S') @@ -56,14 +59,139 @@ def singleton(cls): def get_auth_headers(): return (("csst_dfs_app",os.getenv("CSST_DFS_APP_ID")),("csst_dfs_token",os.getenv("CSST_DFS_APP_TOKEN")),) -def get_nextId_by_prefix(prefix): +def get_next_id(prefix): with get_grpc_channel() as c: - stub = misc_pb2_grpc.MiscSrvStub(c) + stub = db_pb2_grpc.DBSrvStub(c) try: - resp,_ = stub.GetSeqId.with_call( - misc_pb2.GetSeqIdReq(prefix=prefix), + resp,_ = stub.Get.with_call( + db_pb2.GetReq( + conditions = { + "__function":"MiscServicer.GetSeqId", + "prefix": prefix + } + ), metadata = get_auth_headers() ) - return Result.ok_data(data=resp.nextId) + record = pickle.loads(resp.record) + return Result.ok_data(data=record[0]) except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) \ No newline at end of file + return Result.error(message="%s:%s" % (e.code().value, e.details())) + +def update_kwargs(function, kwargs): + conditions = { + "__function": function, + } + conditions.update(kwargs) + conditions = { k:str(v) for k,v in conditions.items() } + return conditions + +def find_req(function, kwargs): + conditions = update_kwargs(function, kwargs) + if "limit" not in conditions: + conditions["limit"] = "-1" + if "page" not in conditions: + conditions["page"] = "-1" + + req = db_pb2.FindReq(conditions = conditions) + with get_grpc_channel() as c: + try: + datas = io.BytesIO() + totalCount = 0 + resps = db_pb2_grpc.DBSrvStub(c).Find(req, + metadata = get_auth_headers()) + for resp in resps: + if resp.success: + datas.write(resp.records) + totalCount = resp.totalCount + else: + return Result.error(message = str(resp.error.detail)) + + datas.flush() + dv = datas.getvalue() + if not dv: + return Result.ok_data(data = []).append("totalCount", totalCount)\ + .append("columns", []) + else: + records = pickle.loads(datas.getvalue()) + records, cols = records[0], records[1] + return Result.ok_data(data = records).append("totalCount", totalCount)\ + .append("columns", cols) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + +def get_req(function, kwargs): + req = db_pb2.GetReq(conditions = update_kwargs(function, kwargs)) + with get_grpc_channel() as c: + stub = db_pb2_grpc.DBSrvStub(c) + try: + resp = stub.Get(req, + metadata = get_auth_headers() + ) + if resp.record: + record = pickle.loads(resp.record) + if record: + data = Record.from_tuple(record, resp.columns) + return Result.ok_data(data=data).append("columns", resp.columns) + else: + return Result.error(message=f"not found") + else: + return Result.error(message=f"not found") + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + +def update_req(function, kwargs): + req = db_pb2.UpdateReq(conditions = update_kwargs(function, kwargs)) + with get_grpc_channel() as c: + stub = db_pb2_grpc.DBSrvStub(c) + try: + resp = stub.Update(req, + metadata = get_auth_headers() + ) + if resp.success: + return Result.ok_data() + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + +def write_req(function, kwargs): + req = db_pb2.WriteReq(conditions = update_kwargs(function, kwargs)) + with get_grpc_channel() as c: + stub = db_pb2_grpc.DBSrvStub(c) + try: + resp = stub.Write(req, + metadata = get_auth_headers() + ) + if resp.success: + if resp.record: + record = pickle.loads(resp.record) + return Result.ok_data(data=record).append("columns", resp.columns) + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + +def write_stream_req(function, byte_stream, kwargs): + conditions = update_kwargs(function, kwargs) + def stream(): + while True: + data = byte_stream.read(UPLOAD_CHUNK_SIZE) + if not data: + break + yield db_pb2.WriteStreamReq(conditions = conditions, data = data) + + with get_grpc_channel() as c: + stub = db_pb2_grpc.DBSrvStub(c) + try: + resp = stub.WriteStream(stream(), + metadata = get_auth_headers() + ) + if resp.success: + if resp.record: + record = pickle.loads(resp.record) + return Result.ok_data(data=record).append("columns", resp.columns) + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) \ No newline at end of file diff --git a/csst_dfs_api_cluster/facility/__init__.py b/csst_dfs_api_cluster/facility/__init__.py index d438b573bbbde02e73dc1caa9f925364cc49b071..d7f4f95749d574901d9702000cd267f92fd38442 100644 --- a/csst_dfs_api_cluster/facility/__init__.py +++ b/csst_dfs_api_cluster/facility/__init__.py @@ -1,6 +1,5 @@ from .brick import BrickApi from .detector import DetectorApi -from .level2producer import Level2ProducerApi from .observation import ObservationApi from .level1 import Level1DataApi from .level0 import Level0DataApi diff --git a/csst_dfs_api_cluster/facility/brick.py b/csst_dfs_api_cluster/facility/brick.py index 8a2637c4dc8d1a69a4b22c534f79f5933bbd4e92..1aaecbd7a3db90f997b40217ddb7bad1b5630fa4 100644 --- a/csst_dfs_api_cluster/facility/brick.py +++ b/csst_dfs_api_cluster/facility/brick.py @@ -1,147 +1,29 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import Brick, BrickObsStatus, BrickLevel1 - -from csst_dfs_proto.facility.brick import brick_pb2, brick_pb2_grpc - from ..common.service import grpc_channel from ..common.utils import * -from ..common.constants import UPLOAD_CHUNK_SIZE class BrickApi(object): """ Brick Operation Class """ def __init__(self): - self.stub_class = brick_pb2_grpc.BrickSrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' find brick records - - :param kwargs: - limit: limits returns the number of records,default 0:no-limit - - :returns: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(brick_pb2.FindBrickReq( - limit = get_parameter(kwargs, "limit", 0), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(Brick, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("BrickServicer.Find", kwargs) @grpc_channel def get(self, **kwargs): - ''' fetch a record from database - - :param kwargs: - id : [int] - - :returns: csst_dfs_common.models.Result - ''' - try: - brick_id = get_parameter(kwargs, "id", -1) - resp, _ = self.stub.Get.with_call(brick_pb2.GetBrickReq( - id = brick_id - ),metadata = get_auth_headers()) - - if resp.record is None or (resp.record.id == 0 and resp.record.ra == 0.0 and resp.record.dec == 0.0): - return Result.error(message=f"{brick_id} not found") - - return Result.ok_data(data=Brick().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("BrickServicer.Get", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a brickal record into database - - :param kwargs: Parameter dictionary, key items support: - ra = [float], - dec = [float], - boundingbox = [str] - - :returns: csst_dfs_common.models.Result - ''' - rec = brick_pb2.BrickRecord( - id = get_parameter(kwargs, "id", -1), - ra = get_parameter(kwargs, "ra", 0.0), - dec = get_parameter(kwargs, "dec", 0.0), - boundingbox = get_parameter(kwargs, "boundingbox", "") - ) - req = brick_pb2.WriteBrickReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Brick().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return write_req("BrickServicer.Write", kwargs) @grpc_channel def find_obs_status(self, **kwargs): - ''' find observation status of bricks - - :param kwargs: - brick_id = [int], - band = [string], - limit = [int] - - :returns: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindObsStatus.with_call(brick_pb2.FindObsStatusReq( - brick_id = get_parameter(kwargs, "brick_id", -1), - band = get_parameter(kwargs, "band", ""), - limit = get_parameter(kwargs, "limit", 0) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(BrickObsStatus, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("BrickServicer.FindObsStatus", kwargs) @grpc_channel - def find_level1_data(self, **kwargs): - ''' find level1 data - - :param kwargs: Parameter dictionary, support: - brick_id = [int] - level1_id = [int] - module = [str], - limit = [int] - - :returns: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindLevel1.with_call(brick_pb2.FindLevel1Req( - brick_id = get_parameter(kwargs, "brick_id", -1), - level1_id = get_parameter(kwargs, "level1_id", 0), - module = get_parameter(kwargs, "limit", ""), - limit = get_parameter(kwargs, "limit", 0) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(BrickLevel1, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + def find_level1_ids(self, **kwargs): + return find_req("BrickServicer.FindLevel1Data", kwargs) diff --git a/csst_dfs_api_cluster/facility/detector.py b/csst_dfs_api_cluster/facility/detector.py index a45ddd124fa7b206183552c15e38f0d9e865bc5a..e3b4a5ce77583c3d737c877625c0f189595f6d9e 100644 --- a/csst_dfs_api_cluster/facility/detector.py +++ b/csst_dfs_api_cluster/facility/detector.py @@ -1,224 +1,38 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import Detector, DetectorStatus -from csst_dfs_proto.facility.detector import detector_pb2, detector_pb2_grpc - from ..common.service import grpc_channel from ..common.utils import * class DetectorApi(object): def __init__(self): - self.stub_class = detector_pb2_grpc.DetectorSrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve detector records from database - - parameter kwargs: - module_id: [str] - key: [str] - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(detector_pb2.FindDetectorReq( - module_id = get_parameter(kwargs, "module_id"), - key = get_parameter(kwargs, "key") - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Detector,resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("DetectorServicer.Find", kwargs) @grpc_channel def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - no : [str] - - return csst_dfs_common.models.Result - ''' - try: - no = get_parameter(kwargs, "no") - resp, _ = self.stub.Get.with_call(detector_pb2.GetDetectorReq( - no = no - ),metadata = get_auth_headers()) - - if not resp.record.no: - return Result.error(message=f"no:{no} not found") - - return Result.ok_data(data=Detector().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("DetectorServicer.Get", kwargs) @grpc_channel def update(self, **kwargs): - ''' update a detector by no - - parameter kwargs: - no : [str], - detector_name : [str], - module_id : [str], - filter_id : [str] - - return csst_dfs_common.models.Result - ''' - try: - no = get_parameter(kwargs, "no") - result_get = self.get(no=no) - if not result_get.success: - return result_get - - record = detector_pb2.Detector( - no = no, - detector_name = get_parameter(kwargs, "detector_name", result_get.data.detector_name), - module_id = get_parameter(kwargs, "module_id", result_get.data.module_id), - filter_id = get_parameter(kwargs, "filter_id", result_get.data.filter_id) - ) - resp,_ = self.stub.Update.with_call( - detector_pb2.UpdateDetectorReq(record=record), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("DetectorServicer.Update", kwargs) @grpc_channel def delete(self, **kwargs): - ''' delete a detector by no - - parameter kwargs: - no : [str] - - return csst_dfs_common.models.Result - ''' - no = get_parameter(kwargs, "no") - - try: - resp,_ = self.stub.Delete.with_call( - detector_pb2.DeleteDetectorReq(no=no), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("DetectorServicer.Delete", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a detector record into database - - parameter kwargs: - no : [str], - detector_name : [str], - module_id : [str], - filter_id : [str] - return csst_dfs_common.models.Result - ''' - - rec = detector_pb2.Detector( - no = get_parameter(kwargs, "no"), - detector_name = get_parameter(kwargs, "detector_name"), - module_id = get_parameter(kwargs, "module_id"), - filter_id = get_parameter(kwargs, "filter_id") - ) - req = detector_pb2.WriteDetectorReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Detector().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return write_req("DetectorServicer.Write", kwargs) @grpc_channel def find_status(self, **kwargs): - ''' retrieve a detector status's from database - - parameter kwargs: - detector_no: [str] - status_occur_time: (begin,end) - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindStatus.with_call(detector_pb2.FindStatusReq( - detector_no = get_parameter(kwargs, "detector_no"), - status_begin_time = get_parameter(kwargs, "status_occur_time", [None, None])[0], - status_end_time = get_parameter(kwargs, "status_occur_time", [None, None])[1], - limit = get_parameter(kwargs, "limit", 0) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(DetectorStatus,resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("DetectorServicer.FindStatus", kwargs) @grpc_channel def get_status(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - id = get_parameter(kwargs, "id") - resp, _ = self.stub.GetStatus.with_call(detector_pb2.GetStatusReq( - id = id - ),metadata = get_auth_headers()) - - if resp.record == 0: - return Result.error(message=f"id:{id} not found") - - return Result.ok_data(data=DetectorStatus().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("DetectorServicer.GetStatus", kwargs) @grpc_channel def write_status(self, **kwargs): - ''' insert a detector status into database - - parameter kwargs: - detector_no : [str], - status : [str], - status_time : [str] - return csst_dfs_common.models.Result - ''' - - rec = detector_pb2.DetectorStatus( - id = 0, - detector_no = get_parameter(kwargs, "detector_no"), - status = get_parameter(kwargs, "status"), - status_time = get_parameter(kwargs, "status_time") - ) - req = detector_pb2.WriteStatusReq(record = rec) - try: - resp,_ = self.stub.WriteStatus.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=DetectorStatus().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) \ No newline at end of file + return write_req("DetectorServicer.WriteStatus", kwargs) \ No newline at end of file diff --git a/csst_dfs_api_cluster/facility/level0.py b/csst_dfs_api_cluster/facility/level0.py index d3678f17f400555bcf1349d1e5f385732029a645..f0c7c54cd093b9f5a0661b48f9314e8aaaa809c4 100644 --- a/csst_dfs_api_cluster/facility/level0.py +++ b/csst_dfs_api_cluster/facility/level0.py @@ -1,212 +1,30 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import Level0Record - -from csst_dfs_proto.facility.level0 import level0_pb2, level0_pb2_grpc from ..common.service import grpc_channel from ..common.utils import * +from csst_dfs_commons.models import Record class Level0DataApi(object): def __init__(self): - self.stub_class = level0_pb2_grpc.Level0SrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve level0 records from database - - parameter kwargs: - obs_id: [str], - module_id: [str] - detector_no: [str], - obs_type: [str], - filter: [str], - obs_time : (start, end), - qc0_status : [int], - prc_status : [int], - file_name: [str], - ra_obj: [float], - dec_obj: [float], - radius: [float], - object_name: [str], - version: [str], - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level0_pb2.FindLevel0DataReq( - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no"), - module_id = get_parameter(kwargs, "module_id"), - obs_type = get_parameter(kwargs, "obs_type"), - exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - qc0_status = get_parameter(kwargs, "qc0_status", 1024), - prc_status = get_parameter(kwargs, "prc_status", 1024), - file_name = get_parameter(kwargs, "file_name"), - ra_obj = get_parameter(kwargs, "ra_obj", None), - dec_obj = get_parameter(kwargs, "dec_obj", None), - radius = get_parameter(kwargs, "radius", 0), - object_name = get_parameter(kwargs, "object_name", None), - version = get_parameter(kwargs, "version", None), - filter = get_parameter(kwargs, "filter", None), - limit = get_parameter(kwargs, "limit", 0), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level0Record, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def find_by_brick_ids(self, **kwargs): - ''' retrieve level0 records by brick_ids like [1,2,3,4] - - :param kwargs: Parameter dictionary, key items support: - brick_ids: [list] - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindByBrickIds.with_call(level0_pb2.FindByBrickIdsReq( - brick_ids = get_parameter(kwargs, "brick_ids", []) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level0Record, resp.records)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - + result = find_req("Level0Servicer.Find", kwargs) + data = Record.from_list(result["data"], result["columns"]) + result["data"] = data + return result @grpc_channel def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int], - level0_id: [str], - obs_type: [str] - - return csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Get.with_call(level0_pb2.GetLevel0DataReq( - id = get_parameter(kwargs, "id"), - level0_id = get_parameter(kwargs, "level0_id"), - obs_type = get_parameter(kwargs, "obs_type") - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"not found") - - return Result.ok_data(data = Level0Record().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("Level0Servicer.Get", kwargs) @grpc_channel def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - level0_id: [str], - obs_type: [str], - status : [int] - - return csst_dfs_common.models.Result - ''' - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level0_pb2.UpdateProcStatusReq( - id = get_parameter(kwargs, "id"), - level0_id = get_parameter(kwargs, "level0_id"), - obs_type = get_parameter(kwargs, "obs_type"), - status=get_parameter(kwargs, "status") - ), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level0Servicer.UpdateProcStatus", kwargs) @grpc_channel def update_qc0_status(self, **kwargs): - ''' update the status of QC0 - - parameter kwargs: - id : [int], - level0_id: [str], - obs_type: [str], - status : [int] - ''' - - try: - resp,_ = self.stub.UpdateQc0Status.with_call( - level0_pb2.UpdateQc0StatusReq( - id = get_parameter(kwargs, "id"), - level0_id = get_parameter(kwargs, "level0_id"), - obs_type = get_parameter(kwargs, "obs_type"), - status=get_parameter(kwargs, "status") - ), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level0Servicer.UpdateQc0Status", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a level0 data record into database - - parameter kwargs: - obs_id = [str] - detector_no = [str] - obs_type = [str] - obs_time = [str] - exp_time = [int] - detector_status_id = [int] - filename = [str] - file_path = [str] - return: csst_dfs_common.models.Result - ''' - rec = level0_pb2.Level0Record( - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no"), - obs_type = get_parameter(kwargs, "obs_type"), - obs_time = get_parameter(kwargs, "obs_time"), - exp_time = get_parameter(kwargs, "exp_time"), - qc0_status = get_parameter(kwargs, "qc0_status", 0), - detector_status_id = get_parameter(kwargs, "detector_status_id"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path") - ) - req = level0_pb2.WriteLevel0DataReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data = Level0Record().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - + return write_req("Level0Servicer.Write", kwargs) diff --git a/csst_dfs_api_cluster/facility/level0prc.py b/csst_dfs_api_cluster/facility/level0prc.py index 3389ac7b550febf5e834c75b2635f7d1f2352ac0..0fd5b7f8355e92c32e50bd7c5bd9fa3e1a92709f 100644 --- a/csst_dfs_api_cluster/facility/level0prc.py +++ b/csst_dfs_api_cluster/facility/level0prc.py @@ -1,107 +1,19 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import Level0PrcRecord - -from csst_dfs_proto.facility.level0prc import level0prc_pb2, level0prc_pb2_grpc from ..common.service import grpc_channel from ..common.utils import * class Level0PrcApi(object): def __init__(self): - self.stub_class = level0prc_pb2_grpc.Level0PrcSrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve level0 procedure records from database - - parameter kwargs: - level0_id: [str] - pipeline_id: [str] - prc_module: [str] - prc_status : [int] - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level0prc_pb2.FindLevel0PrcReq( - level0_id = get_parameter(kwargs, "level0_id"), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - prc_module = get_parameter(kwargs, "prc_module"), - prc_status = get_parameter(kwargs, "prc_status"), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(Level0PrcRecord, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("Level0PrcServicer.Find", kwargs) @grpc_channel def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level0prc_pb2.UpdateProcStatusReq(id=id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level0PrcServicer.UpdateProcStatus", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a level0 procedure record into database - - parameter kwargs: - level0_id : [str] - pipeline_id : [str] - prc_module : [str] - params_file_path : [str] - prc_status : [int] - prc_time : [str] - result_file_path : [str] - return csst_dfs_common.models.Result - ''' - - rec = level0prc_pb2.Level0PrcRecord( - id = 0, - level0_id = get_parameter(kwargs, "level0_id"), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - prc_module = get_parameter(kwargs, "prc_module"), - params_file_path = get_parameter(kwargs, "params_file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time"), - result_file_path = get_parameter(kwargs, "result_file_path") - ) - req = level0prc_pb2.WriteLevel0PrcReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data = Level0PrcRecord().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - - + return write_req("Level0PrcServicer.Write", kwargs) diff --git a/csst_dfs_api_cluster/facility/level1.py b/csst_dfs_api_cluster/facility/level1.py index df1a5d56b007c638bcc05c9c9e894c5ecfe23d89..238f6580ee54a0320ba1c75f188f070c69adb7bb 100644 --- a/csst_dfs_api_cluster/facility/level1.py +++ b/csst_dfs_api_cluster/facility/level1.py @@ -1,271 +1,72 @@ -import os -import grpc -import datetime - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import Level1Record -from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE -from csst_dfs_proto.facility.level1 import level1_pb2, level1_pb2_grpc +import datetime from ..common.service import grpc_channel from ..common.utils import * - +from ..common.constants import * class Level1DataApi(object): - """ - Level1 Data Operation Class - """ def __init__(self): - self.stub_class = level1_pb2_grpc.Level1SrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve level1 records from database - - parameter kwargs: - level0_id: [str] - module_id: [str] - data_type: [str] - create_time : (start, end), - qc1_status : [int], - prc_status : [int], - filename: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level1_pb2.FindLevel1Req( - obs_id = get_parameter(kwargs, "obs_id"), - level0_id = get_parameter(kwargs, "level0_id"), - module_id = get_parameter(kwargs, "module_id"), - data_type = get_parameter(kwargs, "data_type"), - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], - qc1_status = get_parameter(kwargs, "qc1_status", 1024), - prc_status = get_parameter(kwargs, "prc_status", 1024), - filename = get_parameter(kwargs, "filename"), - limit = get_parameter(kwargs, "limit", 0), - pipeline_id = get_parameter(kwargs, "pipeline_id", ""), - detector_no = get_parameter(kwargs, "detector_no", ""), - filter = get_parameter(kwargs, "filter", ""), - object_name = get_parameter(kwargs, "object_name", ""), - other_conditions = { - "ra_cen": str(get_parameter(kwargs, "ra_cen", '')), - "dec_cen": str(get_parameter(kwargs, "dec_cen", '')), - "radius_cen": str(get_parameter(kwargs, "radius_cen", '')) - } - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("Level1Servicer.Find", kwargs) @grpc_channel def find_by_brick_ids(self, **kwargs): - ''' retrieve level1 records by brick_ids like [1,2,3,4] - - :param kwargs: Parameter dictionary, key items support: - brick_ids: [list] - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindByBrickIds.with_call(level1_pb2.FindByBrickIdsReq( - brick_ids = get_parameter(kwargs, "brick_ids", []) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + if not isinstance(get_parameter(kwargs, "brick_ids"), list): + return Result.error(message="brick_ids is not a list") + return find_req("Level1Servicer.FindByBrickIds", kwargs) @grpc_channel def find_by_ids(self, **kwargs): - ''' retrieve level1 records by internal level1 ids like [1,2,3,4] - - :param kwargs: Parameter dictionary, key items support: - ids: [list] - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindByIds.with_call(level1_pb2.FindByIdsReq( - ids = get_parameter(kwargs, "ids", []) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + if not isinstance(get_parameter(kwargs, "ids"), list): + return Result.error(message="ids is not a list") + return find_req("Level1Servicer.FindByIds", kwargs) @grpc_channel def sls_find_by_qc1_status(self, **kwargs): - ''' retrieve level1 records from database - - parameter kwargs: - qc1_status: [str] - limit: limits returns the number of records,default 1 - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindByQc1Status.with_call(level1_pb2.FindLevel1Req( - level0_id = None, - data_type = None, - create_time_start = None, - create_time_end = None, - qc1_status = get_parameter(kwargs, "qc1_status", -1), - prc_status = None, - limit = get_parameter(kwargs, "limit", 1), - other_conditions = {"orderBy":"create_time asc", "module_id": 'SLS'} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + conditions = {"limit": 1} + conditions.update(kwargs) + return find_req("Level1Servicer.FindByQc1Status", conditions) @grpc_channel def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Get.with_call(level1_pb2.GetLevel1Req( - id = get_parameter(kwargs, "id"), - level0_id = get_parameter(kwargs, "level0_id"), - data_type = get_parameter(kwargs, "data_type") - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"data not found") - - return Result.ok_data(data = Level1Record().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("Level1Servicer.Get", kwargs) @grpc_channel def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level1_pb2.UpdateProcStatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level1Servicer.UpdateProcStatus", kwargs) @grpc_channel def update_qc1_status(self, **kwargs): - ''' update the status of QC0 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateQc1Status.with_call( - level1_pb2.UpdateQc1StatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level1Servicer.UpdateQc1Status", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a level1 record into database - - parameter kwargs: - level0_id : [str] - data_type : [str] - cor_sci_id : [int] - prc_params : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - pipeline_id : [str] - pmapname : [str] - build : [int] - - return csst_dfs_common.models.Result - ''' - - rec = level1_pb2.Level1Record( - id = 0, - level0_id = get_parameter(kwargs, "level0_id", ""), - module_id = get_parameter(kwargs, "module_id", ""), - data_type = get_parameter(kwargs, "data_type", ""), - cor_sci_id = get_parameter(kwargs, "cor_sci_id", 0), - prc_params = get_parameter(kwargs, "prc_params", ""), - filename = get_parameter(kwargs, "filename", ""), - file_path = get_parameter(kwargs, "file_path", ""), - qc1_status = get_parameter(kwargs, "qc1_status", 0), - prc_status = get_parameter(kwargs, "prc_status", 0), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id", ""), - build = get_parameter(kwargs, "build", 0), - pmapname = get_parameter(kwargs, "pmapname", ""), - refs = get_parameter(kwargs, "refs", {}) - ) - def stream(rec): - with open(rec.file_path, 'rb') as f: - while True: - data = f.read(UPLOAD_CHUNK_SIZE) - if not data: - break - yield level1_pb2.WriteLevel1Req(record = rec, data = data) try: - if not rec.file_path: + conditions = {} + conditions.update(kwargs) + file_path = get_parameter(kwargs, "file_path", "") + filename = get_parameter(kwargs, "filename", "") + if not file_path: return Result.error(message="file_path is blank") - if not os.path.exists(rec.file_path): - return Result.error(message="the file [%s] not existed" % (rec.file_path, )) - if not rec.filename: - rec.filename = os.path.basename(rec.file_path) - - resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level1Record().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + if not os.path.exists(file_path): + return Result.error(message="the file [%s] not existed" % (file_path, )) + if not filename: + filename = os.path.basename(file_path) + conditions["filename"] = filename + if not conditions.get("qc1_status", ''): + conditions["qc1_status"] = "0" + if not conditions.get("prc_status", ''): + conditions["prc_status"] = "-1024" + if not conditions.get("prc_time", ''): + time_now = datetime.datetime.now() + conditions["prc_time"] = time_now.strftime('%Y-%m-%d %H:%M:%S') + + if not conditions.get("build_id", ''): + conditions["build_id"] = "0" + with open(file_path, 'rb') as f: + byte_stream = io.BytesIO(f.read()) + return write_stream_req("Level1Servicer.Write", byte_stream, conditions) + except Exception as e: + return Result.error(message="%s" % (e,)) diff --git a/csst_dfs_api_cluster/facility/level1prc.py b/csst_dfs_api_cluster/facility/level1prc.py index 79afdf8d5e17014fd10193cc24b2eda2ee7337d5..987af0a80660cc8b9576efec5c2bca077a9b9d37 100644 --- a/csst_dfs_api_cluster/facility/level1prc.py +++ b/csst_dfs_api_cluster/facility/level1prc.py @@ -1,104 +1,19 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import Level1PrcRecord - -from csst_dfs_proto.facility.level1prc import level1prc_pb2, level1prc_pb2_grpc from ..common.service import grpc_channel from ..common.utils import * class Level1PrcApi(object): def __init__(self): - self.stub_class = level1prc_pb2_grpc.Level1PrcSrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve level1 procedure records from database - - parameter kwargs: - level1_id: [str] - pipeline_id: [str] - prc_module: [str] - prc_status : [int] - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level1prc_pb2.FindLevel1PrcReq( - level1_id = get_parameter(kwargs, "level1_id"), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - prc_module = get_parameter(kwargs, "prc_module"), - prc_status = get_parameter(kwargs, "prc_status"), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(Level1PrcRecord, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("Level1PrcServicer.Find", kwargs) @grpc_channel def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level1prc_pb2.UpdateProcStatusReq(id=id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level1PrcServicer.UpdateProcStatus", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a level1 procedure record into database - - parameter kwargs: - level1_id : [int] - pipeline_id : [str] - prc_module : [str] - params_file_path : [str] - prc_status : [int] - prc_time : [str] - result_file_path : [str] - return csst_dfs_common.models.Result - ''' - - rec = level1prc_pb2.Level1PrcRecord( - id = 0, - level1_id = get_parameter(kwargs, "level1_id"), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - prc_module = get_parameter(kwargs, "prc_module"), - params_file_path = get_parameter(kwargs, "params_file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time"), - result_file_path = get_parameter(kwargs, "result_file_path") - ) - req = level1prc_pb2.WriteLevel1PrcReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data = Level1PrcRecord().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return write_req("Level1PrcServicer.Write", kwargs) diff --git a/csst_dfs_api_cluster/facility/level2.py b/csst_dfs_api_cluster/facility/level2.py index e39f350305d559cc0571774c7239373e0a6ed092..adcd7de3b5bdf00a423ecc83e4fa68d8c3712c58 100644 --- a/csst_dfs_api_cluster/facility/level2.py +++ b/csst_dfs_api_cluster/facility/level2.py @@ -9,69 +9,22 @@ from collections.abc import Iterable from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list from csst_dfs_commons.models.level2 import Level2Record, filter_table_name -from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE -from csst_dfs_proto.facility.level2 import level2_pb2, level2_pb2_grpc + from ..common.service import grpc_channel from ..common.utils import * +from ..common.constants import * class Level2DataApi(object): """ Level2 Data Operation Class """ def __init__(self): - self.stub_class = level2_pb2_grpc.Level2SrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve level2 records from database - - parameter kwargs: - level0_id: [str] - level1_id: [int] - module_id: [str] - brick_id: [int] - data_type: [str] - create_time : (start, end), - qc2_status : [int], - prc_status : [int], - import_status : [int], - filename: [str], - build : [int], - pipeline_id: [str], - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level2_pb2.FindLevel2Req( - level0_id = get_parameter(kwargs, "level0_id"), - level1_id = get_parameter(kwargs, "level1_id"), - module_id = get_parameter(kwargs, "module_id"), - brick_id = get_parameter(kwargs, "brick_id"), - data_type = get_parameter(kwargs, "data_type"), - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], - qc2_status = get_parameter(kwargs, "qc2_status", 1024), - prc_status = get_parameter(kwargs, "prc_status", 1024), - import_status = get_parameter(kwargs, "import_status", 1024), - filename = get_parameter(kwargs, "filename"), - object_name = get_parameter(kwargs, "object_name"), - limit = get_parameter(kwargs, "limit", 0), - pipeline_id = get_parameter(kwargs, "pipeline_id",""), - build = get_parameter(kwargs, "build", -1024), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) + return find_req("Level2Servicer.Find", kwargs) - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level2Record, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel def catalog_columns(self, **kwargs): ''' retrieve columns data type @@ -92,211 +45,62 @@ class Level2DataApi(object): @grpc_channel def catalog_query(self, **kwargs): - ''' retrieve level2catalog records from database - - parameter kwargs: - sql: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - datas = io.BytesIO() - totalCount = 0 - - resps = self.stub.FindCatalog(level2_pb2.FindLevel2CatalogReq( - sql = get_parameter(kwargs, "sql", None), - limit = get_parameter(kwargs, "limit", 0) - ),metadata = get_auth_headers()) - - for resp in resps: - if resp.success: - datas.write(resp.records) - totalCount = resp.totalCount - else: - return Result.error(message = str(resp.error.detail)) - datas.flush() - records = pickle.loads(datas.getvalue()) - return Result.ok_data(data = records[0]).append("totalCount", totalCount).append("columns", records[1]) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("Level2Servicer.FindCatalog", kwargs) @grpc_channel def coord_cond_sql(self, **kwargs): - ''' generate coordinate search condition sql - - :param kwargs: Parameter dictionary, key items support: - data_type: [str] - ra: [float] - dec: [float] - radius: [float] - - :returns: csst_dfs_common.models.Result - ''' - try: - resp = self.stub.CoordCond(level2_pb2.CoordCondReq( - data_type = get_parameter(kwargs, "data_type"), - ra = get_parameter(kwargs, "ra"), - dec = get_parameter(kwargs, "dec"), - radius = get_parameter(kwargs, "radius") - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = resp.condition) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("Level2Servicer.CoordCond", kwargs) @grpc_channel def find_existed_brick_ids(self, **kwargs): - ''' retrieve existed brick_ids in a single exposure catalog - - parameter kwargs: - data_type: [str] - return: csst_dfs_common.models.Result - ''' - try: - resp = self.stub.FindExistedBricks(level2_pb2.FindExistedBricksReq( - data_type = get_parameter(kwargs, "data_type") - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = resp.brick_ids) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("Level2Servicer.FindExistedBricks", kwargs) @grpc_channel def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Get.with_call(level2_pb2.GetLevel2Req( - id = get_parameter(kwargs, "id") - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"data not found") - - return Result.ok_data(data = Level2Record().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("Level2Servicer.Get", kwargs) @grpc_channel def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level2_pb2.UpdateProcStatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level2Servicer.UpdateProcStatus", kwargs) @grpc_channel def update_qc2_status(self, **kwargs): - ''' update the status of QC0 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateQc2Status.with_call( - level2_pb2.UpdateQc2StatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level2Servicer.UpdateQc2Status", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a level2 record into database - - parameter kwargs: - level1_id : [int] - brick_id : [int] - module_id : [str] - object_name: [str] - data_type : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - pipeline_id : [str] - build : [int] - - return csst_dfs_common.models.Result - ''' - - rec = level2_pb2.Level2Record( - id = 0, - level0_id = get_parameter(kwargs, "level0_id", ""), - level1_id = get_parameter(kwargs, "level1_id", 0), - brick_id = get_parameter(kwargs, "brick_id", 0), - module_id = get_parameter(kwargs, "module_id", ""), - data_type = get_parameter(kwargs, "data_type", ""), - object_name = get_parameter(kwargs, "object_name", ""), - filename = get_parameter(kwargs, "filename", ""), - file_path = get_parameter(kwargs, "file_path", ""), - qc2_status = get_parameter(kwargs, "qc2_status", 0), - prc_status = get_parameter(kwargs, "prc_status", 0), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - build = get_parameter(kwargs, "build", 0), - pipeline_id = get_parameter(kwargs, "pipeline_id", "") - ) - def stream(rec): - with open(rec.file_path, 'rb') as f: - while True: - data = f.read(UPLOAD_CHUNK_SIZE) - if not data: - break - yield level2_pb2.WriteLevel2Req(record = rec, data = data) try: - if not rec.module_id: + conditions = {} + conditions.update(kwargs) + + module_id = get_parameter(kwargs, "module_id", "") + data_type = get_parameter(kwargs, "data_type", "") + file_path = get_parameter(kwargs, "file_path", "") + filename = get_parameter(kwargs, "filename", "") + if not module_id: return Result.error(message="module_id is blank") - if not rec.data_type: + if not data_type: return Result.error(message="data_type is blank") - if not rec.file_path: + if not file_path: return Result.error(message="file_path is blank") - if not os.path.exists(rec.file_path): - return Result.error(message="the file [%s] not existed" % (rec.file_path, )) - if not rec.filename: - rec.filename = os.path.basename(rec.file_path) + if not os.path.exists(file_path): + return Result.error(message="the file [%s] not existed" % (file_path, )) + if not filename: + filename = os.path.basename(file_path) + conditions["filename"] = filename - resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level2Record().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + if not conditions.get("qc2_status", ''): + conditions["qc2_status"] = "0" + + if not conditions.get("prc_status", ''): + conditions["prc_status"] = "-1024" + + if not conditions.get("prc_time", ''): + time_now = datetime.datetime.now() + conditions["prc_time"] = time_now.strftime('%Y-%m-%d %H:%M:%S') + + with open(file_path, 'rb') as f: + byte_stream = io.BytesIO(f.read()) + return write_stream_req("Level2Servicer.Write", byte_stream, conditions) + except Exception as e: + return Result.error(message="%s" % (e,)) \ No newline at end of file diff --git a/csst_dfs_api_cluster/facility/level2producer.py b/csst_dfs_api_cluster/facility/level2producer.py deleted file mode 100644 index 11027dc783ffbaf149152c061f2b8cc14e6f537e..0000000000000000000000000000000000000000 --- a/csst_dfs_api_cluster/facility/level2producer.py +++ /dev/null @@ -1,402 +0,0 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import Level2Producer, Level2Job, Level2ProducerRuning - -from csst_dfs_proto.facility.level2producer import level2producer_pb2, level2producer_pb2_grpc - -from ..common.service import grpc_channel -from ..common.utils import * -from ..common.constants import UPLOAD_CHUNK_SIZE - -class Level2ProducerApi(object): - """ - Level2Producer Operation Class - """ - def __init__(self): - self.stub_class = level2producer_pb2_grpc.Level2ProducerSrvStub - self.stub = None - - @grpc_channel - def register(self, **kwargs): - ''' register a Level2Producer data record into database - - :param kwargs: Parameter dictionary, key items support: - name = [str]\n - gitlink = [str]\n - paramfiles = [str]\n - priority = [int]\n - pre_producers = list[int] - - :returns: csst_dfs_common.models.Result - ''' - rec = level2producer_pb2.Level2ProducerRecord( - id = get_parameter(kwargs, "id", 0), - name = get_parameter(kwargs, "name", ""), - gitlink = get_parameter(kwargs, "gitlink"), - paramfiles = get_parameter(kwargs, "paramfiles"), - priority = get_parameter(kwargs, "priority", 0), - pre_producers = get_parameter(kwargs, "pre_producers",[]), - ) - req = level2producer_pb2.RegisterReq(record = rec) - try: - resp,_ = self.stub.Register.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level2Producer().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def find(self, **kwargs): - ''' retrieve Level2Producer records from database - - :param kwargs: Parameter dictionary, key items support: - key: [str] - limit: limits returns the number of records,default 0:no-limit - - :returns: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level2producer_pb2.FindReq( - key = get_parameter(kwargs, "key", "") - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(Level2Producer, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - p_id = get_parameter(kwargs, "id", 0) - resp, _ = self.stub.Get.with_call(level2producer_pb2.GetReq( - id = p_id - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"{p_id} not found") - - return Result.ok_data(data=Level2Producer().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def find_nexts(self, **kwargs): - ''' retrieve Level2Producer records from database - - :param kwargs: Parameter dictionary, key items support: - id : [int] - - :returns: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.FindNexts.with_call(level2producer_pb2.FindNextsReq( - id = get_parameter(kwargs, "id", 0) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(Level2Producer, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def find_start(self, **kwargs): - ''' retrieve Level2Producer records from database - - :param kwargs: Parameter dictionary, key items support: - key : [str] - - :returns: csst_dfs_common.models.Result - ''' - try: - - resp, _ = self.stub.FindStart.with_call(level2producer_pb2.FindStartReq( - key = get_parameter(kwargs, "key", "") - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(Level2Producer, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def update(self, **kwargs): - ''' update a Level2Producer - - :param kwargs: Parameter dictionary, key items support: - id : [int]\n - name = [str]\n - gitlink = [str]\n - paramfiles = [str]\n - priority = [int]\n - pre_producers = list[int] - - :returns: csst_dfs_common.models.Result - ''' - try: - rec = level2producer_pb2.Level2ProducerRecord( - id = get_parameter(kwargs, "id", 0), - name = get_parameter(kwargs, "name", ""), - gitlink = get_parameter(kwargs, "gitlink", ""), - paramfiles = get_parameter(kwargs, "paramfiles", ""), - priority = get_parameter(kwargs, "priority", 0), - pre_producers = get_parameter(kwargs, "pre_producers",[]) - ) - resp,_ = self.stub.Update.with_call( - level2producer_pb2.UpdateReq(record = rec), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def delete(self, **kwargs): - ''' delete a Level2Producer data - - :param kwargs: Parameter dictionary, key items support: - id = [int] - - :returns: csst_dfs_common.models.Result - ''' - try: - resp,_ = self.stub.Delete.with_call( - level2producer_pb2.DeleteReq( - id = get_parameter(kwargs, "id", 0)), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def new_job(self, **kwargs): - ''' new a Level2Producer Job - - :param kwargs: Parameter dictionary, key items support: - dag = [str] - - :returns: csst_dfs_common.models.Result - ''' - rec = level2producer_pb2.Level2JobRecord( - id = 0, - name = get_parameter(kwargs, "name", ""), - dag = get_parameter(kwargs, "dag", "") - ) - req = level2producer_pb2.NewJobReq(record = rec) - try: - resp,_ = self.stub.NewJob.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level2Job().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def get_job(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - p_id = get_parameter(kwargs, "id", 0) - resp, _ = self.stub.GetJob.with_call(level2producer_pb2.GetJobReq( - id = p_id - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"{p_id} not found") - - return Result.ok_data(data=Level2Job().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def update_job(self, **kwargs): - ''' update a Level2Producer Job - - :param kwargs: Parameter dictionary, key items support: - id = [int] - dag = [str] - status = [int] - - :returns: csst_dfs_common.models.Result - ''' - rec = level2producer_pb2.Level2JobRecord( - id = get_parameter(kwargs, "id", 0), - name = get_parameter(kwargs, "name", ""), - dag = get_parameter(kwargs, "dag", ""), - status = get_parameter(kwargs, "status", -1) - ) - req = level2producer_pb2.UpdateJobReq(record = rec) - try: - resp,_ = self.stub.UpdateJob.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def new_running(self, **kwargs): - ''' insert a Level2ProducerRuningRecord data - - :param kwargs: Parameter dictionary, key items support: - job_id = [int]\n - producer_id = [int]\n - brick_id = [int]\n - start_time = [str]\n - end_time = [str]\n - prc_status = [int]\n - prc_result = [str] - - :returns: csst_dfs_common.models.Result - ''' - rec = level2producer_pb2.Level2ProducerRuningRecord( - id = 0, - job_id = get_parameter(kwargs, "job_id", 0), - producer_id = get_parameter(kwargs, "producer_id", 0), - brick_id = get_parameter(kwargs, "brick_id", 0), - start_time = get_parameter(kwargs, "start_time", ""), - prc_status = get_parameter(kwargs, "prc_status", 0), - prc_result = get_parameter(kwargs, "prc_result", "") - ) - req = level2producer_pb2.WriteRunningReq(record = rec) - try: - resp,_ = self.stub.WriteRunning.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level2ProducerRuning().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def get_running(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - p_id = get_parameter(kwargs, "id", 0) - resp, _ = self.stub.GetRunning.with_call(level2producer_pb2.GetRunningReq( - id = p_id - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"{p_id} not found") - - return Result.ok_data(data=Level2ProducerRuning().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def update_running(self, **kwargs): - ''' udpate a Level2ProducerRuningRecord data - - :param kwargs: Parameter dictionary, key items support: - id = [int]\n - job_id = [int]\n - producer_id = [int]\n - brick_id = [int]\n - start_time = [str]\n - end_time = [str]\n - prc_status = [int]\n - prc_result = [str] - - :returns: csst_dfs_common.models.Result - ''' - rec = level2producer_pb2.Level2ProducerRuningRecord( - id = get_parameter(kwargs, "id", 0), - job_id = get_parameter(kwargs, "job_id", 0), - producer_id = get_parameter(kwargs, "producer_id", 0), - brick_id = get_parameter(kwargs, "brick_id", 0), - start_time = get_parameter(kwargs, "start_time", ""), - end_time = get_parameter(kwargs, "end_time", ""), - prc_status = get_parameter(kwargs, "prc_status", 0), - prc_result = get_parameter(kwargs, "prc_result", "") - ) - req = level2producer_pb2.UpdateRunningReq(record = rec) - try: - resp,_ = self.stub.UpdateRunning.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def find_running(self, **kwargs): - ''' find Level2ProducerRuningRecord data - - :param kwargs: Parameter dictionary, key items support: - job_id = [int]\n - producer_id = [int]\n - brick_id = [int]\n - prc_status = [int]\n - create_time : (start, end)\n - limit = [int] - - :returns: csst_dfs_common.models.Result - ''' - req = level2producer_pb2.FindRunningReq( - job_id = get_parameter(kwargs, "job_id", 0), - producer_id = get_parameter(kwargs, "producer_id", 0), - brick_id = get_parameter(kwargs, "brick_id", 0), - prc_status = get_parameter(kwargs, "prc_status", 0), - start_time = get_parameter(kwargs, "create_time", [None, None])[0], - end_time = get_parameter(kwargs, "create_time", [None, None])[1], - limit = get_parameter(kwargs, "limit", 0) - ) - try: - resp,_ = self.stub.FindRunning.with_call(req, metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data = from_proto_model_list(Level2ProducerRuning, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) \ No newline at end of file diff --git a/csst_dfs_api_cluster/facility/level2type.py b/csst_dfs_api_cluster/facility/level2type.py index 67af8b4b4614f5b6389819ff0b1e98833c923298..0f4418158d1e61b15a10a9fa44db7c264dd3b149 100644 --- a/csst_dfs_api_cluster/facility/level2type.py +++ b/csst_dfs_api_cluster/facility/level2type.py @@ -1,155 +1,43 @@ -import io import os -import grpc -import datetime -import pickle - -from collections.abc import Iterable - from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.level2 import Level2TypeRecord -from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE -from csst_dfs_proto.facility.level2type import level2type_pb2, level2type_pb2_grpc - from ..common.service import grpc_channel from ..common.utils import * class Level2TypeApi(object): - """ - Level2Type Data Operation Class - """ def __init__(self): - self.stub_class = level2type_pb2_grpc.Level2TypeSrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve level2type records from database - - parameter kwargs: - module_id: [str] - data_type: [str] - import_status : [int], - page: [int] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - the_limit = get_parameter(kwargs, "limit", 100000) - the_limit = the_limit if the_limit > 0 else 100000 - - try: - resp, _ = self.stub.Find.with_call(level2type_pb2.FindLevel2TypeReq( - module_id = get_parameter(kwargs, "module_id"), - data_type = get_parameter(kwargs, "data_type"), - import_status = get_parameter(kwargs, "import_status", 1024), - limit = the_limit, - page = get_parameter(kwargs, "page", 1) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level2TypeRecord, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("Level2TypeServicer.Find", kwargs) @grpc_channel def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - data_type: [str] - - return csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Get.with_call(level2type_pb2.GetLevel2TypeReq( - data_type = get_parameter(kwargs, "data_type") - ),metadata = get_auth_headers()) - - if not resp.record or not resp.record.data_type: - return Result.error(message=f"data not found") - - return Result.ok_data(data = Level2TypeRecord().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("Level2TypeServicer.Get", kwargs) @grpc_channel def update_import_status(self, **kwargs): - ''' update the status of level2 type - - parameter kwargs: - data_type: [str] - status : [int] - - return csst_dfs_common.models.Result - ''' - data_type = get_parameter(kwargs, "data_type") - status = get_parameter(kwargs, "status", 0) - try: - resp,_ = self.stub.UpdateImportStatus.with_call( - level2type_pb2.UpdateImportStatusReq(data_type=data_type, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("Level2TypeServicer.UpdateImportStatus", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a level2type record into database - - parameter kwargs: - data_type : [str] - module_id : [str] - key_column : [str] - hdu_index : [int] - demo_filename : [str] - demo_file_path : [str] - ra_column : [str] - dec_column : [str] - - return csst_dfs_common.models.Result - ''' - - rec = level2type_pb2.Level2TypeRecord( - data_type = get_parameter(kwargs, "data_type", ""), - module_id = get_parameter(kwargs, "module_id", ""), - key_column = get_parameter(kwargs, "key_column", ""), - hdu_index = get_parameter(kwargs, "hdu_index", 0), - demo_filename = get_parameter(kwargs, "demo_filename", ""), - demo_file_path = get_parameter(kwargs, "demo_file_path", ""), - ra_column = get_parameter(kwargs, "ra_column", ""), - dec_column = get_parameter(kwargs, "dec_column", "") - ) - def stream(rec): - with open(rec.demo_file_path, 'rb') as f: - while True: - data = f.read(UPLOAD_CHUNK_SIZE) - if not data: - break - yield level2type_pb2.WriteLevel2TypeReq(record = rec, data = data) try: - if not rec.data_type: - return Result.error(message="data_type is blank") - if not rec.demo_file_path: + data_type = get_parameter(kwargs, "data_type", "") + if not data_type: + return Result.error(message="the data_type is blank") + file_path = get_parameter(kwargs, "demo_file_path", "") + filename = get_parameter(kwargs, "demo_filename", "") + if not file_path: return Result.error(message="demo_file_path is blank") - if not os.path.exists(rec.demo_file_path): - return Result.error(message="the file [%s] not existed" % (rec.demo_file_path, )) - if not rec.demo_filename: - rec.demo_filename = os.path.basename(rec.demo_file_path) - - resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level2TypeRecord().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + if not os.path.exists(file_path): + return Result.error(message="the file [%s] not existed" % (file_path, )) + if not filename: + filename = os.path.basename(file_path) + conditions = {} + conditions.update(kwargs) + conditions["demo_filename"] = filename + with open(file_path, 'rb') as f: + byte_stream = io.BytesIO(f.read()) + return write_stream_req("Level2TypeServicer.Write", byte_stream, kwargs) + except Exception as e: + return Result.error(message="%s" % (e,)) diff --git a/csst_dfs_api_cluster/facility/observation.py b/csst_dfs_api_cluster/facility/observation.py index bba43a10a02da1c9a67d3fa6b2d4305aed57007f..baa6573e1255842b8f50819a89e62446a2c369ec 100644 --- a/csst_dfs_api_cluster/facility/observation.py +++ b/csst_dfs_api_cluster/facility/observation.py @@ -1,171 +1,26 @@ -import grpc - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import Observation -from csst_dfs_proto.facility.observation import observation_pb2, observation_pb2_grpc - from ..common.service import grpc_channel from ..common.utils import * -from ..common.constants import UPLOAD_CHUNK_SIZE class ObservationApi(object): - """ - Observation Operation Class - """ def __init__(self): - self.stub_class = observation_pb2_grpc.ObservationSrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve exposure records from database - - parameter kwargs: - module_id: [str] - obs_type: [str] - obs_time : (start, end), - qc0_status : [int], - prc_status : [int], - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(observation_pb2.FindObservationReq( - module_id = get_parameter(kwargs, "module_id"), - obs_type = get_parameter(kwargs, "obs_type"), - exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0], - exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1], - qc0_status = get_parameter(kwargs, "qc0_status"), - prc_status = get_parameter(kwargs, "prc_status"), - limit = get_parameter(kwargs, "limit", 0), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data = from_proto_model_list(Observation, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return find_req("ObservationServicer.Find", kwargs) @grpc_channel def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int], - obs_id = [str] - - return csst_dfs_common.models.Result - ''' - try: - id = get_parameter(kwargs, "id") - obs_id = get_parameter(kwargs, "obs_id") - resp, _ = self.stub.Get.with_call(observation_pb2.GetObservationReq( - id = id, - obs_id = obs_id - ),metadata = get_auth_headers()) - - if resp.observation is None or resp.observation.id == 0: - return Result.error(message=f"not found") - - return Result.ok_data(data=Observation().from_proto_model(resp.observation)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return get_req("ObservationServicer.Get", kwargs) @grpc_channel def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - obs_id = [str], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id") - obs_id = get_parameter(kwargs, "obs_id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - observation_pb2.UpdateProcStatusReq( - id = id, - obs_id = obs_id, - status = status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("ObservationServicer.UpdateProcStatus", kwargs) @grpc_channel def update_qc0_status(self, **kwargs): - ''' update the status of QC0 - - parameter kwargs: - id : [int], - obs_id = [str], - status : [int] - ''' - id = get_parameter(kwargs, "id") - obs_id = get_parameter(kwargs, "obs_id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateQc0Status.with_call( - observation_pb2.UpdateQc0StatusReq( - id = id, - obs_id = obs_id, - status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return update_req("ObservationServicer.UpdateQc0Status", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a observational record into database - - parameter kwargs: - id = [id] - obs_id = [str] - obs_time = [str] - exp_time = [int] - module_id = [str] - obs_type = [str] - facility_status_id = [int] - module_status_id = [int] - return: csst_dfs_common.models.Result - ''' - - rec = observation_pb2.Observation( - id = get_parameter(kwargs, "id", 0), - obs_id = get_parameter(kwargs, "obs_id", ""), - obs_time = get_parameter(kwargs, "obs_time"), - exp_time = get_parameter(kwargs, "exp_time"), - module_id = get_parameter(kwargs, "module_id"), - obs_type = get_parameter(kwargs, "obs_type"), - facility_status_id = get_parameter(kwargs, "facility_status_id"), - module_status_id = get_parameter(kwargs, "module_status_id") - ) - req = observation_pb2.WriteObservationReq(record = rec) - try: - resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Observation().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + return write_req("ObservationServicer.Write", kwargs) diff --git a/csst_dfs_api_cluster/facility/otherdata.py b/csst_dfs_api_cluster/facility/otherdata.py index 3396882a0ba199b30426864ec288c208e8f9333a..a945c3f240441fd55961f6a554339315e1068bcf 100644 --- a/csst_dfs_api_cluster/facility/otherdata.py +++ b/csst_dfs_api_cluster/facility/otherdata.py @@ -1,130 +1,35 @@ -import os -import grpc -import datetime - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.facility import OtherDataRecord -from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE -from csst_dfs_proto.facility.otherdata import otherdata_pb2, otherdata_pb2_grpc from ..common.service import grpc_channel from ..common.utils import * - +from ..common.constants import * class OtherDataApi(object): - """ - OtherData Data Operation Class - """ def __init__(self): - self.stub_class = otherdata_pb2_grpc.OtherDataSrvStub self.stub = None @grpc_channel def find(self, **kwargs): - ''' retrieve otherdata records from database - - parameter kwargs: - obs_id: [str] - detector_no: [str] - module_id: [str] - file_type: [str] - filename: [str] - create_time : (start, end) - pipeline_id : [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(otherdata_pb2.FindOtherDataReq( - obs_id = get_parameter(kwargs, "obs_id"), - detector_no = get_parameter(kwargs, "detector_no", ""), - module_id = get_parameter(kwargs, "module_id"), - file_type = get_parameter(kwargs, "file_type"), - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], - pipeline_id = get_parameter(kwargs, "pipeline_id", ""), - filename = get_parameter(kwargs, "filename"), - limit = get_parameter(kwargs, "limit", 0) - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(OtherDataRecord, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - + return find_req("OtherDataServicer.Find", kwargs) @grpc_channel def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Get.with_call(otherdata_pb2.GetOtherDataReq( - id = get_parameter(kwargs, "id") - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"data not found") - - return Result.ok_data(data = OtherDataRecord().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - + return get_req("OtherDataServicer.Get", kwargs) @grpc_channel def write(self, **kwargs): - ''' insert a otherdata record into database - - parameter kwargs: - obs_id: [str] - detector_no : [str] - module_id : [str] - file_type : [str] - filename : [str] - file_path : [str] - pipeline_id : [str] - - return csst_dfs_common.models.Result - ''' - - rec = otherdata_pb2.OtherDataRecord( - id = 0, - obs_id = get_parameter(kwargs, "obs_id"), - module_id = get_parameter(kwargs, "module_id", ''), - file_type = get_parameter(kwargs, "file_type"), - detector_no = get_parameter(kwargs, "detector_no"), - filename = get_parameter(kwargs, "filename", ""), - file_path = get_parameter(kwargs, "file_path", ""), - pipeline_id = get_parameter(kwargs, "pipeline_id") - ) - def stream(rec): - with open(rec.file_path, 'rb') as f: - while True: - data = f.read(UPLOAD_CHUNK_SIZE) - if not data: - break - yield otherdata_pb2.WriteOtherDataReq(record = rec, data = data) try: - if not rec.file_path: + file_path = get_parameter(kwargs, "file_path", "") + filename = get_parameter(kwargs, "filename", "") + if not file_path: return Result.error(message="file_path is blank") - if not os.path.exists(rec.file_path): - return Result.error(message="the file [%s] not existed" % (rec.file_path, )) - if not rec.filename: - rec.filename = os.path.basename(rec.file_path) - - resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=OtherDataRecord().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) + if not os.path.exists(file_path): + return Result.error(message="the file [%s] not existed" % (file_path, )) + if not filename: + filename = os.path.basename(file_path) + conditions = {} + conditions.update(kwargs) + conditions["filename"] = filename + with open(file_path, 'rb') as f: + byte_stream = io.BytesIO(f.read()) + return write_stream_req("OtherDataServicer.Write", byte_stream, kwargs) + except Exception as e: + return Result.error(message="%s" % (e,)) diff --git a/csst_dfs_api_cluster/hstdm/level2.py b/csst_dfs_api_cluster/hstdm/level2.py deleted file mode 100644 index c6a1b5f9e513da13f027173d76f1910dd603fb85..0000000000000000000000000000000000000000 --- a/csst_dfs_api_cluster/hstdm/level2.py +++ /dev/null @@ -1,183 +0,0 @@ -import os -import grpc -import datetime - -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.common import from_proto_model_list -from csst_dfs_commons.models.hstdm import Level2Data -from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE -from csst_dfs_proto.hstdm.level2 import level2_pb2, level2_pb2_grpc - -from ..common.service import grpc_channel -from ..common.utils import * - -class Level2DataApi(object): - """ - Level2 Data Operation Class - """ - def __init__(self): - self.stub = level2_pb2_grpc.Level2SrvStub - self.stub = None - - @grpc_channel - def find(self, **kwargs): - ''' retrieve level2 records from database - - :param kwargs: Parameter dictionary, key items support: - level0_id: [str] - level1_id: [int] - project_id: [int] - file_type: [str] - create_time : (start, end), - qc2_status : [int], - prc_status : [int], - filename: [str] - limit: limits returns the number of records,default 0:no-limit - - :returns: csst_dfs_common.models.Result - ''' - try: - resp, _ = self.stub.Find.with_call(level2_pb2.FindLevel2Req( - level0_id = get_parameter(kwargs, "level0_id",None), - level1_id = get_parameter(kwargs, "level1_id", 0), - project_id = get_parameter(kwargs, "project_id", 0), - file_type = get_parameter(kwargs, "file_type"), - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], - qc2_status = get_parameter(kwargs, "qc2_status", 1024), - prc_status = get_parameter(kwargs, "prc_status", 1024), - filename = get_parameter(kwargs, "filename"), - limit = get_parameter(kwargs, "limit", 0), - other_conditions = {"test":"cnlab.test"} - ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level2Data, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int] - - return csst_dfs_common.models.Result - ''' - try: - fits_id = get_parameter(kwargs, "id") - resp, _ = self.stub.Get.with_call(level2_pb2.GetLevel2Req( - id = fits_id - ),metadata = get_auth_headers()) - - if resp.record is None or resp.record.id == 0: - return Result.error(message=f"id:{fits_id} not found") - - return Result.ok_data(data = Level2Data().from_proto_model(resp.record)) - - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateProcStatus.with_call( - level2_pb2.UpdateProcStatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - @grpc_channel - def update_qc2_status(self, **kwargs): - ''' update the status of QC2 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - resp,_ = self.stub.UpdateQc2Status.with_call( - level2_pb2.UpdateQc2StatusReq(id=fits_id, status=status), - metadata = get_auth_headers() - ) - if resp.success: - return Result.ok_data() - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details())) - - @grpc_channel - def write(self, **kwargs): - ''' insert a level2 record into database - - parameter kwargs: - level0_id: [str] - level1_id: [int] - project_id: [int] - file_type : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - pipeline_id : [str] - refs: [dict] - - return csst_dfs_common.models.Result - ''' - - rec = level2_pb2.Level2Record( - id = 0, - level0_id = get_parameter(kwargs, "level0_id", None), - level1_id = get_parameter(kwargs, "level1_id", 0), - project_id = get_parameter(kwargs, "project_id", 0), - file_type = get_parameter(kwargs, "file_type"), - filename = get_parameter(kwargs, "filename", ""), - file_path = get_parameter(kwargs, "file_path", ""), - qc2_status = get_parameter(kwargs, "qc2_status", 0), - prc_status = get_parameter(kwargs, "prc_status", 0), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id") - ) - def stream(rec): - with open(rec.file_path, 'rb') as f: - while True: - data = f.read(UPLOAD_CHUNK_SIZE) - if not data: - break - yield level2_pb2.WriteLevel2Req(record = rec, data = data) - try: - if not rec.file_path: - return Result.error(message="file_path is blank") - if not os.path.exists(rec.file_path): - return Result.error(message="the file [%s] not existed" % (rec.file_path, )) - if not rec.filename: - rec.filename = os.path.basename(rec.file_path) - - resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) - if resp.success: - return Result.ok_data(data=Level2Data().from_proto_model(resp.record)) - else: - return Result.error(message = str(resp.error.detail)) - except grpc.RpcError as e: - return Result.error(message="%s:%s" % (e.code().value, e.details()))