import io import os from datetime import datetime import time import grpc import pickle from csst_dfs_commons.models import Result, Record from csst_dfs_proto.db import db_pb2, db_pb2_grpc from .service import get_grpc_channel from .constants import UPLOAD_CHUNK_SIZE def format_datetime(dt): return dt.strftime('%Y-%m-%d %H:%M:%S') def format_date(dt): return dt.strftime('%Y-%m-%d') def format_time_ms(float_time): local_time = time.localtime(float_time) data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time) data_secs = (float_time - int(float_time)) * 1000 return "%s.%03d" % (data_head, data_secs) def get_parameter(kwargs, key, default=None): """ Get a specified named value for this (calling) function The parameter is searched for in kwargs :param kwargs: Parameter dictionary :param key: Key e.g. 'max_workers' :param default: Default value :return: result """ if kwargs is None: return default value = default if key in kwargs.keys(): value = kwargs[key] return value def to_int(s, default_value = 0): try: return int(s) except: return default_value def singleton(cls): _instance = {} def inner(): if cls not in _instance: _instance[cls] = cls() return _instance[cls] return inner def get_auth_headers(): return (("csst_dfs_app",os.getenv("CSST_DFS_APP_ID")),("csst_dfs_token",os.getenv("CSST_DFS_APP_TOKEN")),) def get_next_id(prefix): with get_grpc_channel() as c: stub = db_pb2_grpc.DBSrvStub(c) try: resp,_ = stub.Get.with_call( db_pb2.GetReq( conditions = { "__function":"MiscServicer.GetSeqId", "prefix": prefix } ), metadata = get_auth_headers() ) record = pickle.loads(resp.record) return Result.ok_data(data=record[0]) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) def update_kwargs(function, kwargs): conditions = { "__function": function, } conditions.update(kwargs) conditions = { k:str(v) for k,v in conditions.items() } return conditions def find_req(function, kwargs): conditions = update_kwargs(function, kwargs) if "limit" not in conditions: conditions["limit"] = "-1" if "page" not in conditions: conditions["page"] = "-1" req = db_pb2.FindReq(conditions = conditions) with get_grpc_channel() as c: try: datas = io.BytesIO() totalCount = 0 resps = db_pb2_grpc.DBSrvStub(c).Find(req, metadata = get_auth_headers()) for resp in resps: if resp.success: datas.write(resp.records) totalCount = resp.totalCount else: return Result.error(message = str(resp.error.detail)) datas.flush() dv = datas.getvalue() if not dv: return Result.ok_data(data = []).append("totalCount", totalCount)\ .append("columns", []) else: records = pickle.loads(datas.getvalue()) records, cols = records[0], records[1] columns = [] for col in cols: if col in columns: columns.append("%s_1" % (col, )) else: columns.append(col) data = Record.from_list(records, columns) return Result.ok_data(data = data).append("totalCount", totalCount)\ .append("columns", columns) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) def get_req(function, kwargs): req = db_pb2.GetReq(conditions = update_kwargs(function, kwargs)) with get_grpc_channel() as c: stub = db_pb2_grpc.DBSrvStub(c) try: resp = stub.Get(req, metadata = get_auth_headers() ) if resp.record: record = pickle.loads(resp.record) cols = resp.columns columns = [] for col in cols: if col in columns: columns.append("%s_1" % (col, )) else: columns.append(col) if record: data = Record.from_tuple(record, columns) return Result.ok_data(data=data) else: return Result.error(message=f"not found") else: return Result.error(message=f"not found") except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) def update_req(function, kwargs): req = db_pb2.UpdateReq(conditions = update_kwargs(function, kwargs)) with get_grpc_channel() as c: stub = db_pb2_grpc.DBSrvStub(c) try: resp = stub.Update(req, metadata = get_auth_headers() ) if resp.success: return Result.ok_data() else: return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) def write_req(function, kwargs): req = db_pb2.WriteReq(conditions = update_kwargs(function, kwargs)) with get_grpc_channel() as c: stub = db_pb2_grpc.DBSrvStub(c) try: resp = stub.Write(req, metadata = get_auth_headers() ) if resp.success: if resp.record: record = pickle.loads(resp.record) cols = resp.columns columns = [] for col in cols: if col in columns: columns.append("%s_1" % (col, )) else: columns.append(col) if record: data = Record.from_tuple(record, columns) return Result.ok_data(data=data) else: return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details())) def write_stream_req(function, byte_stream, kwargs): conditions = update_kwargs(function, kwargs) def stream(): while True: data = byte_stream.read(UPLOAD_CHUNK_SIZE) if not data: break yield db_pb2.WriteStreamReq(conditions = conditions, data = data) with get_grpc_channel() as c: stub = db_pb2_grpc.DBSrvStub(c) try: resp = stub.WriteStream(stream(), metadata = get_auth_headers() ) if resp.success: if resp.record: record = pickle.loads(resp.record) cols = resp.columns columns = [] for col in cols: if col in columns: columns.append("%s_1" % (col, )) else: columns.append(col) if record: data = Record.from_tuple(record, columns) return Result.ok_data(data=data) else: return Result.error(message = str(resp.error.detail)) except grpc.RpcError as e: return Result.error(message="%s:%s" % (e.code().value, e.details()))