Commit 8093cb3b authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Merge branch 'search-and-commit-api' into main

parents 4e62d111 79093e2f
Pipeline #11394 passed with stage
import requests
import time
from typing import Dict, List, Any, TypedDict
import json
import logging
from csst_fs import s3_fs
from ..s3_config import load_backend_settings
logger = logging.getLogger(__name__)
if not logger.handlers:
logger.setLevel(logging.DEBUG)
# File handler for info, warnings and errors
fh = logging.FileHandler('csst_fs_ingestion.log')
fh.setLevel(logging.DEBUG)
# Console handler for warnings and errors
ch = logging.StreamHandler()
ch.setLevel(logging.WARNING)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
class IngestionFile(TypedDict):
file_name: str
file_content: bytes
......@@ -37,38 +54,59 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
Raises:
RuntimeError: If committing failed after retries.
"""
logger.info(f"Starting ingestion task for {len(files)} files")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
error_msg = "CSST backend api url is not set"
logger.error(error_msg)
raise RuntimeError(error_msg)
request_upload_endpoint = f"{api_url}/datasync/level2/upload"
logger.info(f"Using API endpoint: {request_upload_endpoint}")
succeeded_files: List[IngestionFile] = []
failed_files: List[IngestionFile] = []
for file in files:
file_info = {"file_name": file["file_name"], "bucket": None, "key": None}
logger.info(f"Processing file: {file['file_name']}")
try:
# Step 1: Request an OSS upload path
payload = {"fileName": file["file_name"]}
try:
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
get_oss_key_max_retries = 3
get_oss_key_backoff_factor = 2
for get_oss_key_attempt in range(1, get_oss_key_max_retries + 1):
try:
logger.debug(f"Requesting OSS upload path for {file['file_name']} (attempt {get_oss_key_attempt}/{get_oss_key_max_retries})")
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
logger.info(f"Successfully obtained OSS upload path for {file['file_name']}")
except requests.RequestException as e:
if get_oss_key_attempt == get_oss_key_max_retries:
error_msg = f"Failed to request OSS upload path for {file['file_name']}: {e}"
logger.error(error_msg)
raise RuntimeError(error_msg)
else:
get_oss_key_wait_time = get_oss_key_backoff_factor ** (get_oss_key_attempt - 1)
logger.warning(f"OSS upload path request failed for {file['file_name']} (attempt {get_oss_key_attempt}), retrying in {get_oss_key_wait_time}s: {e}")
time.sleep(get_oss_key_wait_time)
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
error_msg = f"Unexpected response while requesting upload path for {file['file_name']}: {data}"
logger.error(error_msg)
raise RuntimeError(error_msg)
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
error_msg = f"No OSS upload bucket/key returned from API for {file['file_name']}: {data}"
logger.error(error_msg)
raise RuntimeError(error_msg)
file_info["bucket"] = oss_upload_bucket
file_info["key"] = oss_upload_key
logger.debug(f"OSS bucket: {oss_upload_bucket}, key: {oss_upload_key}")
# Step 2: Upload file to OSS path
max_retries = 3
......@@ -76,36 +114,53 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
for attempt in range(1, max_retries + 1):
try:
logger.debug(f"Uploading {file['file_name']} to OSS (attempt {attempt}/{max_retries})")
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file["file_content"])
succeeded_files.append(file_info)
logger.info(f"Successfully uploaded {file['file_name']} to OSS")
break
except e:
except Exception as e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
error_msg = f"OSS upload failed after {max_retries} attempts for {file['file_name']}: {e}"
logger.error(error_msg)
raise RuntimeError(error_msg)
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
logger.warning(f"OSS upload attempt {attempt} failed for {file['file_name']}, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
except RuntimeError as e:
logger.error(f"Failed to process file {file['file_name']}: {e}")
failed_files.append(file_info)
# Step 3: Report upload completion, get task id
logger.info(f"Reporting upload completion: {len(succeeded_files)} succeeded, {len(failed_files)} failed")
try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue/batch"
report_payload = [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in succeeded_files]
report_payload += [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in failed_files]
logger.debug(f"Report payload: {json.dumps(report_payload)}")
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status()
report_data = report_response.json()
logger.info("Successfully reported upload completion to API")
except requests.RequestException as e:
raise RuntimeError(f"Failed to report completed upload: {e}")
error_msg = f"Failed to report completed upload: {e}"
logger.error(error_msg)
raise RuntimeError(error_msg)
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"], "failed": [file["file_name"] for file in failed_files]}
task_id = report_data["result"]["task_id"]
failed_file_names = [file["file_name"] for file in failed_files]
logger.info(f"Ingestion task completed successfully. Task ID: {task_id}, Failed files: {failed_file_names}")
if failed_file_names:
logger.warning(f"Some files failed to ingest: {failed_file_names}")
return {"task_id": task_id, "failed": failed_file_names}
else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
error_msg = f"Unexpected response while reporting upload: {report_data}"
logger.error(error_msg)
raise RuntimeError(error_msg)
def query_task_state(task_id: str) -> Dict[str, Any]:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment