Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-dfs
csst-dfs-api-cluster
Commits
2d664214
Commit
2d664214
authored
Dec 08, 2023
by
Wei Shoulin
Browse files
fix deps and version
parent
e348e72b
Changes
9
Hide whitespace changes
Inline
Side-by-side
README.md
View file @
2d664214
...
...
@@ -9,15 +9,6 @@ This package provides APIs to access csst's files and databases.
This library can be installed with the following command:
```
bash
git clone https://github.com/astronomical-data-processing/csst-dfs-api-cluster.git
cd
csst-dfs-api-cluster
pip
install
-r
requirements.txt
python setup.py
install
git clone https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-api-cluster.git
pip
install
./csst-dfs-api-cluster
```
## Configuration
set enviroment variables
*
CSST_DFS_GATEWAY = ip:port
*
CSST_DFS_APP =
*
CSST_DFS_TOKEN =
csst_dfs_api_cluster/facility/__init__.py
View file @
2d664214
...
...
@@ -6,4 +6,7 @@ from .level1 import Level1DataApi
from
.level0
import
Level0DataApi
from
.level0prc
import
Level0PrcApi
from
.level1prc
import
Level1PrcApi
from
.otherdata
import
OtherDataApi
\ No newline at end of file
from
.otherdata
import
OtherDataApi
from
.level2
import
Level2DataApi
from
.level2type
import
Level2TypeApi
\ No newline at end of file
csst_dfs_api_cluster/facility/level2.py
0 → 100644
View file @
2d664214
import
io
import
os
import
grpc
import
datetime
import
pickle
from
collections.abc
import
Iterable
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.level2
import
Level2Record
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.level2
import
level2_pb2
,
level2_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
Level2DataApi
(
object
):
"""
Level2 Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2_pb2_grpc
.
Level2SrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve level2 records from database
parameter kwargs:
level0_id: [str]
level1_id: [int]
module_id: [str]
brick_id: [int]
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
import_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level2_pb2
.
FindLevel2Req
(
level0_id
=
get_parameter
(
kwargs
,
"level0_id"
),
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
create_time_start
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
0
],
create_time_end
=
get_parameter
(
kwargs
,
"create_time"
,
[
None
,
None
])[
1
],
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
1024
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
1024
),
import_status
=
get_parameter
(
kwargs
,
"import_status"
,
1024
),
filename
=
get_parameter
(
kwargs
,
"filename"
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
),
other_conditions
=
{
"test"
:
"cnlab.test"
}
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2Record
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
catalog_query
(
self
,
**
kwargs
):
''' retrieve level2catalog records from database
parameter kwargs:
sql: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try
:
datas
=
io
.
BytesIO
()
totalCount
=
0
resps
=
self
.
stub
.
FindCatalog
(
level2_pb2
.
FindLevel2CatalogReq
(
sql
=
get_parameter
(
kwargs
,
"sql"
,
None
),
limit
=
get_parameter
(
kwargs
,
"limit"
,
0
)
),
metadata
=
get_auth_headers
())
for
resp
in
resps
:
if
resp
.
success
:
datas
.
write
(
resp
.
records
)
totalCount
=
resp
.
totalCount
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
datas
.
flush
()
records
=
pickle
.
loads
(
datas
.
getvalue
())
return
Result
.
ok_data
(
data
=
records
[
0
]).
append
(
"totalCount"
,
totalCount
).
append
(
"columns"
,
records
[
1
])
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
find_existed_brick_ids
(
self
,
**
kwargs
):
''' retrieve existed brick_ids in a single exposure catalog
parameter kwargs:
data_type: [str]
return: csst_dfs_common.models.Result
'''
try
:
resp
=
self
.
stub
.
FindExistedBricks
(
level2_pb2
.
FindExistedBricksReq
(
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
resp
.
brick_ids
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level2_pb2
.
GetLevel2Req
(
id
=
get_parameter
(
kwargs
,
"id"
)
),
metadata
=
get_auth_headers
())
if
resp
.
record
is
None
or
resp
.
record
.
id
==
0
:
return
Result
.
error
(
message
=
f
"data not found"
)
return
Result
.
ok_data
(
data
=
Level2Record
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_proc_status
(
self
,
**
kwargs
):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateProcStatus
.
with_call
(
level2_pb2
.
UpdateProcStatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_qc2_status
(
self
,
**
kwargs
):
''' update the status of QC0
parameter kwargs:
id : [int],
status : [int]
'''
fits_id
=
get_parameter
(
kwargs
,
"id"
)
status
=
get_parameter
(
kwargs
,
"status"
)
try
:
resp
,
_
=
self
.
stub
.
UpdateQc2Status
.
with_call
(
level2_pb2
.
UpdateQc2StatusReq
(
id
=
fits_id
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a level2 record into database
parameter kwargs:
level1_id : [int]
brick_id : [int]
module_id : [str]
object_name: [str]
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
return csst_dfs_common.models.Result
'''
rec
=
level2_pb2
.
Level2Record
(
id
=
0
,
level1_id
=
get_parameter
(
kwargs
,
"level1_id"
,
0
),
brick_id
=
get_parameter
(
kwargs
,
"brick_id"
,
0
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
,
""
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
,
""
),
object_name
=
get_parameter
(
kwargs
,
"object_name"
,
""
),
filename
=
get_parameter
(
kwargs
,
"filename"
,
""
),
file_path
=
get_parameter
(
kwargs
,
"file_path"
,
""
),
qc2_status
=
get_parameter
(
kwargs
,
"qc2_status"
,
0
),
prc_status
=
get_parameter
(
kwargs
,
"prc_status"
,
0
),
prc_time
=
get_parameter
(
kwargs
,
"prc_time"
,
format_datetime
(
datetime
.
now
())),
pipeline_id
=
get_parameter
(
kwargs
,
"pipeline_id"
,
""
)
)
def
stream
(
rec
):
with
open
(
rec
.
file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level2_pb2
.
WriteLevel2Req
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
module_id
:
return
Result
.
error
(
message
=
"module_id is blank"
)
if
not
rec
.
data_type
:
return
Result
.
error
(
message
=
"data_type is blank"
)
if
not
rec
.
file_path
:
return
Result
.
error
(
message
=
"file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
file_path
,
))
if
not
rec
.
filename
:
rec
.
filename
=
os
.
path
.
basename
(
rec
.
file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2Record
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/facility/level2type.py
0 → 100644
View file @
2d664214
import
io
import
os
import
grpc
import
datetime
import
pickle
from
collections.abc
import
Iterable
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.level2
import
Level2TypeRecord
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.facility.level2type
import
level2type_pb2
,
level2type_pb2_grpc
from
..common.service
import
ServiceProxy
from
..common.utils
import
*
class
Level2TypeApi
(
object
):
"""
Level2Type Data Operation Class
"""
def
__init__
(
self
):
self
.
stub
=
level2type_pb2_grpc
.
Level2TypeSrvStub
(
ServiceProxy
().
channel
())
def
find
(
self
,
**
kwargs
):
''' retrieve level2type records from database
parameter kwargs:
module_id: [str]
data_type: [str]
import_status : [int],
page: [int]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
the_limit
=
get_parameter
(
kwargs
,
"limit"
,
100000
)
the_limit
=
the_limit
if
the_limit
>
0
else
100000
try
:
resp
,
_
=
self
.
stub
.
Find
.
with_call
(
level2type_pb2
.
FindLevel2TypeReq
(
module_id
=
get_parameter
(
kwargs
,
"module_id"
),
data_type
=
get_parameter
(
kwargs
,
"data_type"
),
import_status
=
get_parameter
(
kwargs
,
"import_status"
,
1024
),
limit
=
the_limit
,
page
=
get_parameter
(
kwargs
,
"page"
,
1
)
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
from_proto_model_list
(
Level2TypeRecord
,
resp
.
records
)).
append
(
"totalCount"
,
resp
.
totalCount
)
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
get
(
self
,
**
kwargs
):
''' fetch a record from database
parameter kwargs:
data_type: [str]
return csst_dfs_common.models.Result
'''
try
:
resp
,
_
=
self
.
stub
.
Get
.
with_call
(
level2type_pb2
.
GetLevel2TypeReq
(
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
),
metadata
=
get_auth_headers
())
if
not
resp
.
record
or
not
resp
.
record
.
data_type
:
return
Result
.
error
(
message
=
f
"data not found"
)
return
Result
.
ok_data
(
data
=
Level2TypeRecord
().
from_proto_model
(
resp
.
record
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
update_import_status
(
self
,
**
kwargs
):
''' update the status of level2 type
parameter kwargs:
data_type: [str]
status : [int]
return csst_dfs_common.models.Result
'''
data_type
=
get_parameter
(
kwargs
,
"data_type"
)
status
=
get_parameter
(
kwargs
,
"status"
,
0
)
try
:
resp
,
_
=
self
.
stub
.
UpdateImportStatus
.
with_call
(
level2type_pb2
.
UpdateImportStatusReq
(
data_type
=
data_type
,
status
=
status
),
metadata
=
get_auth_headers
()
)
if
resp
.
success
:
return
Result
.
ok_data
()
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
def
write
(
self
,
**
kwargs
):
''' insert a level2type record into database
parameter kwargs:
data_type : [str]
module_id : [str]
key_column : [str]
hdu_index : [int]
demo_filename : [str]
demo_file_path : [str]
ra_column : [str]
dec_column : [str]
return csst_dfs_common.models.Result
'''
rec
=
level2type_pb2
.
Level2TypeRecord
(
data_type
=
get_parameter
(
kwargs
,
"data_type"
,
""
),
module_id
=
get_parameter
(
kwargs
,
"module_id"
,
""
),
key_column
=
get_parameter
(
kwargs
,
"key_column"
,
""
),
hdu_index
=
get_parameter
(
kwargs
,
"hdu_index"
,
0
),
demo_filename
=
get_parameter
(
kwargs
,
"demo_filename"
,
""
),
demo_file_path
=
get_parameter
(
kwargs
,
"demo_file_path"
,
""
),
ra_column
=
get_parameter
(
kwargs
,
"ra_column"
,
""
),
dec_column
=
get_parameter
(
kwargs
,
"dec_column"
,
""
)
)
def
stream
(
rec
):
with
open
(
rec
.
demo_file_path
,
'rb'
)
as
f
:
while
True
:
data
=
f
.
read
(
UPLOAD_CHUNK_SIZE
)
if
not
data
:
break
yield
level2type_pb2
.
WriteLevel2TypeReq
(
record
=
rec
,
data
=
data
)
try
:
if
not
rec
.
data_type
:
return
Result
.
error
(
message
=
"data_type is blank"
)
if
not
rec
.
demo_file_path
:
return
Result
.
error
(
message
=
"demo_file_path is blank"
)
if
not
os
.
path
.
exists
(
rec
.
demo_file_path
):
return
Result
.
error
(
message
=
"the file [%s] not existed"
%
(
rec
.
demo_file_path
,
))
if
not
rec
.
demo_filename
:
rec
.
demo_filename
=
os
.
path
.
basename
(
rec
.
demo_file_path
)
resp
,
_
=
self
.
stub
.
Write
.
with_call
(
stream
(
rec
),
metadata
=
get_auth_headers
())
if
resp
.
success
:
return
Result
.
ok_data
(
data
=
Level2TypeRecord
().
from_proto_model
(
resp
.
record
))
else
:
return
Result
.
error
(
message
=
str
(
resp
.
error
.
detail
))
except
grpc
.
RpcError
as
e
:
return
Result
.
error
(
message
=
"%s:%s"
%
(
e
.
code
().
value
,
e
.
details
()))
csst_dfs_api_cluster/mbi/level2.py
View file @
2d664214
...
...
@@ -8,7 +8,7 @@ from collections.abc import Iterable
from
csst_dfs_commons.models
import
Result
from
csst_dfs_commons.models.common
import
from_proto_model_list
from
csst_dfs_commons.models.msc
import
Level2Record
,
Level2CatalogRecord
from
csst_dfs_commons.models.msc
import
Level2Record
from
csst_dfs_commons.models.constants
import
UPLOAD_CHUNK_SIZE
from
csst_dfs_proto.msc.level2
import
level2_pb2
,
level2_pb2_grpc
...
...
csst_dfs_api_cluster/version.py
0 → 100644
View file @
2d664214
__version__
=
"1.0.0"
\ No newline at end of file
requirements.txt
View file @
2d664214
astropy
>=4.0
grpcio
>=1.28.1
protobuf
==3.9.0
\ No newline at end of file
# base
astropy
==5.3.4
numpy
==1.26.1
# specific
grpcio
==1.53.0
grpcio_tools
==1.53.0
protobuf
==4.22.3
\ No newline at end of file
setup.cfg
View file @
2d664214
[metadata]
# replace with your username:
name = csst_dfs_api_cluster
author =CSST DFS Team.
author_email = weishoulin@astrolab.cn
...
...
@@ -7,16 +6,18 @@ description = CSST DFS Cluster APIs Library.
long_description = file: README.md
long_description_content_type = text/markdown
keywords = astronomy, astrophysics, cosmology, space, CSST
url = https://github.com/astronomical-data-processing/csst-dfs-api-cluster
project_urls =
Bug Tracker = https://github.com/astronomical-data-processing/csst-dfs-api-cluster/issues
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: MIT License
Operating System :: OS Independent
url = https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-api-cluster
[options]
packages = find:
python_requires = >=3.7
zip_safe = False
setup_requires = setuptools_scm
[bumpver]
current_version = "1.0.0"
version_pattern = "MAJOR.MINOR.PATCH[PYTAGNUM]"
commit_message = "Release {new_version}"
commit = True
tag = True
push = True
[bumpver:file_patterns]
setup.cfg =
current_version = "{version}"
csst_dfs_api_cluster/version.py =
__version__ = "{version}"
\ No newline at end of file
setup.py
View file @
2d664214
# coding: utf-8
import
os
from
setuptools
import
setup
from
distutils.core
import
setup
setup
(
use_scm_version
=
{
'write_to'
:
os
.
path
.
join
(
'csst_dfs_api_cluster'
,
'_version.py'
)})
setup_pars
=
{
"packages"
:
[
'csst_dfs_api_cluster'
,
'csst_dfs_api_cluster.common'
,
'csst_dfs_api_cluster.cpic'
,
'csst_dfs_api_cluster.facility'
,
'csst_dfs_api_cluster.hstdm'
,
'csst_dfs_api_cluster.ifs'
,
'csst_dfs_api_cluster.mbi'
,
'csst_dfs_api_cluster.mci'
,
'csst_dfs_api_cluster.sls'
,
],
"package_dir"
:
{
'csst_dfs_api_cluster'
:
'csst_dfs_api_cluster'
,
'csst_dfs_api_cluster.common'
:
'csst_dfs_api_cluster/common'
,
'csst_dfs_api_cluster.cpic'
:
'csst_dfs_api_cluster/cpic'
,
'csst_dfs_api_cluster.facility'
:
'csst_dfs_api_cluster/facility'
,
'csst_dfs_api_cluster.hstdm'
:
'csst_dfs_api_cluster/hstdm'
,
'csst_dfs_api_cluster.ifs'
:
'csst_dfs_api_cluster/ifs'
,
'csst_dfs_api_cluster.mbi'
:
'csst_dfs_api_cluster/mbi'
,
'csst_dfs_api_cluster.mci'
:
'csst_dfs_api_cluster/mci'
,
'csst_dfs_api_cluster.sls'
:
'csst_dfs_api_cluster/sls'
,
},
}
def
requirements
():
with
open
(
"requirements.txt"
,
"r"
)
as
f
:
return
[
req
.
strip
()
for
req
in
f
.
readlines
()
if
not
req
.
startswith
(
"#"
)
and
req
.
__contains__
(
"=="
)
]
def
version
():
__version
=
{}
version_path
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"csst_dfs_api_cluster"
,
"version.py"
)
with
open
(
version_path
,
"r"
)
as
file
:
exec
(
file
.
read
(),
__version
)
return
__version
[
"__version__"
]
setup
(
name
=
"csst_dfs_api_cluster"
,
version
=
version
(),
description
=
"API's to access CSST Data Flow System (DFS)"
,
long_description
=
open
(
'README.md'
).
read
(),
license
=
"MIT"
,
python_requires
=
">=3.7"
,
install_requires
=
requirements
(),
zip_safe
=
False
,
classifiers
=
[
"Development Status :: 4 - Beta"
,
"Intended Audience :: Science/Research"
,
"License :: OSI Approved :: MIT License"
,
"Operating System :: OS Independent"
,
"Programming Language :: Python :: 3"
,
"Topic :: Scientific/Engineering :: Physics"
,
"Topic :: Scientific/Engineering :: Astronomy"
,
],
include_package_data
=
False
,
project_urls
=
{
'Source'
:
'https://csst-tb.bao.ac.cn/code/csst-dfs/csst-dfs-api-cluster'
,
},
**
setup_pars
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment