Commit b6e90872 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

tweaks

parent 85d69654
dagtest/
csst_dag.egg-info/
csst-msc-c9-50sqdeg-v3-plan/
build/
batch/
\ No newline at end of file
...@@ -9,4 +9,7 @@ install: ...@@ -9,4 +9,7 @@ install:
pip install . --no-deps --force-reinstall pip install . --no-deps --force-reinstall
pull: pull:
git pull git pull
\ No newline at end of file
compile:
uv pip compile pyproject.toml -o requirements.txt
\ No newline at end of file
# csst-dag # `csst-dag`
更新: # Change log
- [2025-09-15] 旧版csst-dag命令行工具安装方式 - [2025-09-15] 旧版csst-dag命令行工具安装方式
- `pip install git+https://csst-tb.bao.ac.cn/code/csst-cicd/csst-dag.git@7a0108f3 --force-reinstall` - `pip install git+https://csst-tb.bao.ac.cn/code/csst-cicd/csst-dag.git@7a0108f3 --force-reinstall`
- [2025-09-09] csst-dag将被封装为Docker镜像,原有的基于`python -m csst_dag.cli`的命令行调用方式将被放弃 - [2025-09-09] csst-dag将被封装为Docker镜像,原有的基于`python -m csst_dag.cli`的命令行调用方式将被放弃
...@@ -48,22 +48,24 @@ docker pull csu-harbor.csst.nao:10443/csst/csst-dag:latest ...@@ -48,22 +48,24 @@ docker pull csu-harbor.csst.nao:10443/csst/csst-dag:latest
主要有以下几个命令组 主要有以下几个命令组
- `csst env`:预设环境变量 - `csst env`: 预设环境变量
- `csst plan`:编排数据查询 - `csst plan`: 编排数据查询
- `csst plan --obs-group=111` - `csst plan --obs-group=111`
- `csst plan --dataset=111 --stats=obs_group` - `csst plan --dataset=111 --stats=obs_group --to-json`
- `csst dag`:DAG任务操作 - `csst dag`: DAG任务操作
- `csst dag run --dataset=xxx --batch-id=xxx` () - `csst dag start --dataset=xxx --batch-id=xxx`
- `csst dag candel --dataset=xxx --batch-id=xxx` (需要Scalebox、DFS支持) - `csst dag cancel --dataset=xxx --batch-id=xxx` (需要Scalebox、DFS支持)
- `csst data`:原始数据查询 - `csst dag status --dataset=xxx --batch-id=xxx` (需要Scalebox、DFS支持)
- `csst dag logs --dataset=xxx --batch-id=xxx --status=0` (需要Scalebox、DFS支持)
- `csst file`: 原始数据查询
- `csst data --data-model=raw --obs-id=10100000001 --stats=obs_group` - `csst data --data-model=raw --obs-id=10100000001 --stats=obs_group`
- 查询原始数据,并按照obs_group进行统计 - 查询原始数据,并按照obs_group进行统计
- `csst data --data-model=csst-msc-l1-mbi --obs-id=10100000001 --stats=obs_group` - `csst data --data-model=csst-msc-l1-mbi --obs-id=10100000001 --stats=obs_group` --output=json
- 查询数据产品,并按照obs_group进行统计 - 查询数据产品,并按照obs_group进行统计
- `csst catalog` DFS星表查询 - `csst catalog` DFS星表查询
- `csst catalog list` 列出可用的星表 - `csst catalog list` 列出可用的星表
- `csst catalog show --name=trilegal` 列出指定星表的具体信息 - `csst catalog show --name=trilegal` 列出指定星表的具体信息
- `csst ccds` CCDS - `csst ccds`: CCDS
- `csst ccds pmap?` - `csst ccds pmap?`
## 1. `csst env` 环境变量 ## 1. `csst env` 环境变量
......
import setuptools
# 读取README.md作为长描述
with open("README.md", "r") as f:
long_description = f.read()
# 读取依赖列表requirements.txt
# 忽略#开头或者版本号不明确指定的条目
with open("requirements.txt", "r") as f:
requirements = [
req.strip()
for req in f.readlines()
if not req.startswith("#") and req.__contains__("==")
]
# 配置、安装
setuptools.setup(
name="csst_dag", # 包名
version="0.0.1", # 版本号
author="Bo Zhang", # 作者
author_email="bozhang@nao.cas.cn", # 邮箱
description="CSST DAG", # 短描述
long_description=long_description, # 长描述
long_description_content_type="text/markdown", # 长描述类型
url="https://csst-tb.bao.ac.cn/code/csst-cicd/csst-dag", # 主页
packages=setuptools.find_packages(
where="."
), # 用setuptools工具自动发现带有__init__.py的包
license="MIT", # 证书类型
classifiers=[ # 程序分类, 参考 https://pypi.org/classifiers/
# How mature is this project?
# 3 - Alpha
# 4 - Beta
# 5 - Production/Stable
"Development Status :: 3 - Alpha",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering :: Physics",
"Topic :: Scientific/Engineering :: Astronomy",
],
include_package_data=True, # 设置包含随包数据
package_data={ # 具体随包数据路径
"csst_dag": [
"constants/*",
"dag/*",
"config/*",
],
},
# 请注意检查,防止临时文件或其他不必要的文件被提交到仓库,否则会一同安装
python_requires=">=3.11", # Python版本要求
install_requires=requirements,
)
# DFS1
CSST_DFS_GATEWAY="192.168.25.89:28000"
CSST_DFS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0"
# DFS2
# ---
\ No newline at end of file
#!/bin/bash
# 定义 Python CLI 的完整路径
# 如果当前文件夹下存在csst.py文件,就使用当前文件夹下的csst.py文件
if [[ -f "csst.py" ]]; then
PYTHON_CLI="./csst.py"
source .csst/.env
else
PYTHON_CLI="/pipeline/app/csst.py"
source /pipeline/app/.csst/.env
fi
# 检查 Python CLI 文件是否存在
if [[ ! -f "$PYTHON_CLI" ]]; then
echo "错误: 找不到 Python CLI 文件: $PYTHON_CLI"
exit 1
fi
# 打印环境变量
printenv | grep CSST
# 将所有参数传递给 Python CLI
python "$PYTHON_CLI" "$@"
import click
from typing import Optional
import os
import glob
import shutil
# to test locally:
# export PATH=${PWD}:$PATH
# locate dot csst
EXE_PATH = os.path.abspath(__file__)
APP_DIR = os.path.dirname(EXE_PATH)
DOT_CSST = os.path.join(APP_DIR, ".csst")
assert os.path.exists(DOT_CSST), f"`{DOT_CSST}` does not exist!"
# Create main command group 'csst'
@click.group()
@click.version_option(
version=os.environ.get("BUILD", "BUILD not found"),
prog_name="CSST Pipeline Operation Tool",
)
def csst():
"""CSST Pipeline Operation Tool"""
pass
@csst.group()
def env():
"""Environment file / variables"""
pass
@env.command()
def list():
"""List all available environment configurations"""
# 使用 glob 查找所有 .env 文件
env_file_pattern = os.path.join(APP_DIR, "envs/*.env")
env_files = glob.glob(env_file_pattern)
if not env_files:
click.echo(
f"No predefined env files found with pattern: {env_file_pattern}",
err=True,
)
return
click.echo("Available predefined env files:")
for file_path in env_files:
# 从文件路径中提取配置名称(去掉目录和扩展名)
config_name = os.path.splitext(os.path.basename(file_path))[0]
click.echo(f" - {config_name}")
@env.command()
@click.option(
"--name",
"-n",
help="Use a predefined env file (e.g., csu, production, development)",
required=True,
)
def show(name):
"""Show environment variables using a predefined env file"""
if not name:
# 如果没有提供 --name 参数,执行默认的初始化操作
click.echo("Performing default env file initialization...")
# 这里可以添加你的默认初始化逻辑
click.echo(
"Use --name option to load a specific env file or use 'env list' to see available options."
)
return
# 使用 glob 获取所有可用的环境文件
env_file = os.path.join(APP_DIR, f"envs/{name}.env")
# 读取并打印文件内容
try:
with open(env_file, "r") as file:
content = file.read()
click.echo(content)
except IOError as e:
click.echo(f"Error reading file: {e}", err=True)
@env.command()
@click.option(
"--name",
"-n",
help="Use a predefined env file (e.g., csu, zjlab, naoc)",
required=True,
type=click.Choice(["csu", "zjlab", "naoc"]),
)
def set(name):
"""Set environment variables using a predefined env file"""
if not name:
# 如果没有提供 --name 参数,执行默认的初始化操作
click.echo("Performing default env file initialization...")
# 这里可以添加你的默认初始化逻辑
click.echo(
"Use --name option to load a specific env file or use 'env list' to see available options."
)
return
# 构建完整的文件路径
source_env_file_path = os.path.join(APP_DIR, f"envs/{name}.env")
target_env_file_path = os.path.join(DOT_CSST, f".env")
shutil.copy(source_env_file_path, target_env_file_path)
click.echo(f"Environment file {name} set successfully.")
return
# Create 'dag' subcommand group for DAG task management
@csst.group()
def dag():
"""DAG Operations"""
pass
# 1. Run task
@dag.command()
@click.option("--dags", required=True, help="Comma-separated list of DAG IDs to run")
@click.option("--conf", default="{}", help="Run configuration in JSON format")
@click.option("--run-id", help="Manually specify run ID (optional)")
def run(dags, conf, run_id):
"""Initiate a new DAG task"""
# Actual business logic here, e.g.:
# - Call Airflow API or your pipeline service
# - Validate parameters
# - Return run ID or result
click.echo(f"Submitting task: DAG_IDS={dags}, CONF={conf}, RUN_ID={run_id}")
# Simulate response
mock_run_id = run_id or "simulated_run_id_12345"
click.echo(f"Task submitted successfully. Run ID: {mock_run_id}")
# 2. Check task status
@dag.command()
@click.option("--dag-id", required=True, help="DAG ID")
@click.option("--run-id", help="Specific run ID (query latest if not provided)")
@click.option("--verbose", "-v", is_flag=True, help="Show detailed status information")
def status(dag_id, run_id, verbose):
"""Check status of a DAG task"""
# Actual business logic here
status_info = "success" # Simulated status
click.echo(f"DAG {dag_id} (Run ID: {run_id}) status: {status_info}")
if verbose:
click.echo("Detailed status information: ...")
# 3. View task logs
@dag.command()
@click.option("--dag-id", required=True, help="DAG ID")
@click.option("--run-id", required=True, help="Run ID")
@click.option("--task-id", help="View logs for specific task (optional)")
@click.option("--tail", type=int, help="Show only last N lines of logs")
def log(dag_id, run_id, task_id, tail):
"""View execution logs for a DAG task"""
# Actual business logic here
click.echo(f"Fetching logs: DAG_ID={dag_id}, RUN_ID={run_id}, TASK_ID={task_id}")
if tail:
click.echo(f"Showing last {tail} lines:")
# Simulate log output
click.echo("INFO - Task started...")
click.echo("INFO - Processing data...")
click.echo("INFO - Task completed successfully.")
# 4. Cancel task
@dag.command()
@click.option("--dag-id", required=True, help="DAG ID")
@click.option("--run-id", required=True, help="Run ID to cancel")
@click.option("--force", is_flag=True, help="Force cancellation without confirmation")
def cancel(dag_id, run_id, force):
"""Cancel a running DAG task"""
# Actual business logic here
confirm = force or click.confirm(f"Confirm cancellation of task {dag_id}/{run_id}?")
if confirm:
click.echo(f"Canceling task: DAG_ID={dag_id}, RUN_ID={run_id}")
# Call cancellation logic
else:
click.echo("Operation canceled")
# 5. View raw data (Level 0)
@csst.command()
@click.option("--obs-id", required=True, help="Observation ID")
@click.option("--dataset", help="Dataset name")
@click.option(
"--output", "-o", type=click.Path(), help="Download data to local path (optional)"
)
def raw(obs_id, dataset, output):
"""View raw observation data (Level 0)"""
click.echo(f"Querying raw data: OBS_ID={obs_id}, DATASET={dataset}")
if output:
click.echo(f"Data will be downloaded to: {output}")
# Display data information or download data
# 6. View data products
@csst.command()
@click.option("--product-id", required=True, help="Data product ID")
@click.option("--version", default="latest", help="Product version")
@click.option(
"--format",
type=click.Choice(["fits", "hdf5", "ascii"]),
default="fits",
help="Data format",
)
def product(product_id, version, format):
"""View processed data products"""
click.echo(
f"Querying data product: PRODUCT_ID={product_id}, VERSION={version}, FORMAT={format}"
)
# Display data product information
if __name__ == "__main__":
csst()
# DFS1
CSST_DFS_GATEWAY="192.168.25.89:28000"
CSST_DFS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0"
# DFS2
# ---
\ No newline at end of file
# DFS1
CSST_DFS_GATEWAY="10.80.1.22:28000"
CSST_DFS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0"
# DFS2
# ---
\ No newline at end of file
# DFS1
CSST_DFS_GATEWAY="10.200.60.246:28000"
CSST_DFS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ4ODU0NTA2NjQsInN1YiI6InN5c3RlbSJ9.POsuUABytu8-WMtZiYehiYEa5BnlgqNTXT6X3OTyix0"
# DFS2
CSST_BACKEND_API_URL=http://10.200.60.199:9010
\ No newline at end of file
# base image
ARG HARBOR
FROM ${HARBOR}/csst/miniconda3-py312:latest
# declare variables
ARG PIPELINE_ID
ARG BUILD
ARG CREATED
# set environment variables
ENV PIPELINE_ID=${PIPELINE_ID} \
BUILD=${BUILD} \
CREATED=${CREATED}
# label info
LABEL MAINTAINER="Bo Zhang <bozhang@nao.cas.cn>"
LABEL PIPELINE_ID=${PIPELINE_ID}
LABEL BUILD=${BUILD}
LABEL CREATED=${CREATED}
# run as root
USER root
# install system softwares
RUN --mount=type=cache,id=apt,target=/var/cache/apt \
apt-get update && apt-get install -y \
git \
&& rm -rf /var/lib/apt/lists/*
# run as csst
USER csst
# copy VCS roots into image
COPY --chown=csst:csst . /pipeline
# make csst executable
RUN chmod +x /pipeline/app/csst
# add csst to PATH
ENV PATH="/pipeline/app:${PATH}"
# install packages & requirements
RUN --mount=type=cache,id=pip,uid=9000,gid=9000,target=/home/csst/.cache \
pip install pkg/ccds \
&& pip install pkg/csst-dfs-client \
&& pip install pkg/csst_common \
&& pip install pkg/csst_dadel \
&& pip install pkg/csst-dag \
&& pip install -r requirements.txt \
&& rm -rf pkg
# change workdir
WORKDIR /pipeline/output
RUN touch /pipeline/output/pipeline.log
# 设置容器启动时默认执行的命令(这里以提供一个帮助信息为例)
# 实际使用时,你可能会通过docker run覆盖这个命令来调用具体的工具
CMD ["csst", "--help"]
astropy
numpy
toml
pyyaml
joblib
requests
\ No newline at end of file
version: '3'
services:
test-csst-dag: # each service should have its own name
image: ${IMAGE}
container_name: test-csst-dag # each container should have its own name
user: csst
command: > # change it as necessary
bash -c "csst --help > /pipeline/output/pipeline.log"
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment