Commit 623bb5f8 authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Added L2 state query API

parent 984c6ce2
......@@ -4,4 +4,5 @@ from .fits import fsspec_HDUList
from .table import fsspec_table
from . import s3_fs
from . import fs
from .commit import level2
\ No newline at end of file
from .commit import level2
from .star_catalog import search
\ No newline at end of file
......@@ -2,6 +2,7 @@ import os
import base64
import requests
import time
from typing import Dict, Any
def submit_file_for_ingestion(file_content: bytes, file_name: str) -> dict:
"""
......@@ -54,4 +55,44 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> dict:
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"Attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
\ No newline at end of file
time.sleep(wait_time)
def query_processing_task_state(task_id: str) -> Dict[str, Any]:
"""
Query the processing state of a processing task given a L2 task id.
Args:
task_id: Task id of the L2 processing task
Returns:
Dictionary of the format:
{
"state": "submission_pending",
}
"""
if not task_id:
raise ValueError("task_id must be provided")
api_url = os.getenv("QUERY_TASK_STATE_API_URL")
if not api_url:
raise RuntimeError("QUERY_TASK_STATE_API_URL environment variable is not set")
endpoint = f"{api_url}/datasync/level2/{task_id}"
try:
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to query task state: {e}")
if not data.get("success"):
raise RuntimeError(f"Unexpected API response: {data}")
result = data.get("result")
if not result:
return {"state": "not_found"}
sync_state = result.get("syncState")
return {"state": sync_state if sync_state else "unknown"}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment