import click from typing import Optional import os import glob import shutil # to test locally: # export PATH=${PWD}:$PATH # locate dot csst EXE_PATH = os.path.abspath(__file__) APP_DIR = os.path.dirname(EXE_PATH) DOT_CSST = os.path.join(APP_DIR, ".csst") assert os.path.exists(DOT_CSST), f"`{DOT_CSST}` does not exist!" # Create main command group 'csst' @click.group() def csst(): """CSST Pipeline Operation Tool""" pass @csst.group() def env(): """Environment file / variables""" pass @env.command() def list(): """List all available environment configurations""" # 使用 glob 查找所有 .env 文件 env_file_pattern = os.path.join(APP_DIR, "envs/*.env") env_files = glob.glob(env_file_pattern) if not env_files: click.echo( f"No predefined env files found with pattern: {env_file_pattern}", err=True, ) return click.echo("Available predefined env files:") for file_path in env_files: # 从文件路径中提取配置名称(去掉目录和扩展名) config_name = os.path.splitext(os.path.basename(file_path))[0] click.echo(f" - {config_name}") @env.command() @click.option( "--name", "-n", help="Use a predefined env file (e.g., csu, production, development)", required=True, ) def show(name): """Show environment variables using a predefined env file""" if not name: # 如果没有提供 --name 参数,执行默认的初始化操作 click.echo("Performing default env file initialization...") # 这里可以添加你的默认初始化逻辑 click.echo( "Use --name option to load a specific env file or use 'env list' to see available options." ) return # 使用 glob 获取所有可用的环境文件 env_file = os.path.join(APP_DIR, f"envs/{name}.env") # 读取并打印文件内容 try: with open(env_file, "r") as file: content = file.read() click.echo(content) except IOError as e: click.echo(f"Error reading file: {e}", err=True) @env.command() @click.option( "--name", "-n", help="Use a predefined env file (e.g., csu, zjlab, naoc)", required=True, type=click.Choice(["csu", "zjlab", "naoc"]), ) def set(name): """Set environment variables using a predefined env file""" if not name: # 如果没有提供 --name 参数,执行默认的初始化操作 click.echo("Performing default env file initialization...") # 这里可以添加你的默认初始化逻辑 click.echo( "Use --name option to load a specific env file or use 'env list' to see available options." ) return # 构建完整的文件路径 source_env_file_path = os.path.join(APP_DIR, f"envs/{name}.env") target_env_file_path = os.path.join(DOT_CSST, f".env") shutil.copy(source_env_file_path, target_env_file_path) click.echo(f"Environment file {name} set successfully.") return # Create 'dag' subcommand group for DAG task management @csst.group() def dag(): """DAG Operations""" pass # 1. Run task @dag.command() @click.option("--dags", required=True, help="Comma-separated list of DAG IDs to run") @click.option("--conf", default="{}", help="Run configuration in JSON format") @click.option("--run-id", help="Manually specify run ID (optional)") def run(dags, conf, run_id): """Initiate a new DAG task""" # Actual business logic here, e.g.: # - Call Airflow API or your pipeline service # - Validate parameters # - Return run ID or result click.echo(f"Submitting task: DAG_IDS={dags}, CONF={conf}, RUN_ID={run_id}") # Simulate response mock_run_id = run_id or "simulated_run_id_12345" click.echo(f"Task submitted successfully. Run ID: {mock_run_id}") # 2. Check task status @dag.command() @click.option("--dag-id", required=True, help="DAG ID") @click.option("--run-id", help="Specific run ID (query latest if not provided)") @click.option("--verbose", "-v", is_flag=True, help="Show detailed status information") def status(dag_id, run_id, verbose): """Check status of a DAG task""" # Actual business logic here status_info = "success" # Simulated status click.echo(f"DAG {dag_id} (Run ID: {run_id}) status: {status_info}") if verbose: click.echo("Detailed status information: ...") # 3. View task logs @dag.command() @click.option("--dag-id", required=True, help="DAG ID") @click.option("--run-id", required=True, help="Run ID") @click.option("--task-id", help="View logs for specific task (optional)") @click.option("--tail", type=int, help="Show only last N lines of logs") def log(dag_id, run_id, task_id, tail): """View execution logs for a DAG task""" # Actual business logic here click.echo(f"Fetching logs: DAG_ID={dag_id}, RUN_ID={run_id}, TASK_ID={task_id}") if tail: click.echo(f"Showing last {tail} lines:") # Simulate log output click.echo("INFO - Task started...") click.echo("INFO - Processing data...") click.echo("INFO - Task completed successfully.") # 4. Cancel task @dag.command() @click.option("--dag-id", required=True, help="DAG ID") @click.option("--run-id", required=True, help="Run ID to cancel") @click.option("--force", is_flag=True, help="Force cancellation without confirmation") def cancel(dag_id, run_id, force): """Cancel a running DAG task""" # Actual business logic here confirm = force or click.confirm(f"Confirm cancellation of task {dag_id}/{run_id}?") if confirm: click.echo(f"Canceling task: DAG_ID={dag_id}, RUN_ID={run_id}") # Call cancellation logic else: click.echo("Operation canceled") # 5. View raw data (Level 0) @csst.command() @click.option("--obs-id", required=True, help="Observation ID") @click.option("--dataset", help="Dataset name") @click.option( "--output", "-o", type=click.Path(), help="Download data to local path (optional)" ) def raw(obs_id, dataset, output): """View raw observation data (Level 0)""" click.echo(f"Querying raw data: OBS_ID={obs_id}, DATASET={dataset}") if output: click.echo(f"Data will be downloaded to: {output}") # Display data information or download data # 6. View data products @csst.command() @click.option("--product-id", required=True, help="Data product ID") @click.option("--version", default="latest", help="Product version") @click.option( "--format", type=click.Choice(["fits", "hdf5", "ascii"]), default="fits", help="Data format", ) def product(product_id, version, format): """View processed data products""" click.echo( f"Querying data product: PRODUCT_ID={product_id}, VERSION={version}, FORMAT={format}" ) # Display data product information if __name__ == "__main__": csst()