import click import requests # For API calls from typing import Optional # Create main command group 'csst' @click.group() def csst(): """CSST Pipeline Operation Tool""" pass # Create 'dag' subcommand group for DAG task management @csst.group() def dag(): """DAG Operations""" pass # 1. Run task @dag.command() @click.option("--dags", required=True, help="Comma-separated list of DAG IDs to run") @click.option("--conf", default="{}", help="Run configuration in JSON format") @click.option("--run-id", help="Manually specify run ID (optional)") def run(dags, conf, run_id): """Initiate a new DAG task""" # Actual business logic here, e.g.: # - Call Airflow API or your pipeline service # - Validate parameters # - Return run ID or result click.echo(f"Submitting task: DAG_IDS={dags}, CONF={conf}, RUN_ID={run_id}") # Simulate response mock_run_id = run_id or "simulated_run_id_12345" click.echo(f"Task submitted successfully. Run ID: {mock_run_id}") # 2. Check task status @dag.command() @click.option("--dag-id", required=True, help="DAG ID") @click.option("--run-id", help="Specific run ID (query latest if not provided)") @click.option("--verbose", "-v", is_flag=True, help="Show detailed status information") def status(dag_id, run_id, verbose): """Check status of a DAG task""" # Actual business logic here status_info = "success" # Simulated status click.echo(f"DAG {dag_id} (Run ID: {run_id}) status: {status_info}") if verbose: click.echo("Detailed status information: ...") # 3. View task logs @dag.command() @click.option("--dag-id", required=True, help="DAG ID") @click.option("--run-id", required=True, help="Run ID") @click.option("--task-id", help="View logs for specific task (optional)") @click.option("--tail", type=int, help="Show only last N lines of logs") def log(dag_id, run_id, task_id, tail): """View execution logs for a DAG task""" # Actual business logic here click.echo(f"Fetching logs: DAG_ID={dag_id}, RUN_ID={run_id}, TASK_ID={task_id}") if tail: click.echo(f"Showing last {tail} lines:") # Simulate log output click.echo("INFO - Task started...") click.echo("INFO - Processing data...") click.echo("INFO - Task completed successfully.") # 4. Cancel task @dag.command() @click.option("--dag-id", required=True, help="DAG ID") @click.option("--run-id", required=True, help="Run ID to cancel") @click.option("--force", is_flag=True, help="Force cancellation without confirmation") def cancel(dag_id, run_id, force): """Cancel a running DAG task""" # Actual business logic here confirm = force or click.confirm(f"Confirm cancellation of task {dag_id}/{run_id}?") if confirm: click.echo(f"Canceling task: DAG_ID={dag_id}, RUN_ID={run_id}") # Call cancellation logic else: click.echo("Operation canceled") # 5. View raw data (Level 0) @csst.command() @click.option("--obs-id", required=True, help="Observation ID") @click.option("--dataset", help="Dataset name") @click.option( "--output", "-o", type=click.Path(), help="Download data to local path (optional)" ) def raw(obs_id, dataset, output): """View raw observation data (Level 0)""" click.echo(f"Querying raw data: OBS_ID={obs_id}, DATASET={dataset}") if output: click.echo(f"Data will be downloaded to: {output}") # Display data information or download data # 6. View data products @csst.command() @click.option("--product-id", required=True, help="Data product ID") @click.option("--version", default="latest", help="Product version") @click.option( "--format", type=click.Choice(["fits", "hdf5", "ascii"]), default="fits", help="Data format", ) def product(product_id, version, format): """View processed data products""" click.echo( f"Querying data product: PRODUCT_ID={product_id}, VERSION={version}, FORMAT={format}" ) # Display data product information if __name__ == "__main__": csst()