Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-dfs
csst-dfs-api-cluster
Commits
a9ef62c1
Commit
a9ef62c1
authored
Apr 15, 2021
by
Wei Shoulin
Browse files
ephem
parent
0d2a7a78
Changes
6
Hide whitespace changes
Inline
Side-by-side
README.md
View file @
a9ef62c1
...
...
@@ -16,6 +16,8 @@ python setup.py install
```
## Configuration
set enviroment variable
set enviroment variable
s
CSST_DFS_GATEWAY = ip:port
*
CSST_DFS_GATEWAY = ip:port
*
CSST_DFS_APP =
*
CSST_DFS_TOKEN =
csst_dfs_api_cluster/common/catalog.py
View file @
a9ef62c1
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_proto.common.ephem
import
ephem_pb2
,
ephem_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.constants
import
*
from
.service
import
ServiceProxy
from
.constants
import
*
from
.utils
import
get_auth_headers
class
CatalogApi
(
object
):
def
__init__
(
self
):
self
.
proxy
=
ServiceProxy
()
self
.
stub
=
self
.
proxy
.
insecure
(
ephem_pb2_grpc
.
EphemSearchSrvStub
)
self
.
stub
=
ephem_pb2_grpc
.
EphemSearchSrvStub
(
ServiceProxy
().
channel
())
def
gaia3_query
(
self
,
ra
:
float
,
dec
:
float
,
radius
:
float
,
min_mag
:
float
,
max_mag
:
float
,
obstime
:
int
,
limit
:
int
):
''' retrieval GAIA DR 3
...
...
@@ -19,10 +19,11 @@ class CatalogApi(object):
max_mag: maximal magnitude
obstime: seconds
limit: limits returns the number of records
return:
a dict as {success: true, totalCount: 100, records:[.....]}
return:
csst_dfs_common.models.Result
'''
try
:
resp
=
self
.
stub
.
Gaia3Search
(
ephem_pb2
.
EphemSearchRequest
(
resp
,
_
=
self
.
stub
.
Gaia3Search
.
with_call
(
ephem_pb2
.
EphemSearchRequest
(
ra
=
ra
,
dec
=
dec
,
radius
=
radius
,
...
...
@@ -30,7 +31,7 @@ class CatalogApi(object):
maxMag
=
max_mag
,
obstime
=
obstime
,
limit
=
limit
))
),
metadata
=
get_auth_headers
(
))
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
resp
.
records
).
append
(
"totalCount"
,
resp
.
totalCount
)
...
...
csst_dfs_api_cluster/common/service.py
View file @
a9ef62c1
import
os
import
grpc
from
csst_dfs_commons.models.errors
import
CSSTFatalException
class
ServiceProxy
:
def
__init__
(
self
):
self
.
gateway
=
os
.
getenv
(
"CSST_DFS_GATEWAY"
,
'172.31.248.218:30880'
)
def
insecure
(
self
,
stubCls
):
def
channel
(
self
):
options
=
[(
'grpc.max_send_message_length'
,
1000
*
1024
*
1024
),
(
'grpc.max_receive_message_length'
,
1000
*
1024
*
1024
),
(
'grpc.enable_retries'
,
1
),
(
'grpc.service_config'
,
'{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }'
)]
channel
=
grpc
.
insecure_channel
(
self
.
gateway
,
options
=
options
)
return
stubCls
(
channel
)
\ No newline at end of file
try
:
grpc
.
channel_ready_future
(
channel
).
result
(
timeout
=
10
)
except
grpc
.
FutureTimeoutError
:
raise
CSSTFatalException
(
'Error connecting to server {}'
.
format
(
self
.
gateway
))
else
:
return
channel
\ No newline at end of file
csst_dfs_api_cluster/common/utils.py
View file @
a9ef62c1
import
os
from
datetime
import
datetime
import
time
...
...
@@ -47,4 +48,7 @@ def singleton(cls):
if
cls
not
in
_instance
:
_instance
[
cls
]
=
cls
()
return
_instance
[
cls
]
return
inner
\ No newline at end of file
return
inner
def
get_auth_headers
():
return
((
"csst_dfs_app"
,
os
.
getenv
(
"CSST_DFS_APP_ID"
)),(
"csst_dfs_token"
,
os
.
getenv
(
"CSST_DFS_APP_TOKEN"
)),)
\ No newline at end of file
csst_dfs_api_cluster/ifs/fits.py
View file @
a9ef62c1
import
os
import
grpc
from
csst_dfs_commons.models
import
Result
from
csst_dfs_proto.ifs.fits
import
fits_pb2
,
fits_pb2_grpc
from
..common.service
import
ServiceProxy
...
...
@@ -10,8 +11,7 @@ from ..common.constants import UPLOAD_CHUNK_SIZE
class
FitsApi
(
object
):
def
__init__
(
self
,
sub_system
=
"ifs"
):
self
.
sub_system
=
sub_system
self
.
proxy
=
ServiceProxy
()
self
.
stub
=
self
.
proxy
.
insecure
(
fits_pb2_grpc
.
FitsSrvStub
)
self
.
stub
=
fits_pb2_grpc
.
FitsSrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
'''
...
...
@@ -21,20 +21,30 @@ class FitsApi(object):
exp_time = (start, end),
ccd_num = [int],
qc0_status = [int],
prc_status = [int]
prc_status = [int],
limit = [int]
return list of raw records
'''
return
self
.
stub
.
Find
(
fits_pb2
.
FindRawFitsReq
(
obs_time
=
get_parameter
(
kwargs
,
"obs_time"
),
file_name
=
get_parameter
(
kwargs
,
"file_name"
),
exp_time_start
=
get_parameter
(
kwargs
,
"exp_time"
,
[
None
,
None
])[
0
],
exp_time_end
=
get_parameter
(
kwargs
,
"exp_time"
,
[
None
,
None
])[
1
],
ccd_num
=
get_parameter
(
kwargs
,
"ccd_num"
),
qc0_status
=
get_parameter
(
kwargs
,
"qc0_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
))
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
fits_pb2
.
FindRawFitsReq
(
obs_time
=
get_parameter
(
kwargs
,
"obs_time"
),
file_name
=
get_parameter
(
kwargs
,
"file_name"
),
exp_time_start
=
get_parameter
(
kwargs
,
"exp_time"
,
[
None
,
None
])[
0
],
exp_time_end
=
get_parameter
(
kwargs
,
"exp_time"
,
[
None
,
None
])[
1
],
ccd_num
=
get_parameter
(
kwargs
,
"ccd_num"
),
qc0_status
=
get_parameter
(
kwargs
,
"qc0_status"
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
resp
.
rawFits
).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
resp
.
message
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
))
def
get
(
self
,
**
kwargs
):
''' query database, return a record as dict
...
...
@@ -44,9 +54,16 @@ class FitsApi(object):
return dict or None
'''
return
self
.
stub
.
Get
(
fits_pb2
.
GetRawFitsReq
(
fits_id
=
get_parameter
(
kwargs
,
"fits_id"
)
))
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
fits_pb2
.
GetRawFitsReq
(
fits_id
=
get_parameter
(
kwargs
,
"fits_id"
)
),
metadata
=
get_auth_headers
())
return
Result
.
ok_data
(
data
=
resp
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
))
def
read
(
self
,
**
kwargs
):
''' yield bytes of fits file
...
...
@@ -59,17 +76,17 @@ class FitsApi(object):
yield bytes of fits file
'''
try
:
streams
=
self
.
stub
.
Read
(
fits_pb2
.
ReadRawFitsReq
(
streams
=
self
.
stub
.
Read
.
with_call
(
fits_pb2
.
ReadRawFitsReq
(
fits_id
=
get_parameter
(
kwargs
,
"fits_id"
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
),
chunk_size
=
get_parameter
(
kwargs
,
"chunk_size"
,
20480
)
))
),
metadata
=
get_auth_headers
(
))
for
stream
in
streams
:
yield
stream
.
data
except
Exception
as
e
:
print
(
e
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
)
)
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -81,10 +98,13 @@ class FitsApi(object):
fits_id
=
get_parameter
(
kwargs
,
"fits_id"
)
status
=
get_parameter
(
kwargs
,
"status"
),
try
:
return
self
.
stub
.
update_qc0_status
((
fits_pb2
.
UpdateProcStatusReq
(
fits_id
=
fits_id
,
status
=
status
)))
except
grpc
.
RpcError
as
identifier
:
print
(
identifier
)
resp
,
_
=
self
.
stub
.
update_qc0_status
.
with_call
(
fits_pb2
.
UpdateProcStatusReq
(
fits_id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
return
Result
.
ok_data
(
data
=
resp
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
))
def
update_qc0_status
(
self
,
**
kwargs
):
''' update the status of reduction
...
...
@@ -96,11 +116,13 @@ class FitsApi(object):
fits_id
=
get_parameter
(
kwargs
,
"fits_id"
)
status
=
get_parameter
(
kwargs
,
"status"
),
try
:
return
self
.
stub
.
update_qc0_status
((
fits_pb2
.
UpdateQc0StatusReq
(
fits_id
=
fits_id
,
status
=
status
)))
except
grpc
.
RpcError
as
identifier
:
print
(
identifier
)
resp
,
_
=
self
.
stub
.
update_qc0_status
.
with_call
(
fits_pb2
.
UpdateQc0StatusReq
(
fits_id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
return
Result
.
ok_data
(
data
=
resp
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
))
def
write
(
self
,
**
kwargs
):
''' copy a local file to file storage, then reduce the header of fits file and insert a record into database
...
...
@@ -123,9 +145,13 @@ class FitsApi(object):
yield
fits_pb2
.
WriteRawFitsReq
(
file_name
=
v_file_name
,
fitsData
=
data
)
return
self
.
stub
.
Write
(
stream
(
file_path
))
try
:
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
file_path
),
metadata
=
get_auth_headers
())
return
Result
.
ok_data
(
data
=
resp
)
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
))
tests/test_common_catalog.py
View file @
a9ef62c1
...
...
@@ -10,5 +10,5 @@ class CommonEphemTestCase(unittest.TestCase):
self
.
api
=
CatalogApi
()
def
test_gaia3_query
(
self
):
recs
=
self
.
api
.
gaia3_query
(
ra
=
56.234039029108935
,
dec
=
14.466534827703473
,
radius
=
0.
1
,
min_mag
=
4
,
max_mag
=
1
2
,
obstime
=
-
1
,
limit
=
2
)
recs
=
self
.
api
.
gaia3_query
(
ra
=
38.444
,
dec
=
55
,
radius
=
1
,
min_mag
=
-
1
,
max_mag
=
-
1
,
obstime
=
-
1
,
limit
=
2
)
print
(
'find:'
,
recs
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment