diff --git a/csst_dfs_api_cluster/mbi/level2.py b/csst_dfs_api_cluster/mbi/level2.py index 898e5881f6df376e0cb95d2429d663992320abb3..52dc4a4f08ca20b78273664155a541c3f78d125d 100644 --- a/csst_dfs_api_cluster/mbi/level2.py +++ b/csst_dfs_api_cluster/mbi/level2.py @@ -1,6 +1,8 @@ +import io import os import grpc import datetime +import pickle from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list @@ -59,6 +61,7 @@ class Level2DataApi(object): ''' retrieve level2catalog records from database parameter kwargs: + brick_ids: list[int] obs_id: [str] detector_no: [str] filter: [str] @@ -71,7 +74,10 @@ class Level2DataApi(object): return: csst_dfs_common.models.Result ''' try: - resp, _ = self.stub.FindCatalog.with_call(level2_pb2.FindLevel2CatalogReq( + datas = io.BytesIO() + totalCount = 0 + resps = self.stub.FindCatalog(level2_pb2.FindLevel2CatalogReq( + brick_ids = ",".join(get_parameter(kwargs, "brick_ids", [])), obs_id = get_parameter(kwargs, "obs_id", None), detector_no = get_parameter(kwargs, "detector_no", None), filter = get_parameter(kwargs, "filter", None), @@ -82,14 +88,19 @@ class Level2DataApi(object): radius = get_parameter(kwargs, "radius"), minMag = get_parameter(kwargs, "min_mag"), maxMag = get_parameter(kwargs, "max_mag"), - limit = get_parameter(kwargs, "limit", 0) + limit = get_parameter(kwargs, "limit", 0), + columns = ",".join(get_parameter(kwargs, "columns", "*")) ),metadata = get_auth_headers()) - - if resp.success: - return Result.ok_data(data=from_proto_model_list(Level2CatalogRecord, resp.records)).append("totalCount", resp.totalCount) - else: - return Result.error(message = str(resp.error.detail)) - + + for resp in resps: + if resp.success: + datas.write(resp.records) + totalCount = resp.totalCount + else: + return Result.error(message = str(resp.error.detail)) + datas.flush() + records = pickle.loads(datas.getvalue()) + return Result.ok_data(data = records).append("totalCount", totalCount) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) @@ -97,6 +108,7 @@ class Level2DataApi(object): ''' retrieve level2catalog records from database parameter kwargs: + brick_ids: list[int] obs_id: [str] detector_no: [str] filter: [str] @@ -110,6 +122,7 @@ class Level2DataApi(object): ''' try: resp, _ = self.stub.FindCatalogFile.with_call(level2_pb2.FindLevel2CatalogReq( + brick_ids = get_parameter(kwargs, "brick_ids", []), obs_id = get_parameter(kwargs, "obs_id"), detector_no = get_parameter(kwargs, "detector_no"), filter = get_parameter(kwargs, "filter", None), @@ -131,6 +144,23 @@ class Level2DataApi(object): except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) + def find_existed_brick_ids(self, **kwargs): + ''' retrieve existed brick_ids in a single exposure catalog + + parameter kwargs: + return: csst_dfs_common.models.Result + ''' + try: + resp, _ = self.stub.FindExistedBricks.with_call(level2_pb2.FindExistedBricksReq(),metadata = get_auth_headers()) + + if resp.success: + return Result.ok_data(data = resp.brick_ids) + else: + return Result.error(message = str(resp.error.detail)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details())) + def get(self, **kwargs): ''' fetch a record from database @@ -148,7 +178,7 @@ class Level2DataApi(object): return Result.error(message=f"data not found") return Result.ok_data(data = Level2Record().from_proto_model(resp.record)) - + except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) diff --git a/csst_dfs_api_cluster/mbi/level2co.py b/csst_dfs_api_cluster/mbi/level2co.py index 8d70391e729547ddf0251c9d9fceb8856fe33780..0cd212691eeaf576b5d13126669479b0ea857107 100644 --- a/csst_dfs_api_cluster/mbi/level2co.py +++ b/csst_dfs_api_cluster/mbi/level2co.py @@ -1,3 +1,4 @@ +import io import os import grpc import datetime