Commit 984c6ce2 authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Added star catalog search API

parent e8a4c67a
import os
import requests
from typing import Dict, Any, List
def query_star_catalog(
catalog_name: str,
filter: Dict[str, Any],
key: List[str],
) -> List[Dict[str, Any]]:
"""
Query a star catalog by column values given a ra, dec and radius preselection.
Args:
catalog_name: Name of the star catalog (e.g. msc_l1_mbi_catmix)
filter: The filter dict described below.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the column names that should be present in the return value.
Returns:
A List[Dict] of matching star catalog objects.
"""
for required_key in ("ra", "dec", "radius"):
if required_key not in filter or isinstance(filter[required_key], dict):
raise ValueError(f"Filter must contain scalar '{required_key}'")
if not key:
raise ValueError("Key list cannot be empty")
api_url = os.getenv("STAR_CATALOG_SEARCH_API_URL")
if not api_url:
raise RuntimeError("STAR_CATALOG_SEARCH_API_URL environment variable is not set")
endpoint = f"{api_url}/starcatalog/query"
payload = {
"catalogName": catalog_name,
"filter": filter,
"key": key,
}
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Star catalog query failed: {e}")
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment