Commits (20)
This repository provides the following functionalities:
1. [Read or Download a File From S3 Storage](#1-read-or-download-a-file-from-s3-storage)
2. [Commit For File Processing](#2-commit-for-file-processing)
3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values)
4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state)
5. [Query a Star Catalog](#5-query-a-star-catalog)
# 1. Read or Download a File from S3 storage
Supported are two distinct ways of reading from s3 storage.
1) [Download to a local file](#从s3下载到本地)
2) [use open() to get a file object](#open-for-read)
## Configuration
**astropy 需升级至 5.3** **astropy 需升级至 5.3**
**老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别** **老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别**
如果需要读写S3时,需要传入s3的密钥和endpoint等配置,有两种方法可选 ## 从s3下载到本地
### 方法1 环境变量
执行下面三个环境变量,本文档下面介绍到的所有方法都会尝试读取环境变量以获取配置
```python ```python
s3_options = {
'key': os.getenv('S3_KEY'),
'secret': os.getenv('S3_SECRET'),
'endpoint_url': os.getenv('S3_ENDPOINT_URL')
}
```
### 方法2 每次调用方法时传入 s3_options def get_path(remote_path: str, local_path: str):
``` """
在第一个kwargs参数位置指定s3_options, s3_options示例: Download a file/folder from s3 to local storage.
```json
s3_options = {
"key": "minioadmin",
"secret": "minioadmin",
"endpoint_url": "http://localhost:9000"
}
``` Args:
## 本地到s3的上传与下载 remote_path: s3 key
### 上传 local_path: Local path that will be downloaded to.
```python """
from csst_fs import s3_fs
# single file,s3_options from env
s3_fs.put('requirements.txt', 's3://csst-prod/gaia/test/requirements.txt')
# single file,s3_options from function parameter
s3_fs.put('requirements.txt', 's3://csst-prod/gaia/test/requirements.txt', s3_options=s3_options)
# folder,to s3 s3://csst-prod/common
s3_fs.put('./common', 's3://csst-prod/', recursive=True)
s3_fs.put('./common', 's3://csst-prod/', s3_options=s3_options, recursive=True)
``` def info_path(remote_path: str):
"""
Get information about a s3 file.
### 下载 Args:
```python remote_path: s3 key
"""
# Example:
from csst_fs import s3_fs from csst_fs import s3_fs
# single file # single file
s3_fs.get('s3://csst-prod/gaia/test/requirements.txt', 'requirements.txt') s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', 'v01.fits')
s3_fs.get('s3://csst-prod/gaia/test/requirements.txt', 'requirements.txt', s3_options=s3_options)
# folder # folder
s3_fs.get('s3://csst-prod/gaia/data', './', recursive=True) s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0', './', recursive=True)
s3_fs.get('s3://csst-prod/gaia/data', './', s3_options=s3_options, recursive=True)
# get file or folder info # get file or folder info
s3_fs.info('s3://csst-prod/gaia/data') s3_fs.info_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits')
s3_fs.info('s3://csst-prod/gaia/test/requirements.txt', s3_options=s3_options)
``` ```
### Open for read/write ## Open for read
```python ```python
def open_path(remote_path: str, mode: str = 'r'):
"""
Get a readonly file object from a file on s3. Use mode = 'rb' for binary files.
Args:
remote_path: s3 key
mode: str = 'r' For binary files: 'rb', default: 'r'
Returns:
File object of the s3 file.
"""
# Example:
from csst_fs import s3_fs from csst_fs import s3_fs
# open single file (s3 or local) # open single file (s3 or local)
with s3_fs.open('s3://csst-prod/gaia/data') as file: with s3_fs.open_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', mode='rb') as file:
file.read() file.read()
with s3_fs.open('s3://csst-prod/gaia/test/requirements.txt', s3_options=s3_options, mode='w') as file:
file.write("CSST")
``` ```
### Check if the given file path exists
```python
from csst_fs import fs
# local or on s3, depending on the given path
fs.isfile('requirements.txt')
fs.isfile('s3://csst-prod/test.txt')
fs.isfile('s3://csst-prod/test.txt', s3_options=s3_options)
```
### Delete a file from local or s3 # 2. Commit For File Processing
```python
from csst_fs import fs
# local or on s3, depending on the given path
fs.delete('requirements.txt') # uses os.remove
fs.delete('test', dir_fd=1)
fs.delete('s3://csst-prod/test.txt') # uses fsspec.delete
fs.delete('s3://csst-prod/test.txt', recursive=True, maxdepth=3)
fs.delete('s3://csst-prod/test.txt', s3_options=s3_options)
```
## astropy直接读写s3的写法适配 Submit a file's content and file name to the ingestion API for further processing.
### fits.open The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately.
#### 老写法 A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
```python
fits.open(path)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/files.html#open](https://docs.astropy.org/en/stable/io/fits/api/files.html#open)
#### 新写法
```python
from csst_fs import fsspec_fits
fsspec_fits.open("s3://csst-prod/gaia/xx.fits")
fsspec_fits.open("s3://csst-prod/gaia/xx.fits", s3_options=s3_options)
```
### fits.getheader ## Configuration
#### 老写法 The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
```python CSST_BACKEND_API_URL=http://10.200.60.199:9010
fits.getheader(filename=in_image_path, ext=1)
``` ## Function: `start_ingestion_task`
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/files.html#getheader](https://docs.astropy.org/en/stable/io/fits/api/files.html#getheader)
#### 新写法
```python ```python
from csst_fs import fsspec_fits def start_ingestion_task(file_content: str, file_name: str) -> dict:
fsspec_fits.getheader(filename=in_image_path, ext=1) """
fsspec_fits.getheader(filename=in_image_path, ext=1, s3_options=s3_options) Submit a file's content and file name to the ingestion API.
Args:
file_content (str): The file's content as string representation
file_name (str): The file name for storing the file after ingestion.
Returns:
dict: A dict containing a task_id, referring the the queued processing task's id.
E.g.
{
"task_id": "5",
}
Raises:
RuntimeError: If the ingestion API or data upload fails after retries.
"""
``` ```
### fits.getdata # 3. Query a List Of L1/L2 Fits-Files By Metadata Values
#### 老写法 Query for file info by metadata values.
```python
fits.getdata(in_ref_flat)
fits.getdata( in_ref_shutter, ext=1)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/files.html#getdata](https://docs.astropy.org/en/stable/io/fits/api/files.html#getdata)
#### 新写法 ## Configuration
```python The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
from csst_fs import fsspec_fits CSST_BACKEND_API_URL=http://10.200.60.199:9010
fsspec_fits.getdata(in_ref_flat)
fsspec_fits.getdata(in_ref_flat, s3_options=s3_options)
fsspec_fits.getdata( in_ref_shutter, ext=1)
fsspec_fits.getdata( in_ref_shutter, s3_options=s3_options, ext=1)
```
### fits.getval
#### 老写法
```python
fits.getval(filename, keyword)
fits.getval(filename, keyword, ext=1)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/files.html#getdata](https://docs.astropy.org/en/stable/io/fits/api/files.html#getval)
#### 新写法 ## Function: `query_metadata`
```python ```python
from csst_fs import fsspec_fits def query_metadata(
fsspec_fits.getval(filename, keyword) filter: Dict[str, Any],
fsspec_fits.getval(filename, keyword, s3_options=s3_options) key: List[str],
fsspec_fits.getval(filename, keyword, ext=1) hdu: int = 0
fsspec_fits.getval(filename, keyword, s3_options=s3_options, ext=1) ) -> List[Dict[str, Any]]:
"""
Query for file info by metadata values.
Args:
filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["CABEND", "qc_status"]
then returns:
[
{
"urn": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
"metadata": {
"CABEND": "59785.82529",
"qc_status": "0.0"
},
"removed": false,
"created": 1756284502817,
"parentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/",
"name": "CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
"lastModified": 1756284502817,
"grandParentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/",
"platform": "s3",
"tags": [
"L1"
]
}
]
"""
``` ```
## Filter Syntax
### header.tofile All filters are combined with logical AND (every clause must match).
#### 老写法 1) String equality
```python ```python
header.tofile(out_head_path) filter = {
"dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
"obs_type": "WIDE",
}
``` ```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/headers.html#astropy.io.fits.Header.tofile](https://docs.astropy.org/en/stable/io/fits/api/headers.html#astropy.io.fits.Header.tofile)
#### 新写法 2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
```python ```python
from csst_fs import fsspec_header filter = {
fsspec_header.tofile(header, out_head_path) "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
fsspec_header.tofile(header, out_head_path, s3_options=s3_options) "ra": {
``` "gte": 250,
"lte": 260
},
### header.fromfile "qc_status": 0,
#### 老写法 }
```python
header.fromfile(filename)
``` ```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/headers.html#astropy.io.fits.Header.fromfile](https://docs.astropy.org/en/stable/io/fits/api/headers.html#astropy.io.fits.Header.fromfile)
#### 新写法 3) Timestamp equality and ranges
```python ```python
from csst_fs import fsspec_header filter = {
fsspec_header.fromfile(filename) "created_date": "2015-08-04T11:00:00",
fsspec_header.fromfile(filename, s3_options=s3_options) "obs_date": {
"gt": "2015-06-01T10:00:00",
"lt": "2015-07-01T10:00:00",
},
}
``` ```
# 4. Query a L2 Processing Tasks State
Query the processing state of a processing task given a L2 task id.
### HDUList.writeto ## Configuration
#### 老写法 The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
```python CSST_BACKEND_API_URL=http://10.200.60.199:9010
hdul_img.writeto(hdul_img, out_combined_img, overwrite=True)
```
usage reference:
[https://docs.astropy.org/en/stable/io/fits/api/hdulists.html#astropy.io.fits.HDUList.writeto](https://docs.astropy.org/en/stable/io/fits/api/hdulists.html#astropy.io.fits.HDUList.writeto)
#### 新写法
```python
from csst_fs import fsspec_HDUList
fsspec_HDUList.writeto(hdul_img, out_combined_img, overwrite=True)
fsspec_HDUList.writeto(hdul_img, out_combined_img, s3_options=s3_options, overwrite=True)
```
### HDUList.fromfile ## Function: `query_task_state`
#### 老写法
```python ```python
hdul_img = fits.HDUList.fromfile("hdulist.fits") def query_task_state(
``` task_id: str
usage reference: ) -> Dict[str, Any]
[https://docs.astropy.org/en/stable/io/fits/api/hdulists.html#astropy.io.fits.HDUList.fromfile](https://docs.astropy.org/en/stable/io/fits/api/hdulists.html#astropy.io.fits.HDUList.fromfile) """
Query the processing state of a processing task given a L2 task id.
#### 新写法
```python Args:
from csst_fs import fsspec_HDUList task_id: Task id of the L2 processing task
hdul_img = fsspec_HDUList.fromfile("hdulist.fits") Returns:
hdul_img = fsspec_HDUList.fromfile("hdulist.fits", cache=False, s3_options=s3_options) Dictionary of the following format, including information about the current state of the corresponding processing task.
The following strings are valid state values: tbd
E.g.
{
"state": "submission_pending",
}
"""
``` ```
# 5. Query a Star Catalog
Query a star catalog by column values given a ra, dec and radius preselection.
### table.Table.read ## Configuration
#### 老写法 The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
```python CSST_BACKEND_API_URL=http://10.200.60.199:9010
from astropy import table
table.Table.read(out_gaia_ldac, hdu=2)
```
usage reference:
[https://docs.astropy.org/en/stable/api/astropy.table.Table.html#astropy.table.Table.read](https://docs.astropy.org/en/stable/api/astropy.table.Table.html#astropy.table.Table.read
)
#### 新写法 ## Function: `query_star_catalog`
```python ```python
from csst_fs import fsspec_table def query_star_catalog(
fsspec_table.read(out_gaia_ldac, hdu=2) catalog_name: str,
fsspec_table.read(out_gaia_ldac, s3_options=s3_options, hdu=2) filter: Dict[str, Any],
key: List[str],
) -> List[Dict[str, Any]]:
"""
Query a star catalog by column values given a ra, dec and radius preselection.
Args:
catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
filter: The filter dict described below.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Ra, dec values pinpoint a location, 'radius' defines a radius in [deg] around this point.
Only star catalog objects withing this area are considered for subsequent filtering.
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the colum names that should be present in the return value.
Returns:
A List[Dict] of matching star catalog objects, containing key-value pairs for the keys set as 'key' parameter.
E.g. with key = ["x", "bulge_flux", "ab"]
then returns:
[
{
"x": 995.27,
"bulge_flux": "3.2",
"ab": 1.2,
},
]
"""
``` ```
## Filter Syntax
### table.Table.write All filters are combined with logical AND (every clause must match).
#### 老写法 1) String equality
```python
ps.write(ref, format='fits', overwrite=True)
```
usage reference:
[https://docs.astropy.org/en/stable/api/astropy.table.Table.html#astropy.table.Table.write](https://docs.astropy.org/en/stable/api/astropy.table.Table.html#astropy.table.Table.write)
#### 新写法
```python ```python
from csst_fs import fsspec_table filter = {
fsspec_table.write(ps, ref, format='fits', overwrite=True) "ra": 40.3,
fsspec_table.write(ps, ref, format='fits', s3_options=s3_options, overwrite=True) "dec": 21.9,
"radius": 0.2,
"msc_photid": "00101000703350610200001812",
"detector": "06",
}
``` ```
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
```python
filter = {
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
"msc_photid": "00101000703350610200001812",
"x": {
"gte": 996,
"lte": 1000,
},
"ratio_disk": -9999,
}
```
\ No newline at end of file
...@@ -3,4 +3,7 @@ from .fits import fsspec_header ...@@ -3,4 +3,7 @@ from .fits import fsspec_header
from .fits import fsspec_HDUList from .fits import fsspec_HDUList
from .table import fsspec_table from .table import fsspec_table
from . import s3_fs from . import s3_fs
from . import fs from . import fs
\ No newline at end of file from .catalog.metadata import query_metadata
from .catalog.star import query_star_catalog
from .ingestion.level2 import start_ingestion_task, query_task_state
\ No newline at end of file
import os
import requests
from typing import Dict, Any, List
from ..s3_config import load_backend_settings
def query_metadata(
filter: Dict[str, Any],
key: List[str],
hdu: int = 0
) -> List[Dict[str, Any]]:
"""
Query for file info by metadata values.
Args:
filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"]
then returns:
[
{
"file_path": "CSST_L0/MSC/SCI/60310/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_03_L0_V01.fits",
"metadata": {
"dataset":"csst-msc-c11-1000sqdeg-wide-test-v2",
"instrument":"MSC",
"obs_group":"W1",
"obs_id":"10200000000"
},
},
]
"""
if not filter:
raise ValueError("Filter cannot be empty")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/datasync/metadata/query"
payload = {
"filter": filter,
"key": key,
"hdu": hdu
}
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Metadata query failed: {e}")
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
import os
import requests
from typing import Dict, Any, List
from ..s3_config import load_backend_settings
def query_star_catalog(
catalog_name: str,
filter: Dict[str, Any],
key: List[str],
) -> List[Dict[str, Any]]:
"""
Query a star catalog by column values given a ra, dec and radius preselection.
Args:
catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
filter: The filter dict described below.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the column names that should be present in the return value.
Returns:
A List[Dict] of matching star catalog objects.
"""
for required_key in ("ra", "dec", "radius"):
if required_key not in filter or isinstance(filter[required_key], dict):
raise ValueError(f"Filter must contain scalar '{{{required_key}}}'")
if not key:
raise ValueError("Key list cannot be empty")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/starcatalog/query"
payload = {
"catalogName": catalog_name,
"filter": filter,
"key": key,
}
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Star catalog query failed: {e}")
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
import os
import base64
import requests
import time
from typing import Dict, Any
from csst_fs import s3_fs
from ..s3_config import load_backend_settings
def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
"""
Submit a file for ingestion by:
1. Requesting an OSS upload path from the ingestion API.
2. Uploading the file to the returned OSS path.
3. Reporting the finished upload back to the ingestion API.
Args:
file_content (bytes): The file's content
file_name (str): The file name for storing the file after ingestion.
Returns:
dict: A dict containing a task_id referring to the queued processing task's id.
Example:
{
"task_id": "5",
}
Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries.
"""
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
# Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload"
payload = {"fileName": file_name}
try:
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file_content)
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
# Step 3: Report upload completion, get task id
try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue"
report_payload = {"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file_name}
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status()
report_data = report_response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"]}
else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
def query_task_state(task_id: str) -> Dict[str, Any]:
"""
Query the processing state of a processing task given a L2 task id.
Args:
task_id: Task id of the L2 processing task
Returns:
Dictionary of the format:
{
"state": "submission_pending",
}
"""
if not task_id:
raise ValueError("task_id must be provided")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/datasync/level2/{task_id}"
try:
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to query task state: {e}")
if not data.get("success"):
raise RuntimeError(f"Unexpected API response: {data}")
result = data.get("result")
if not result:
return {"state": "not_found"}
sync_state = result.get("syncState")
return {"state": sync_state if sync_state else "unknown"}
import os import os
import json
default_s3_options = { default_s3_options = {
'key': 'minioadmin', 'key': 'HZv6qvhH1ZImspVK',
'secret': 'minioadmin', 'secret': 'wCDhQu63fD89rg5A05vcRIb6KfLAxS',
'endpoint_url': 'http://localhost:9000/' 'endpoint_url': 'http://oss-cn-hangzhou-zjy-d01-a.ops.cloud.zhejianglab.com',
}
default_s3_settings = {
'bucket': 'data-and-computing'
}
default_backend_settings = {
'backend_url': 'http://10.200.60.199:32000',
} }
def load_from_env(): def load_from_env():
s3_options = { s3_options = {
'key': os.getenv('S3_KEY'), 'key': os.getenv('S3_KEY'),
'secret': os.getenv('S3_SECRET'), 'secret': os.getenv('S3_SECRET'),
'endpoint_url': os.getenv('S3_ENDPOINT_URL') 'endpoint_url': os.getenv('S3_ENDPOINT_URL'),
} }
return s3_options return s3_options
def load_settings_from_env():
s3_settings = {
'bucket': os.getenv('S3_BUCKET'),
}
return s3_settings
def load_backend_settings_from_env():
backend_settings = {
'backend_url': os.getenv('CSST_BACKEND_API_URL'),
}
return backend_settings
def load_s3_options(): def load_s3_options():
if 'S3_KEY' in os.environ: if 'S3_KEY' in os.environ:
return load_from_env() return load_from_env()
return default_s3_options return default_s3_options
\ No newline at end of file
def load_s3_settings():
if 'S3_BUCKET' in os.environ:
return load_settings_from_env()
return default_s3_settings
def load_backend_settings():
if 'CSST_BACKEND_API_URL' in os.environ:
return load_backend_settings_from_env()
return default_backend_settings
import fsspec import fsspec
from .s3_config import load_s3_options from .s3_config import load_s3_options, load_s3_settings
from .fsspec_fileobj import open_fileobj from .fsspec_fileobj import open_fileobj
def put(lpath, rpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBACK, maxdepth=None, s3_options=load_s3_options(), **kwargs,): def put(lpath, rpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBACK, maxdepth=None, s3_options=load_s3_options(), **kwargs,):
...@@ -10,13 +10,27 @@ def get(rpath, lpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBAC ...@@ -10,13 +10,27 @@ def get(rpath, lpath, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBAC
s3_fs = fsspec.filesystem('s3', **s3_options) s3_fs = fsspec.filesystem('s3', **s3_options)
s3_fs.get(rpath, lpath, recursive, callback, maxdepth, **kwargs) s3_fs.get(rpath, lpath, recursive, callback, maxdepth, **kwargs)
def get_path(remote_path, local_path, recursive=False, callback=fsspec.callbacks.DEFAULT_CALLBACK, maxdepth=None, s3_options=load_s3_options(), **kwargs,):
s3_fs = fsspec.filesystem('s3', **s3_options)
bucketAndKey = load_s3_settings()['bucket'] + "/" + remote_path
s3_fs.get(bucketAndKey, local_path, recursive, callback, maxdepth, **kwargs)
def info(path, bucket=None, key=None, refresh=False, version_id=None, s3_options=load_s3_options()) -> dict: def info(path, bucket=None, key=None, refresh=False, version_id=None, s3_options=load_s3_options()) -> dict:
s3_fs = fsspec.filesystem('s3', **s3_options) s3_fs = fsspec.filesystem('s3', **s3_options)
return s3_fs.info(path, bucket, key, refresh, version_id) return s3_fs.info(path, bucket, key, refresh, version_id)
def info_path(remote_path, refresh=False, version_id=None, s3_options=load_s3_options()) -> dict:
s3_fs = fsspec.filesystem('s3', **s3_options)
bucketAndKey = load_s3_settings()['bucket'] + "/" + remote_path
return s3_fs.info(bucketAndKey, None, None, refresh, version_id)
def open(file: str, mode: str='r', s3_options=load_s3_options()): def open(file: str, mode: str='r', s3_options=load_s3_options()):
return open_fileobj(path=file, s3_options=s3_options, mode=mode) return open_fileobj(path=file, s3_options=s3_options, mode=mode)
def open_path(path: str, mode: str='r', s3_options=load_s3_options()):
bucketAndKey = "s3://" + load_s3_settings()['bucket'] + "/" + path
return open_fileobj(path=bucketAndKey, s3_options=s3_options, mode=mode)
def isfile(path: str, s3_options: dict=load_s3_options()) -> bool: def isfile(path: str, s3_options: dict=load_s3_options()) -> bool:
s3_fs = fsspec.filesystem('s3', **s3_options) s3_fs = fsspec.filesystem('s3', **s3_options)
return s3_fs.isfile(path) return s3_fs.isfile(path)
......
...@@ -7,7 +7,8 @@ setup( ...@@ -7,7 +7,8 @@ setup(
install_requires=[ install_requires=[
'astropy>=5.3', 'astropy>=5.3',
'fsspec>=2024.5.0', 'fsspec>=2024.5.0',
's3fs>=2024.5.0' 's3fs>=2024.5.0',
'requests'
], ],
python_requires='>=3.9', python_requires='>=3.9',
description='csst pipeline handle file in local file system and remote s3 file system', description='csst pipeline handle file in local file system and remote s3 file system',
......