Newer
Older
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_proto.common.misc import misc_pb2, misc_pb2_grpc
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def format_datetime(dt):
return dt.strftime('%Y-%m-%d %H:%M:%S')
def format_date(dt):
return dt.strftime('%Y-%m-%d')
def format_time_ms(float_time):
local_time = time.localtime(float_time)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = (float_time - int(float_time)) * 1000
return "%s.%03d" % (data_head, data_secs)
def get_parameter(kwargs, key, default=None):
""" Get a specified named value for this (calling) function
The parameter is searched for in kwargs
:param kwargs: Parameter dictionary
:param key: Key e.g. 'max_workers'
:param default: Default value
:return: result
"""
if kwargs is None:
return default
value = default
if key in kwargs.keys():
value = kwargs[key]
return value
def to_int(s, default_value = 0):
try:
return int(s)
except:
return default_value
def singleton(cls):
_instance = {}
def inner():
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return (("csst_dfs_app",os.getenv("CSST_DFS_APP_ID")),("csst_dfs_token",os.getenv("CSST_DFS_APP_TOKEN")),)
def get_nextId_by_prefix(prefix):
with get_grpc_channel() as c:
stub = misc_pb2_grpc.MiscSrvStub(c)
try:
resp,_ = stub.GetSeqId.with_call(
misc_pb2.GetSeqIdReq(prefix=prefix),
metadata = get_auth_headers()
)
return Result.ok_data(data=resp.nextId)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details()))