Commit 9c35f0bb authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Changed level2 submission logic

parent b974c287
...@@ -93,6 +93,8 @@ def submit_file_for_ingestion(file_content: str, file_name: str) -> dict: ...@@ -93,6 +93,8 @@ def submit_file_for_ingestion(file_content: str, file_name: str) -> dict:
{ {
"task_id": "5", "task_id": "5",
} }
Raises:
RuntimeError: If the ingestion API or data upload fails after retries.
""" """
``` ```
......
...@@ -3,13 +3,17 @@ import base64 ...@@ -3,13 +3,17 @@ import base64
import requests import requests
import time import time
from typing import Dict, Any from typing import Dict, Any
from csst_fs import s3_fs
def submit_file_for_ingestion(file_content: bytes, file_name: str) -> dict: def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str, str]:
""" """
Submit a file's content and file name to the ingestion API with retry logic. Submit a file for ingestion by:
1. Requesting an OSS upload path from the ingestion API.
2. Uploading the file to the returned OSS path.
3. Reporting the finished upload back to the ingestion API.
Args: Args:
file_content (bytes): The file's content as bytes file_content (bytes): The file's content
file_name (str): The file name for storing the file after ingestion. file_name (str): The file name for storing the file after ingestion.
Returns: Returns:
...@@ -20,43 +24,63 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> dict: ...@@ -20,43 +24,63 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> dict:
} }
Raises: Raises:
RuntimeError: If the ingestion API returns an error after retries or request fails. RuntimeError: If the ingestion API or OSS upload fails after retries.
""" """
api_url = os.getenv("INGESTION_API_URL") api_url = os.getenv("INGESTION_API_URL")
if not api_url: if not api_url:
raise RuntimeError("INGESTION_API_URL environment variable is not set") raise RuntimeError("INGESTION_API_URL environment variable is not set")
endpoint = f"{api_url}/datasync/level2" # Step 1: Request an OSS upload path
encoded_content = base64.b64encode(file_content).decode("utf-8") request_upload_endpoint = f"{api_url}/datasync/level2/upload"
payload = { payload = {"fileName": file_name}
"fileContent": encoded_content,
"fileName": file_name try:
} response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3 max_retries = 3
backoff_factor = 2 backoff_factor = 2
for attempt in range(1, max_retries + 1): for attempt in range(1, max_retries + 1):
try: try:
response = requests.post(endpoint, json=payload, timeout=30) with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
response.raise_for_status() f.write(file_content)
data = response.json() except e:
if data.get("success") and "result" in data and "task_id" in data["result"]:
return {"task_id": data["result"]["task_id"]}
else:
raise RuntimeError(f"Ingestion API returned an error: {data}")
except (requests.RequestException, RuntimeError) as e:
print(e)
if attempt == max_retries: if attempt == max_retries:
raise RuntimeError(f"Failed after {max_retries} attempts: {e}") raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else: else:
wait_time = backoff_factor ** (attempt - 1) wait_time = backoff_factor ** (attempt - 1)
print(f"Attempt {attempt} failed, retrying in {wait_time}s...") print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time) time.sleep(wait_time)
# Step 3: Report upload completion
try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue"
report_payload = {"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file_name}
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status()
report_data = report_response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"]}
else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
def query_processing_task_state(task_id: str) -> Dict[str, Any]: def query_processing_task_state(task_id: str) -> Dict[str, Any]:
""" """
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment