Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-dfs
csst-dfs-api-cluster
Commits
407b231e
Commit
407b231e
authored
1 year ago
by
Wei Shoulin
Browse files
Options
Download
Email Patches
Plain Diff
C9
parent
5202236f
Changes
16
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
csst_dfs_api_cluster/common/catalog.py
+4
-23
csst_dfs_api_cluster/common/catalog.py
csst_dfs_api_cluster/common/service.py
+3
-18
csst_dfs_api_cluster/common/service.py
csst_dfs_api_cluster/common/utils.py
+136
-8
csst_dfs_api_cluster/common/utils.py
csst_dfs_api_cluster/facility/__init__.py
+0
-1
csst_dfs_api_cluster/facility/__init__.py
csst_dfs_api_cluster/facility/brick.py
+6
-124
csst_dfs_api_cluster/facility/brick.py
csst_dfs_api_cluster/facility/detector.py
+8
-194
csst_dfs_api_cluster/facility/detector.py
csst_dfs_api_cluster/facility/level0.py
+9
-191
csst_dfs_api_cluster/facility/level0.py
csst_dfs_api_cluster/facility/level0prc.py
+3
-91
csst_dfs_api_cluster/facility/level0prc.py
csst_dfs_api_cluster/facility/level1.py
+40
-239
csst_dfs_api_cluster/facility/level1.py
csst_dfs_api_cluster/facility/level1prc.py
+3
-88
csst_dfs_api_cluster/facility/level1prc.py
csst_dfs_api_cluster/facility/level2.py
+39
-235
csst_dfs_api_cluster/facility/level2.py
csst_dfs_api_cluster/facility/level2producer.py
+0
-402
csst_dfs_api_cluster/facility/level2producer.py
csst_dfs_api_cluster/facility/level2type.py
+21
-133
csst_dfs_api_cluster/facility/level2type.py
csst_dfs_api_cluster/facility/observation.py
+5
-150
csst_dfs_api_cluster/facility/observation.py
csst_dfs_api_cluster/facility/otherdata.py
+18
-113
csst_dfs_api_cluster/facility/otherdata.py
csst_dfs_api_cluster/hstdm/level2.py
+0
-183
csst_dfs_api_cluster/hstdm/level2.py
with
295 additions
and
2193 deletions
+295
-2193
csst_dfs_api_cluster/common/catalog.py
+
4
-
23
View file @
407b231e
...
...
@@ -16,31 +16,12 @@ class CatalogApi(object):
self
.
stub
=
None
@
grpc_channel
def
gaia3_query
(
self
,
ra
:
float
,
dec
:
float
,
radius
:
float
,
columns
:
tuple
,
min_mag
:
float
,
max_mag
:
float
,
obstime
:
int
,
limit
:
int
):
''' retrieval GAIA DR 3
args:
ra: in deg
dec: in deg
radius: in deg
tuple of str, like ('ra','dec','phot_g_mean_mag')
min_mag: minimal magnitude
max_mag: maximal magnitude
obstime: seconds
limit: limits returns the number of records
return: csst_dfs_common.models.Result
'''
def
catalog_query
(
self
,
**
kwargs
):
try
:
datas
=
io
.
BytesIO
()
totalCount
=
0
resps
=
self
.
stub
.
Gaia3Search
(
ephem_pb2
.
EphemSearchRequest
(
ra
=
ra
,
dec
=
dec
,
radius
=
radius
,
columns
=
","
.
join
(
columns
),
minMag
=
min_mag
,
maxMag
=
max_mag
,
obstime
=
obstime
,
limit
=
limit
resps
=
self
.
stub
.
Search
(
ephem_pb2
.
SearchRequest
(
conditions
=
{
k
:
str
(
v
)
for
k
,
v
in
kwargs
.
items
()
}
),
metadata
=
get_auth_headers
())
for
resp
in
resps
:
if
resp
.
success
:
...
...
@@ -51,6 +32,6 @@ class CatalogApi(object):
datas
.
flush
()
records
=
pickle
.
loads
(
datas
.
getvalue
())
return
Result
.
ok_data
(
data
=
records
).
append
(
"totalCount"
,
totalCount
).
append
(
"columns"
,
columns
)
return
Result
.
ok_data
(
data
=
records
).
append
(
"totalCount"
,
totalCount
).
append
(
"columns"
,
kwargs
[
'
columns
'
]
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/common/service.py
+
3
-
18
View file @
407b231e
import
os
import
grpc
from
csst_dfs_proto.db
import
db_pb2
,
db_pb2_grpc
from
csst_dfs_commons.models.errors
import
CSSTFatalException
class
ServiceProxy
:
def
__init__
(
self
):
self
.
gateway
=
os
.
getenv
(
"CSST_DFS_GATEWAY"
,
'172.31.248.218:30880'
)
def
channel
(
self
):
options
=
((
'grpc.max_send_message_length'
,
1024
*
1024
*
1024
),
(
'grpc.max_receive_message_length'
,
1024
*
1024
*
1024
))
# channel = grpc.insecure_channel(self.gateway, options = options, compression = grpc.Compression.Gzip)
channel
=
grpc
.
insecure_channel
(
self
.
gateway
,
options
=
options
)
try
:
grpc
.
channel_ready_future
(
channel
).
result
(
timeout
=
10
)
except
grpc
.
FutureTimeoutError
:
raise
CSSTFatalException
(
'Error connecting to server {}'
.
format
(
self
.
gateway
))
else
:
return
channel
def
grpc_channel
(
func
):
def
wrapper
(
*
args
,
**
kwargs
):
with
get_grpc_channel
()
as
c
:
args
[
0
].
stub
=
args
[
0
].
stub_class
(
c
)
args
[
0
].
stub
=
db_pb2_grpc
.
DBSrvStub
(
c
)
return
func
(
*
args
,
**
kwargs
)
return
wrapper
...
...
@@ -36,4 +21,4 @@ def get_grpc_channel():
except
grpc
.
FutureTimeoutError
:
raise
CSSTFatalException
(
'Error connecting to server {}'
.
format
(
gateway
))
else
:
return
channel
\ No newline at end of file
return
channel
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/common/utils.py
+
136
-
8
View file @
407b231e
import
io
import
os
from
datetime
import
datetime
import
time
import
grpc
import
pickle
from
csst_dfs_commons.models
import
Result
from
csst_dfs_proto.
common.misc
import
misc
_pb2
,
misc
_pb2_grpc
from
csst_dfs_commons.models
import
Result
,
Record
from
csst_dfs_proto.
db
import
db
_pb2
,
db
_pb2_grpc
from
.service
import
get_grpc_channel
from
.constants
import
UPLOAD_CHUNK_SIZE
def
format_datetime
(
dt
):
return
dt
.
strftime
(
'%Y-%m-%d %H:%M:%S'
)
...
...
@@ -56,14 +59,139 @@ def singleton(cls):
def
get_auth_headers
():
return
((
"csst_dfs_app"
,
os
.
getenv
(
"CSST_DFS_APP_ID"
)),(
"csst_dfs_token"
,
os
.
getenv
(
"CSST_DFS_APP_TOKEN"
)),)
def
get_next
Id_by_prefix
(
prefix
):
def
get_next
_id
(
prefix
):
with
get_grpc_channel
()
as
c
:
stub
=
misc
_pb2_grpc
.
Misc
SrvStub
(
c
)
stub
=
db
_pb2_grpc
.
DB
SrvStub
(
c
)
try
:
resp
,
_
=
stub
.
GetSeqId
.
with_call
(
misc_pb2
.
GetSeqIdReq
(
prefix
=
prefix
),
resp
,
_
=
stub
.
Get
.
with_call
(
db_pb2
.
GetReq
(
conditions
=
{
"__function"
:
"MiscServicer.GetSeqId"
,
"prefix"
:
prefix
}
),
metadata
=
get_auth_headers
()
)
return
Result
.
ok_data
(
data
=
resp
.
nextId
)
record
=
pickle
.
loads
(
resp
.
record
)
return
Result
.
ok_data
(
data
=
record
[
0
])
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
\ No newline at end of file
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_kwargs
(
function
,
kwargs
):
conditions
=
{
"__function"
:
function
,
}
conditions
.
update
(
kwargs
)
conditions
=
{
k
:
str
(
v
)
for
k
,
v
in
conditions
.
items
()
}
return
conditions
def
find_req
(
function
,
kwargs
):
conditions
=
update_kwargs
(
function
,
kwargs
)
if
"limit"
not
in
conditions
:
conditions
[
"limit"
]
=
"-1"
if
"page"
not
in
conditions
:
conditions
[
"page"
]
=
"-1"
req
=
db_pb2
.
FindReq
(
conditions
=
conditions
)
with
get_grpc_channel
()
as
c
:
try
:
datas
=
io
.
BytesIO
()
totalCount
=
0
resps
=
db_pb2_grpc
.
DBSrvStub
(
c
).
Find
(
req
,
metadata
=
get_auth_headers
())
for
resp
in
resps
:
if
resp
.
success
:
datas
.
write
(
resp
.
records
)
totalCount
=
resp
.
totalCount
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
datas
.
flush
()
dv
=
datas
.
getvalue
()
if
not
dv
:
return
Result
.
ok_data
(
data
=
[]).
append
(
"totalCount"
,
totalCount
)
\
.
append
(
"columns"
,
[])
else
:
records
=
pickle
.
loads
(
datas
.
getvalue
())
records
,
cols
=
records
[
0
],
records
[
1
]
return
Result
.
ok_data
(
data
=
records
).
append
(
"totalCount"
,
totalCount
)
\
.
append
(
"columns"
,
cols
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
get_req
(
function
,
kwargs
):
req
=
db_pb2
.
GetReq
(
conditions
=
update_kwargs
(
function
,
kwargs
))
with
get_grpc_channel
()
as
c
:
stub
=
db_pb2_grpc
.
DBSrvStub
(
c
)
try
:
resp
=
stub
.
Get
(
req
,
metadata
=
get_auth_headers
()
)
if
resp
.
record
:
record
=
pickle
.
loads
(
resp
.
record
)
if
record
:
data
=
Record
.
from_tuple
(
record
,
resp
.
columns
)
return
Result
.
ok_data
(
data
=
data
).
append
(
"columns"
,
resp
.
columns
)
else
:
return
Result
.
error
(
message
=
f
"not found"
)
else
:
return
Result
.
error
(
message
=
f
"not found"
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_req
(
function
,
kwargs
):
req
=
db_pb2
.
UpdateReq
(
conditions
=
update_kwargs
(
function
,
kwargs
))
with
get_grpc_channel
()
as
c
:
stub
=
db_pb2_grpc
.
DBSrvStub
(
c
)
try
:
resp
=
stub
.
Update
(
req
,
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write_req
(
function
,
kwargs
):
req
=
db_pb2
.
WriteReq
(
conditions
=
update_kwargs
(
function
,
kwargs
))
with
get_grpc_channel
()
as
c
:
stub
=
db_pb2_grpc
.
DBSrvStub
(
c
)
try
:
resp
=
stub
.
Write
(
req
,
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
if
resp
.
record
:
record
=
pickle
.
loads
(
resp
.
record
)
return
Result
.
ok_data
(
data
=
record
).
append
(
"columns"
,
resp
.
columns
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write_stream_req
(
function
,
byte_stream
,
kwargs
):
conditions
=
update_kwargs
(
function
,
kwargs
)
def
stream
():
while
True
:
data
=
byte_stream
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
db_pb2
.
WriteStreamReq
(
conditions
=
conditions
,
data
=
data
)
with
get_grpc_channel
()
as
c
:
stub
=
db_pb2_grpc
.
DBSrvStub
(
c
)
try
:
resp
=
stub
.
WriteStream
(
stream
(),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
if
resp
.
record
:
record
=
pickle
.
loads
(
resp
.
record
)
return
Result
.
ok_data
(
data
=
record
).
append
(
"columns"
,
resp
.
columns
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
\ No newline at end of file
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/__init__.py
+
0
-
1
View file @
407b231e
from
.brick
import
BrickApi
from
.detector
import
DetectorApi
from
.level2producer
import
Level2ProducerApi
from
.observation
import
ObservationApi
from
.level1
import
Level1DataApi
from
.level0
import
Level0DataApi
...
...
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/brick.py
+
6
-
124
View file @
407b231e
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Brick
,
BrickObsStatus
,
BrickLevel1
from
csst_dfs_proto.facility.brick
import
brick_pb2
,
brick_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
UPLOAD_CHUNK_SIZE
class
BrickApi
(
object
):
"""
Brick Operation Class
"""
def
__init__
(
self
):
self
.
stub_class
=
brick_pb2_grpc
.
BrickSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' find brick records
:param kwargs:
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
brick_pb2
.
FindBrickReq
(
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Brick
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"BrickServicer.Find"
,
kwargs
)
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
:param kwargs:
id : [int]
:returns: csst_dfs_common.models.Result
'''
try
:
brick_id
=
get_parameter
(
kwargs
,
"id"
,
-
1
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
brick_pb2
.
GetBrickReq
(
id
=
brick_id
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
(
resp
.
record
.
id
==
0
and
resp
.
record
.
ra
==
0.0
and
resp
.
record
.
dec
==
0.0
):
return
Result
.
error
(
message
=
f
"
{
brick_id
}
not found"
)
return
Result
.
ok_data
(
data
=
Brick
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"BrickServicer.Get"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a brickal record into database
:param kwargs: Parameter dictionary, key items support:
ra = [float],
dec = [float],
boundingbox = [str]
:returns: csst_dfs_common.models.Result
'''
rec
=
brick_pb2
.
BrickRecord
(
id
=
get_parameter
(
kwargs
,
"id"
,
-
1
),
ra
=
get_parameter
(
kwargs
,
"ra"
,
0.0
),
dec
=
get_parameter
(
kwargs
,
"dec"
,
0.0
),
boundingbox
=
get_parameter
(
kwargs
,
"boundingbox"
,
""
)
)
req
=
brick_pb2
.
WriteBrickReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Brick
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
write_req
(
"BrickServicer.Write"
,
kwargs
)
@
grpc_channel
def
find_obs_status
(
self
,
**
kwargs
):
''' find observation status of bricks
:param kwargs:
brick_id = [int],
band = [string],
limit = [int]
:returns: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindObsStatus
.
with_call
(
brick_pb2
.
FindObsStatusReq
(
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
,
-
1
),
band
=
get_parameter
(
kwargs
,
"band"
,
""
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
BrickObsStatus
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"BrickServicer.FindObsStatus"
,
kwargs
)
@
grpc_channel
def
find_level1_data
(
self
,
**
kwargs
):
''' find level1 data
:param kwargs: Parameter dictionary, support:
brick_id = [int]
level1_id = [int]
module = [str],
limit = [int]
:returns: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindLevel1
.
with_call
(
brick_pb2
.
FindLevel1Req
(
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
,
-
1
),
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
,
0
),
module
=
get_parameter
(
kwargs
,
"limit"
,
""
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
BrickLevel1
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
find_level1_ids
(
self
,
**
kwargs
):
return
find_req
(
"BrickServicer.FindLevel1Data"
,
kwargs
)
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/detector.py
+
8
-
194
View file @
407b231e
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Detector
,
DetectorStatus
from
csst_dfs_proto.facility.detector
import
detector_pb2
,
detector_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
DetectorApi
(
object
):
def
__init__
(
self
):
self
.
stub_class
=
detector_pb2_grpc
.
DetectorSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve detector records from database
parameter kwargs:
module_id: [str]
key: [str]
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
detector_pb2
.
FindDetectorReq
(
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
key
=
get_parameter
(
kwargs
,
"key"
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Detector
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"DetectorServicer.Find"
,
kwargs
)
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
no : [str]
return csst_dfs_common.models.Result
'''
try
:
no
=
get_parameter
(
kwargs
,
"no"
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
detector_pb2
.
GetDetectorReq
(
no
=
no
),
metadata
=
get_auth_headers
())
if
not
resp
.
record
.
no
:
return
Result
.
error
(
message
=
f
"no:
{
no
}
not found"
)
return
Result
.
ok_data
(
data
=
Detector
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"DetectorServicer.Get"
,
kwargs
)
@
grpc_channel
def
update
(
self
,
**
kwargs
):
''' update a detector by no
parameter kwargs:
no : [str],
detector_name : [str],
module_id : [str],
filter_id : [str]
return csst_dfs_common.models.Result
'''
try
:
no
=
get_parameter
(
kwargs
,
"no"
)
result_get
=
self
.
get
(
no
=
no
)
if
not
result_get
.
success
:
return
result_get
record
=
detector_pb2
.
Detector
(
no
=
no
,
detector_name
=
get_parameter
(
kwargs
,
"detector_name"
,
result_get
.
data
.
detector_name
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
,
result_get
.
data
.
module_id
),
filter_id
=
get_parameter
(
kwargs
,
"filter_id"
,
result_get
.
data
.
filter_id
)
)
resp
,
_
=
self
.
stub
.
Update
.
with_call
(
detector_pb2
.
UpdateDetectorReq
(
record
=
record
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"DetectorServicer.Update"
,
kwargs
)
@
grpc_channel
def
delete
(
self
,
**
kwargs
):
''' delete a detector by no
parameter kwargs:
no : [str]
return csst_dfs_common.models.Result
'''
no
=
get_parameter
(
kwargs
,
"no"
)
try
:
resp
,
_
=
self
.
stub
.
Delete
.
with_call
(
detector_pb2
.
DeleteDetectorReq
(
no
=
no
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"DetectorServicer.Delete"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a detector record into database
parameter kwargs:
no : [str],
detector_name : [str],
module_id : [str],
filter_id : [str]
return csst_dfs_common.models.Result
'''
rec
=
detector_pb2
.
Detector
(
no
=
get_parameter
(
kwargs
,
"no"
),
detector_name
=
get_parameter
(
kwargs
,
"detector_name"
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
filter_id
=
get_parameter
(
kwargs
,
"filter_id"
)
)
req
=
detector_pb2
.
WriteDetectorReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Detector
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
write_req
(
"DetectorServicer.Write"
,
kwargs
)
@
grpc_channel
def
find_status
(
self
,
**
kwargs
):
''' retrieve a detector status's from database
parameter kwargs:
detector_no: [str]
status_occur_time: (begin,end)
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindStatus
.
with_call
(
detector_pb2
.
FindStatusReq
(
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
status_begin_time
=
get_parameter
(
kwargs
,
"status_occur_time"
,
[
None
,
None
])[
0
],
status_end_time
=
get_parameter
(
kwargs
,
"status_occur_time"
,
[
None
,
None
])[
1
],
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
DetectorStatus
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"DetectorServicer.FindStatus"
,
kwargs
)
@
grpc_channel
def
get_status
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
id
=
get_parameter
(
kwargs
,
"id"
)
resp
,
_
=
self
.
stub
.
GetStatus
.
with_call
(
detector_pb2
.
GetStatusReq
(
id
=
id
),
metadata
=
get_auth_headers
())
if
resp
.
record
==
0
:
return
Result
.
error
(
message
=
f
"id:
{
id
}
not found"
)
return
Result
.
ok_data
(
data
=
DetectorStatus
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"DetectorServicer.GetStatus"
,
kwargs
)
@
grpc_channel
def
write_status
(
self
,
**
kwargs
):
''' insert a detector status into database
parameter kwargs:
detector_no : [str],
status : [str],
status_time : [str]
return csst_dfs_common.models.Result
'''
rec
=
detector_pb2
.
DetectorStatus
(
id
=
0
,
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
status
=
get_parameter
(
kwargs
,
"status"
),
status_time
=
get_parameter
(
kwargs
,
"status_time"
)
)
req
=
detector_pb2
.
WriteStatusReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
WriteStatus
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
DetectorStatus
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
\ No newline at end of file
return
write_req
(
"DetectorServicer.WriteStatus"
,
kwargs
)
\ No newline at end of file
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/level0.py
+
9
-
191
View file @
407b231e
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Level0Record
from
csst_dfs_proto.facility.level0
import
level0_pb2
,
level0_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
csst_dfs_commons.models
import
Record
class
Level0DataApi
(
object
):
def
__init__
(
self
):
self
.
stub_class
=
level0_pb2_grpc
.
Level0SrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level0 records from database
parameter kwargs:
obs_id: [str],
module_id: [str]
detector_no: [str],
obs_type: [str],
filter: [str],
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str],
ra_obj: [float],
dec_obj: [float],
radius: [float],
object_name: [str],
version: [str],
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level0_pb2
.
FindLevel0DataReq
(
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
exp_time_start
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
0
],
exp_time_end
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
1
],
qc0_status
=
get_parameter
(
kwargs
,
"qc0_status"
,
1024
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
1024
),
file_name
=
get_parameter
(
kwargs
,
"file_name"
),
ra_obj
=
get_parameter
(
kwargs
,
"ra_obj"
,
None
),
dec_obj
=
get_parameter
(
kwargs
,
"dec_obj"
,
None
),
radius
=
get_parameter
(
kwargs
,
"radius"
,
0
),
object_name
=
get_parameter
(
kwargs
,
"object_name"
,
None
),
version
=
get_parameter
(
kwargs
,
"version"
,
None
),
filter
=
get_parameter
(
kwargs
,
"filter"
,
None
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level0Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_by_brick_ids
(
self
,
**
kwargs
):
''' retrieve level0 records by brick_ids like [1,2,3,4]
:param kwargs: Parameter dictionary, key items support:
brick_ids: [list]
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindByBrickIds
.
with_call
(
level0_pb2
.
FindByBrickIdsReq
(
brick_ids
=
get_parameter
(
kwargs
,
"brick_ids"
,
[])
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level0Record
,
resp
.
records
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
result
=
find_req
(
"Level0Servicer.Find"
,
kwargs
)
data
=
Record
.
from_list
(
result
[
"data"
],
result
[
"columns"
])
result
[
"data"
]
=
data
return
result
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int],
level0_id: [str],
obs_type: [str]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level0_pb2
.
GetLevel0DataReq
(
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"not found"
)
return
Result
.
ok_data
(
data
=
Level0Record
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"Level0Servicer.Get"
,
kwargs
)
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
level0_id: [str],
obs_type: [str],
status : [int]
return csst_dfs_common.models.Result
'''
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level0_pb2
.
UpdateProcStatusReq
(
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
status
=
get_parameter
(
kwargs
,
"status"
)
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level0Servicer.UpdateProcStatus"
,
kwargs
)
@
grpc_channel
def
update_qc0_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
level0_id: [str],
obs_type: [str],
status : [int]
'''
try
:
resp
,
_
=
self
.
stub
.
UpdateQc0Status
.
with_call
(
level0_pb2
.
UpdateQc0StatusReq
(
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
status
=
get_parameter
(
kwargs
,
"status"
)
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level0Servicer.UpdateQc0Status"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level0 data record into database
parameter kwargs:
obs_id = [str]
detector_no = [str]
obs_type = [str]
obs_time = [str]
exp_time = [int]
detector_status_id = [int]
filename = [str]
file_path = [str]
return: csst_dfs_common.models.Result
'''
rec
=
level0_pb2
.
Level0Record
(
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
obs_time
=
get_parameter
(
kwargs
,
"obs_time"
),
exp_time
=
get_parameter
(
kwargs
,
"exp_time"
),
qc0_status
=
get_parameter
(
kwargs
,
"qc0_status"
,
0
),
detector_status_id
=
get_parameter
(
kwargs
,
"detector_status_id"
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
)
)
req
=
level0_pb2
.
WriteLevel0DataReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level0Record
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
write_req
(
"Level0Servicer.Write"
,
kwargs
)
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/level0prc.py
+
3
-
91
View file @
407b231e
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Level0PrcRecord
from
csst_dfs_proto.facility.level0prc
import
level0prc_pb2
,
level0prc_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level0PrcApi
(
object
):
def
__init__
(
self
):
self
.
stub_class
=
level0prc_pb2_grpc
.
Level0PrcSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level0 procedure records from database
parameter kwargs:
level0_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level0prc_pb2
.
FindLevel0PrcReq
(
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
prc_module
=
get_parameter
(
kwargs
,
"prc_module"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level0PrcRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"Level0PrcServicer.Find"
,
kwargs
)
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level0prc_pb2
.
UpdateProcStatusReq
(
id
=
id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level0PrcServicer.UpdateProcStatus"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level0 procedure record into database
parameter kwargs:
level0_id : [str]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec
=
level0prc_pb2
.
Level0PrcRecord
(
id
=
0
,
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
prc_module
=
get_parameter
(
kwargs
,
"prc_module"
),
params_file_path
=
get_parameter
(
kwargs
,
"params_file_path"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
-
1
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
),
result_file_path
=
get_parameter
(
kwargs
,
"result_file_path"
)
)
req
=
level0prc_pb2
.
WriteLevel0PrcReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level0PrcRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
write_req
(
"Level0PrcServicer.Write"
,
kwargs
)
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/level1.py
+
40
-
239
View file @
407b231e
import
os
import
grpc
import
datetime
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Level1Record
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.level1
import
level1_pb2
,
level1_pb2_grpc
import
datetime
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
*
class
Level1DataApi
(
object
):
"""
Level1 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub_class
=
level1_pb2_grpc
.
Level1SrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level1 records from database
parameter kwargs:
level0_id: [str]
module_id: [str]
data_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level1_pb2
.
FindLevel1Req
(
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
create_time_start
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
create_time_end
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
qc1_status
=
get_parameter
(
kwargs
,
"qc1_status"
,
1024
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
1024
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
,
""
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
,
""
),
filter
=
get_parameter
(
kwargs
,
"filter"
,
""
),
object_name
=
get_parameter
(
kwargs
,
"object_name"
,
""
),
other_conditions
=
{
"ra_cen"
:
str
(
get_parameter
(
kwargs
,
"ra_cen"
,
''
)),
"dec_cen"
:
str
(
get_parameter
(
kwargs
,
"dec_cen"
,
''
)),
"radius_cen"
:
str
(
get_parameter
(
kwargs
,
"radius_cen"
,
''
))
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level1Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"Level1Servicer.Find"
,
kwargs
)
@
grpc_channel
def
find_by_brick_ids
(
self
,
**
kwargs
):
''' retrieve level1 records by brick_ids like [1,2,3,4]
:param kwargs: Parameter dictionary, key items support:
brick_ids: [list]
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindByBrickIds
.
with_call
(
level1_pb2
.
FindByBrickIdsReq
(
brick_ids
=
get_parameter
(
kwargs
,
"brick_ids"
,
[])
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level1Record
,
resp
.
records
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
if
not
isinstance
(
get_parameter
(
kwargs
,
"brick_ids"
),
list
):
return
Result
.
error
(
message
=
"brick_ids is not a list"
)
return
find_req
(
"Level1Servicer.FindByBrickIds"
,
kwargs
)
@
grpc_channel
def
find_by_ids
(
self
,
**
kwargs
):
''' retrieve level1 records by internal level1 ids like [1,2,3,4]
:param kwargs: Parameter dictionary, key items support:
ids: [list]
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindByIds
.
with_call
(
level1_pb2
.
FindByIdsReq
(
ids
=
get_parameter
(
kwargs
,
"ids"
,
[])
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level1Record
,
resp
.
records
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
if
not
isinstance
(
get_parameter
(
kwargs
,
"ids"
),
list
):
return
Result
.
error
(
message
=
"ids is not a list"
)
return
find_req
(
"Level1Servicer.FindByIds"
,
kwargs
)
@
grpc_channel
def
sls_find_by_qc1_status
(
self
,
**
kwargs
):
''' retrieve level1 records from database
parameter kwargs:
qc1_status: [str]
limit: limits returns the number of records,default 1
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindByQc1Status
.
with_call
(
level1_pb2
.
FindLevel1Req
(
level0_id
=
None
,
data_type
=
None
,
create_time_start
=
None
,
create_time_end
=
None
,
qc1_status
=
get_parameter
(
kwargs
,
"qc1_status"
,
-
1
),
prc_status
=
None
,
limit
=
get_parameter
(
kwargs
,
"limit"
,
1
),
other_conditions
=
{
"orderBy"
:
"create_time asc"
,
"module_id"
:
'SLS'
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level1Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
conditions
=
{
"limit"
:
1
}
conditions
.
update
(
kwargs
)
return
find_req
(
"Level1Servicer.FindByQc1Status"
,
conditions
)
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level1_pb2
.
GetLevel1Req
(
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"data not found"
)
return
Result
.
ok_data
(
data
=
Level1Record
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"Level1Servicer.Get"
,
kwargs
)
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level1_pb2
.
UpdateProcStatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level1Servicer.UpdateProcStatus"
,
kwargs
)
@
grpc_channel
def
update_qc1_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc1Status
.
with_call
(
level1_pb2
.
UpdateQc1StatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level1Servicer.UpdateQc1Status"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level1 record into database
parameter kwargs:
level0_id : [str]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
pmapname : [str]
build : [int]
return csst_dfs_common.models.Result
'''
rec
=
level1_pb2
.
Level1Record
(
id
=
0
,
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
,
""
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
,
""
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
,
""
),
cor_sci_id
=
get_parameter
(
kwargs
,
"cor_sci_id"
,
0
),
prc_params
=
get_parameter
(
kwargs
,
"prc_params"
,
""
),
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
),
qc1_status
=
get_parameter
(
kwargs
,
"qc1_status"
,
0
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
,
format_datetime
(
datetime
.
now
())),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
,
""
),
build
=
get_parameter
(
kwargs
,
"build"
,
0
),
pmapname
=
get_parameter
(
kwargs
,
"pmapname"
,
""
),
refs
=
get_parameter
(
kwargs
,
"refs"
,
{})
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level1_pb2
.
WriteLevel1Req
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
file_path
:
conditions
=
{}
conditions
.
update
(
kwargs
)
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
)
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
)
if
not
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
file_path
,
))
if
not
rec
.
filename
:
rec
.
filename
=
os
.
path
.
basename
(
rec
.
file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level1Record
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
if
not
os
.
path
.
exists
(
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
file_path
,
))
if
not
filename
:
filename
=
os
.
path
.
basename
(
file_path
)
conditions
[
"filename"
]
=
filename
if
not
conditions
.
get
(
"qc1_status"
,
''
):
conditions
[
"qc1_status"
]
=
"0"
if
not
conditions
.
get
(
"prc_status"
,
''
):
conditions
[
"prc_status"
]
=
"-1024"
if
not
conditions
.
get
(
"prc_time"
,
''
):
time_now
=
datetime
.
datetime
.
now
()
conditions
[
"prc_time"
]
=
time_now
.
strftime
(
'%Y-%m-%d %H:%M:%S'
)
if
not
conditions
.
get
(
"build_id"
,
''
):
conditions
[
"build_id"
]
=
"0"
with
open
(
file_path
,
'rb'
)
as
f
:
byte_stream
=
io
.
BytesIO
(
f
.
read
())
return
write_stream_req
(
"Level1Servicer.Write"
,
byte_stream
,
conditions
)
except
Exception
as
e
:
return
Result
.
error
(
message
=
"%s"
%
(
e
,))
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/level1prc.py
+
3
-
88
View file @
407b231e
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Level1PrcRecord
from
csst_dfs_proto.facility.level1prc
import
level1prc_pb2
,
level1prc_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level1PrcApi
(
object
):
def
__init__
(
self
):
self
.
stub_class
=
level1prc_pb2_grpc
.
Level1PrcSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level1 procedure records from database
parameter kwargs:
level1_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level1prc_pb2
.
FindLevel1PrcReq
(
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
prc_module
=
get_parameter
(
kwargs
,
"prc_module"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level1PrcRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"Level1PrcServicer.Find"
,
kwargs
)
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level1prc_pb2
.
UpdateProcStatusReq
(
id
=
id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level1PrcServicer.UpdateProcStatus"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level1 procedure record into database
parameter kwargs:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec
=
level1prc_pb2
.
Level1PrcRecord
(
id
=
0
,
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
prc_module
=
get_parameter
(
kwargs
,
"prc_module"
),
params_file_path
=
get_parameter
(
kwargs
,
"params_file_path"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
-
1
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
),
result_file_path
=
get_parameter
(
kwargs
,
"result_file_path"
)
)
req
=
level1prc_pb2
.
WriteLevel1PrcReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level1PrcRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
write_req
(
"Level1PrcServicer.Write"
,
kwargs
)
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/level2.py
+
39
-
235
View file @
407b231e
...
...
@@ -9,69 +9,22 @@ from collections.abc import Iterable
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.level2
import
Level2Record
,
filter_table_name
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.level2
import
level2_pb2
,
level2_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
*
class
Level2DataApi
(
object
):
"""
Level2 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub_class
=
level2_pb2_grpc
.
Level2SrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level2 records from database
parameter kwargs:
level0_id: [str]
level1_id: [int]
module_id: [str]
brick_id: [int]
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
import_status : [int],
filename: [str],
build : [int],
pipeline_id: [str],
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level2_pb2
.
FindLevel2Req
(
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
create_time_start
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
create_time_end
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
1024
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
1024
),
import_status
=
get_parameter
(
kwargs
,
"import_status"
,
1024
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
object_name
=
get_parameter
(
kwargs
,
"object_name"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
,
""
),
build
=
get_parameter
(
kwargs
,
"build"
,
-
1024
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
return
find_req
(
"Level2Servicer.Find"
,
kwargs
)
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
catalog_columns
(
self
,
**
kwargs
):
''' retrieve columns data type
...
...
@@ -92,211 +45,62 @@ class Level2DataApi(object):
@
grpc_channel
def
catalog_query
(
self
,
**
kwargs
):
''' retrieve level2catalog records from database
parameter kwargs:
sql: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
datas
=
io
.
BytesIO
()
totalCount
=
0
resps
=
self
.
stub
.
FindCatalog
(
level2_pb2
.
FindLevel2CatalogReq
(
sql
=
get_parameter
(
kwargs
,
"sql"
,
None
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
),
metadata
=
get_auth_headers
())
for
resp
in
resps
:
if
resp
.
success
:
datas
.
write
(
resp
.
records
)
totalCount
=
resp
.
totalCount
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
datas
.
flush
()
records
=
pickle
.
loads
(
datas
.
getvalue
())
return
Result
.
ok_data
(
data
=
records
[
0
]).
append
(
"totalCount"
,
totalCount
).
append
(
"columns"
,
records
[
1
])
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"Level2Servicer.FindCatalog"
,
kwargs
)
@
grpc_channel
def
coord_cond_sql
(
self
,
**
kwargs
):
''' generate coordinate search condition sql
:param kwargs: Parameter dictionary, key items support:
data_type: [str]
ra: [float]
dec: [float]
radius: [float]
:returns: csst_dfs_common.models.Result
'''
try
:
resp
=
self
.
stub
.
CoordCond
(
level2_pb2
.
CoordCondReq
(
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
ra
=
get_parameter
(
kwargs
,
"ra"
),
dec
=
get_parameter
(
kwargs
,
"dec"
),
radius
=
get_parameter
(
kwargs
,
"radius"
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
resp
.
condition
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"Level2Servicer.CoordCond"
,
kwargs
)
@
grpc_channel
def
find_existed_brick_ids
(
self
,
**
kwargs
):
''' retrieve existed brick_ids in a single exposure catalog
parameter kwargs:
data_type: [str]
return: csst_dfs_common.models.Result
'''
try
:
resp
=
self
.
stub
.
FindExistedBricks
(
level2_pb2
.
FindExistedBricksReq
(
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
resp
.
brick_ids
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"Level2Servicer.FindExistedBricks"
,
kwargs
)
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level2_pb2
.
GetLevel2Req
(
id
=
get_parameter
(
kwargs
,
"id"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"data not found"
)
return
Result
.
ok_data
(
data
=
Level2Record
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"Level2Servicer.Get"
,
kwargs
)
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level2_pb2
.
UpdateProcStatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level2Servicer.UpdateProcStatus"
,
kwargs
)
@
grpc_channel
def
update_qc2_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc2Status
.
with_call
(
level2_pb2
.
UpdateQc2StatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level2Servicer.UpdateQc2Status"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level2 record into database
parameter kwargs:
level1_id : [int]
brick_id : [int]
module_id : [str]
object_name: [str]
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
build : [int]
return csst_dfs_common.models.Result
'''
rec
=
level2_pb2
.
Level2Record
(
id
=
0
,
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
,
""
),
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
,
0
),
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
,
0
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
,
""
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
,
""
),
object_name
=
get_parameter
(
kwargs
,
"object_name"
,
""
),
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
),
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
0
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
,
format_datetime
(
datetime
.
now
())),
build
=
get_parameter
(
kwargs
,
"build"
,
0
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
,
""
)
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level2_pb2
.
WriteLevel2Req
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
module_id
:
conditions
=
{}
conditions
.
update
(
kwargs
)
module_id
=
get_parameter
(
kwargs
,
"module_id"
,
""
)
data_type
=
get_parameter
(
kwargs
,
"data_type"
,
""
)
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
)
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
)
if
not
module_id
:
return
Result
.
error
(
message
=
"module_id is blank"
)
if
not
rec
.
data_type
:
if
not
data_type
:
return
Result
.
error
(
message
=
"data_type is blank"
)
if
not
rec
.
file_path
:
if
not
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
file_path
,
))
if
not
rec
.
filename
:
rec
.
filename
=
os
.
path
.
basename
(
rec
.
file_path
)
if
not
os
.
path
.
exists
(
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
file_path
,
))
if
not
filename
:
filename
=
os
.
path
.
basename
(
file_path
)
conditions
[
"filename"
]
=
filename
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2Record
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
if
not
conditions
.
get
(
"qc2_status"
,
''
):
conditions
[
"qc2_status"
]
=
"0"
if
not
conditions
.
get
(
"prc_status"
,
''
):
conditions
[
"prc_status"
]
=
"-1024"
if
not
conditions
.
get
(
"prc_time"
,
''
):
time_now
=
datetime
.
datetime
.
now
()
conditions
[
"prc_time"
]
=
time_now
.
strftime
(
'%Y-%m-%d %H:%M:%S'
)
with
open
(
file_path
,
'rb'
)
as
f
:
byte_stream
=
io
.
BytesIO
(
f
.
read
())
return
write_stream_req
(
"Level2Servicer.Write"
,
byte_stream
,
conditions
)
except
Exception
as
e
:
return
Result
.
error
(
message
=
"%s"
%
(
e
,))
\ No newline at end of file
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/level2producer.py
deleted
100644 → 0
+
0
-
402
View file @
5202236f
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Level2Producer
,
Level2Job
,
Level2ProducerRuning
from
csst_dfs_proto.facility.level2producer
import
level2producer_pb2
,
level2producer_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
UPLOAD_CHUNK_SIZE
class
Level2ProducerApi
(
object
):
"""
Level2Producer Operation Class
"""
def
__init__
(
self
):
self
.
stub_class
=
level2producer_pb2_grpc
.
Level2ProducerSrvStub
self
.
stub
=
None
@
grpc_channel
def
register
(
self
,
**
kwargs
):
''' register a Level2Producer data record into database
:param kwargs: Parameter dictionary, key items support:
name = [str]
\n
gitlink = [str]
\n
paramfiles = [str]
\n
priority = [int]
\n
pre_producers = list[int]
:returns: csst_dfs_common.models.Result
'''
rec
=
level2producer_pb2
.
Level2ProducerRecord
(
id
=
get_parameter
(
kwargs
,
"id"
,
0
),
name
=
get_parameter
(
kwargs
,
"name"
,
""
),
gitlink
=
get_parameter
(
kwargs
,
"gitlink"
),
paramfiles
=
get_parameter
(
kwargs
,
"paramfiles"
),
priority
=
get_parameter
(
kwargs
,
"priority"
,
0
),
pre_producers
=
get_parameter
(
kwargs
,
"pre_producers"
,[]),
)
req
=
level2producer_pb2
.
RegisterReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Register
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2Producer
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve Level2Producer records from database
:param kwargs: Parameter dictionary, key items support:
key: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level2producer_pb2
.
FindReq
(
key
=
get_parameter
(
kwargs
,
"key"
,
""
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2Producer
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
p_id
=
get_parameter
(
kwargs
,
"id"
,
0
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level2producer_pb2
.
GetReq
(
id
=
p_id
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"
{
p_id
}
not found"
)
return
Result
.
ok_data
(
data
=
Level2Producer
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_nexts
(
self
,
**
kwargs
):
''' retrieve Level2Producer records from database
:param kwargs: Parameter dictionary, key items support:
id : [int]
:returns: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindNexts
.
with_call
(
level2producer_pb2
.
FindNextsReq
(
id
=
get_parameter
(
kwargs
,
"id"
,
0
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2Producer
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_start
(
self
,
**
kwargs
):
''' retrieve Level2Producer records from database
:param kwargs: Parameter dictionary, key items support:
key : [str]
:returns: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindStart
.
with_call
(
level2producer_pb2
.
FindStartReq
(
key
=
get_parameter
(
kwargs
,
"key"
,
""
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2Producer
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update
(
self
,
**
kwargs
):
''' update a Level2Producer
:param kwargs: Parameter dictionary, key items support:
id : [int]
\n
name = [str]
\n
gitlink = [str]
\n
paramfiles = [str]
\n
priority = [int]
\n
pre_producers = list[int]
:returns: csst_dfs_common.models.Result
'''
try
:
rec
=
level2producer_pb2
.
Level2ProducerRecord
(
id
=
get_parameter
(
kwargs
,
"id"
,
0
),
name
=
get_parameter
(
kwargs
,
"name"
,
""
),
gitlink
=
get_parameter
(
kwargs
,
"gitlink"
,
""
),
paramfiles
=
get_parameter
(
kwargs
,
"paramfiles"
,
""
),
priority
=
get_parameter
(
kwargs
,
"priority"
,
0
),
pre_producers
=
get_parameter
(
kwargs
,
"pre_producers"
,[])
)
resp
,
_
=
self
.
stub
.
Update
.
with_call
(
level2producer_pb2
.
UpdateReq
(
record
=
rec
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
delete
(
self
,
**
kwargs
):
''' delete a Level2Producer data
:param kwargs: Parameter dictionary, key items support:
id = [int]
:returns: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Delete
.
with_call
(
level2producer_pb2
.
DeleteReq
(
id
=
get_parameter
(
kwargs
,
"id"
,
0
)),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
new_job
(
self
,
**
kwargs
):
''' new a Level2Producer Job
:param kwargs: Parameter dictionary, key items support:
dag = [str]
:returns: csst_dfs_common.models.Result
'''
rec
=
level2producer_pb2
.
Level2JobRecord
(
id
=
0
,
name
=
get_parameter
(
kwargs
,
"name"
,
""
),
dag
=
get_parameter
(
kwargs
,
"dag"
,
""
)
)
req
=
level2producer_pb2
.
NewJobReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
NewJob
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2Job
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get_job
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
p_id
=
get_parameter
(
kwargs
,
"id"
,
0
)
resp
,
_
=
self
.
stub
.
GetJob
.
with_call
(
level2producer_pb2
.
GetJobReq
(
id
=
p_id
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"
{
p_id
}
not found"
)
return
Result
.
ok_data
(
data
=
Level2Job
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_job
(
self
,
**
kwargs
):
''' update a Level2Producer Job
:param kwargs: Parameter dictionary, key items support:
id = [int]
dag = [str]
status = [int]
:returns: csst_dfs_common.models.Result
'''
rec
=
level2producer_pb2
.
Level2JobRecord
(
id
=
get_parameter
(
kwargs
,
"id"
,
0
),
name
=
get_parameter
(
kwargs
,
"name"
,
""
),
dag
=
get_parameter
(
kwargs
,
"dag"
,
""
),
status
=
get_parameter
(
kwargs
,
"status"
,
-
1
)
)
req
=
level2producer_pb2
.
UpdateJobReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
UpdateJob
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
new_running
(
self
,
**
kwargs
):
''' insert a Level2ProducerRuningRecord data
:param kwargs: Parameter dictionary, key items support:
job_id = [int]
\n
producer_id = [int]
\n
brick_id = [int]
\n
start_time = [str]
\n
end_time = [str]
\n
prc_status = [int]
\n
prc_result = [str]
:returns: csst_dfs_common.models.Result
'''
rec
=
level2producer_pb2
.
Level2ProducerRuningRecord
(
id
=
0
,
job_id
=
get_parameter
(
kwargs
,
"job_id"
,
0
),
producer_id
=
get_parameter
(
kwargs
,
"producer_id"
,
0
),
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
,
0
),
start_time
=
get_parameter
(
kwargs
,
"start_time"
,
""
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
prc_result
=
get_parameter
(
kwargs
,
"prc_result"
,
""
)
)
req
=
level2producer_pb2
.
WriteRunningReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
WriteRunning
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2ProducerRuning
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get_running
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
p_id
=
get_parameter
(
kwargs
,
"id"
,
0
)
resp
,
_
=
self
.
stub
.
GetRunning
.
with_call
(
level2producer_pb2
.
GetRunningReq
(
id
=
p_id
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"
{
p_id
}
not found"
)
return
Result
.
ok_data
(
data
=
Level2ProducerRuning
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_running
(
self
,
**
kwargs
):
''' udpate a Level2ProducerRuningRecord data
:param kwargs: Parameter dictionary, key items support:
id = [int]
\n
job_id = [int]
\n
producer_id = [int]
\n
brick_id = [int]
\n
start_time = [str]
\n
end_time = [str]
\n
prc_status = [int]
\n
prc_result = [str]
:returns: csst_dfs_common.models.Result
'''
rec
=
level2producer_pb2
.
Level2ProducerRuningRecord
(
id
=
get_parameter
(
kwargs
,
"id"
,
0
),
job_id
=
get_parameter
(
kwargs
,
"job_id"
,
0
),
producer_id
=
get_parameter
(
kwargs
,
"producer_id"
,
0
),
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
,
0
),
start_time
=
get_parameter
(
kwargs
,
"start_time"
,
""
),
end_time
=
get_parameter
(
kwargs
,
"end_time"
,
""
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
prc_result
=
get_parameter
(
kwargs
,
"prc_result"
,
""
)
)
req
=
level2producer_pb2
.
UpdateRunningReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
UpdateRunning
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_running
(
self
,
**
kwargs
):
''' find Level2ProducerRuningRecord data
:param kwargs: Parameter dictionary, key items support:
job_id = [int]
\n
producer_id = [int]
\n
brick_id = [int]
\n
prc_status = [int]
\n
create_time : (start, end)
\n
limit = [int]
:returns: csst_dfs_common.models.Result
'''
req
=
level2producer_pb2
.
FindRunningReq
(
job_id
=
get_parameter
(
kwargs
,
"job_id"
,
0
),
producer_id
=
get_parameter
(
kwargs
,
"producer_id"
,
0
),
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
,
0
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
start_time
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
end_time
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
)
try
:
resp
,
_
=
self
.
stub
.
FindRunning
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2ProducerRuning
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
\ No newline at end of file
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/level2type.py
+
21
-
133
View file @
407b231e
import
io
import
os
import
grpc
import
datetime
import
pickle
from
collections.abc
import
Iterable
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.level2
import
Level2TypeRecord
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.level2type
import
level2type_pb2
,
level2type_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level2TypeApi
(
object
):
"""
Level2Type Data Operation Class
"""
def
__init__
(
self
):
self
.
stub_class
=
level2type_pb2_grpc
.
Level2TypeSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level2type records from database
parameter kwargs:
module_id: [str]
data_type: [str]
import_status : [int],
page: [int]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
the_limit
=
get_parameter
(
kwargs
,
"limit"
,
100000
)
the_limit
=
the_limit
if
the_limit
>
0
else
100000
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level2type_pb2
.
FindLevel2TypeReq
(
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
import_status
=
get_parameter
(
kwargs
,
"import_status"
,
1024
),
limit
=
the_limit
,
page
=
get_parameter
(
kwargs
,
"page"
,
1
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2TypeRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"Level2TypeServicer.Find"
,
kwargs
)
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
data_type: [str]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level2type_pb2
.
GetLevel2TypeReq
(
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
),
metadata
=
get_auth_headers
())
if
not
resp
.
record
or
not
resp
.
record
.
data_type
:
return
Result
.
error
(
message
=
f
"data not found"
)
return
Result
.
ok_data
(
data
=
Level2TypeRecord
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"Level2TypeServicer.Get"
,
kwargs
)
@
grpc_channel
def
update_import_status
(
self
,
**
kwargs
):
''' update the status of level2 type
parameter kwargs:
data_type: [str]
status : [int]
return csst_dfs_common.models.Result
'''
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
status
=
get_parameter
(
kwargs
,
"status"
,
0
)
try
:
resp
,
_
=
self
.
stub
.
UpdateImportStatus
.
with_call
(
level2type_pb2
.
UpdateImportStatusReq
(
data_type
=
data_type
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"Level2TypeServicer.UpdateImportStatus"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level2type record into database
parameter kwargs:
data_type : [str]
module_id : [str]
key_column : [str]
hdu_index : [int]
demo_filename : [str]
demo_file_path : [str]
ra_column : [str]
dec_column : [str]
return csst_dfs_common.models.Result
'''
rec
=
level2type_pb2
.
Level2TypeRecord
(
data_type
=
get_parameter
(
kwargs
,
"data_type"
,
""
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
,
""
),
key_column
=
get_parameter
(
kwargs
,
"key_column"
,
""
),
hdu_index
=
get_parameter
(
kwargs
,
"hdu_index"
,
0
),
demo_filename
=
get_parameter
(
kwargs
,
"demo_filename"
,
""
),
demo_file_path
=
get_parameter
(
kwargs
,
"demo_file_path"
,
""
),
ra_column
=
get_parameter
(
kwargs
,
"ra_column"
,
""
),
dec_column
=
get_parameter
(
kwargs
,
"dec_column"
,
""
)
)
def
stream
(
rec
):
with
open
(
rec
.
demo_file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level2type_pb2
.
WriteLevel2TypeReq
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
data_type
:
return
Result
.
error
(
message
=
"data_type is blank"
)
if
not
rec
.
demo_file_path
:
data_type
=
get_parameter
(
kwargs
,
"data_type"
,
""
)
if
not
data_type
:
return
Result
.
error
(
message
=
"the data_type is blank"
)
file_path
=
get_parameter
(
kwargs
,
"demo_file_path"
,
""
)
filename
=
get_parameter
(
kwargs
,
"demo_filename"
,
""
)
if
not
file_path
:
return
Result
.
error
(
message
=
"demo_file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
demo_
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
demo_
file_path
,
))
if
not
rec
.
demo_
filename
:
rec
.
demo_
filename
=
os
.
path
.
basename
(
rec
.
demo_
file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2TypeRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
)
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s
:%s
"
%
(
e
.
code
().
value
,
e
.
details
()
))
if
not
os
.
path
.
exists
(
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
file_path
,
))
if
not
filename
:
filename
=
os
.
path
.
basename
(
file_path
)
conditions
=
{}
conditions
.
update
(
kwargs
)
conditions
[
"demo_filename"
]
=
filename
with
open
(
file_path
,
'rb'
)
as
f
:
byte_stream
=
io
.
BytesIO
(
f
.
read
())
return
write_stream_req
(
"Level2TypeServicer.Write"
,
byte_stream
,
kwargs
)
except
Exception
as
e
:
return
Result
.
error
(
message
=
"%s"
%
(
e
,
))
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/observation.py
+
5
-
150
View file @
407b231e
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Observation
from
csst_dfs_proto.facility.observation
import
observation_pb2
,
observation_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
UPLOAD_CHUNK_SIZE
class
ObservationApi
(
object
):
"""
Observation Operation Class
"""
def
__init__
(
self
):
self
.
stub_class
=
observation_pb2_grpc
.
ObservationSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve exposure records from database
parameter kwargs:
module_id: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
observation_pb2
.
FindObservationReq
(
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
exp_time_start
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
0
],
exp_time_end
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
1
],
qc0_status
=
get_parameter
(
kwargs
,
"qc0_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Observation
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"ObservationServicer.Find"
,
kwargs
)
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int],
obs_id = [str]
return csst_dfs_common.models.Result
'''
try
:
id
=
get_parameter
(
kwargs
,
"id"
)
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
observation_pb2
.
GetObservationReq
(
id
=
id
,
obs_id
=
obs_id
),
metadata
=
get_auth_headers
())
if
resp
.
observation
is
None
or
resp
.
observation
.
id
==
0
:
return
Result
.
error
(
message
=
f
"not found"
)
return
Result
.
ok_data
(
data
=
Observation
().
from_proto_model
(
resp
.
observation
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"ObservationServicer.Get"
,
kwargs
)
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
obs_id = [str],
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
)
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
observation_pb2
.
UpdateProcStatusReq
(
id
=
id
,
obs_id
=
obs_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"ObservationServicer.UpdateProcStatus"
,
kwargs
)
@
grpc_channel
def
update_qc0_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
obs_id = [str],
status : [int]
'''
id
=
get_parameter
(
kwargs
,
"id"
)
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc0Status
.
with_call
(
observation_pb2
.
UpdateQc0StatusReq
(
id
=
id
,
obs_id
=
obs_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
update_req
(
"ObservationServicer.UpdateQc0Status"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a observational record into database
parameter kwargs:
id = [id]
obs_id = [str]
obs_time = [str]
exp_time = [int]
module_id = [str]
obs_type = [str]
facility_status_id = [int]
module_status_id = [int]
return: csst_dfs_common.models.Result
'''
rec
=
observation_pb2
.
Observation
(
id
=
get_parameter
(
kwargs
,
"id"
,
0
),
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
,
""
),
obs_time
=
get_parameter
(
kwargs
,
"obs_time"
),
exp_time
=
get_parameter
(
kwargs
,
"exp_time"
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
facility_status_id
=
get_parameter
(
kwargs
,
"facility_status_id"
),
module_status_id
=
get_parameter
(
kwargs
,
"module_status_id"
)
)
req
=
observation_pb2
.
WriteObservationReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Observation
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
write_req
(
"ObservationServicer.Write"
,
kwargs
)
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/facility/otherdata.py
+
18
-
113
View file @
407b231e
import
os
import
grpc
import
datetime
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
OtherDataRecord
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.otherdata
import
otherdata_pb2
,
otherdata_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
*
class
OtherDataApi
(
object
):
"""
OtherData Data Operation Class
"""
def
__init__
(
self
):
self
.
stub_class
=
otherdata_pb2_grpc
.
OtherDataSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve otherdata records from database
parameter kwargs:
obs_id: [str]
detector_no: [str]
module_id: [str]
file_type: [str]
filename: [str]
create_time : (start, end)
pipeline_id : [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
otherdata_pb2
.
FindOtherDataReq
(
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
,
""
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
file_type
=
get_parameter
(
kwargs
,
"file_type"
),
create_time_start
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
create_time_end
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
,
""
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
OtherDataRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
find_req
(
"OtherDataServicer.Find"
,
kwargs
)
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
otherdata_pb2
.
GetOtherDataReq
(
id
=
get_parameter
(
kwargs
,
"id"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"data not found"
)
return
Result
.
ok_data
(
data
=
OtherDataRecord
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
get_req
(
"OtherDataServicer.Get"
,
kwargs
)
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a otherdata record into database
parameter kwargs:
obs_id: [str]
detector_no : [str]
module_id : [str]
file_type : [str]
filename : [str]
file_path : [str]
pipeline_id : [str]
return csst_dfs_common.models.Result
'''
rec
=
otherdata_pb2
.
OtherDataRecord
(
id
=
0
,
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
,
''
),
file_type
=
get_parameter
(
kwargs
,
"file_type"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
)
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
otherdata_pb2
.
WriteOtherDataReq
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
file_path
:
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
)
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
)
if
not
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
file_path
,
))
if
not
rec
.
filename
:
rec
.
filename
=
os
.
path
.
basename
(
rec
.
file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
OtherDataRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
)
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s
:%s
"
%
(
e
.
code
().
value
,
e
.
details
()
))
if
not
os
.
path
.
exists
(
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
file_path
,
))
if
not
filename
:
filename
=
os
.
path
.
basename
(
file_path
)
conditions
=
{}
conditions
.
update
(
kwargs
)
conditions
[
"filename"
]
=
filename
with
open
(
file_path
,
'rb'
)
as
f
:
byte_stream
=
io
.
BytesIO
(
f
.
read
())
return
write_stream_req
(
"OtherDataServicer.Write"
,
byte_stream
,
kwargs
)
except
Exception
as
e
:
return
Result
.
error
(
message
=
"%s"
%
(
e
,
))
This diff is collapsed.
Click to expand it.
csst_dfs_api_cluster/hstdm/level2.py
deleted
100644 → 0
+
0
-
183
View file @
5202236f
import
os
import
grpc
import
datetime
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.hstdm
import
Level2Data
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.hstdm.level2
import
level2_pb2
,
level2_pb2_grpc
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level2DataApi
(
object
):
"""
Level2 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2_pb2_grpc
.
Level2SrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level2 records from database
:param kwargs: Parameter dictionary, key items support:
level0_id: [str]
level1_id: [int]
project_id: [int]
file_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level2_pb2
.
FindLevel2Req
(
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
,
None
),
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
,
0
),
project_id
=
get_parameter
(
kwargs
,
"project_id"
,
0
),
file_type
=
get_parameter
(
kwargs
,
"file_type"
),
create_time_start
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
create_time_end
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
1024
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
1024
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2Data
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
fits_id
=
get_parameter
(
kwargs
,
"id"
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level2_pb2
.
GetLevel2Req
(
id
=
fits_id
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"id:
{
fits_id
}
not found"
)
return
Result
.
ok_data
(
data
=
Level2Data
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level2_pb2
.
UpdateProcStatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_qc2_status
(
self
,
**
kwargs
):
''' update the status of QC2
parameter kwargs:
id : [int],
status : [int]
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc2Status
.
with_call
(
level2_pb2
.
UpdateQc2StatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level2 record into database
parameter kwargs:
level0_id: [str]
level1_id: [int]
project_id: [int]
file_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result
'''
rec
=
level2_pb2
.
Level2Record
(
id
=
0
,
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
,
None
),
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
,
0
),
project_id
=
get_parameter
(
kwargs
,
"project_id"
,
0
),
file_type
=
get_parameter
(
kwargs
,
"file_type"
),
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
),
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
0
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
,
format_datetime
(
datetime
.
now
())),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
)
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level2_pb2
.
WriteLevel2Req
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
file_path
,
))
if
not
rec
.
filename
:
rec
.
filename
=
os
.
path
.
basename
(
rec
.
file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2Data
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment