Commit e68f0ced authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

use click

parent 5559bac6
......@@ -27,7 +27,7 @@ RUN --mount=type=cache,id=apt,target=/var/cache/apt \
USER csst
# copy VCS roots into image
COPY --chown=csst:csst . /pipeline
RUN mv pkg/csst-dag/csst_dag/cli /pipeline/app && rm /pipeline/app/__init__.py
RUN mv pkg/csst-dag/csst_dag/docker/csst* /pipeline/app/
# install packages & requirements
RUN --mount=type=cache,id=pip,uid=9000,gid=9000,target=/home/csst/.cache \
pip install pkg/ccds \
......
#!/bin/bash
# 定义 Python CLI 的完整路径
# 如果当前文件夹下存在csst.py文件,就使用当前文件夹下的csst.py文件
if [[ -f "csst.py" ]]; then
PYTHON_CLI="./csst.py"
else
PYTHON_CLI="/pipeline/app/csst.py"
fi
# 检查 Python CLI 文件是否存在
if [[ ! -f "$PYTHON_CLI" ]]; then
echo "错误: 找不到 Python CLI 文件: $PYTHON_CLI"
exit 1
fi
# 将所有参数传递给 Python CLI
python "$PYTHON_CLI" "$@"
import click
import requests # For API calls
from typing import Optional
# Create main command group 'csst'
@click.group()
def csst():
"""CSST Pipeline Operation Tool"""
pass
# Create 'dag' subcommand group for DAG task management
@csst.group()
def dag():
"""DAG Operations"""
pass
# 1. Run task
@dag.command()
@click.option("--dags", required=True, help="Comma-separated list of DAG IDs to run")
@click.option("--conf", default="{}", help="Run configuration in JSON format")
@click.option("--run-id", help="Manually specify run ID (optional)")
def run(dags, conf, run_id):
"""Initiate a new DAG task"""
# Actual business logic here, e.g.:
# - Call Airflow API or your pipeline service
# - Validate parameters
# - Return run ID or result
click.echo(f"Submitting task: DAG_IDS={dags}, CONF={conf}, RUN_ID={run_id}")
# Simulate response
mock_run_id = run_id or "simulated_run_id_12345"
click.echo(f"Task submitted successfully. Run ID: {mock_run_id}")
# 2. Check task status
@dag.command()
@click.option("--dag-id", required=True, help="DAG ID")
@click.option("--run-id", help="Specific run ID (query latest if not provided)")
@click.option("--verbose", "-v", is_flag=True, help="Show detailed status information")
def status(dag_id, run_id, verbose):
"""Check status of a DAG task"""
# Actual business logic here
status_info = "success" # Simulated status
click.echo(f"DAG {dag_id} (Run ID: {run_id}) status: {status_info}")
if verbose:
click.echo("Detailed status information: ...")
# 3. View task logs
@dag.command()
@click.option("--dag-id", required=True, help="DAG ID")
@click.option("--run-id", required=True, help="Run ID")
@click.option("--task-id", help="View logs for specific task (optional)")
@click.option("--tail", type=int, help="Show only last N lines of logs")
def log(dag_id, run_id, task_id, tail):
"""View execution logs for a DAG task"""
# Actual business logic here
click.echo(f"Fetching logs: DAG_ID={dag_id}, RUN_ID={run_id}, TASK_ID={task_id}")
if tail:
click.echo(f"Showing last {tail} lines:")
# Simulate log output
click.echo("INFO - Task started...")
click.echo("INFO - Processing data...")
click.echo("INFO - Task completed successfully.")
# 4. Cancel task
@dag.command()
@click.option("--dag-id", required=True, help="DAG ID")
@click.option("--run-id", required=True, help="Run ID to cancel")
@click.option("--force", is_flag=True, help="Force cancellation without confirmation")
def cancel(dag_id, run_id, force):
"""Cancel a running DAG task"""
# Actual business logic here
confirm = force or click.confirm(f"Confirm cancellation of task {dag_id}/{run_id}?")
if confirm:
click.echo(f"Canceling task: DAG_ID={dag_id}, RUN_ID={run_id}")
# Call cancellation logic
else:
click.echo("Operation canceled")
# 5. View raw data (Level 0)
@csst.command()
@click.option("--obs-id", required=True, help="Observation ID")
@click.option("--dataset", help="Dataset name")
@click.option(
"--output", "-o", type=click.Path(), help="Download data to local path (optional)"
)
def raw(obs_id, dataset, output):
"""View raw observation data (Level 0)"""
click.echo(f"Querying raw data: OBS_ID={obs_id}, DATASET={dataset}")
if output:
click.echo(f"Data will be downloaded to: {output}")
# Display data information or download data
# 6. View data products
@csst.command()
@click.option("--product-id", required=True, help="Data product ID")
@click.option("--version", default="latest", help="Product version")
@click.option(
"--format",
type=click.Choice(["fits", "hdf5", "ascii"]),
default="fits",
help="Data format",
)
def product(product_id, version, format):
"""View processed data products"""
click.echo(
f"Querying data product: PRODUCT_ID={product_id}, VERSION={version}, FORMAT={format}"
)
# Display data product information
if __name__ == "__main__":
csst()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment