Commit abfa78d0 authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Add batching behavior in new_dag_group_run

parent 25d93953
...@@ -350,9 +350,8 @@ filter = { ...@@ -350,9 +350,8 @@ filter = {
# 7. Trigger a Pipeline Run # 7. Trigger a Pipeline Run
Trigger execution of a data processing pipeline for a list of DAG group runs. Trigger execution of a data processing pipeline for a list of DAG group runs.
This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns a PipelineResult containing a list of run_id when successful. This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns a PipelineResult.
If the list of dag runs is larger than 512, it will be submitted in batches of 512.
The run_id can be used to [query the current state](#8-query-a-pipeline-run-state) of the corresponding run.
## Function: `new_dag_group_run` ## Function: `new_dag_group_run`
```python ```python
...@@ -407,7 +406,7 @@ def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[Da ...@@ -407,7 +406,7 @@ def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[Da
PipelineResult: A result object containing: PipelineResult: A result object containing:
- code: int (200 for success) - code: int (200 for success)
- message: str (response message) - message: str (response message)
- data: dict containing the run_id - data: dict
Example: Example:
{ {
"code": 200, "code": 200,
......
...@@ -5,6 +5,8 @@ from ..s3_config import load_backend_settings ...@@ -5,6 +5,8 @@ from ..s3_config import load_backend_settings
from .pipeline_result import PipelineResult from .pipeline_result import PipelineResult
from .dag_run import DagGroupRun, DagRun from .dag_run import DagGroupRun, DagRun
_DAG_RUN_BATCH_SIZE: int = 512
class PipelineBatch(TypedDict): class PipelineBatch(TypedDict):
dag_group: str dag_group: str
dag: str dag: str
...@@ -12,15 +14,18 @@ class PipelineBatch(TypedDict): ...@@ -12,15 +14,18 @@ class PipelineBatch(TypedDict):
def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[DagRun]] = None) -> PipelineResult: def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[DagRun]] = None) -> PipelineResult:
""" """
Trigger a pipeline run for the provided batch. Trigger a pipeline run for the provided DAG group run.
Retries up to 3 times on transient/network errors. Retries up to 3 times on transient/network errors for each batch.
If dag_run_list is provided and exceeds batch size, splits it into batches of 512 runs.
Args: Args:
batch: PipelineBatch TypedDict with keys: dag_group, dag, batch_id dag_group_run: DagGroupRun TypedDict with keys: dag_group, dag_group_run, batch_id, priority, created_time, queue_time
dag_run_list: Optional list of DagRun TypedDicts. If None or empty, submits without runs.
Returns: Returns:
Dict containing the run_id, e.g. {"run_id": "3"} PipelineResult containing the run_id(s). If multiple batches are submitted, returns the last successful result.
If any batch fails, returns the failed result immediately.
Raises: Raises:
RuntimeError: on permanent failure or invalid response RuntimeError: on permanent failure or invalid response
...@@ -30,19 +35,59 @@ def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[Da ...@@ -30,19 +35,59 @@ def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[Da
raise RuntimeError("CSST backend api url is not set") raise RuntimeError("CSST backend api url is not set")
pipeline_run_endpoint = f"{api_url}/level2/pipeline/run" pipeline_run_endpoint = f"{api_url}/level2/pipeline/run"
# If dag_run_list is None or empty, submit once with empty list
if dag_run_list is None or len(dag_run_list) == 0:
return _submit_pipeline_batch(pipeline_run_endpoint, dag_group_run, [])
# Process dag_run_list in batches
results = []
for i in range(0, len(dag_run_list), _DAG_RUN_BATCH_SIZE):
batch = dag_run_list[i:i + _DAG_RUN_BATCH_SIZE]
result = _submit_pipeline_batch(pipeline_run_endpoint, dag_group_run, batch)
results.append(result)
if not result.success:
# If any batch fails, return the failed result immediately
return result
# If all batches succeed, return the last result
return results[-1]
def _submit_pipeline_batch(endpoint: str, dag_group_run: DagGroupRun, dag_run_batch: List[DagRun]) -> PipelineResult:
"""
Internal helper to submit a single batch of pipeline runs with retry logic.
Args:
endpoint: API endpoint URL
dag_group_run: DagGroupRun configuration
dag_run_batch: List of DagRun items (can be empty)
Returns:
PipelineResult with the API response
Raises:
RuntimeError: on permanent failure after all retries
"""
max_retries = 3 max_retries = 3
backoff_base = 1 backoff_base = 1
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
for attempt in range(1, max_retries + 1): for attempt in range(1, max_retries + 1):
try: try:
resp = requests.post(pipeline_run_endpoint, json={"dag_group_run": dag_group_run, "dag_run_list": dag_run_list}, headers=headers, timeout=30) resp = requests.post(
endpoint,
json={"dag_group_run": dag_group_run, "dag_run_list": dag_run_batch},
headers=headers,
timeout=30
)
resp.raise_for_status() resp.raise_for_status()
try: try:
data = resp.json() data = resp.json()
except ValueError as e: except ValueError as e:
raise RuntimeError(f"Invalid JSON response from pipeline API: {e}") raise RuntimeError(f"Invalid JSON response from pipeline API: {e}")
if not data.get("success", False): if not data.get("success", False):
raise RuntimeError(f"Pipeline API returned unsuccessful response: {data.get('code')} {data.get('message')}") raise RuntimeError(f"Pipeline API returned unsuccessful response: {data.get('code')} {data.get('message')}")
...@@ -51,7 +96,7 @@ def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[Da ...@@ -51,7 +96,7 @@ def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[Da
raise RuntimeError(f"Unexpected pipeline API result shape: {data}") raise RuntimeError(f"Unexpected pipeline API result shape: {data}")
run_id = result.get("run_id") run_id = result.get("run_id")
if not run_id: if run_id == None:
raise RuntimeError(f"Pipeline API did not return run_id: {data}") raise RuntimeError(f"Pipeline API did not return run_id: {data}")
pipeline_result: PipelineResult = PipelineResult() pipeline_result: PipelineResult = PipelineResult()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment