Commit 476195e4 authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Merge branch 'search-and-commit-api' into main

parents 8927556e c6c642a7
Pipeline #11111 passed with stage
...@@ -4,6 +4,9 @@ This repository provides the following functionalities: ...@@ -4,6 +4,9 @@ This repository provides the following functionalities:
3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values) 3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values)
4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) 4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state)
5. [Query a Star Catalog](#5-query-a-star-catalog) 5. [Query a Star Catalog](#5-query-a-star-catalog)
6. [Query a Star Catalog Corresponding to Metadata Entries](#6-query-a-star-catalog-corresponding-to-metadata-entries)
7. [Trigger a Pipeline Run](#7-trigger-a-pipeline-run)
8. [Query a Pipeline Run State](#8-query-a-pipeline-run-state)
# 1. Read or Download a File from S3 storage # 1. Read or Download a File from S3 storage
Supported are two distinct ways of reading from s3 storage. Supported are two distinct ways of reading from s3 storage.
...@@ -241,6 +244,59 @@ def query_star_catalog( ...@@ -241,6 +244,59 @@ def query_star_catalog(
] ]
""" """
``` ```
# 6. Query a Star Catalog Corresponding to Metadata Entries
First queries the metadata catalog, based on that subsequently queries the star catalog.
## Function `query_star_catalog_with_metadata`
```python
def query_star_catalog_with_metadata(
metadata: Dict[str, Any],
star_catalog: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""
Queries the metadata catalog according to the provided filter criteria and HDU value.
Subsequently queries the star catalog entries corresponding to the metadata results and
the given additional filters.
Returns the catalog columns specified in the 'key' list.
Args:
metadata: {
filter: filter dict described below.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
},
star_catalog: {
catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
filter: filter dict described below.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the column names that should be present in the return value.
}
Example:
from csst_fs import *
query_star_catalog_with_metadata(
star_catalog={
"catalogName": "csst-msc-l1-mbi-catmix",
"key": ["data_uuid", "obsid", "ra"],
"filter": {"ra": 130.97, "dec": -20.5, "radius": 0.09, "x": {"lt": 30}},
},
metadata={
"filter": {"priority": {"gte": 2}, "obs_id": 66},
"hdu": 0,
}
)
Returns:
A List[Dict] of matching star catalog objects.
"""
```
## Filter Syntax ## Filter Syntax
All filters are combined with logical AND (every clause must match). All filters are combined with logical AND (every clause must match).
1) String equality 1) String equality
...@@ -271,3 +327,79 @@ filter = { ...@@ -271,3 +327,79 @@ filter = {
"ratio_disk": -9999, "ratio_disk": -9999,
} }
``` ```
# 7. Trigger a Pipeline Run
Trigger execution of a Level-2 data processing pipeline for a specified batch.
This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns the pipeline run_id when successful.
The run_id can be used to [query the current state](#8-query-a-pipeline-run-state) of the corresponding run.
## Function: `run_pipeline`
```python
def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
"""
Submitts a pipeline run for execution with the provided batch.
Retries up to 3 times on transient/network errors.
Args:
batch: A dictionary describing the batch to run.
{
"dag_group": str,
"dag": str,
"batch_id": str,
}
Returns:
dict: A dictionary containing the run_id of the newly submitted pipeline run.
Example:
{
"run_id": "3"
}
Raises:
RuntimeError: If the pipeline API request fails or returns an invalid response
after all retry attempts.
"""
# Example:
from csst_fs import run_pipeline
batch = {
"dag_group": "dag_group_name",
"dag": "dag_name",
"batch_id": "batch_1"
}
result = run_pipeline(batch)
# result: {'run_id': '4'}
```
# 8. Query a Pipeline Run State
Query the state of a pipeline run given an id (obtained from [run_pipeline](#7-trigger-a-pipeline-run))
## Function: `query_run_state`
```python
def query_run_state(
run_id: str
) -> Dict[str, Any]
"""
Query the processing state of a pipeline run given an id.
Args:
run_id: Run id of the pipeline run.
Returns:
Dictionary of the following format, including information about the current state of the corresponding run.
Possible values are "running" and "completed"
E.g.
{
"state": "running",
}
"""
# Example
from csst_fs import query_run_state
result = query_run_state("4")
# result: {'state': 'completed'}
```
\ No newline at end of file
...@@ -5,5 +5,6 @@ from .table import fsspec_table ...@@ -5,5 +5,6 @@ from .table import fsspec_table
from . import s3_fs from . import s3_fs
from . import fs from . import fs
from .catalog.metadata import query_metadata from .catalog.metadata import query_metadata
from .catalog.star import query_star_catalog from .catalog.star import query_star_catalog, query_star_catalog_with_metadata
from .ingestion.level2 import start_ingestion_task, query_task_state from .ingestion.level2 import start_ingestion_task, query_task_state
from .pipeline.pipeline import run_pipeline, PipelineBatch, query_run_state
\ No newline at end of file
import os import os
import requests import requests
from typing import Dict, Any, List from typing import Dict, Any, List
from pydantic import BaseModel, Field, validator
from ..s3_config import load_backend_settings from ..s3_config import load_backend_settings
class StarCatalogQuery(BaseModel):
catalogName: str
key: List[str]
filter: Dict[str, Any]
@validator("key")
def key_not_empty(cls, v):
if not v:
raise ValueError("key list cannot be empty")
return v
class MetadataQuery(BaseModel):
filter: Dict[str, Any]
hdu: int = Field(default=0)
class CombinedCatalogMetadataRequest(BaseModel):
starCatalog: StarCatalogQuery
metadata: MetadataQuery
def query_star_catalog_with_metadata(
metadata: Dict[str, Any],
star_catalog: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""
Queries the metadata catalog according to the provided filter criteria and HDU value.
Subsequently queries the star catalog entries corresponding to the metadata results and
the given additional filters.
Returns the catalog columns specified in the 'key' list.
Args:
metadata: {
filter: filter dict described in the README filter syntax section.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
},
star_catalog: {
catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
filter: filter dict described in the README filter syntax section.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the column names that should be present in the return value.
}
Example:
from csst_fs import *
query_starcatalog_with_metadata(
... star_catalog={
... "catalogName": "csst-msc-l1-mbi-catmix",
... "key": ["data_uuid", "obsid", "ra"],
... "filter": {"ra": 130.97, "dec": -20.5, "radius": 0.09, "x": {"lt": 30}},
... },
... metadata={
... "filter": {"priority": {"gte": 2}, "obs_id": 66},
... "hdu": 0,
... }
... )
Returns:
A List[Dict] of matching star catalog objects.
"""
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/starcatalog/query/metadata"
star_catalog_model = StarCatalogQuery(**star_catalog)
metadata_model = MetadataQuery(**metadata)
payload = CombinedCatalogMetadataRequest(
starCatalog=star_catalog_model,
metadata=metadata_model,
).dict()
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
def query_star_catalog( def query_star_catalog(
catalog_name: str, catalog_name: str,
filter: Dict[str, Any], filter: Dict[str, Any],
......
import time
import requests
from typing import Dict, List, Any, TypedDict
from ..s3_config import load_backend_settings
class PipelineBatch(TypedDict):
dag_group: str
dag: str
batch_id: str
def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
"""
Trigger a pipeline run for the provided batch.
Retries up to 3 times on transient/network errors.
Args:
batch: PipelineBatch TypedDict with keys: dag_group, dag, batch_id
Returns:
Dict containing the run_id, e.g. {"run_id": "3"}
Raises:
RuntimeError: on permanent failure or invalid response
"""
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
pipeline_run_endpoint = f"{api_url}/level2/pipeline/run"
max_retries = 3
backoff_base = 1
headers = {"Content-Type": "application/json"}
for attempt in range(1, max_retries + 1):
try:
resp = requests.post(pipeline_run_endpoint, json=batch, headers=headers, timeout=30)
resp.raise_for_status()
try:
data = resp.json()
except ValueError as e:
raise RuntimeError(f"Invalid JSON response from pipeline API: {e}")
if not data.get("success", False):
raise RuntimeError(f"Pipeline API returned unsuccessful response: {data.get('code')} {data.get('message')}")
result = data.get("result")
if not isinstance(result, dict):
raise RuntimeError(f"Unexpected pipeline API result shape: {data}")
run_id = result.get("run_id")
if not run_id:
raise RuntimeError(f"Pipeline API did not return run_id: {data}")
return {"run_id": str(run_id)}
except (requests.RequestException, RuntimeError) as exc:
if attempt == max_retries:
raise RuntimeError(f"Failed to run pipeline after {max_retries} attempts: {exc}") from exc
wait = backoff_base * (2 ** (attempt - 1))
time.sleep(wait)
def query_run_state(run_id: str) -> Dict[str, Any]:
"""
Query the state of a pipeline run given an id.
Args:
run_id: Run id of the pipeline run
Returns:
Dictionary of the format:
{
"state": "submission_pending",
}
"""
if not run_id:
raise ValueError("run_id must be provided")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/level2/pipeline/run/{run_id}"
try:
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to query run state: {e}")
if not data.get("success"):
raise RuntimeError(f"Unexpected API response: {data}")
result = data.get("result")
if not result:
return {"state": "not_found"}
state = result.get("status")
return {"state": state if state else "unknown"}
\ No newline at end of file
...@@ -8,7 +8,8 @@ setup( ...@@ -8,7 +8,8 @@ setup(
'astropy>=5.3', 'astropy>=5.3',
'fsspec>=2024.5.0', 'fsspec>=2024.5.0',
's3fs>=2024.5.0', 's3fs>=2024.5.0',
'requests' 'requests',
'pydantic'
], ],
python_requires='>=3.9', python_requires='>=3.9',
description='csst pipeline handle file in local file system and remote s3 file system', description='csst pipeline handle file in local file system and remote s3 file system',
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment