Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-dfs
csst-dfs-api-cluster
Commits
5202236f
Commit
5202236f
authored
Jan 12, 2024
by
Wei Shoulin
Browse files
grpc channel
parent
a87e09f1
Pipeline
#3078
passed with stage
Changes
19
Pipelines
1
Show whitespace changes
Inline
Side-by-side
csst_dfs_api_cluster/common/catalog.py
View file @
5202236f
...
...
@@ -6,15 +6,16 @@ import io
from
csst_dfs_commons.models
import
Result
from
csst_dfs_proto.common.ephem
import
ephem_pb2
,
ephem_pb2_grpc
from
.service
import
ServiceProxy
from
.constants
import
*
from
.service
import
grpc_channel
from
.utils
import
get_auth_headers
log
=
logging
.
getLogger
(
'csst'
)
class
CatalogApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
ephem_pb2_grpc
.
EphemSearchSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
ephem_pb2_grpc
.
EphemSearchSrvStub
self
.
stub
=
None
@
grpc_channel
def
gaia3_query
(
self
,
ra
:
float
,
dec
:
float
,
radius
:
float
,
columns
:
tuple
,
min_mag
:
float
,
max_mag
:
float
,
obstime
:
int
,
limit
:
int
):
''' retrieval GAIA DR 3
args:
...
...
csst_dfs_api_cluster/common/service.py
View file @
5202236f
import
os
import
grpc
from
csst_dfs_commons.models.errors
import
CSSTFatalException
class
ServiceProxy
:
def
__init__
(
self
):
self
.
gateway
=
os
.
getenv
(
"CSST_DFS_GATEWAY"
,
'172.31.248.218:30880'
)
...
...
@@ -16,3 +17,23 @@ class ServiceProxy:
raise
CSSTFatalException
(
'Error connecting to server {}'
.
format
(
self
.
gateway
))
else
:
return
channel
def
grpc_channel
(
func
):
def
wrapper
(
*
args
,
**
kwargs
):
with
get_grpc_channel
()
as
c
:
args
[
0
].
stub
=
args
[
0
].
stub_class
(
c
)
return
func
(
*
args
,
**
kwargs
)
return
wrapper
def
get_grpc_channel
():
options
=
((
'grpc.max_send_message_length'
,
1024
*
1024
*
1024
),
(
'grpc.max_receive_message_length'
,
1024
*
1024
*
1024
))
gateway
=
os
.
getenv
(
"CSST_DFS_GATEWAY"
,
'172.31.248.218:30880'
)
# channel = grpc.insecure_channel(self.gateway, options = options, compression = grpc.Compression.Gzip)
channel
=
grpc
.
insecure_channel
(
gateway
,
options
=
options
)
try
:
grpc
.
channel_ready_future
(
channel
).
result
(
timeout
=
10
)
except
grpc
.
FutureTimeoutError
:
raise
CSSTFatalException
(
'Error connecting to server {}'
.
format
(
gateway
))
else
:
return
channel
\ No newline at end of file
csst_dfs_api_cluster/common/utils.py
View file @
5202236f
...
...
@@ -5,7 +5,7 @@ import grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_proto.common.misc
import
misc_pb2
,
misc_pb2_grpc
from
.service
import
ServiceProxy
from
.service
import
get_grpc_channel
def
format_datetime
(
dt
):
return
dt
.
strftime
(
'%Y-%m-%d %H:%M:%S'
)
...
...
@@ -57,7 +57,8 @@ def get_auth_headers():
return
((
"csst_dfs_app"
,
os
.
getenv
(
"CSST_DFS_APP_ID"
)),(
"csst_dfs_token"
,
os
.
getenv
(
"CSST_DFS_APP_TOKEN"
)),)
def
get_nextId_by_prefix
(
prefix
):
stub
=
misc_pb2_grpc
.
MiscSrvStub
(
ServiceProxy
().
channel
())
with
get_grpc_channel
()
as
c
:
stub
=
misc_pb2_grpc
.
MiscSrvStub
(
c
)
try
:
resp
,
_
=
stub
.
GetSeqId
.
with_call
(
misc_pb2
.
GetSeqIdReq
(
prefix
=
prefix
),
...
...
csst_dfs_api_cluster/facility/brick.py
View file @
5202236f
...
...
@@ -6,7 +6,7 @@ from csst_dfs_commons.models.facility import Brick, BrickObsStatus, BrickLevel1
from
csst_dfs_proto.facility.brick
import
brick_pb2
,
brick_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
UPLOAD_CHUNK_SIZE
...
...
@@ -15,8 +15,10 @@ class BrickApi(object):
Brick Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
brick_pb2_grpc
.
BrickSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
brick_pb2_grpc
.
BrickSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' find brick records
...
...
@@ -39,6 +41,7 @@ class BrickApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -61,6 +64,7 @@ class BrickApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a brickal record into database
...
...
@@ -88,6 +92,7 @@ class BrickApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_obs_status
(
self
,
**
kwargs
):
''' find observation status of bricks
...
...
@@ -113,12 +118,13 @@ class BrickApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_level1_data
(
self
,
**
kwargs
):
''' find level1 data
:param kwargs: Parameter dictionary, support:
brick_id = [int]
\n
level1_id = [int]
\n
brick_id = [int]
level1_id = [int]
module = [str],
limit = [int]
...
...
csst_dfs_api_cluster/facility/detector.py
View file @
5202236f
...
...
@@ -3,16 +3,17 @@ import grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Detector
,
DetectorStatus
from
csst_dfs_proto.facility.detector
import
detector_pb2
,
detector_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
DetectorApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
detector_pb2_grpc
.
DetectorSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
detector_pb2_grpc
.
DetectorSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve detector records from database
...
...
@@ -36,6 +37,7 @@ class DetectorApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -58,6 +60,7 @@ class DetectorApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update
(
self
,
**
kwargs
):
''' update a detector by no
...
...
@@ -92,6 +95,7 @@ class DetectorApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
delete
(
self
,
**
kwargs
):
''' delete a detector by no
...
...
@@ -114,6 +118,7 @@ class DetectorApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a detector record into database
...
...
@@ -141,6 +146,7 @@ class DetectorApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_status
(
self
,
**
kwargs
):
''' retrieve a detector status's from database
...
...
@@ -167,6 +173,7 @@ class DetectorApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get_status
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -189,6 +196,7 @@ class DetectorApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write_status
(
self
,
**
kwargs
):
''' insert a detector status into database
...
...
csst_dfs_api_cluster/facility/level0.py
View file @
5202236f
...
...
@@ -6,13 +6,15 @@ from csst_dfs_commons.models.facility import Level0Record
from
csst_dfs_proto.facility.level0
import
level0_pb2
,
level0_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level0DataApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
level0_pb2_grpc
.
Level0SrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
level0_pb2_grpc
.
Level0SrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level0 records from database
...
...
@@ -64,6 +66,7 @@ class Level0DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_by_brick_ids
(
self
,
**
kwargs
):
''' retrieve level0 records by brick_ids like [1,2,3,4]
...
...
@@ -85,6 +88,7 @@ class Level0DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -110,6 +114,7 @@ class Level0DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -139,6 +144,7 @@ class Level0DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_qc0_status
(
self
,
**
kwargs
):
''' update the status of QC0
...
...
@@ -166,6 +172,7 @@ class Level0DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level0 data record into database
...
...
csst_dfs_api_cluster/facility/level0prc.py
View file @
5202236f
...
...
@@ -6,13 +6,15 @@ from csst_dfs_commons.models.facility import Level0PrcRecord
from
csst_dfs_proto.facility.level0prc
import
level0prc_pb2
,
level0prc_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level0PrcApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
level0prc_pb2_grpc
.
Level0PrcSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
level0prc_pb2_grpc
.
Level0PrcSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level0 procedure records from database
...
...
@@ -41,6 +43,7 @@ class Level0PrcApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -65,6 +68,7 @@ class Level0PrcApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level0 procedure record into database
...
...
csst_dfs_api_cluster/facility/level1.py
View file @
5202236f
...
...
@@ -8,7 +8,7 @@ from csst_dfs_commons.models.facility import Level1Record
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.level1
import
level1_pb2
,
level1_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level1DataApi
(
object
):
...
...
@@ -16,8 +16,10 @@ class Level1DataApi(object):
Level1 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level1_pb2_grpc
.
Level1SrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
level1_pb2_grpc
.
Level1SrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level1 records from database
...
...
@@ -64,6 +66,7 @@ class Level1DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_by_brick_ids
(
self
,
**
kwargs
):
''' retrieve level1 records by brick_ids like [1,2,3,4]
...
...
@@ -85,6 +88,7 @@ class Level1DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_by_ids
(
self
,
**
kwargs
):
''' retrieve level1 records by internal level1 ids like [1,2,3,4]
...
...
@@ -106,6 +110,7 @@ class Level1DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
sls_find_by_qc1_status
(
self
,
**
kwargs
):
''' retrieve level1 records from database
...
...
@@ -135,6 +140,7 @@ class Level1DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -158,6 +164,7 @@ class Level1DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -181,6 +188,7 @@ class Level1DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_qc1_status
(
self
,
**
kwargs
):
''' update the status of QC0
...
...
@@ -202,6 +210,7 @@ class Level1DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level1 record into database
...
...
csst_dfs_api_cluster/facility/level1prc.py
View file @
5202236f
...
...
@@ -6,13 +6,15 @@ from csst_dfs_commons.models.facility import Level1PrcRecord
from
csst_dfs_proto.facility.level1prc
import
level1prc_pb2
,
level1prc_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level1PrcApi
(
object
):
def
__init__
(
self
):
self
.
stub
=
level1prc_pb2_grpc
.
Level1PrcSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
level1prc_pb2_grpc
.
Level1PrcSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level1 procedure records from database
...
...
@@ -41,6 +43,7 @@ class Level1PrcApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -65,6 +68,7 @@ class Level1PrcApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level1 procedure record into database
...
...
@@ -98,6 +102,3 @@ class Level1PrcApi(object):
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/facility/level2.py
View file @
5202236f
...
...
@@ -11,7 +11,7 @@ from csst_dfs_commons.models.common import from_proto_model_list
from
csst_dfs_commons.models.level2
import
Level2Record
,
filter_table_name
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.level2
import
level2_pb2
,
level2_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level2DataApi
(
object
):
...
...
@@ -19,8 +19,10 @@ class Level2DataApi(object):
Level2 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2_pb2_grpc
.
Level2SrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
level2_pb2_grpc
.
Level2SrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level2 records from database
...
...
@@ -69,6 +71,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
catalog_columns
(
self
,
**
kwargs
):
''' retrieve columns data type
...
...
@@ -87,6 +90,7 @@ class Level2DataApi(object):
resp
[
'totalCount'
]
=
len
(
resp
[
'data'
])
return
resp
@
grpc_channel
def
catalog_query
(
self
,
**
kwargs
):
''' retrieve level2catalog records from database
...
...
@@ -117,6 +121,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
coord_cond_sql
(
self
,
**
kwargs
):
''' generate coordinate search condition sql
...
...
@@ -144,6 +149,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_existed_brick_ids
(
self
,
**
kwargs
):
''' retrieve existed brick_ids in a single exposure catalog
...
...
@@ -164,6 +170,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -185,6 +192,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -208,6 +216,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_qc2_status
(
self
,
**
kwargs
):
''' update the status of QC0
...
...
@@ -229,6 +238,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level2 record into database
...
...
csst_dfs_api_cluster/facility/level2producer.py
View file @
5202236f
...
...
@@ -6,7 +6,7 @@ from csst_dfs_commons.models.facility import Level2Producer, Level2Job, Level2Pr
from
csst_dfs_proto.facility.level2producer
import
level2producer_pb2
,
level2producer_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
UPLOAD_CHUNK_SIZE
...
...
@@ -15,8 +15,10 @@ class Level2ProducerApi(object):
Level2Producer Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2producer_pb2_grpc
.
Level2ProducerSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
level2producer_pb2_grpc
.
Level2ProducerSrvStub
self
.
stub
=
None
@
grpc_channel
def
register
(
self
,
**
kwargs
):
''' register a Level2Producer data record into database
...
...
@@ -48,6 +50,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve Level2Producer records from database
...
...
@@ -70,6 +73,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -92,6 +96,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_nexts
(
self
,
**
kwargs
):
''' retrieve Level2Producer records from database
...
...
@@ -113,6 +118,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_start
(
self
,
**
kwargs
):
''' retrieve Level2Producer records from database
...
...
@@ -135,6 +141,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update
(
self
,
**
kwargs
):
''' update a Level2Producer
...
...
@@ -168,6 +175,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
delete
(
self
,
**
kwargs
):
''' delete a Level2Producer data
...
...
@@ -189,6 +197,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
new_job
(
self
,
**
kwargs
):
''' new a Level2Producer Job
...
...
@@ -213,6 +222,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get_job
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -235,6 +245,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_job
(
self
,
**
kwargs
):
''' update a Level2Producer Job
...
...
@@ -262,6 +273,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
new_running
(
self
,
**
kwargs
):
''' insert a Level2ProducerRuningRecord data
...
...
@@ -296,6 +308,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get_running
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -318,6 +331,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_running
(
self
,
**
kwargs
):
''' udpate a Level2ProducerRuningRecord data
...
...
@@ -354,6 +368,7 @@ class Level2ProducerApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
find_running
(
self
,
**
kwargs
):
''' find Level2ProducerRuningRecord data
...
...
csst_dfs_api_cluster/facility/level2type.py
View file @
5202236f
...
...
@@ -12,7 +12,7 @@ from csst_dfs_commons.models.level2 import Level2TypeRecord
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.level2type
import
level2type_pb2
,
level2type_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level2TypeApi
(
object
):
...
...
@@ -20,8 +20,10 @@ class Level2TypeApi(object):
Level2Type Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2type_pb2_grpc
.
Level2TypeSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
level2type_pb2_grpc
.
Level2TypeSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level2type records from database
...
...
@@ -54,7 +56,7 @@ class Level2TypeApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -76,6 +78,7 @@ class Level2TypeApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_import_status
(
self
,
**
kwargs
):
''' update the status of level2 type
...
...
@@ -99,7 +102,7 @@ class Level2TypeApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level2type record into database
...
...
csst_dfs_api_cluster/facility/observation.py
View file @
5202236f
...
...
@@ -3,10 +3,9 @@ import grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.facility
import
Observation
from
csst_dfs_proto.facility.observation
import
observation_pb2
,
observation_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
from
..common.constants
import
UPLOAD_CHUNK_SIZE
...
...
@@ -15,8 +14,10 @@ class ObservationApi(object):
Observation Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
observation_pb2_grpc
.
ObservationSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
observation_pb2_grpc
.
ObservationSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve exposure records from database
...
...
@@ -50,6 +51,7 @@ class ObservationApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -75,6 +77,7 @@ class ObservationApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -103,6 +106,7 @@ class ObservationApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_qc0_status
(
self
,
**
kwargs
):
''' update the status of QC0
...
...
@@ -129,6 +133,7 @@ class ObservationApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a observational record into database
...
...
@@ -164,6 +169,3 @@ class ObservationApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/facility/otherdata.py
View file @
5202236f
...
...
@@ -8,7 +8,7 @@ from csst_dfs_commons.models.facility import OtherDataRecord
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.otherdata
import
otherdata_pb2
,
otherdata_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
OtherDataApi
(
object
):
...
...
@@ -16,8 +16,10 @@ class OtherDataApi(object):
OtherData Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
otherdata_pb2_grpc
.
OtherDataSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
otherdata_pb2_grpc
.
OtherDataSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve otherdata records from database
...
...
@@ -55,6 +57,7 @@ class OtherDataApi(object):
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -77,6 +80,7 @@ class OtherDataApi(object):
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a otherdata record into database
...
...
csst_dfs_api_cluster/hstdm/level2.py
View file @
5202236f
...
...
@@ -8,7 +8,7 @@ from csst_dfs_commons.models.hstdm import Level2Data
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.hstdm.level2
import
level2_pb2
,
level2_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level2DataApi
(
object
):
...
...
@@ -16,8 +16,10 @@ class Level2DataApi(object):
Level2 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2_pb2_grpc
.
Level2SrvStub
(
ServiceProxy
().
channel
())
self
.
stub
=
level2_pb2_grpc
.
Level2SrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level2 records from database
...
...
@@ -57,6 +59,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -79,6 +82,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -101,7 +105,7 @@ class Level2DataApi(object):
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_qc2_status
(
self
,
**
kwargs
):
''' update the status of QC2
...
...
@@ -123,6 +127,7 @@ class Level2DataApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level2 record into database
...
...
csst_dfs_api_cluster/mbi/__init__.py
View file @
5202236f
from
.level2
import
Level2DataApi
from
.level2co
import
Level2CoApi
\ No newline at end of file
csst_dfs_api_cluster/mbi/level2.py
deleted
100644 → 0
View file @
a87e09f1
import
io
import
os
import
grpc
import
datetime
import
pickle
from
collections.abc
import
Iterable
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.msc
import
Level2Record
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.msc.level2
import
level2_pb2
,
level2_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
Level2DataApi
(
object
):
"""
Level2 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2_pb2_grpc
.
Level2SrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve level2 records from database
parameter kwargs:
level0_id: [str]
level1_id: [int]
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level2_pb2
.
FindLevel2Req
(
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
create_time_start
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
create_time_end
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
1024
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
1024
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
catalog_query
(
self
,
**
kwargs
):
''' retrieve level2catalog records from database
parameter kwargs:
brick_ids: list[int]
obs_id: [str]
detector_no: [str]
filter: [str]
ra: [float] in deg
dec: [float] in deg
radius: [float] in deg
obs_time: (start, end),
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
datas
=
io
.
BytesIO
()
totalCount
=
0
brick_ids
=
get_parameter
(
kwargs
,
"brick_ids"
,
[])
if
not
isinstance
(
brick_ids
,
Iterable
):
brick_ids
=
[
brick_ids
]
resps
=
self
.
stub
.
FindCatalog
(
level2_pb2
.
FindLevel2CatalogReq
(
brick_ids
=
","
.
join
([
str
(
i
)
for
i
in
brick_ids
]),
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
,
None
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
,
None
),
filter
=
get_parameter
(
kwargs
,
"filter"
,
None
),
obs_time_start
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
0
],
obs_time_end
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
1
],
ra
=
get_parameter
(
kwargs
,
"ra"
),
dec
=
get_parameter
(
kwargs
,
"dec"
),
radius
=
get_parameter
(
kwargs
,
"radius"
),
minMag
=
get_parameter
(
kwargs
,
"min_mag"
),
maxMag
=
get_parameter
(
kwargs
,
"max_mag"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
columns
=
","
.
join
(
get_parameter
(
kwargs
,
"columns"
,
"*"
))
),
metadata
=
get_auth_headers
())
for
resp
in
resps
:
if
resp
.
success
:
datas
.
write
(
resp
.
records
)
totalCount
=
resp
.
totalCount
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
datas
.
flush
()
records
=
pickle
.
loads
(
datas
.
getvalue
())
return
Result
.
ok_data
(
data
=
records
[
0
]).
append
(
"totalCount"
,
totalCount
).
append
(
"columns"
,
records
[
1
])
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
catalog_query_file
(
self
,
**
kwargs
):
''' retrieve level2catalog records from database
parameter kwargs:
brick_ids: list[int]
obs_id: [str]
detector_no: [str]
filter: [str]
ra: [float] in deg
dec: [float] in deg
radius: [float] in deg
obs_time: (start, end),
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
brick_ids
=
get_parameter
(
kwargs
,
"brick_ids"
,
[])
if
not
isinstance
(
brick_ids
,
Iterable
):
brick_ids
=
[
brick_ids
]
resp
,
_
=
self
.
stub
.
FindCatalogFile
.
with_call
(
level2_pb2
.
FindLevel2CatalogReq
(
brick_ids
=
","
.
join
([
str
(
i
)
for
i
in
brick_ids
]),
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
filter
=
get_parameter
(
kwargs
,
"filter"
,
None
),
obs_time_start
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
0
],
obs_time_end
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
1
],
ra
=
get_parameter
(
kwargs
,
"ra"
),
dec
=
get_parameter
(
kwargs
,
"dec"
),
radius
=
get_parameter
(
kwargs
,
"radius"
),
minMag
=
get_parameter
(
kwargs
,
"min_mag"
),
maxMag
=
get_parameter
(
kwargs
,
"max_mag"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
find_existed_brick_ids
(
self
,
**
kwargs
):
''' retrieve existed brick_ids in a single exposure catalog
parameter kwargs:
return: csst_dfs_common.models.Result
'''
try
:
resp
=
self
.
stub
.
FindExistedBricks
(
level2_pb2
.
FindExistedBricksReq
(),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
resp
.
brick_ids
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level2_pb2
.
GetLevel2Req
(
id
=
get_parameter
(
kwargs
,
"id"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"data not found"
)
return
Result
.
ok_data
(
data
=
Level2Record
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level2_pb2
.
UpdateProcStatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_qc2_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc2Status
.
with_call
(
level2_pb2
.
UpdateQc2StatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a level2 record into database
parameter kwargs:
level1_id : [int]
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
return csst_dfs_common.models.Result
'''
rec
=
level2_pb2
.
Level2Record
(
id
=
0
,
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
),
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
0
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
,
format_datetime
(
datetime
.
now
())),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
,
""
)
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level2_pb2
.
WriteLevel2Req
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
file_path
,
))
if
not
rec
.
filename
:
rec
.
filename
=
os
.
path
.
basename
(
rec
.
file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2Record
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/mbi/level2co.py
deleted
100644 → 0
View file @
a87e09f1
import
io
import
os
import
grpc
import
datetime
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.msc
import
Level2CoRecord
,
Level2CoCatalogRecord
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.msc.level2co
import
level2co_pb2
,
level2co_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
Level2CoApi
(
object
):
"""
Level2 Merge Catalog Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2co_pb2_grpc
.
Level2CoSrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve level2 records from database
parameter kwargs:
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level2co_pb2
.
FindLevel2CoReq
(
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
create_time_start
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
create_time_end
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2CoRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
catalog_query
(
self
,
**
kwargs
):
''' retrieve level2catalog records from database
parameter kwargs:
obs_id: [str]
detector_no: [str]
ra: [float] in deg
dec: [float] in deg
radius: [float] in deg
min_mag: [float]
max_mag: [float]
obs_time: (start, end),
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
FindCatalog
.
with_call
(
level2co_pb2
.
FindLevel2CoCatalogReq
(
obs_id
=
get_parameter
(
kwargs
,
"obs_id"
),
detector_no
=
get_parameter
(
kwargs
,
"detector_no"
),
obs_time_start
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
0
],
obs_time_end
=
get_parameter
(
kwargs
,
"obs_time"
,
[
None
,
None
])[
1
],
ra
=
get_parameter
(
kwargs
,
"ra"
),
dec
=
get_parameter
(
kwargs
,
"dec"
),
radius
=
get_parameter
(
kwargs
,
"radius"
),
minMag
=
get_parameter
(
kwargs
,
"min_mag"
),
maxMag
=
get_parameter
(
kwargs
,
"max_mag"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2CoCatalogRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level2co_pb2
.
GetLevel2CoReq
(
id
=
get_parameter
(
kwargs
,
"id"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"data not found"
)
return
Result
.
ok_data
(
data
=
Level2CoRecord
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level2co_pb2
.
UpdateProcStatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_qc2_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc2Status
.
with_call
(
level2co_pb2
.
UpdateQc2StatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a level2 record into database
parameter kwargs:
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
return csst_dfs_common.models.Result
'''
rec
=
level2co_pb2
.
Level2CoRecord
(
id
=
0
,
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
),
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
0
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
,
format_datetime
(
datetime
.
now
())),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
,
""
)
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level2co_pb2
.
WriteLevel2CoReq
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
file_path
,
))
if
not
rec
.
filename
:
rec
.
filename
=
os
.
path
.
basename
(
rec
.
file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2CoRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/sls/level2spectra.py
View file @
5202236f
...
...
@@ -8,7 +8,7 @@ from csst_dfs_commons.models.sls import Level2Spectra
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.sls.level2spectra
import
level2spectra_pb2
,
level2spectra_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.service
import
grpc_channel
from
..common.utils
import
*
class
Level2SpectraApi
(
object
):
...
...
@@ -16,8 +16,10 @@ class Level2SpectraApi(object):
Level2spectra Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2spectra_pb2_grpc
.
Level2spectraSrvStub
(
ServiceProxy
().
channel
())
self
.
stub_class
=
level2spectra_pb2_grpc
.
Level2spectraSrvStub
self
.
stub
=
None
@
grpc_channel
def
find
(
self
,
**
kwargs
):
''' retrieve level2spectra records from database
...
...
@@ -55,6 +57,7 @@ class Level2SpectraApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
...
...
@@ -77,6 +80,7 @@ class Level2SpectraApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -100,6 +104,7 @@ class Level2SpectraApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
update_qc2_status
(
self
,
**
kwargs
):
''' update the status of QC2
...
...
@@ -121,6 +126,7 @@ class Level2SpectraApi(object):
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
@
grpc_channel
def
write
(
self
,
**
kwargs
):
''' insert a level2spectra record into database
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment