Commit 24b2f8de authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Changed L2 task ingestion logic to accept a list of files

parent e5f4f84f
...@@ -75,21 +75,31 @@ A successfull response contains a task_id referring to the queued processing tas ...@@ -75,21 +75,31 @@ A successfull response contains a task_id referring to the queued processing tas
## Function: `start_ingestion_task` ## Function: `start_ingestion_task`
```python ```python
def start_ingestion_task(file_content: str, file_name: str) -> dict: def start_ingestion_task(files: List[dict]) -> Dict[str, Any]:
""" """
Submit a file's content and file name to the ingestion API. Submit a list of file contents and file names for ingestion.
Args: Args:
file_content (str): The file's content as string representation [
file_name (str): The file name for storing the file after ingestion. {
file_name (str): The file name for storing the file after ingestion.
file_content (bytes): The file's content
},
{
...
}
]
Returns: Returns:
dict: A dict containing a task_id, referring the the queued processing task's id. dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
E.g. Example:
{ {
"task_id": "5", "task_id": "5",
"failed": List[str] List of file names for which ingestion failed.
} }
Raises: Raises:
RuntimeError: If the ingestion API or data upload fails after retries. RuntimeError: If committing failed after retries.
""" """
``` ```
......
import os
import base64
import requests import requests
import time import time
from typing import Dict, Any from typing import Dict, List, Any, TypedDict
from csst_fs import s3_fs from csst_fs import s3_fs
from ..s3_config import load_backend_settings from ..s3_config import load_backend_settings
def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]: class IngestionFile(TypedDict):
file_name: str
file_content: bytes
def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
""" """
Submit a file for ingestion by: Submit a list of files for ingestion by:
1. Requesting an OSS upload path from the ingestion API. 1. Requesting an OSS upload path from the ingestion API.
2. Uploading the file to the returned OSS path. 2. Uploading the file to the returned OSS path.
3. Reporting the finished upload back to the ingestion API. 3. Reporting the finished upload back to the ingestion API.
Args: Args:
file_content (bytes): The file's content [
file_name (str): The file name for storing the file after ingestion. {
file_name (str): The file name for storing the file after ingestion.
file_content (bytes): The file's content
},
{
...
}
]
Returns: Returns:
dict: A dict containing a task_id referring to the queued processing task's id. dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
Example: Example:
{ {
"task_id": "5", "task_id": "5",
"failed": List[str] List of file names for which ingestion failed.
} }
Raises: Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries. RuntimeError: If committing failed after retries.
""" """
api_url = load_backend_settings()['backend_url'] api_url = load_backend_settings()['backend_url']
if not api_url: if not api_url:
raise RuntimeError("CSST backend api url is not set") raise RuntimeError("CSST backend api url is not set")
# Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload" request_upload_endpoint = f"{api_url}/datasync/level2/upload"
payload = {"fileName": file_name}
try: failed_files: List[IngestionFile] = []
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data: for file in files:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try: try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f: # Step 1: Request an OSS upload path
f.write(file_content) payload = {"fileName": file["file_name"]}
except e:
if attempt == max_retries: try:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}") response = requests.post(request_upload_endpoint, json=payload, timeout=30)
else: response.raise_for_status()
wait_time = backoff_factor ** (attempt - 1) data = response.json()
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...") except requests.RequestException as e:
time.sleep(wait_time) raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file["file_content"])
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
except RuntimeError as e:
failed_files.append(file)
# Step 3: Report upload completion, get task id # Step 3: Report upload completion, get task id
try: try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue" report_upload_endpoint = f"{api_url}/datasync/level2/queue/batch"
report_payload = {"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file_name} report_payload = [{"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file["file_name"]} for file in files]
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30) report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status() report_response.raise_for_status()
report_data = report_response.json() report_data = report_response.json()
...@@ -78,7 +95,7 @@ def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]: ...@@ -78,7 +95,7 @@ def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
raise RuntimeError(f"Failed to report completed upload: {e}") raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]: if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"]} return {"task_id": report_data["result"]["task_id"], "failed": [file.file_name for file in failed_files]}
else: else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}") raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment