Skip to content
utils.py 7.83 KiB
Newer Older
Wei Shoulin's avatar
C9  
Wei Shoulin committed
import io
Wei Shoulin's avatar
Wei Shoulin committed
import os
Shoulin Wei's avatar
Shoulin Wei committed
from datetime import datetime
import time
Wei Shoulin's avatar
Wei Shoulin committed
import grpc
Wei Shoulin's avatar
C9  
Wei Shoulin committed
import pickle
Wei Shoulin's avatar
Wei Shoulin committed

Wei Shoulin's avatar
C9  
Wei Shoulin committed
from csst_dfs_commons.models import Result, Record
from csst_dfs_proto.db import db_pb2, db_pb2_grpc
Wei Shoulin's avatar
Wei Shoulin committed
from .service import get_grpc_channel
Wei Shoulin's avatar
C9  
Wei Shoulin committed
from .constants import UPLOAD_CHUNK_SIZE
Shoulin Wei's avatar
Shoulin Wei committed

def format_datetime(dt):
    return dt.strftime('%Y-%m-%d %H:%M:%S')

def format_date(dt):
    return dt.strftime('%Y-%m-%d')

def format_time_ms(float_time):
    local_time = time.localtime(float_time)
    data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
    data_secs = (float_time - int(float_time)) * 1000
    return "%s.%03d" % (data_head, data_secs)

def get_parameter(kwargs, key, default=None):
    """ Get a specified named value for this (calling) function

    The parameter is searched for in kwargs

    :param kwargs: Parameter dictionary
    :param key: Key e.g. 'max_workers'
    :param default: Default value
    :return: result
    """

    if kwargs is None:
        return default

    value = default
    if key in kwargs.keys():
        value = kwargs[key]
    return value
    
def to_int(s, default_value = 0):
    try:
        return int(s)
    except:
        return default_value

def singleton(cls):
    _instance = {}

    def inner():
        if cls not in _instance:
            _instance[cls] = cls()
        return _instance[cls]
Wei Shoulin's avatar
Wei Shoulin committed
    return inner

def get_auth_headers():
Wei Shoulin's avatar
Wei Shoulin committed
    return (("csst_dfs_app",os.getenv("CSST_DFS_APP_ID")),("csst_dfs_token",os.getenv("CSST_DFS_APP_TOKEN")),)

Wei Shoulin's avatar
C9  
Wei Shoulin committed
def get_next_id(prefix):
Wei Shoulin's avatar
Wei Shoulin committed
    with get_grpc_channel() as c:
Wei Shoulin's avatar
C9  
Wei Shoulin committed
        stub = db_pb2_grpc.DBSrvStub(c)
Wei Shoulin's avatar
Wei Shoulin committed
        try:
Wei Shoulin's avatar
C9  
Wei Shoulin committed
            resp,_ = stub.Get.with_call(
                db_pb2.GetReq(
                    conditions = {
                        "__function":"MiscServicer.GetSeqId",
                        "prefix": prefix
                    }
                ),
Wei Shoulin's avatar
Wei Shoulin committed
                metadata = get_auth_headers()
            )
Wei Shoulin's avatar
C9  
Wei Shoulin committed
            record = pickle.loads(resp.record)
            return Result.ok_data(data=record[0])
Wei Shoulin's avatar
Wei Shoulin committed
        except grpc.RpcError as e:
Wei Shoulin's avatar
C9  
Wei Shoulin committed
            return Result.error(message="%s:%s" % (e.code().value, e.details()))

def update_kwargs(function, kwargs):
    conditions = {
        "__function": function,
    }
    conditions.update(kwargs)
    conditions = { k:str(v) for k,v in conditions.items() }
    return conditions
    
def find_req(function, kwargs): 
    conditions = update_kwargs(function, kwargs)
    if "limit" not in conditions:
        conditions["limit"] = "-1"
    if "page" not in conditions:
        conditions["page"] = "-1"
        
    req = db_pb2.FindReq(conditions = conditions)
    with get_grpc_channel() as c:
        try:
            datas = io.BytesIO()
            totalCount = 0
            resps =  db_pb2_grpc.DBSrvStub(c).Find(req,
                                    metadata = get_auth_headers())
            for resp in resps:
                if resp.success:
                    datas.write(resp.records)
                    totalCount = resp.totalCount
                else:
                    return Result.error(message = str(resp.error.detail))
                
            datas.flush()
            dv = datas.getvalue()
            if not dv:
                return Result.ok_data(data = []).append("totalCount", totalCount)\
                            .append("columns", [])
            else:
                records = pickle.loads(datas.getvalue())
                records, cols = records[0], records[1]
