Newer
Older
from csst_dfs_commons.models import Result, Record
from csst_dfs_proto.db import db_pb2, db_pb2_grpc
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def format_datetime(dt):
return dt.strftime('%Y-%m-%d %H:%M:%S')
def format_date(dt):
return dt.strftime('%Y-%m-%d')
def format_time_ms(float_time):
local_time = time.localtime(float_time)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = (float_time - int(float_time)) * 1000
return "%s.%03d" % (data_head, data_secs)
def get_parameter(kwargs, key, default=None):
""" Get a specified named value for this (calling) function
The parameter is searched for in kwargs
:param kwargs: Parameter dictionary
:param key: Key e.g. 'max_workers'
:param default: Default value
:return: result
"""
if kwargs is None:
return default
value = default
if key in kwargs.keys():
value = kwargs[key]
return value
def to_int(s, default_value = 0):
try:
return int(s)
except:
return default_value
def singleton(cls):
_instance = {}
def inner():
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return (("csst_dfs_app",os.getenv("CSST_DFS_APP_ID")),("csst_dfs_token",os.getenv("CSST_DFS_APP_TOKEN")),)
resp,_ = stub.Get.with_call(
db_pb2.GetReq(
conditions = {
"__function":"MiscServicer.GetSeqId",
"prefix": prefix
}
),
record = pickle.loads(resp.record)
return Result.ok_data(data=record[0])
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
return Result.error(message="%s:%s" % (e.code().value, e.details()))
def update_kwargs(function, kwargs):
conditions = {
"__function": function,
}
conditions.update(kwargs)
conditions = { k:str(v) for k,v in conditions.items() }
return conditions
def find_req(function, kwargs):
conditions = update_kwargs(function, kwargs)
if "limit" not in conditions:
conditions["limit"] = "-1"
if "page" not in conditions:
conditions["page"] = "-1"
req = db_pb2.FindReq(conditions = conditions)
with get_grpc_channel() as c:
try:
datas = io.BytesIO()
totalCount = 0
resps = db_pb2_grpc.DBSrvStub(c).Find(req,
metadata = get_auth_headers())
for resp in resps:
if resp.success:
datas.write(resp.records)
totalCount = resp.totalCount
else:
return Result.error(message = str(resp.error.detail))
datas.flush()
dv = datas.getvalue()
if not dv:
return Result.ok_data(data = []).append("totalCount", totalCount)\
.append("columns", [])
else:
records = pickle.loads(datas.getvalue())
records, cols = records[0], records[1]
columns = []
for col in cols:
if col in columns:
columns.append("%s_1" % (col, ))
else:
columns.append(col)
data = Record.from_list(records, columns)
return Result.ok_data(data = data).append("totalCount", totalCount)\
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details()))
def get_req(function, kwargs):
req = db_pb2.GetReq(conditions = update_kwargs(function, kwargs))
with get_grpc_channel() as c:
stub = db_pb2_grpc.DBSrvStub(c)
try:
resp = stub.Get(req,
metadata = get_auth_headers()
)
if resp.record:
record = pickle.loads(resp.record)
cols = resp.columns
columns = []
for col in cols:
if col in columns:
columns.append("%s_1" % (col, ))
else:
columns.append(col)
data = Record.from_tuple(record, columns)
return Result.ok_data(data=data)
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
else:
return Result.error(message=f"not found")
else:
return Result.error(message=f"not found")
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details()))
def update_req(function, kwargs):
req = db_pb2.UpdateReq(conditions = update_kwargs(function, kwargs))
with get_grpc_channel() as c:
stub = db_pb2_grpc.DBSrvStub(c)
try:
resp = stub.Update(req,
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details()))
def write_req(function, kwargs):
req = db_pb2.WriteReq(conditions = update_kwargs(function, kwargs))
with get_grpc_channel() as c:
stub = db_pb2_grpc.DBSrvStub(c)
try:
resp = stub.Write(req,
metadata = get_auth_headers()
)
if resp.success:
if resp.record:
record = pickle.loads(resp.record)
cols = resp.columns
columns = []
for col in cols:
if col in columns:
columns.append("%s_1" % (col, ))
else:
columns.append(col)
if record:
data = Record.from_tuple(record, columns)
return Result.ok_data(data=data)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details()))
def write_stream_req(function, byte_stream, kwargs):
conditions = update_kwargs(function, kwargs)
def stream():
while True:
data = byte_stream.read(UPLOAD_CHUNK_SIZE)
if not data:
break
yield db_pb2.WriteStreamReq(conditions = conditions, data = data)
with get_grpc_channel() as c:
stub = db_pb2_grpc.DBSrvStub(c)
try:
resp = stub.WriteStream(stream(),
metadata = get_auth_headers()
)
if resp.success:
if resp.record:
record = pickle.loads(resp.record)
cols = resp.columns
columns = []
for col in cols:
if col in columns:
columns.append("%s_1" % (col, ))
else:
columns.append(col)
if record:
data = Record.from_tuple(record, columns)
return Result.ok_data(data=data)