Commits (20)
This repository provides the following functionalities:
1. [Read or Download a File From S3 Storage](#1-read-or-download-a-file-from-s3-storage)
2. [Commit For File Processing](#2-commit-for-file-processing)
3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values)
4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state)
5. [Query a Star Catalog](#5-query-a-star-catalog)
# 1. Read or Download a File from S3 storage
Supported are two distinct ways of reading from s3 storage.
1) [Download to a local file](#从s3下载到本地)
2) [use open() to get a file object](#open-for-read)
## Configuration
**astropy 需升级至 5.3**
**老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别**
**老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别**
如果需要读写S3时,需要传入s3的密钥和endpoint等配置,有两种方法可选
### 方法1 环境变量
执行下面三个环境变量,本文档下面介绍到的所有方法都会尝试读取环境变量以获取配置
## 从s3下载到本地
```python
s3_options = {
'key': os.getenv('S3_KEY'),
'secret': os.getenv('S3_SECRET'),
'endpoint_url': os.getenv('S3_ENDPOINT_URL')
}
```
### 方法2 每次调用方法时传入 s3_options
```
在第一个kwargs参数位置指定s3_options, s3_options示例:
```json
s3_options = {
"key": "minioadmin",
"secret": "minioadmin",
"endpoint_url": "http://localhost:9000"
}
def get_path(remote_path: str, local_path: str):
"""
Download a file/folder from s3 to local storage.
```
## 本地到s3的上传与下载
### 上传
```python
from csst_fs import s3_fs
# single file,s3_options from env
s3_fs.put('requirements.txt', 's3://csst-prod/gaia/test/requirements.txt')
# single file,s3_options from function parameter
s3_fs.put('requirements.txt', 's3://csst-prod/gaia/test/requirements.txt', s3_options=s3_options)
# folder,to s3 s3://csst-prod/common
s3_fs.put('./common', 's3://csst-prod/', recursive=True)
s3_fs.put('./common', 's3://csst-prod/', s3_options=s3_options, recursive=True)
Args:
remote_path: s3 key
local_path: Local path that will be downloaded to.
"""
```
def info_path(remote_path: str):
"""
Get information about a s3 file.
### 下载
```python
Args:
remote_path: s3 key
"""
# Example:
from csst_fs import s3_fs
# single file
s3_fs.get('s3://csst-prod/gaia/test/requirements.txt', 'requirements.txt')
s3_fs.get('s3://csst-prod/gaia/test/requirements.txt', 'requirements.txt', s3_options=s3_options)
s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', 'v01.fits')
# folder
s3_fs.get('s3://csst-prod/gaia/data', './', recursive=True)
s3_fs.get('s3://csst-prod/gaia/data', './', s3_options=s3_options, recursive=True)
s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0', './', recursive=True)
# get file or folder info
s3_fs.info('s3://csst-prod/gaia/data')
s3_fs.info('s3://csst-prod/gaia/test/requirements.txt', s3_options=s3_options)
s3_fs.info_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits')
```
### Open for read/write
## Open for read
```python
def open_path(remote_path: str, mode: str = 'r'):
"""
Get a readonly file object from a file on s3. Use mode = 'rb' for binary files.
Args:
remote_path: s3 key
mode: str = 'r' For binary files: 'rb', default: 'r'
Returns:
File object of the s3 file.
"""
# Example:
from csst_fs import s3_fs
# open single file (s3 or local)
with s3_fs.open('s3://csst-prod/gaia/data') as file:
with s3_fs.open_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', mode='rb') as file:
file.read()
with s3_fs.open('s3://csst-prod/gaia/test/requirements.txt', s3_options=s3_options, mode='w') as file:
file.write("CSST")
```
### Check if the given file path exists
```python
from csst_fs import fs
# local or on s3, depending on the given path
fs.isfile('requirements.txt')
fs.isfile('s3://csst-prod/test.txt')
fs.isfile('s3://csst-prod/test.txt', s3_options=s3_options)
```
### Delete a file from local or s3
```python
from csst_fs import fs
# local or on s3, depending on the given path
fs.delete('requirements.txt') # uses os.remove
fs.delete('test', dir_fd=1)
fs.delete('s3://csst-prod/test.txt') # uses fsspec.delete
fs.delete('s3://csst-prod/test.txt', recursive=True, maxdepth=3)
fs.delete('s3://csst-prod/test.txt', s3_options=s3_options)
```
# 2. Commit For File Processing
## astropy直接读写s3的写法适配
### fits.open
#### 老写法
```python
fits.open(path)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/files.html#open](https://docs.astropy.org/en/stable/io/fits/api/files.html#open)
#### 新写法
```python
from csst_fs import fsspec_fits
fsspec_fits.open("s3://csst-prod/gaia/xx.fits")
fsspec_fits.open("s3://csst-prod/gaia/xx.fits", s3_options=s3_options)
```
Submit a file's content and file name to the ingestion API for further processing.
The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately.
A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
### fits.getheader
#### 老写法
```python
fits.getheader(filename=in_image_path, ext=1)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/files.html#getheader](https://docs.astropy.org/en/stable/io/fits/api/files.html#getheader)
## Configuration
The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
## Function: `start_ingestion_task`
#### 新写法
```python
from csst_fs import fsspec_fits
fsspec_fits.getheader(filename=in_image_path, ext=1)
fsspec_fits.getheader(filename=in_image_path, ext=1, s3_options=s3_options)
def start_ingestion_task(file_content: str, file_name: str) -> dict:
"""
Submit a file's content and file name to the ingestion API.
Args:
file_content (str): The file's content as string representation
file_name (str): The file name for storing the file after ingestion.
Returns:
dict: A dict containing a task_id, referring the the queued processing task's id.
E.g.
{
"task_id": "5",
}
Raises:
RuntimeError: If the ingestion API or data upload fails after retries.
"""
```
### fits.getdata
#### 老写法
```python
fits.getdata(in_ref_flat)
fits.getdata( in_ref_shutter, ext=1)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/files.html#getdata](https://docs.astropy.org/en/stable/io/fits/api/files.html#getdata)
# 3. Query a List Of L1/L2 Fits-Files By Metadata Values
Query for file info by metadata values.
#### 新写法
```python
from csst_fs import fsspec_fits
fsspec_fits.getdata(in_ref_flat)
fsspec_fits.getdata(in_ref_flat, s3_options=s3_options)
fsspec_fits.getdata( in_ref_shutter, ext=1)
fsspec_fits.getdata( in_ref_shutter, s3_options=s3_options, ext=1)
```
### fits.getval
#### 老写法
```python
fits.getval(filename, keyword)
fits.getval(filename, keyword, ext=1)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/files.html#getdata](https://docs.astropy.org/en/stable/io/fits/api/files.html#getval)
## Configuration
The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
#### 新写法
## Function: `query_metadata`
```python
from csst_fs import fsspec_fits
fsspec_fits.getval(filename, keyword)
fsspec_fits.getval(filename, keyword, s3_options=s3_options)
fsspec_fits.getval(filename, keyword, ext=1)
fsspec_fits.getval(filename, keyword, s3_options=s3_options, ext=1)
def query_metadata(
filter: Dict[str, Any],
key: List[str],
hdu: int = 0
) -> List[Dict[str, Any]]:
"""
Query for file info by metadata values.
Args:
filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["CABEND", "qc_status"]
then returns:
[
{
"urn": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
"metadata": {
"CABEND": "59785.82529",
"qc_status": "0.0"
},
"removed": false,
"created": 1756284502817,
"parentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/",
"name": "CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
"lastModified": 1756284502817,
"grandParentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/",
"platform": "s3",
"tags": [
"L1"
]
}
]
"""
```
### header.tofile
#### 老写法
## Filter Syntax
All filters are combined with logical AND (every clause must match).
1) String equality
```python
header.tofile(out_head_path)
filter = {
"dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
"obs_type": "WIDE",
}
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/headers.html#astropy.io.fits.Header.tofile](https://docs.astropy.org/en/stable/io/fits/api/headers.html#astropy.io.fits.Header.tofile)
#### 新写法
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
```python
from csst_fs import fsspec_header
fsspec_header.tofile(header, out_head_path)
fsspec_header.tofile(header, out_head_path, s3_options=s3_options)
```
### header.fromfile
#### 老写法
```python
header.fromfile(filename)
filter = {
"dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
"ra": {
"gte": 250,
"lte": 260
},
"qc_status": 0,
}
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/headers.html#astropy.io.fits.Header.fromfile](https://docs.astropy.org/en/stable/io/fits/api/headers.html#astropy.io.fits.Header.fromfile)
#### 新写法
3) Timestamp equality and ranges
```python
from csst_fs import fsspec_header
fsspec_header.fromfile(filename)
fsspec_header.fromfile(filename, s3_options=s3_options)
filter = {
"created_date": "2015-08-04T11:00:00",
"obs_date": {
"gt": "2015-06-01T10:00:00",
"lt": "2015-07-01T10:00:00",
},
}
```
# 4. Query a L2 Processing Tasks State
Query the processing state of a processing task given a L2 task id.
### HDUList.writeto
#### 老写法
```python
hdul_img.writeto(hdul_img, out_combined_img, overwrite=True)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/hdulists.html#astropy.io.fits.HDUList.writeto](https://docs.astropy.org/en/stable/io/fits/api/hdulists.html#astropy.io.fits.HDUList.writeto)
#### 新写法
```python
from csst_fs import fsspec_HDUList
fsspec_HDUList.writeto(hdul_img, out_combined_img, overwrite=True)
fsspec_HDUList.writeto(hdul_img, out_combined_img, s3_options=s3_options, overwrite=True)
```
## Configuration
The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
### HDUList.fromfile
#### 老写法
## Function: `query_task_state`
```python
hdul_img = fits.HDUList.fromfile("hdulist.fits")
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/hdulists.html#astropy.io.fits.HDUList.fromfile](https://docs.astropy.org/en/stable/io/fits/api/hdulists.html#astropy.io.fits.HDUList.fromfile)
#### 新写法
```python
from csst_fs import fsspec_HDUList
hdul_img = fsspec_HDUList.fromfile("hdulist.fits")
hdul_img = fsspec_HDUList.fromfile("hdulist.fits", cache=False, s3_options=s3_options)
def query_task_state(
task_id: str
) -> Dict[str, Any]
"""
Query the processing state of a processing task given a L2 task id.
Args:
task_id: Task id of the L2 processing task
Returns:
Dictionary of the following format, including information about the current state of the corresponding processing task.
The following strings are valid state values: tbd
E.g.
{
"state": "submission_pending",
}
"""
```
# 5. Query a Star Catalog
Query a star catalog by column values given a ra, dec and radius preselection.
### table.Table.read
#### 老写法
```python
from astropy import table
table.Table.read(out_gaia_ldac, hdu=2)
```
usage reference:
[https://docs.astropy.org/en/stable/api/astropy.table.Table.html#astropy.table.Table.read](https://docs.astropy.org/en/stable/api/astropy.table.Table.html#astropy.table.Table.read
)
## Configuration
The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
#### 新写法
## Function: `query_star_catalog`
```python
from csst_fs import fsspec_table
fsspec_table.read(out_gaia_ldac, hdu=2)
fsspec_table.read(out_gaia_ldac, s3_options=s3_options, hdu=2)
def query_star_catalog(
catalog_name: str,
filter: Dict[str, Any],
key: List[str],
) -> List[Dict[str, Any]]:
"""
Query a star catalog by column values given a ra, dec and radius preselection.
Args:
catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
filter: The filter dict described below.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Ra, dec values pinpoint a location, 'radius' defines a radius in [deg] around this point.
Only star catalog objects withing this area are considered for subsequent filtering.
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the colum names that should be present in the return value.
Returns:
A List[Dict] of matching star catalog objects, containing key-value pairs for the keys set as 'key' parameter.
E.g. with key = ["x", "bulge_flux", "ab"]
then returns:
[
{
"x": 995.27,
"bulge_flux": "3.2",
"ab": 1.2,
},
]
"""
```
### table.Table.write
#### 老写法
```python
ps.write(ref, format='fits', overwrite=True)
```
usage reference:
[https://docs.astropy.org/en/stable/api/astropy.table.Table.html#astropy.table.Table.write](https://docs.astropy.org/en/stable/api/astropy.table.Table.html#astropy.table.Table.write)
#### 新写法
## Filter Syntax
All filters are combined with logical AND (every clause must match).
1) String equality
```python
from csst_fs import fsspec_table
fsspec_table.write(ps, ref, format='fits', overwrite=True)
fsspec_table.write(ps, ref, format='fits', s3_options=s3_options, overwrite=True)
filter = {
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
"msc_photid": "00101000703350610200001812",
"detector": "06",
}
```
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
```python
filter = {
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
"msc_photid": "00101000703350610200001812",
"x": {
"gte": 996,
"lte": 1000,
},
"ratio_disk": -9999,
}
```
\ No newline at end of file
......@@ -3,4 +3,7 @@ from .fits import fsspec_header
from .fits import fsspec_HDUList
from .table import fsspec_table
from . import s3_fs
from . import fs
\ No newline at end of file
from . import fs
from .catalog.metadata import query_metadata
from .catalog.star import query_star_catalog
from .ingestion.level2 import start_ingestion_task, query_task_state
\ No newline at end of file
import os
import requests
from typing import Dict, Any, List
from ..s3_config import load_backend_settings
def query_metadata(
filter: Dict[str, Any],
key: List[str],
hdu: int = 0
) -> List[Dict[str, Any]]:
"""
Query for file info by metadata values.
Args:
filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"]
then returns:
[
{
"file_path": "CSST_L0/MSC/SCI/60310/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_03_L0_V01.fits",
"metadata": {
"dataset":"csst-msc-c11-1000sqdeg-wide-test-v2",
"instrument":"MSC",
"obs_group":"W1",
"obs_id":"10200000000"
},
},
]
"""
if not filter:
raise ValueError("Filter cannot be empty")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/datasync/metadata/query"
payload = {
"filter": filter,
"key": key,
"hdu": hdu
}
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Metadata query failed: {e}")
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
import os
import requests
from typing import Dict, Any, List
from ..s3_config import load_backend_settings
def query_star_catalog(
catalog_name: str,
filter: Dict[str, Any],
key: List[str],
) -> List[Dict[str, Any]]:
"""
Query a star catalog by column values given a ra, dec and radius preselection.
Args:
catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
filter: The filter dict described below.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the column names that should be present in the return value.
Returns:
A List[Dict] of matching star catalog objects.
"""
for required_key in ("ra", "dec", "radius"):
if required_key not in filter or isinstance(filter[required_key], dict):
raise ValueError(f"Filter must contain scalar '{{{required_key}}}'")
if not key:
raise ValueError("Key list cannot be empty")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/starcatalog/query"
payload = {
"catalogName": catalog_name,
"filter": filter,
"key": key,
}
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Star catalog query failed: {e}")
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
import os
import base64
import requests
import time
from typing import Dict, Any
from csst_fs import s3_fs
from ..s3_config import load_backend_settings
def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
"""
Submit a file for ingestion by:
1. Requesting an OSS upload path from the ingestion API.
2. Uploading the file to the returned OSS path.
3. Reporting the finished upload back to the ingestion API.
Args:
file_content (bytes): The file's content
file_name (str): The file name for storing the file after ingestion.
Returns:
dict: A dict containing a task_id referring to the queued processing task's id.
Example:
{
"task_id": "5",
}
Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries.
"""
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
# Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload"
payload = {"fileName": file_name}
try:
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file_content)
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
# Step 3: Report upload completion, get task id
try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue"
report_payload = {"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file_name}
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status()
report_data = report_response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"]}
else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
def query_task_state(task_id: str) -> Dict[str, Any]:
"""
Query the processing state of a processing task given a L2 task id.
Args:
task_id: Task id of the L2 processing task
Returns:
Dictionary of the format:
{
"state": "submission_pending",
}
"""
if not task_id:
raise ValueError("task_id must be provided")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/datasync/level2/{task_id}"
try:
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to query task state: {e}")
if not data.get("success"):
raise RuntimeError(f"Unexpected API response: {data}")
result = data.get("result")
if not result:
return {"state": "not_found"}
sync_state = result.get("syncState")
return {"state": sync_state if sync_state else "unknown"}
import os
import json
default_s3_options = {
'key': 'minioadmin',
'secret': 'minioadmin',
'endpoint_url': 'http://localhost:9000/'
'key': 'HZv6qvhH1ZImspVK',
'secret': 'wCDhQu63fD89rg5A05vcRIb6KfLAxS',
'endpoint_url': 'http://oss-cn-hangzhou-zjy-d01-a.ops.cloud.zhejianglab.com',
}
default_s3_settings = {
'bucket': 'data-and-computing'
}
default_backend_settings = {
'backend_url': 'http://10.200.60.199:32000',
}
def load_from_env():
s3_options = {
'key': os.getenv('S3_KEY'),
'secret': os.getenv('S3_SECRET'),
'endpoint_url': os.getenv('S3_ENDPOINT_URL')
'endpoint_url': os.getenv('S3_ENDPOINT_URL'),
}
return s3_options
def load_settings_from_env():
s3_settings = {
'bucket': os.getenv('S3_BUCKET'),
}
return s3_settings
def load_backend_settings_from_env():
backend_settings = {
'backend_url': os.getenv('CSST_BACKEND_API_URL'),
}
return backend_settings
def load_s3_options():
if 'S3_KEY' in os.environ:
return load_from_env()
return default_s3_options
\ No newline at end of file
return default_s3_options
def load_s3_settings():
if 'S3_BUCKET' in os.environ:
return load_settings_from_env()
return default_s3_settings
def load_backend_settings():
if 'CSST_BACKEND_API_URL' in os.environ:
return load_backend_settings_from_env()
return default_backend_settings
import fsspec
from .s3_config import load_s3_options
from .s3_config import load_s3_options, load_s3_settings
from .fsspec_fileobj import open_fileobj
def put(lpath, rpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBACK, maxdepth=None, s3_options=load_s3_options(), **kwargs,):
......@@ -10,13 +10,27 @@ def get(rpath, lpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBAC
s3_fs = fsspec.filesystem('s3', **s3_options)
s3_fs.get(rpath, lpath, recursive, callback, maxdepth, **kwargs)
def get_path(remote_path, local_path, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBACK, maxdepth=None, s3_options=load_s3_options(), **kwargs,):
s3_fs = fsspec.filesystem('s3', **s3_options)
bucketAndKey = load_s3_settings()['bucket'] + "/" + remote_path
s3_fs.get(bucketAndKey, local_path, recursive, callback, maxdepth, **kwargs)
def info(path, bucket=None, key=None, refresh=False, version_id=None, s3_options=load_s3_options()) -> dict:
s3_fs = fsspec.filesystem('s3', **s3_options)
return s3_fs.info(path, bucket, key, refresh, version_id)
def info_path(remote_path, refresh=False, version_id=None, s3_options=load_s3_options()) -> dict:
s3_fs = fsspec.filesystem('s3', **s3_options)
bucketAndKey = load_s3_settings()['bucket'] + "/" + remote_path
return s3_fs.info(bucketAndKey, None, None, refresh, version_id)
def open(file: str, mode: str='r', s3_options=load_s3_options()):
return open_fileobj(path=file, s3_options=s3_options, mode=mode)
def open_path(path: str, mode: str='r', s3_options=load_s3_options()):
bucketAndKey = "s3://" + load_s3_settings()['bucket'] + "/" + path
return open_fileobj(path=bucketAndKey, s3_options=s3_options, mode=mode)
def isfile(path: str, s3_options: dict=load_s3_options()) -> bool:
s3_fs = fsspec.filesystem('s3', **s3_options)
return s3_fs.isfile(path)
......
......@@ -7,7 +7,8 @@ setup(
install_requires=[
'astropy>=5.3',
'fsspec>=2024.5.0',
's3fs>=2024.5.0'
's3fs>=2024.5.0',
'requests'
],
python_requires='>=3.9',
description='csst pipeline handle file in local file system and remote s3 file system',
......