Commit 25d93953 authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Change api triggering dag runs to conform to the same formats as csst-dfs-client

parent 4d21bf68
......@@ -349,32 +349,70 @@ filter = {
# 7. Trigger a Pipeline Run
Trigger execution of a Level-2 data processing pipeline for a specified batch.
This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns the pipeline run_id when successful.
Trigger execution of a data processing pipeline for a list of DAG group runs.
This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns a PipelineResult containing a list of run_id when successful.
The run_id can be used to [query the current state](#8-query-a-pipeline-run-state) of the corresponding run.
## Function: `run_pipeline`
## Function: `new_dag_group_run`
```python
def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
from csst_fs.pipeline import DagGroupRun, DagRun, PipelineResult
def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[DagRun]] = None) -> PipelineResult:
"""
Submitts a pipeline run for execution with the provided batch.
Trigger a pipeline run for the provided DAG group runs.
Retries up to 3 times on transient/network errors.
Args:
batch: A dictionary describing the batch to run.
dag_group_run: DagGroupRun TypedDict with keys:
{
"dag_group": str,
"dag": str,
"dag_group_run": str,
"batch_id": str,
"priority": int,
"created_time": str,
"queue_time": str,
}
dag_run_list: Optional list of DagRun TypedDicts. Each DagRun contains:
{
"dataset": str,
"instrument": str,
"obs_type": str,
"obs_group": str,
"obs_id": str,
"detector": str,
"filter": str,
"custom_id": str,
"batch_id": str,
"pmapname": str,
"ref_cat": str,
"dag_group": str,
"dag": str,
"dag_group_run": str,
"dag_run": str,
"priority": int,
"data_list": List[str],
"extra_kwargs": dict,
"created_time": str,
"rerun": int,
"status_code": int,
"n_file_expected": int,
"n_file_found": int,
"object": str,
"proposal_id": str,
}
Returns:
dict: A dictionary containing the run_id of the newly submitted pipeline run.
PipelineResult: A result object containing:
- code: int (200 for success)
- message: str (response message)
- data: dict containing the run_id
Example:
{
"run_id": "3"
"code": 200,
"message": "Success",
"data": {"run_id": ["3"]}
}
Raises:
......@@ -383,19 +421,57 @@ def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
"""
# Example:
from csst_fs import run_pipeline
batch = {
"dag_group": "dag_group_name",
"dag": "dag_name",
"batch_id": "batch_1"
}
result = run_pipeline(batch)
# result: {'run_id': '4'}
from csst_fs.pipeline import new_dag_group_run, DagGroupRun, DagRun
dag_group_run = {
'dag_group': 'default',
'dag_group_run': '107217c85a1bd3189c47c593e9553b22f6e9a880',
'batch_id': 'default',
'priority': 1,
'created_time': '2025-12-12T01:26:36.080'
}
dag_run_list = [
{
'dataset': 'test-msc-c9-25sqdeg-v3',
'instrument': 'MSC',
'obs_type': 'WIDE',
'obs_group': 'W5',
'obs_id': '10100285453',
'detector': '06',
'filter': '',
'custom_id': '',
'batch_id': 'default',
'pmapname': '',
'ref_cat': '',
'dag_group': 'default',
'dag': 'csst-msc-l1-mbi',
'dag_group_run': '107217c85a1bd3189c47c593e9553b22f6e9a880',
'dag_run': '192be2b0e6c8b531b2ad70afab880b23af546f2d',
'priority': 1,
'data_list': ['692566f668bc9fe08221713d'],
'extra_kwargs': {},
'created_time': '2025-12-12T01:26:36.080',
'rerun': -1,
'status_code': -1024,
'n_file_expected': 1,
'n_file_found': 1,
'object': '',
'proposal_id': ''
},
{
...
}
]
result = new_dag_group_run(dag_group_run, dag_run_list)
# result.success: True
# result.data: {'run_id': ['4']}
```
# 8. Query a Pipeline Run State
Query the state of a pipeline run given an id (obtained from [run_pipeline](#7-trigger-a-pipeline-run))
Query the state of a pipeline run given an id (obtained from [new_dag_group_run](#7-trigger-a-pipeline-run))
## Function: `query_run_state`
```python
......
......@@ -7,5 +7,6 @@ from . import fs
from .catalog.metadata import query_metadata
from .catalog.star import query_catalog, query_catalog_with_metadata
from .ingestion.level2 import start_ingestion_task, query_task_state
from .pipeline.pipeline import run_pipeline, PipelineBatch, query_run_state
from .log_config import configure_logging, get_log_directory, get_log_file_path, LazyFileHandler
\ No newline at end of file
from .pipeline.pipeline import new_dag_group_run, PipelineBatch, query_run_state
from .log_config import configure_logging, get_log_directory, get_log_file_path, LazyFileHandler
from .pipeline.pipeline_result import PipelineResult
\ No newline at end of file
from typing import Dict, List, Any, TypedDict
class DagGroupRun(TypedDict):
dag_group: str
dag_group_run: str
batch_id: str
priority: int
created_time: str
queue_time: str
class DagRun(TypedDict):
dataset: str
instrument: str
obs_type: str
obs_group: str
obs_id: str
detector: str
filter: str
custom_id: str
batch_id: str
pmapname: str
ref_cat: str
dag_group: str
dag: str
dag_group_run: str
dag_run: str
priority: int
data_list: List[str]
extra_kwargs: dict
created_time: str
rerun: int
status_code: int
n_file_expected: int
n_file_found: int
object: str
proposal_id: str
import time
import requests
from typing import Dict, List, Any, TypedDict
from typing import Dict, List, Any, TypedDict, Optional
from ..s3_config import load_backend_settings
from .pipeline_result import PipelineResult
from .dag_run import DagGroupRun, DagRun
class PipelineBatch(TypedDict):
dag_group: str
dag: str
batch_id: str
def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
def new_dag_group_run(dag_group_run: DagGroupRun, dag_run_list: Optional[List[DagRun]] = None) -> PipelineResult:
"""
Trigger a pipeline run for the provided batch.
......@@ -34,7 +36,7 @@ def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
headers = {"Content-Type": "application/json"}
for attempt in range(1, max_retries + 1):
try:
resp = requests.post(pipeline_run_endpoint, json=batch, headers=headers, timeout=30)
resp = requests.post(pipeline_run_endpoint, json={"dag_group_run": dag_group_run, "dag_run_list": dag_run_list}, headers=headers, timeout=30)
resp.raise_for_status()
try:
......@@ -52,7 +54,11 @@ def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
if not run_id:
raise RuntimeError(f"Pipeline API did not return run_id: {data}")
return {"run_id": str(run_id)}
pipeline_result: PipelineResult = PipelineResult()
pipeline_result["code"] = data.get("code")
pipeline_result["message"] = data.get("message")
pipeline_result["data"] = data.get("result")
return pipeline_result
except (requests.RequestException, RuntimeError) as exc:
if attempt == max_retries:
......
# Adapted from csst-dfs.csst-dfs-client.csst-dfs-client.common.__init__ (20251212)
class PipelineResult(dict):
'''
类Result用于封装返回结果,包含success、message、data三个属性:
success (bool): 是否成功
message (str): 消息
data (any): 数据
'''
__setattr__ = dict.__setitem__
__getattr__ = dict.__getitem__
def __init__(self):
super(PipelineResult, self).__init__()
self["code"] = 200
self["message"] = ""
self["data"] = None
@property
def success(self) -> bool:
return self["code"] == 200
@property
def data(self) -> any:
return self["data"]
@property
def message(self) -> str:
return str(self["message"])
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment