Commit 75d3742d authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Added limit/offset to metadata query, fixed ingestion commit logic

parent 24b2f8de
import os
import requests import requests
from typing import Dict, Any, List from typing import Dict, Any, List
from ..s3_config import load_backend_settings from ..s3_config import load_backend_settings
...@@ -6,15 +5,19 @@ from ..s3_config import load_backend_settings ...@@ -6,15 +5,19 @@ from ..s3_config import load_backend_settings
def query_metadata( def query_metadata(
filter: Dict[str, Any], filter: Dict[str, Any],
key: List[str], key: List[str],
hdu: int = 0 hdu: int = 0,
offset: int = 0,
limit: int = 10,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Query for file info by metadata values. Query for file info by metadata values. Results are paginated using offset & limit and ordered ascending by database insert timestamp.
Args: Args:
filter: The filter dict described below. filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output. key: A list of string values, corresponding to metadata keys that should be included in the output.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1. hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
offset: Optional offset for paginated results (default 0).
limit: Optional limit for paginated results (default 10).
Returns: Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'. A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"] E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"]
...@@ -44,7 +47,9 @@ def query_metadata( ...@@ -44,7 +47,9 @@ def query_metadata(
payload = { payload = {
"filter": filter, "filter": filter,
"key": key, "key": key,
"hdu": hdu "hdu": hdu,
"offset": offset,
"limit": limit,
} }
try: try:
......
...@@ -42,9 +42,11 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]: ...@@ -42,9 +42,11 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
raise RuntimeError("CSST backend api url is not set") raise RuntimeError("CSST backend api url is not set")
request_upload_endpoint = f"{api_url}/datasync/level2/upload" request_upload_endpoint = f"{api_url}/datasync/level2/upload"
succeeded_files: List[IngestionFile] = []
failed_files: List[IngestionFile] = [] failed_files: List[IngestionFile] = []
for file in files: for file in files:
file_info = {"file_name": file["file_name"], "bucket": None, "key": None}
try: try:
# Step 1: Request an OSS upload path # Step 1: Request an OSS upload path
payload = {"fileName": file["file_name"]} payload = {"fileName": file["file_name"]}
...@@ -65,6 +67,9 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]: ...@@ -65,6 +67,9 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
if not oss_upload_bucket or not oss_upload_key: if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}") raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
file_info["bucket"] = oss_upload_bucket
file_info["key"] = oss_upload_key
# Step 2: Upload file to OSS path # Step 2: Upload file to OSS path
max_retries = 3 max_retries = 3
backoff_factor = 2 backoff_factor = 2
...@@ -73,6 +78,8 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]: ...@@ -73,6 +78,8 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
try: try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f: with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file["file_content"]) f.write(file["file_content"])
succeeded_files.append(file_info)
break
except e: except e:
if attempt == max_retries: if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}") raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
...@@ -81,13 +88,14 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]: ...@@ -81,13 +88,14 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...") print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time) time.sleep(wait_time)
except RuntimeError as e: except RuntimeError as e:
failed_files.append(file) failed_files.append(file_info)
# Step 3: Report upload completion, get task id # Step 3: Report upload completion, get task id
try: try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue/batch" report_upload_endpoint = f"{api_url}/datasync/level2/queue/batch"
report_payload = [{"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file["file_name"]} for file in files] report_payload = [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in succeeded_files]
report_payload += [{"bucket": file["bucket"], "key": file["key"], "fileName": file["file_name"]} for file in failed_files]
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30) report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status() report_response.raise_for_status()
report_data = report_response.json() report_data = report_response.json()
...@@ -95,7 +103,7 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]: ...@@ -95,7 +103,7 @@ def start_ingestion_task(files: List[IngestionFile]) -> Dict[str, Any]:
raise RuntimeError(f"Failed to report completed upload: {e}") raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]: if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"], "failed": [file.file_name for file in failed_files]} return {"task_id": report_data["result"]["task_id"], "failed": [file["file_name"] for file in failed_files]}
else: else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}") raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment