Commit ed18ff4b authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Added functions wrapping the original get/info/open ones, adding bucket information

parent 43c8eaa7
...@@ -15,48 +15,53 @@ Supported are two distinct ways of reading from s3 storage. ...@@ -15,48 +15,53 @@ Supported are two distinct ways of reading from s3 storage.
**老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别** **老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别**
如果需要读S3时,需要传入s3的密钥和endpoint等配置,有两种方法可选 如果需要读S3时,需要传入s3的密钥和endpoint等配置,有两种方法可选
The used s3 bucket is configured through an env variable. S3 credentials and options are set to working default values. They can be overwritten if needed, e.g.
S3_KEY=dummy_key
S3_SECRET=dummy_secret
S3_ENDPOINT_URL=http://oss-cn-hangzhou-zjy-d01-a.ops.cloud.zhejianglab.com
S3_BUCKET=data-and-computing
## 从s3下载到本地 ## 从s3下载到本地
```python ```python
def get(key: str, local_path: str): def get_path(remote_path: str, local_path: str):
""" """
Download a file/folder from s3 to local storage. Download a file/folder from s3 to local storage.
Args: Args:
key: s3 key remote_path: s3 key
local_path: Local path that will be downloaded to. local_path: Local path that will be downloaded to.
""" """
def info(key: str): def info_path(remote_path: str):
""" """
Get information about s3 file. Get information about a s3 file.
Args: Args:
key: s3 key remote_path: s3 key
""" """
# Example: # Example:
from csst_fs import s3_fs from csst_fs import s3_fs
# single file # single file
s3_fs.get('gaia/test/requirements.txt', 'requirements.txt') s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', 'v01.fits')
# folder # folder
s3_fs.get('gaia/data', './', recursive=True) s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0', './', recursive=True)
# get file or folder info # get file or folder info
s3_fs.info('gaia/data') s3_fs.info_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits')
``` ```
## Open for read ## Open for read
```python ```python
def open(key: str): def open_path(remote_path: str, mode: str = 'r'):
""" """
Get a readonly file object from a file on s3. Get a readonly file object from a file on s3. Use mode = 'rb' for binary files.
Args: Args:
key: s3 key remote_path: s3 key
mode: str = 'r' For binary files: 'rb', default: 'r'
Returns: Returns:
File object of the s3 file. File object of the s3 file.
""" """
...@@ -64,7 +69,7 @@ def open(key: str): ...@@ -64,7 +69,7 @@ def open(key: str):
# Example: # Example:
from csst_fs import s3_fs from csst_fs import s3_fs
# open single file (s3 or local) # open single file (s3 or local)
with s3_fs.open('gaia/data') as file: with s3_fs.open_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', mode='rb') as file:
file.read() file.read()
``` ```
...@@ -76,7 +81,8 @@ The function will return a successfull response as soon as the file content is s ...@@ -76,7 +81,8 @@ The function will return a successfull response as soon as the file content is s
A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state. A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
## Configuration ## Configuration
The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly. The helper will send HTTP requests to an external API. The CSST_BACKEND_API_URL env variable should be set accordingly. E.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
## Function: `start_ingestion_task` ## Function: `start_ingestion_task`
......
import os import os
import json
default_s3_options = { default_s3_options = {
'key': 'HZv6qvhH1ZImspVK', 'key': 'HZv6qvhH1ZImspVK',
'secret': 'wCDhQu63fD89rg5A05vcRIb6KfLAxS', 'secret': 'wCDhQu63fD89rg5A05vcRIb6KfLAxS',
'endpoint_url': 'http://oss-cn-hangzhou-zjy-d01-a.ops.cloud.zhejianglab.com' 'endpoint_url': 'http://oss-cn-hangzhou-zjy-d01-a.ops.cloud.zhejianglab.com',
}
default_s3_settings = {
'bucket': 'data-and-computing'
} }
def load_from_env(): def load_from_env():
s3_options = { s3_options = {
'key': os.getenv('S3_KEY'), 'key': os.getenv('S3_KEY'),
'secret': os.getenv('S3_SECRET'), 'secret': os.getenv('S3_SECRET'),
'endpoint_url': os.getenv('S3_ENDPOINT_URL') 'endpoint_url': os.getenv('S3_ENDPOINT_URL'),
} }
return s3_options return s3_options
def load_settings_from_env():
s3_settings = {
'bucket': os.getenv('S3_BUCKET'),
}
return s3_settings
def load_s3_options(): def load_s3_options():
if 'S3_KEY' in os.environ: if 'S3_KEY' in os.environ:
return load_from_env() return load_from_env()
return default_s3_options return default_s3_options
\ No newline at end of file
def load_s3_settings():
if 'S3_BUCKET' in os.environ:
return load_settings_from_env()
return default_s3_settings
import fsspec import fsspec
from .s3_config import load_s3_options from .s3_config import load_s3_options, load_s3_settings
from .fsspec_fileobj import open_fileobj from .fsspec_fileobj import open_fileobj
def put(lpath, rpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBACK, maxdepth=None, s3_options=load_s3_options(), **kwargs,): def put(lpath, rpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBACK, maxdepth=None, s3_options=load_s3_options(), **kwargs,):
...@@ -10,13 +10,27 @@ def get(rpath, lpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBAC ...@@ -10,13 +10,27 @@ def get(rpath, lpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBAC
s3_fs = fsspec.filesystem('s3', **s3_options) s3_fs = fsspec.filesystem('s3', **s3_options)
s3_fs.get(rpath, lpath, recursive, callback, maxdepth, **kwargs) s3_fs.get(rpath, lpath, recursive, callback, maxdepth, **kwargs)
def get_path(remote_path, local_path, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBACK, maxdepth=None, s3_options=load_s3_options(), **kwargs,):
s3_fs = fsspec.filesystem('s3', **s3_options)
bucketAndKey = load_s3_settings()['bucket'] + "/" + remote_path
s3_fs.get(bucketAndKey, local_path, recursive, callback, maxdepth, **kwargs)
def info(path, bucket=None, key=None, refresh=False, version_id=None, s3_options=load_s3_options()) -> dict: def info(path, bucket=None, key=None, refresh=False, version_id=None, s3_options=load_s3_options()) -> dict:
s3_fs = fsspec.filesystem('s3', **s3_options) s3_fs = fsspec.filesystem('s3', **s3_options)
return s3_fs.info(path, bucket, key, refresh, version_id) return s3_fs.info(path, bucket, key, refresh, version_id)
def info_path(remote_path, refresh=False, version_id=None, s3_options=load_s3_options()) -> dict:
s3_fs = fsspec.filesystem('s3', **s3_options)
bucketAndKey = load_s3_settings()['bucket'] + "/" + remote_path
return s3_fs.info(bucketAndKey, None, None, refresh, version_id)
def open(file: str, mode: str='r', s3_options=load_s3_options()): def open(file: str, mode: str='r', s3_options=load_s3_options()):
return open_fileobj(path=file, s3_options=s3_options, mode=mode) return open_fileobj(path=file, s3_options=s3_options, mode=mode)
def open_path(path: str, mode: str='r', s3_options=load_s3_options()):
bucketAndKey = "s3://" + load_s3_settings()['bucket'] + "/" + path
return open_fileobj(path=bucketAndKey, s3_options=s3_options, mode=mode)
def isfile(path: str, s3_options: dict=load_s3_options()) -> bool: def isfile(path: str, s3_options: dict=load_s3_options()) -> bool:
s3_fs = fsspec.filesystem('s3', **s3_options) s3_fs = fsspec.filesystem('s3', **s3_options)
return s3_fs.isfile(path) return s3_fs.isfile(path)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment