Commits (7)
......@@ -72,28 +72,34 @@ Submit a file's content and file name to the ingestion API for further processin
The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately.
A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
## Configuration
The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
## Function: `start_ingestion_task`
```python
def start_ingestion_task(file_content: str, file_name: str) -> dict:
def start_ingestion_task(files: List[dict]) -> Dict[str, Any]:
"""
Submit a file's content and file name to the ingestion API.
Submit a list of file contents and file names for ingestion.
Args:
file_content (str): The file's content as string representation
file_name (str): The file name for storing the file after ingestion.
[
{
file_name (str): The file name for storing the file after ingestion.
file_content (bytes): The file's content
},
{
...
}
]
Returns:
dict: A dict containing a task_id, referring the the queued processing task's id.
E.g.
dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
Example:
{
"task_id": "5",
"failed": List[str] List of file names for which ingestion failed.
}
Raises:
RuntimeError: If the ingestion API or data upload fails after retries.
RuntimeError: If committing failed after retries.
"""
```
......@@ -101,10 +107,6 @@ def start_ingestion_task(file_content: str, file_name: str) -> dict:
# 3. Query a List Of L1/L2 Fits-Files By Metadata Values
Query for file info by metadata values.
## Configuration
The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
## Function: `query_metadata`
```python
def query_metadata(
......@@ -169,24 +171,17 @@ filter = {
}
```
3) Timestamp equality and ranges
3) List of values
The queried data should match one of the values in the list. String or number values are possible.
```python
filter = {
"created_date": "2015-08-04T11:00:00",
"obs_date": {
"gt": "2015-06-01T10:00:00",
"lt": "2015-07-01T10:00:00",
},
"NAXIS": [0, 1]
}
```
# 4. Query a L2 Processing Tasks State
Query the processing state of a processing task given a L2 task id.
## Configuration
The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
## Function: `query_task_state`
```python
def query_task_state(
......@@ -210,10 +205,6 @@ def query_task_state(
# 5. Query a Star Catalog
Query a star catalog by column values given a ra, dec and radius preselection.
## Configuration
The helper will send HTTP requests to an external API. The external api url is set to working default values. It can be overwritten if needed via env variable, e.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
## Function: `query_star_catalog`
```python
def query_star_catalog(
......
import os
import requests
from typing import Dict, Any, List
from ..s3_config import load_backend_settings
......@@ -6,15 +5,19 @@ from ..s3_config import load_backend_settings
def query_metadata(
filter: Dict[str, Any],
key: List[str],
hdu: int = 0
hdu: int = 0,
offset: int = 0,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
Query for file info by metadata values.
Query for file info by metadata values. Results are paginated using offset & limit and ordered ascending by database insert timestamp.
Args:
filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
offset: Optional offset for paginated results (default 0).
limit: Optional limit for paginated results (default 10).
Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"]
......@@ -44,7 +47,9 @@ def query_metadata(
payload = {
"filter": filter,
"key": key,
"hdu": hdu
"hdu": hdu,
"offset": offset,
"limit": limit,
}
try:
......
import os
import base64
import requests
import time
from typing import Dict, Any
from typing import Dict, List, Any, TypedDict
from csst_fs import s3_fs
from ..s3_config import load_backend_settings
def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
class IngestionFile(TypedDict):
file_name: str
file_content: bytes
def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
"""
Submit a file for ingestion by:
Submit a list of files for ingestion by:
1. Requesting an OSS upload path from the ingestion API.
2. Uploading the file to the returned OSS path.
3. Reporting the finished upload back to the ingestion API.
Args:
file_content (bytes): The file's content
file_name (str): The file name for storing the file after ingestion.
[
{
file_name (str): The file name for storing the file after ingestion.
file_content (bytes): The file's content
},
{
...
}
]
Returns:
dict: A dict containing a task_id referring to the queued processing task's id.
dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
Example:
{
"task_id": "5",
"failed": List[str] List of file names for which ingestion failed.
}
Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries.
RuntimeError: If committing failed after retries.
"""
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
# Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload"
payload = {"fileName": file_name}
try:
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
succeeded_files: List[IngestionFile] = []
failed_files: List[IngestionFile] = []
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
for file in files:
file_info = {"file_name": file["file_name"], "bucket": None, "key": None}
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file_content)
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
# Step 1: Request an OSS upload path
payload = {"fileName": file["file_name"]}
try:
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
file_info["bucket"] = oss_upload_bucket
file_info["key"] = oss_upload_key
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file["file_content"])
succeeded_files.append(file_info)
break
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
except RuntimeError as e:
failed_files.append(file_info)
# Step 3: Report upload completion, get task id
try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue"
report_payload = {"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file_name}
report_upload_endpoint = f"{api_url}/datasync/level2/queue/batch"
report_payload = [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in succeeded_files]
report_payload += [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in failed_files]
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status()
report_data = report_response.json()
......@@ -78,7 +103,7 @@ def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"]}
return {"task_id": report_data["result"]["task_id"], "failed": [file["file_name"] for file in failed_files]}
else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
......
......@@ -11,7 +11,7 @@ default_s3_settings = {
}
default_backend_settings = {
'backend_url': 'http://10.200.60.199:32000',
'backend_url': 'https://astro-workbench-bts.lab.zverse.space:32443/api/csst',
}
def load_from_env():
......