......@@ -75,21 +75,31 @@ A successfull response contains a task_id referring to the queued processing tas
## Function: `start_ingestion_task`
```python
def start_ingestion_task(file_content: str, file_name: str) -> dict:
def start_ingestion_task(files: List[dict]) -> Dict[str, Any]:
"""
Submit a file's content and file name to the ingestion API.
Submit a list of file contents and file names for ingestion.
Args:
file_content (str): The file's content as string representation
file_name (str): The file name for storing the file after ingestion.
[
{
file_name (str): The file name for storing the file after ingestion.
file_content (bytes): The file's content
},
{
...
}
]
Returns:
dict: A dict containing a task_id, referring the the queued processing task's id.
E.g.
dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
Example:
{
"task_id": "5",
"failed": List[str] List of file names for which ingestion failed.
}
Raises:
RuntimeError: If the ingestion API or data upload fails after retries.
RuntimeError: If committing failed after retries.
"""
```
......@@ -161,14 +171,11 @@ filter = {
}
```
3) Timestamp equality and ranges
3) List of values
The queried data should match one of the values in the list. String or number values are possible.
```python
filter = {
"created_date": "2015-08-04T11:00:00",
"obs_date": {
"gt": "2015-06-01T10:00:00",
"lt": "2015-07-01T10:00:00",
},
"NAXIS": [0, 1]
}
```
......
import os
import requests
from typing import Dict, Any, List
from ..s3_config import load_backend_settings
......@@ -6,15 +5,19 @@ from ..s3_config import load_backend_settings
def query_metadata(
filter: Dict[str, Any],
key: List[str],
hdu: int = 0
hdu: int = 0,
offset: int = 0,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
Query for file info by metadata values.
Query for file info by metadata values. Results are paginated using offset & limit and ordered ascending by database insert timestamp.
Args:
filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
offset: Optional offset for paginated results (default 0).
limit: Optional limit for paginated results (default 10).
Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"]
......@@ -44,7 +47,9 @@ def query_metadata(
payload = {
"filter": filter,
"key": key,
"hdu": hdu
"hdu": hdu,
"offset": offset,
"limit": limit,
}
try:
......
import os
import base64
import requests
import time
from typing import Dict, Any
from typing import Dict, List, Any, TypedDict
from csst_fs import s3_fs
from ..s3_config import load_backend_settings
def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
class IngestionFile(TypedDict):
file_name: str
file_content: bytes
def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
"""
Submit a file for ingestion by:
Submit a list of files for ingestion by:
1. Requesting an OSS upload path from the ingestion API.
2. Uploading the file to the returned OSS path.
3. Reporting the finished upload back to the ingestion API.
Args:
file_content (bytes): The file's content
file_name (str): The file name for storing the file after ingestion.
[
{
file_name (str): The file name for storing the file after ingestion.
file_content (bytes): The file's content
},
{
...
}
]
Returns:
dict: A dict containing a task_id referring to the queued processing task's id.
dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
Example:
{
"task_id": "5",
"failed": List[str] List of file names for which ingestion failed.
}
Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries.
RuntimeError: If committing failed after retries.
"""
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
# Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload"
payload = {"fileName": file_name}
try:
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
succeeded_files: List[IngestionFile] = []
failed_files: List[IngestionFile] = []
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
for file in files:
file_info = {"file_name": file["file_name"], "bucket": None, "key": None}
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file_content)
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
# Step 1: Request an OSS upload path
payload = {"fileName": file["file_name"]}
try:
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
file_info["bucket"] = oss_upload_bucket
file_info["key"] = oss_upload_key
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file["file_content"])
succeeded_files.append(file_info)
break
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
except RuntimeError as e:
failed_files.append(file_info)
# Step 3: Report upload completion, get task id
try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue"
report_payload = {"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file_name}
report_upload_endpoint = f"{api_url}/datasync/level2/queue/batch"
report_payload = [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in succeeded_files]
report_payload += [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in failed_files]
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status()
report_data = report_response.json()
......@@ -78,7 +103,7 @@ def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"]}
return {"task_id": report_data["result"]["task_id"], "failed": [file["file_name"] for file in failed_files]}
else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
......
......@@ -11,7 +11,7 @@ default_s3_settings = {
}
default_backend_settings = {
'backend_url': 'http://10.200.60.199:32000',
'backend_url': 'https://astro-workbench-bts.lab.zverse.space:32443/api/csst',
}
def load_from_env():
......