Commit 8927556e authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Merge branch 'search-and-commit-api' into main

parents e1d7b6e3 06548873
Pipeline #10423 passed with stage
...@@ -75,21 +75,31 @@ A successfull response contains a task_id referring to the queued processing tas ...@@ -75,21 +75,31 @@ A successfull response contains a task_id referring to the queued processing tas
## Function: `start_ingestion_task` ## Function: `start_ingestion_task`
```python ```python
def start_ingestion_task(file_content: str, file_name: str) -> dict: def start_ingestion_task(files: List[dict]) -> Dict[str, Any]:
""" """
Submit a file's content and file name to the ingestion API. Submit a list of file contents and file names for ingestion.
Args: Args:
file_content (str): The file's content as string representation [
file_name (str): The file name for storing the file after ingestion. {
file_name (str): The file name for storing the file after ingestion.
file_content (bytes): The file's content
},
{
...
}
]
Returns: Returns:
dict: A dict containing a task_id, referring the the queued processing task's id. dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
E.g. Example:
{ {
"task_id": "5", "task_id": "5",
"failed": List[str] List of file names for which ingestion failed.
} }
Raises: Raises:
RuntimeError: If the ingestion API or data upload fails after retries. RuntimeError: If committing failed after retries.
""" """
``` ```
...@@ -161,14 +171,11 @@ filter = { ...@@ -161,14 +171,11 @@ filter = {
} }
``` ```
3) Timestamp equality and ranges 3) List of values
The queried data should match one of the values in the list. String or number values are possible.
```python ```python
filter = { filter = {
"created_date": "2015-08-04T11:00:00", "NAXIS": [0, 1]
"obs_date": {
"gt": "2015-06-01T10:00:00",
"lt": "2015-07-01T10:00:00",
},
} }
``` ```
......
import os
import requests import requests
from typing import Dict, Any, List from typing import Dict, Any, List
from ..s3_config import load_backend_settings from ..s3_config import load_backend_settings
...@@ -6,15 +5,19 @@ from ..s3_config import load_backend_settings ...@@ -6,15 +5,19 @@ from ..s3_config import load_backend_settings
def query_metadata( def query_metadata(
filter: Dict[str, Any], filter: Dict[str, Any],
key: List[str], key: List[str],
hdu: int = 0 hdu: int = 0,
offset: int = 0,
limit: int = 10,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Query for file info by metadata values. Query for file info by metadata values. Results are paginated using offset & limit and ordered ascending by database insert timestamp.
Args: Args:
filter: The filter dict described below. filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output. key: A list of string values, corresponding to metadata keys that should be included in the output.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1. hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
offset: Optional offset for paginated results (default 0).
limit: Optional limit for paginated results (default 10).
Returns: Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'. A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"] E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"]
...@@ -44,7 +47,9 @@ def query_metadata( ...@@ -44,7 +47,9 @@ def query_metadata(
payload = { payload = {
"filter": filter, "filter": filter,
"key": key, "key": key,
"hdu": hdu "hdu": hdu,
"offset": offset,
"limit": limit,
} }
try: try:
......
import os
import base64
import requests import requests
import time import time
from typing import Dict, Any from typing import Dict, List, Any, TypedDict
from csst_fs import s3_fs from csst_fs import s3_fs
from ..s3_config import load_backend_settings from ..s3_config import load_backend_settings
def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]: class IngestionFile(TypedDict):
file_name: str
file_content: bytes
def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
""" """
Submit a file for ingestion by: Submit a list of files for ingestion by:
1. Requesting an OSS upload path from the ingestion API. 1. Requesting an OSS upload path from the ingestion API.
2. Uploading the file to the returned OSS path. 2. Uploading the file to the returned OSS path.
3. Reporting the finished upload back to the ingestion API. 3. Reporting the finished upload back to the ingestion API.
Args: Args:
file_content (bytes): The file's content [
file_name (str): The file name for storing the file after ingestion. {
file_name (str): The file name for storing the file after ingestion.
file_content (bytes): The file's content
},
{
...
}
]
Returns: Returns:
dict: A dict containing a task_id referring to the queued processing task's id. dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
Example: Example:
{ {
"task_id": "5", "task_id": "5",
"failed": List[str] List of file names for which ingestion failed.
} }
Raises: Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries. RuntimeError: If committing failed after retries.
""" """
api_url = load_backend_settings()['backend_url'] api_url = load_backend_settings()['backend_url']
if not api_url: if not api_url:
raise RuntimeError("CSST backend api url is not set") raise RuntimeError("CSST backend api url is not set")
# Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload" request_upload_endpoint = f"{api_url}/datasync/level2/upload"
payload = {"fileName": file_name}
try: succeeded_files: List[IngestionFile] = []
response = requests.post(request_upload_endpoint, json=payload, timeout=30) failed_files: List[IngestionFile] = []
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data: for file in files:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}") file_info = {"file_name": file["file_name"], "bucket": None, "key": None}
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try: try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f: # Step 1: Request an OSS upload path
f.write(file_content) payload = {"fileName": file["file_name"]}
except e:
if attempt == max_retries: try:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}") response = requests.post(request_upload_endpoint, json=payload, timeout=30)
else: response.raise_for_status()
wait_time = backoff_factor ** (attempt - 1) data = response.json()
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...") except requests.RequestException as e:
time.sleep(wait_time) raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
file_info["bucket"] = oss_upload_bucket
file_info["key"] = oss_upload_key
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file["file_content"])
succeeded_files.append(file_info)
break
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
except RuntimeError as e:
failed_files.append(file_info)
# Step 3: Report upload completion, get task id # Step 3: Report upload completion, get task id
try: try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue" report_upload_endpoint = f"{api_url}/datasync/level2/queue/batch"
report_payload = {"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file_name} report_payload = [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in succeeded_files]
report_payload += [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in failed_files]
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30) report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status() report_response.raise_for_status()
report_data = report_response.json() report_data = report_response.json()
...@@ -78,7 +103,7 @@ def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]: ...@@ -78,7 +103,7 @@ def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
raise RuntimeError(f"Failed to report completed upload: {e}") raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]: if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"]} return {"task_id": report_data["result"]["task_id"], "failed": [file["file_name"] for file in failed_files]}
else: else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}") raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
......
...@@ -11,7 +11,7 @@ default_s3_settings = { ...@@ -11,7 +11,7 @@ default_s3_settings = {
} }
default_backend_settings = { default_backend_settings = {
'backend_url': 'http://10.200.60.199:32000', 'backend_url': 'https://astro-workbench-bts.lab.zverse.space:32443/api/csst',
} }
def load_from_env(): def load_from_env():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment