Commit df84a03f authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Added L2 pipeline run submission and querying

parent 06548873
......@@ -270,4 +270,80 @@ filter = {
},
"ratio_disk": -9999,
}
```
# 6. Trigger a Pipeline Run
Trigger execution of a Level-2 data processing pipeline for a specified batch.
This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns the pipeline run_id when successful.
The run_id can be used to [query the current state](#7-query-a-pipeline-run-state) of the corresponding run.
## Function: `run_pipeline`
```python
def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
"""
Submitts a pipeline run for execution with the provided batch.
Retries up to 3 times on transient/network errors.
Args:
batch: A dictionary describing the batch to run.
{
"dag_group": str,
"dag": str,
"batch_id": str,
}
Returns:
dict: A dictionary containing the run_id of the newly submitted pipeline run.
Example:
{
"run_id": "3"
}
Raises:
RuntimeError: If the pipeline API request fails or returns an invalid response
after all retry attempts.
"""
# Example:
from csst_fs import run_pipeline
batch = {
"dag_group": "dag_group_name",
"dag": "dag_name",
"batch_id": "batch_1"
}
result = run_pipeline(batch)
# result: {'run_id': '4'}
```
# 7. Query a Pipeline Run State
Query the state of a pipeline run given an id (obtained from [run_pipeline](#6-trigger-a-pipeline-run))
## Function: `query_run_state`
```python
def query_run_state(
run_id: str
) -> Dict[str, Any]
"""
Query the processing state of a pipeline run given an id.
Args:
run_id: Run id of the pipeline run.
Returns:
Dictionary of the following format, including information about the current state of the corresponding run.
Possible values are "running" and "completed"
E.g.
{
"state": "running",
}
"""
# Example
from csst_fs import query_run_state
result = query_run_state("4")
# result: {'state': 'completed'}
```
\ No newline at end of file
......@@ -6,4 +6,5 @@ from . import s3_fs
from . import fs
from .catalog.metadata import query_metadata
from .catalog.star import query_star_catalog
from .ingestion.level2 import start_ingestion_task, query_task_state
\ No newline at end of file
from .ingestion.level2 import start_ingestion_task, query_task_state
from .pipeline.pipeline import run_pipeline, PipelineBatch, query_run_state
\ No newline at end of file
import time
import requests
from typing import Dict, List, Any, TypedDict
from ..s3_config import load_backend_settings
class PipelineBatch(TypedDict):
dag_group: str
dag: str
batch_id: str
def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
"""
Trigger a pipeline run for the provided batch.
Retries up to 3 times on transient/network errors.
Args:
batch: PipelineBatch TypedDict with keys: dag_group, dag, batch_id
Returns:
Dict containing the run_id, e.g. {"run_id": "3"}
Raises:
RuntimeError: on permanent failure or invalid response
"""
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
pipeline_run_endpoint = f"{api_url}/level2/pipeline/run"
max_retries = 3
backoff_base = 1
headers = {"Content-Type": "application/json"}
for attempt in range(1, max_retries + 1):
try:
resp = requests.post(pipeline_run_endpoint, json=batch, headers=headers, timeout=30)
resp.raise_for_status()
try:
data = resp.json()
except ValueError as e:
raise RuntimeError(f"Invalid JSON response from pipeline API: {e}")
if not data.get("success", False):
raise RuntimeError(f"Pipeline API returned unsuccessful response: {data.get('code')} {data.get('message')}")
result = data.get("result")
if not isinstance(result, dict):
raise RuntimeError(f"Unexpected pipeline API result shape: {data}")
run_id = result.get("run_id")
if not run_id:
raise RuntimeError(f"Pipeline API did not return run_id: {data}")
return {"run_id": str(run_id)}
except (requests.RequestException, RuntimeError) as exc:
if attempt == max_retries:
raise RuntimeError(f"Failed to run pipeline after {max_retries} attempts: {exc}") from exc
wait = backoff_base * (2 ** (attempt - 1))
time.sleep(wait)
def query_run_state(run_id: str) -> Dict[str, Any]:
"""
Query the state of a pipeline run given an id.
Args:
run_id: Run id of the pipeline run
Returns:
Dictionary of the format:
{
"state": "submission_pending",
}
"""
if not run_id:
raise ValueError("run_id must be provided")
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/level2/pipeline/run/{run_id}"
try:
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to query run state: {e}")
if not data.get("success"):
raise RuntimeError(f"Unexpected API response: {data}")
result = data.get("result")
if not result:
return {"state": "not_found"}
state = result.get("status")
return {"state": state if state else "unknown"}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment