import io import os import grpc import datetime import pickle from collections.abc import Iterable from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_proto_model_list from csst_dfs_commons.models.level2 import Level2Record, filter_table_name from ..common.service import grpc_channel from ..common.utils import * from ..common.constants import * class Level2DataApi(object): """ Level2 Data Operation Class """ def __init__(self): self.stub = None @grpc_channel def find(self, **kwargs): return find_req("Level2Servicer.Find", kwargs) def catalog_columns(self, **kwargs): ''' retrieve columns data type :param kwargs: Parameter dictionary, key items support: data_type: [str] columns: [list], list of str ''' data_type = get_parameter(kwargs, "data_type", "") columns = get_parameter(kwargs, "columns", []) if type(columns) != list: columns = [columns] resp = self.catalog_query(sql=f"describe {filter_table_name(data_type)}") if resp.success: resp['data'] = [(rec[0], rec[1]) for rec in resp.data if rec[0].lower() in [col.lower() for col in columns] or len(columns) == 0] resp['columns'] = ['Field', 'Type'] resp['totalCount'] = len(resp['data']) return resp @grpc_channel def catalog_query(self, **kwargs): return find_req("Level2Servicer.FindCatalog", kwargs) @grpc_channel def coord_cond_sql(self, **kwargs): return get_req("Level2Servicer.CoordCond", kwargs) @grpc_channel def find_existed_brick_ids(self, **kwargs): return find_req("Level2Servicer.FindExistedBricks", kwargs) @grpc_channel def get(self, **kwargs): return get_req("Level2Servicer.Get", kwargs) @grpc_channel def update_proc_status(self, **kwargs): return update_req("Level2Servicer.UpdateProcStatus", kwargs) @grpc_channel def update_qc2_status(self, **kwargs): return update_req("Level2Servicer.UpdateQc2Status", kwargs) @grpc_channel def write(self, **kwargs): try: conditions = {} conditions.update(kwargs) module_id = get_parameter(kwargs, "module_id", "") data_type = get_parameter(kwargs, "data_type", "") file_path = get_parameter(kwargs, "file_path", "") filename = get_parameter(kwargs, "filename", "") if not module_id: return Result.error(message="module_id is blank") if not data_type: return Result.error(message="data_type is blank") if not file_path: return Result.error(message="file_path is blank") if not os.path.exists(file_path): return Result.error(message="the file [%s] not existed" % (file_path, )) if not filename: filename = os.path.basename(file_path) conditions["filename"] = filename if not conditions.get("qc2_status", ''): conditions["qc2_status"] = "0" if not conditions.get("prc_status", ''): conditions["prc_status"] = "-1024" if not conditions.get("prc_time", ''): time_now = datetime.datetime.now() conditions["prc_time"] = time_now.strftime('%Y-%m-%d %H:%M:%S') with open(file_path, 'rb') as f: byte_stream = io.BytesIO(f.read()) return write_stream_req("Level2Servicer.Write", byte_stream, conditions) except Exception as e: return Result.error(message="%s" % (e,))