Wei Shoulin's avatar
Wei Shoulin committed
                columns = []
                for col in cols:
                    if col in columns:
                        columns.append("%s_1" % (col, ))
                    else:
                        columns.append(col)

Wei Shoulin's avatar
Wei Shoulin committed
                data = Record.from_list(records, columns)
                return Result.ok_data(data = data).append("totalCount", totalCount)\
Wei Shoulin's avatar
Wei Shoulin committed
                            .append("columns", columns)
Wei Shoulin's avatar
C9  
Wei Shoulin committed

        except grpc.RpcError as e:
            return Result.error(message="%s:%s" % (e.code().value, e.details()))

def get_req(function, kwargs): 
    req = db_pb2.GetReq(conditions = update_kwargs(function, kwargs))
    with get_grpc_channel() as c:
        stub = db_pb2_grpc.DBSrvStub(c)
        try:
            resp = stub.Get(req,
                metadata = get_auth_headers()
            )
            if resp.record:
                record = pickle.loads(resp.record)
Wei Shoulin's avatar
Wei Shoulin committed
                cols = resp.columns

                columns = []
                for col in cols:
                    if col in columns:
                        columns.append("%s_1" % (col, ))
                    else:
                        columns.append(col)

Wei Shoulin's avatar
C9  
Wei Shoulin committed
                if record:
Wei Shoulin's avatar
Wei Shoulin committed
                    data = Record.from_tuple(record, columns)
                    return Result.ok_data(data=data)
Wei Shoulin's avatar
C9  
Wei Shoulin committed
                else:
                    return Result.error(message=f"not found")                
            else:
                return Result.error(message=f"not found")
        except grpc.RpcError as e:
            return Result.error(message="%s:%s" % (e.code().value, e.details()))

def update_req(function, kwargs): 
    req = db_pb2.UpdateReq(conditions = update_kwargs(function, kwargs))
    with get_grpc_channel() as c:
        stub = db_pb2_grpc.DBSrvStub(c)
        try:
            resp = stub.Update(req,
                metadata = get_auth_headers()
            )
            if resp.success:
                return Result.ok_data()
            else:
                return Result.error(message = str(resp.error.detail))
        except grpc.RpcError as e:
            return Result.error(message="%s:%s" % (e.code().value, e.details()))

def write_req(function, kwargs): 
    req = db_pb2.WriteReq(conditions = update_kwargs(function, kwargs))
    with get_grpc_channel() as c:
        stub = db_pb2_grpc.DBSrvStub(c)
        try:
            resp = stub.Write(req,
                metadata = get_auth_headers()
            )
            if resp.success:
                if resp.record:
                    record = pickle.loads(resp.record)
Wei Shoulin's avatar
Wei Shoulin committed
                    cols = resp.columns
                    columns = []
                    for col in cols:
                        if col in columns:
                            columns.append("%s_1" % (col, ))
                        else:
                            columns.append(col)
                    if record:
                        data = Record.from_tuple(record, columns)                            
                    return Result.ok_data(data=data)
Wei Shoulin's avatar
C9  
Wei Shoulin committed
            else:
                return Result.error(message = str(resp.error.detail))
        except grpc.RpcError as e:
            return Result.error(message="%s:%s" % (e.code().value, e.details()))  

def write_stream_req(function, byte_stream, kwargs): 
    conditions = update_kwargs(function, kwargs)
    def stream():
        while True:
            data = byte_stream.read(UPLOAD_CHUNK_SIZE)
            if not data:
                break
            yield db_pb2.WriteStreamReq(conditions = conditions, data = data)

    with get_grpc_channel() as c:
        stub = db_pb2_grpc.DBSrvStub(c)
        try:
            resp = stub.WriteStream(stream(),
                metadata = get_auth_headers()
            )
            if resp.success:
                if resp.record:
                    record = pickle.loads(resp.record)
Wei Shoulin's avatar
Wei Shoulin committed
                    cols = resp.columns
                    columns = []
                    for col in cols:
                        if col in columns:
                            columns.append("%s_1" % (col, ))
                        else:
                            columns.append(col)
                    if record:
                        data = Record.from_tuple(record, columns)                    
                    return Result.ok_data(data=data)
Wei Shoulin's avatar
C9  
Wei Shoulin committed
            else:
                return Result.error(message = str(resp.error.detail))
        except grpc.RpcError as e:
            return Result.error(message="%s:%s" % (e.code().value, e.details()))