Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-dfs
csst-dfs-api-cluster
Commits
68697532
Commit
68697532
authored
Dec 03, 2022
by
Wei Shoulin
Browse files
un level0
parent
97fce90b
Changes
17
Hide whitespace changes
Inline
Side-by-side
csst_dfs_api_cluster/common/service.py
View file @
68697532
...
...
@@ -8,7 +8,8 @@ class ServiceProxy:
def
channel
(
self
):
options
=
((
'grpc.max_send_message_length'
,
1000
*
1024
*
1024
),
(
'grpc.max_receive_message_length'
,
1000
*
1024
*
1024
))
channel
=
grpc
.
insecure_channel
(
self
.
gateway
,
options
=
options
,
compression
=
grpc
.
Compression
.
Gzip
)
# channel = grpc.insecure_channel(self.gateway, options = options, compression = grpc.Compression.Gzip)
channel
=
grpc
.
insecure_channel
(
self
.
gateway
,
options
=
options
)
try
:
grpc
.
channel_ready_future
(
channel
).
result
(
timeout
=
10
)
except
grpc
.
FutureTimeoutError
:
...
...
csst_dfs_api_cluster/cpic/__init__.py
View file @
68697532
from
.level1
import
Level1DataApi
from
.level1prc
import
Level1PrcApi
\ No newline at end of file
csst_dfs_api_cluster/
ifs
/level
0
.py
→
csst_dfs_api_cluster/
cpic
/level
1
.py
View file @
68697532
import
os
import
grpc
import
datetime
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.
ifs
import
Level
0
Record
from
csst_dfs_commons.models.
cpic
import
Level
1
Record
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.ifs.level0
import
level0_pb2
,
level0_pb2_grpc
from
csst_dfs_proto.cpic.level1
import
level1_pb2
,
level1_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
Level0DataApi
(
object
):
class
Level1DataApi
(
object
):
"""
Level1 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level
0
_pb2_grpc
.
Level
0
SrvStub
(
ServiceProxy
().
channel
())
self
.
stub
=
level
1
_pb2_grpc
.
Level
1
SrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve level
0
records from database
''' retrieve level
1
records from database
parameter kwargs:
obs_id: [str],
detector_no: [str],
obs_type: [str],
object_name: [str],
obs_time : (start, end),
qc0_status : [int],
level0_id: [str]
data_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
file_name: [str],
version: [str],
ra: [float],
dec: [float],
radius: [float],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level0_pb2
.
FindLevel0DataReq
(
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
exp_time_start
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
0
],
exp_time_end
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
1
],
qc0_status
=
get_parameter
(
kwargs
,
"qc0_status"
),
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level1_pb2
.
FindLevel1Req
(
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
create_time_start
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
create_time_end
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
qc1_status
=
get_parameter
(
kwargs
,
"qc1_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
file_name
=
get_parameter
(
kwargs
,
"file_name"
),
object_name
=
get_parameter
(
kwargs
,
"object_name"
),
version
=
get_parameter
(
kwargs
,
"version"
),
ra
=
get_parameter
(
kwargs
,
"ra"
),
dec
=
get_parameter
(
kwargs
,
"dec"
),
radius
=
get_parameter
(
kwargs
,
"radius"
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level
0
Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level
1
Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
...
...
@@ -65,23 +57,21 @@ class Level0DataApi(object):
''' fetch a record from database
parameter kwargs:
id : [int],
level0_id: [str]
id : [int]
return csst_dfs_common.models.Result
'''
try
:
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level0_pb2
.
GetLevel0DataReq
(
id
=
id
,
level0_id
=
level0_id
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level1_pb2
.
GetLevel1Req
(
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"not found"
)
return
Result
.
error
(
message
=
f
"
data
not found"
)
return
Result
.
ok_data
(
data
=
Level
0
Record
().
from_proto_model
(
resp
.
record
))
return
Result
.
ok_data
(
data
=
Level
1
Record
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
...
...
@@ -91,20 +81,15 @@ class Level0DataApi(object):
parameter kwargs:
id : [int],
level0_id: [str],
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level0_pb2
.
UpdateProcStatusReq
(
id
=
id
,
level0_id
=
level0_id
,
status
=
status
),
level1_pb2
.
UpdateProcStatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
...
...
@@ -114,23 +99,18 @@ class Level0DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_qc
0
_status
(
self
,
**
kwargs
):
def
update_qc
1
_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
level0_id: [str],
status : [int]
'''
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc0Status
.
with_call
(
level0_pb2
.
UpdateQc0StatusReq
(
id
=
id
,
level0_id
=
level0_id
,
status
=
status
),
resp
,
_
=
self
.
stub
.
UpdateQc1Status
.
with_call
(
level1_pb2
.
UpdateQc1StatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
...
...
@@ -141,24 +121,43 @@ class Level0DataApi(object):
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a level
0 data
record into database
''' insert a level
1
record into database
parameter kwargs:
file_path = [str]
return: csst_dfs_common.models.Result
'''
rec
=
level0_pb2
.
Level0Record
(
file_path
=
get_parameter
(
kwargs
,
"file_path"
)
)
level0_id : [str]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result
'''
rec
=
level1_pb2
.
Level1Record
(
id
=
0
,
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
cor_sci_id
=
get_parameter
(
kwargs
,
"cor_sci_id"
),
prc_params
=
get_parameter
(
kwargs
,
"prc_params"
),
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
-
1
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
,
format_datetime
(
datetime
.
now
())),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
refs
=
get_parameter
(
kwargs
,
"refs"
,
{})
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level0_pb2
.
WriteLevel0Req
(
record
=
rec
,
data
=
data
)
yield
level1_pb2
.
WriteLevel1Req
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
...
...
@@ -169,10 +168,8 @@ class Level0DataApi(object):
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level
0
Record
().
from_proto_model
(
resp
.
record
))
return
Result
.
ok_data
(
data
=
Level
1
Record
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/
ms
c/level
0
prc.py
→
csst_dfs_api_cluster/
cpi
c/level
1
prc.py
View file @
68697532
...
...
@@ -2,22 +2,22 @@ import grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.
ms
c
import
Level
0
PrcRecord
from
csst_dfs_commons.models.
cpi
c
import
Level
1
PrcRecord
from
csst_dfs_proto.
ms
c.level
0
prc
import
level
0
prc_pb2
,
level
0
prc_pb2_grpc
from
csst_dfs_proto.
cpi
c.level
1
prc
import
level
1
prc_pb2
,
level
1
prc_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
Level
0
PrcApi
(
object
):
class
Level
1
PrcApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
level
0
prc_pb2_grpc
.
Level
0
PrcSrvStub
(
ServiceProxy
().
channel
())
self
.
stub
=
level
1
prc_pb2_grpc
.
Level
1
PrcSrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve level
0
procedure records from database
''' retrieve level
1
procedure records from database
parameter kwargs:
level
0
_id: [str]
level
1
_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
...
...
@@ -25,8 +25,8 @@ class Level0PrcApi(object):
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level
0
prc_pb2
.
FindLevel
0
PrcReq
(
level
0
_id
=
get_parameter
(
kwargs
,
"level
0
_id"
),
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level
1
prc_pb2
.
FindLevel
1
PrcReq
(
level
1
_id
=
get_parameter
(
kwargs
,
"level
1
_id"
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
prc_module
=
get_parameter
(
kwargs
,
"prc_module"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
...
...
@@ -34,7 +34,7 @@ class Level0PrcApi(object):
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level
0
PrcRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level
1
PrcRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
...
...
@@ -55,7 +55,7 @@ class Level0PrcApi(object):
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level
0
prc_pb2
.
UpdateProcStatusReq
(
id
=
id
,
status
=
status
),
level
1
prc_pb2
.
UpdateProcStatusReq
(
id
=
id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
...
...
@@ -66,10 +66,10 @@ class Level0PrcApi(object):
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a level
0
procedure record into database
''' insert a level
1
procedure record into database
parameter kwargs:
level
0
_id : [
str
]
level
1
_id : [
int
]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
...
...
@@ -79,9 +79,9 @@ class Level0PrcApi(object):
return csst_dfs_common.models.Result
'''
rec
=
level
0
prc_pb2
.
Level
0
PrcRecord
(
rec
=
level
1
prc_pb2
.
Level
1
PrcRecord
(
id
=
0
,
level
0
_id
=
get_parameter
(
kwargs
,
"level
0
_id"
),
level
1
_id
=
get_parameter
(
kwargs
,
"level
1
_id"
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
prc_module
=
get_parameter
(
kwargs
,
"prc_module"
),
params_file_path
=
get_parameter
(
kwargs
,
"params_file_path"
),
...
...
@@ -89,11 +89,11 @@ class Level0PrcApi(object):
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
),
result_file_path
=
get_parameter
(
kwargs
,
"result_file_path"
)
)
req
=
level
0
prc_pb2
.
WriteLevel
0
PrcReq
(
record
=
rec
)
req
=
level
1
prc_pb2
.
WriteLevel
1
PrcReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level
0
PrcRecord
().
from_proto_model
(
resp
.
record
))
return
Result
.
ok_data
(
data
=
Level
1
PrcRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
...
...
csst_dfs_api_cluster/facility/__init__.py
View file @
68697532
from
.brick
import
BrickApi
from
.detector
import
DetectorApi
from
.level2producer
import
Level2ProducerApi
from
.observation
import
ObservationApi
\ No newline at end of file
from
.observation
import
ObservationApi
from
.calmerge
import
CalMergeApi
from
.level0
import
Level0DataApi
from
.level0prc
import
Level0PrcApi
\ No newline at end of file
csst_dfs_api_cluster/
sls
/calmerge.py
→
csst_dfs_api_cluster/
facility
/calmerge.py
View file @
68697532
...
...
@@ -2,9 +2,9 @@ import grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.
sls
import
CalMergeRecord
from
csst_dfs_commons.models.
facility
import
CalMergeRecord
from
csst_dfs_proto.
sls
.calmerge
import
calmerge_pb2
,
calmerge_pb2_grpc
from
csst_dfs_proto.
facility
.calmerge
import
calmerge_pb2
,
calmerge_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
...
...
csst_dfs_api_cluster/
msc
/level0.py
→
csst_dfs_api_cluster/
facility
/level0.py
View file @
68697532
...
...
@@ -2,9 +2,9 @@ import grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.
msc
import
Level0Record
from
csst_dfs_commons.models.
facility
import
Level0Record
from
csst_dfs_proto.
msc
.level0
import
level0_pb2
,
level0_pb2_grpc
from
csst_dfs_proto.
facility
.level0
import
level0_pb2
,
level0_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
...
...
@@ -38,6 +38,11 @@ class Level0DataApi(object):
qc0_status
=
get_parameter
(
kwargs
,
"qc0_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
file_name
=
get_parameter
(
kwargs
,
"file_name"
),
ra_obj
=
get_parameter
(
kwargs
,
"ra_obj"
,
None
),
dec_obj
=
get_parameter
(
kwargs
,
"dec_obj"
,
None
),
radius
=
get_parameter
(
kwargs
,
"radius"
,
0
),
object_name
=
get_parameter
(
kwargs
,
"object_name"
,
None
),
version
=
get_parameter
(
kwargs
,
"version"
,
None
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
...
...
@@ -55,16 +60,16 @@ class Level0DataApi(object):
parameter kwargs:
id : [int],
level0_id: [str]
level0_id: [str],
obs_type: [str]
return csst_dfs_common.models.Result
'''
try
:
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level0_pb2
.
GetLevel0DataReq
(
id
=
id
,
level0_id
=
level0_id
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
...
...
@@ -81,19 +86,20 @@ class Level0DataApi(object):
parameter kwargs:
id : [int],
level0_id: [str],
obs_type: [str],
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level0_pb2
.
UpdateProcStatusReq
(
id
=
id
,
level0_id
=
level0_id
,
status
=
status
),
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
status
=
get_parameter
(
kwargs
,
"status"
)
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
...
...
@@ -109,17 +115,18 @@ class Level0DataApi(object):
parameter kwargs:
id : [int],
level0_id: [str],
obs_type: [str],
status : [int]
'''
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc0Status
.
with_call
(
level0_pb2
.
UpdateQc0StatusReq
(
id
=
id
,
level0_id
=
level0_id
,
status
=
status
),
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
status
=
get_parameter
(
kwargs
,
"status"
)
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
...
...
@@ -155,7 +162,7 @@ class Level0DataApi(object):
)
req
=
level0_pb2
.
WriteLevel0DataReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level0Record
().
from_proto_model
(
resp
.
record
))
else
:
...
...
csst_dfs_api_cluster/
sls
/level0prc.py
→
csst_dfs_api_cluster/
facility
/level0prc.py
View file @
68697532
...
...
@@ -2,9 +2,9 @@ import grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.
sls
import
Level0PrcRecord
from
csst_dfs_commons.models.
facility
import
Level0PrcRecord
from
csst_dfs_proto.
sls
.level0prc
import
level0prc_pb2
,
level0prc_pb2_grpc
from
csst_dfs_proto.
facility
.level0prc
import
level0prc_pb2
,
level0prc_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
...
...
csst_dfs_api_cluster/ifs/__init__.py
View file @
68697532
from
.calmerge
import
CalMergeApi
from
.level0
import
Level0DataApi
from
.level0prc
import
Level0PrcApi
from
.level1
import
Level1DataApi
from
.level1prc
import
Level1PrcApi
csst_dfs_api_cluster/mci/__init__.py
View file @
68697532
from
.level1
import
Level1DataApi
from
.level1prc
import
Level1PrcApi
\ No newline at end of file
csst_dfs_api_cluster/
ifs/calmerge
.py
→
csst_dfs_api_cluster/
mci/level1
.py
View file @
68697532
import
os
import
grpc
import
datetime
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.
ifs
import
CalMerge
Record
from
csst_dfs_proto.
ifs.calmerge
import
calmerge_pb2
,
calmerge
_pb2_grpc
from
csst_dfs_commons.models.
mci
import
Level1
Record
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.
mci.level1
import
level1_pb2
,
level1
_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
CalMergeApi
(
object
):
class
Level1DataApi
(
object
):
"""
Level1 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
calmerge_pb2_grpc
.
CalMergeSrvStub
(
ServiceProxy
().
channel
())
def
get_latest_by_l0
(
self
,
**
kwargs
):
''' retrieve calibration merge records from database by level0 data
parameter kwargs:
level0_id: [str]
ref_type: [str]
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
GetLatestByL0
.
with_call
(
calmerge_pb2
.
GetLatestByL0Req
(
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
ref_type
=
get_parameter
(
kwargs
,
"ref_type"
)),
metadata
=
get_auth_headers
())
if
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"not found"
)
return
Result
.
ok_data
(
data
=
CalMergeRecord
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
self
.
stub
=
level1_pb2_grpc
.
Level1SrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve
calibration merge
records from database
''' retrieve
level1
records from database
parameter kwargs:
detector_no
: [str]
ref
_type: [str]
obs
_time: (start,end)
qc1_status : [int]
prc_status : [int]
file
_
name: [str]
level0_id
: [str]
data
_type: [str]
create
_time
: (start,
end)
,
qc1_status : [int]
,
prc_status : [int]
,
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
calmerge
_pb2
.
Find
CalMerge
Req
(
detector_no
=
get_parameter
(
kwargs
,
"
detector_no
"
),
ref
_type
=
get_parameter
(
kwargs
,
"
ref
_type"
),
exp
_time_start
=
get_parameter
(
kwargs
,
"
obs
_time"
,
[
None
,
None
])[
0
],
exp
_time_end
=
get_parameter
(
kwargs
,
"
obs
_time"
,
[
None
,
None
])[
1
],
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level1
_pb2
.
Find
Level1
Req
(
level0_id
=
get_parameter
(
kwargs
,
"
level0_id
"
),
data
_type
=
get_parameter
(
kwargs
,
"
data
_type"
),
create
_time_start
=
get_parameter
(
kwargs
,
"
create
_time"
,
[
None
,
None
])[
0
],
create
_time_end
=
get_parameter
(
kwargs
,
"
create
_time"
,
[
None
,
None
])[
1
],
qc1_status
=
get_parameter
(
kwargs
,
"qc1_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
file
_
name
=
get_parameter
(
kwargs
,
"file
_
name"
),
limit
=
get_parameter
(
kwargs
,
"limit"
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
CalMerge
Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level1
Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
...
...
@@ -74,47 +56,40 @@ class CalMergeApi(object):
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
:
param kwargs
: Parameter dictionary, key items support
:
id : [int]
:
return
s:
csst_dfs_common.models.Result
param
eter
kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
id
=
get_parameter
(
kwargs
,
"id"
,
0
)
cal_id
=
get_parameter
(
kwargs
,
"cal_id"
,
""
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
calmerge_pb2
.
GetCalMergeReq
(
id
=
id
,
cal_id
=
cal_id
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level1_pb2
.
GetLevel1Req
(
id
=
get_parameter
(
kwargs
,
"id"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"not found"
)
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"
data
not found"
)
return
Result
.
ok_data
(
data
=
CalMerge
Record
().
from_proto_model
(
resp
.
record
))
return
Result
.
ok_data
(
data
=
Level1
Record
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_
qc1
_status
(
self
,
**
kwargs
):
def
update_
proc
_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id = cal_id,
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
,
0
)
cal_id
=
get_parameter
(
kwargs
,
"cal_id"
,
""
)
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc1Status
.
with_call
(
calmerge_pb2
.
UpdateQc1StatusReq
(
id
=
id
,
cal_id
=
cal_id
,
status
=
status
),
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level1_pb2
.
UpdateProcStatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
...
...
@@ -124,26 +99,18 @@ class CalMergeApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_
proc
_status
(
self
,
**
kwargs
):
''' update the status of
reduction
def
update_
qc1
_status
(
self
,
**
kwargs
):
''' update the status of
QC0
parameter kwargs:
id : [int],
cal_id: [str],
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
,
0
)
cal_id
=
get_parameter
(
kwargs
,
"cal_id"
,
""
)
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
calmerge_pb2
.
UpdateProcStatusReq
(
id
=
id
,
cal_id
=
cal_id
,
status
=
status
),
resp
,
_
=
self
.
stub
.
UpdateQc1Status
.
with_call
(
level1_pb2
.
UpdateQc1StatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
...
...
@@ -154,42 +121,56 @@ class CalMergeApi(object):
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a
calibration merge
record into database
''' insert a
level1
record into database
parameter kwargs:
cal_id : [str],
detector_no : [str]
ref_type : [str]
obs_time : [str]
exp_time : [float]
level0_id : [str]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
filename
: [str]
file_path : [str
]
level0_ids : [list]
pipeline_id
: [str]
refs: [dict
]
return csst_dfs_common.models.Result
'''
rec
=
calmerge_pb2
.
CalMerge
Record
(
rec
=
level1_pb2
.
Level1
Record
(
id
=
0
,
cal_id
=
get_parameter
(
kwargs
,
"cal_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
ref_type
=
get_parameter
(
kwargs
,
"ref_type"
),
obs_time
=
get_parameter
(
kwargs
,
"obs_time"
),
exp_time
=
get_parameter
(
kwargs
,
"exp_time"
),
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
cor_sci_id
=
get_parameter
(
kwargs
,
"cor_sci_id"
),
prc_params
=
get_parameter
(
kwargs
,
"prc_params"
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
-
1
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
),
level0_ids
=
get_parameter
(
kwargs
,
"level0_ids"
,
[])
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
-
1
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
,
format_datetime
(
datetime
.
now
())),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
refs
=
get_parameter
(
kwargs
,
"refs"
,
{})
)
req
=
calmerge_pb2
.
WriteCalMergeReq
(
record
=
rec
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level1_pb2
.
WriteLevel1Req
(
record
=
rec
,
data
=
data
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
not
rec
.
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
file_path
,
))
if
not
rec
.
filename
:
rec
.
filename
=
os
.
path
.
basename
(
rec
.
file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
CalMerge
Record
().
from_proto_model
(
resp
.
record
))
return
Result
.
ok_data
(
data
=
Level1
Record
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/
ifs
/level
0
prc.py
→
csst_dfs_api_cluster/
mci
/level
1
prc.py
View file @
68697532
...
...
@@ -2,22 +2,22 @@ import grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.ifs
import
Level
0
PrcRecord
from
csst_dfs_commons.models.ifs
import
Level
1
PrcRecord
from
csst_dfs_proto.ifs.level
0
prc
import
level
0
prc_pb2
,
level
0
prc_pb2_grpc
from
csst_dfs_proto.ifs.level
1
prc
import
level
1
prc_pb2
,
level
1
prc_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
Level
0
PrcApi
(
object
):
class
Level
1
PrcApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
level
0
prc_pb2_grpc
.
Level
0
PrcSrvStub
(
ServiceProxy
().
channel
())
self
.
stub
=
level
1
prc_pb2_grpc
.
Level
1
PrcSrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve level
0
procedure records from database
''' retrieve level
1
procedure records from database
parameter kwargs:
level
0
_id: [str]
level
1
_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
...
...
@@ -25,8 +25,8 @@ class Level0PrcApi(object):
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level
0
prc_pb2
.
FindLevel
0
PrcReq
(
level
0
_id
=
get_parameter
(
kwargs
,
"level
0
_id"
),
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level
1
prc_pb2
.
FindLevel
1
PrcReq
(
level
1
_id
=
get_parameter
(
kwargs
,
"level
1
_id"
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
prc_module
=
get_parameter
(
kwargs
,
"prc_module"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
...
...
@@ -34,7 +34,7 @@ class Level0PrcApi(object):
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level
0
PrcRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level
1
PrcRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
...
...
@@ -55,7 +55,7 @@ class Level0PrcApi(object):
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level
0
prc_pb2
.
UpdateProcStatusReq
(
id
=
id
,
status
=
status
),
level
1
prc_pb2
.
UpdateProcStatusReq
(
id
=
id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
...
...
@@ -66,10 +66,10 @@ class Level0PrcApi(object):
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a level
0
procedure record into database
''' insert a level
1
procedure record into database
parameter kwargs:
level
0
_id : [
str
]
level
1
_id : [
int
]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
...
...
@@ -79,9 +79,9 @@ class Level0PrcApi(object):
return csst_dfs_common.models.Result
'''
rec
=
level
0
prc_pb2
.
Level
0
PrcRecord
(
rec
=
level
1
prc_pb2
.
Level
1
PrcRecord
(
id
=
0
,
level
0
_id
=
get_parameter
(
kwargs
,
"level
0
_id"
),
level
1
_id
=
get_parameter
(
kwargs
,
"level
1
_id"
),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
),
prc_module
=
get_parameter
(
kwargs
,
"prc_module"
),
params_file_path
=
get_parameter
(
kwargs
,
"params_file_path"
),
...
...
@@ -89,11 +89,11 @@ class Level0PrcApi(object):
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
),
result_file_path
=
get_parameter
(
kwargs
,
"result_file_path"
)
)
req
=
level
0
prc_pb2
.
WriteLevel
0
PrcReq
(
record
=
rec
)
req
=
level
1
prc_pb2
.
WriteLevel
1
PrcReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level
0
PrcRecord
().
from_proto_model
(
resp
.
record
))
return
Result
.
ok_data
(
data
=
Level
1
PrcRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
...
...
csst_dfs_api_cluster/msc/__init__.py
View file @
68697532
from
.calmerge
import
CalMergeApi
from
.level0
import
Level0DataApi
from
.level0prc
import
Level0PrcApi
from
.level1
import
Level1DataApi
from
.level1prc
import
Level1PrcApi
from
.level2
import
Level2DataApi
\ No newline at end of file
csst_dfs_api_cluster/msc/calmerge.py
deleted
100644 → 0
View file @
97fce90b
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.msc
import
CalMergeRecord
from
csst_dfs_proto.msc.calmerge
import
calmerge_pb2
,
calmerge_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
CalMergeApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
calmerge_pb2_grpc
.
CalMergeSrvStub
(
ServiceProxy
().
channel
())
def
get_latest_by_l0
(
self
,
**
kwargs
):
''' retrieve calibration merge records from database by level0 data
parameter kwargs:
level0_id: [str]
ref_type: [str]
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
GetLatestByL0
.
with_call
(
calmerge_pb2
.
GetLatestByL0Req
(
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
ref_type
=
get_parameter
(
kwargs
,
"ref_type"
)),
metadata
=
get_auth_headers
())
if
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"not found"
)
return
Result
.
ok_data
(
data
=
CalMergeRecord
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
find
(
self
,
**
kwargs
):
''' retrieve calibration merge records from database
parameter kwargs:
detector_no: [str]
ref_type: [str]
obs_time: (start,end)
qc1_status : [int]
prc_status : [int]
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
calmerge_pb2
.
FindCalMergeReq
(
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
ref_type
=
get_parameter
(
kwargs
,
"ref_type"
),
exp_time_start
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
0
],
exp_time_end
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
1
],
qc1_status
=
get_parameter
(
kwargs
,
"qc1_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
file_name
=
get_parameter
(
kwargs
,
"file_name"
),
limit
=
get_parameter
(
kwargs
,
"limit"
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
CalMergeRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
:param kwargs: Parameter dictionary, key items support:
id : [int]
:returns: csst_dfs_common.models.Result
'''
try
:
id
=
get_parameter
(
kwargs
,
"id"
,
0
)
cal_id
=
get_parameter
(
kwargs
,
"cal_id"
,
""
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
calmerge_pb2
.
GetCalMergeReq
(
id
=
id
,
cal_id
=
cal_id
),
metadata
=
get_auth_headers
())
if
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"not found"
)
return
Result
.
ok_data
(
data
=
CalMergeRecord
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_qc1_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id = cal_id,
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
,
0
)
cal_id
=
get_parameter
(
kwargs
,
"cal_id"
,
""
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc1Status
.
with_call
(
calmerge_pb2
.
UpdateQc1StatusReq
(
id
=
id
,
cal_id
=
cal_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id: [str],
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
,
0
)
cal_id
=
get_parameter
(
kwargs
,
"cal_id"
,
""
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
calmerge_pb2
.
UpdateProcStatusReq
(
id
=
id
,
cal_id
=
cal_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a calibration merge record into database
parameter kwargs:
cal_id : [str],
detector_no : [str]
ref_type : [str]
obs_time : [str]
exp_time : [float]
prc_status : [int]
prc_time : [str]
filename : [str]
file_path : [str]
level0_ids : [list]
return csst_dfs_common.models.Result
'''
rec
=
calmerge_pb2
.
CalMergeRecord
(
id
=
0
,
cal_id
=
get_parameter
(
kwargs
,
"cal_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
ref_type
=
get_parameter
(
kwargs
,
"ref_type"
),
obs_time
=
get_parameter
(
kwargs
,
"obs_time"
),
exp_time
=
get_parameter
(
kwargs
,
"exp_time"
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
-
1
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
),
level0_ids
=
get_parameter
(
kwargs
,
"level0_ids"
,
[])
)
req
=
calmerge_pb2
.
WriteCalMergeReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
CalMergeRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/sls/__init__.py
View file @
68697532
from
.calmerge
import
CalMergeApi
from
.level0
import
Level0DataApi
from
.level0prc
import
Level0PrcApi
from
.level1
import
Level1DataApi
from
.level1prc
import
Level1PrcApi
from
.level2spectra
import
Level2SpectraApi
\ No newline at end of file
csst_dfs_api_cluster/sls/level0.py
deleted
100644 → 0
View file @
97fce90b
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.sls
import
Level0Record
from
csst_dfs_proto.sls.level0
import
level0_pb2
,
level0_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
Level0DataApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
level0_pb2_grpc
.
Level0SrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve level0 records from database
parameter kwargs:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level0_pb2
.
FindLevel0DataReq
(
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
exp_time_start
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
0
],
exp_time_end
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
1
],
qc0_status
=
get_parameter
(
kwargs
,
"qc0_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
file_name
=
get_parameter
(
kwargs
,
"file_name"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level0Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int],
level0_id: [str]
return csst_dfs_common.models.Result
'''
try
:
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level0_pb2
.
GetLevel0DataReq
(
id
=
id
,
level0_id
=
level0_id
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"not found"
)
return
Result
.
ok_data
(
data
=
Level0Record
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
level0_id: [str],
status : [int]
return csst_dfs_common.models.Result
'''
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level0_pb2
.
UpdateProcStatusReq
(
id
=
id
,
level0_id
=
level0_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_qc0_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
level0_id: [str],
status : [int]
'''
id
=
get_parameter
(
kwargs
,
"id"
)
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc0Status
.
with_call
(
level0_pb2
.
UpdateQc0StatusReq
(
id
=
id
,
level0_id
=
level0_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a level0 data record into database
parameter kwargs:
obs_id = [str]
detector_no = [str]
obs_type = [str]
obs_time = [str]
exp_time = [int]
detector_status_id = [int]
filename = [str]
file_path = [str]
return: csst_dfs_common.models.Result
'''
rec
=
level0_pb2
.
Level0Record
(
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
obs_type
=
get_parameter
(
kwargs
,
"obs_type"
),
obs_time
=
get_parameter
(
kwargs
,
"obs_time"
),
exp_time
=
get_parameter
(
kwargs
,
"exp_time"
),
detector_status_id
=
get_parameter
(
kwargs
,
"detector_status_id"
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
)
)
req
=
level0_pb2
.
WriteLevel0DataReq
(
record
=
rec
)
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
req
,
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level0Record
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/sls/level1.py
View file @
68697532
...
...
@@ -42,7 +42,41 @@ class Level1DataApi(object):
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
other_conditions
=
{}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level1Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
find_by_prc_status
(
self
,
**
kwargs
):
''' retrieve level1 records from database
parameter kwargs:
level0_id: [str]
data_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level1_pb2
.
FindLevel1Req
(
level0_id
=
None
,
data_type
=
None
,
create_time_start
=
None
,
create_time_end
=
None
,
qc1_status
=
None
,
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
-
1
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
1
),
other_conditions
=
{
"orderBy"
:
"create_time asc"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment