Commit 287920fd authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add Pipeline.download_oss_file

parent e27dacbb
Pipeline #11635 passed with stage
...@@ -9,6 +9,7 @@ Modified-History: ...@@ -9,6 +9,7 @@ Modified-History:
2023-07-11, Bo Zhang, add Pipeline class 2023-07-11, Bo Zhang, add Pipeline class
2023-12-10, Bo Zhang, update Pipeline 2023-12-10, Bo Zhang, update Pipeline
2023-12-15, Bo Zhang, add module header 2023-12-15, Bo Zhang, add module header
2025-12-27, Bo Zhang, rewrite Pipeline class
""" """
import json import json
...@@ -21,14 +22,15 @@ from typing import Callable, NamedTuple, Optional, Any, Union ...@@ -21,14 +22,15 @@ from typing import Callable, NamedTuple, Optional, Any, Union
from astropy.time import Time, TimeDelta from astropy.time import Time, TimeDelta
from astropy.io import fits from astropy.io import fits
import csst_dfs_client as dfs1 import csst_dfs_client
import csst_fs as dfs2 import csst_fs
from .ccds import CCDS from .ccds import CCDS
from .utils import retry from .utils import retry
from .file import File from .file import File
from .logger import get_logger from .logger import get_logger
from .status import CsstStatus, CsstResult from .status import CsstStatus, CsstResult
from .fits import s3_options
def print_directory_tree(directory="."): def print_directory_tree(directory="."):
...@@ -51,10 +53,16 @@ EXIT_CODES = { ...@@ -51,10 +53,16 @@ EXIT_CODES = {
"data_product_invalid": 30, "data_product_invalid": 30,
"data_product_ingestion_error": 31, "data_product_ingestion_error": 31,
} }
1
class Pipeline: class Pipeline:
"""
Examples
--------
>>> p = Pipeline(msg="{'a':1}")
>>> p.msg_dict
{'a': 1}
"""
def __init__(self, msg: str = "", **env_vars: Any): def __init__(self, msg: str = "", **env_vars: Any):
# record start time # record start time
...@@ -106,8 +114,8 @@ class Pipeline: ...@@ -106,8 +114,8 @@ class Pipeline:
self.filter_warnings("ignore") self.filter_warnings("ignore")
# DFS1, DFS2 & CCDS # DFS1, DFS2 & CCDS
self.dfs1 = dfs1 self.dfs1 = csst_dfs_client
self.dfs2 = dfs2 self.dfs2 = csst_fs
self.ccds = CCDS(ccds_root=self.ccds_root, ccds_cache=self.ccds_cache) self.ccds = CCDS(ccds_root=self.ccds_root, ccds_cache=self.ccds_cache)
# exit code # exit code
...@@ -180,15 +188,14 @@ class Pipeline: ...@@ -180,15 +188,14 @@ class Pipeline:
"""Move file `file_src` to `file_dist`.""" """Move file `file_src` to `file_dist`."""
shutil.copy(file_src, file_dst) shutil.copy(file_src, file_dst)
def copy_to_output(self, file_paths: list): def download_oss_file(self, oss_file_path: str) -> str:
for file_path in file_paths: """Download an OSS file from OSS to output directory."""
pass local_file_path = os.path.join(self.dir_output, os.path.basename(oss_file_path))
csst_fs.s3_fs.get(oss_file_path, local_file_path, s3_options=s3_options)
def download_oss_file(self, oss_file_path: str) -> None: assert os.path.exists(
"""Download an OSS file from OSS to local path.""" local_file_path
# TODO ), f"Failed to download {oss_file_path} to {local_file_path}"
# self.dfs.download_file(oss_file_path, local_path) return local_file_path
pass
def call(self, func: Callable, *args: Any, **kwargs: Any): def call(self, func: Callable, *args: Any, **kwargs: Any):
self.logger.info(f"=====================================================") self.logger.info(f"=====================================================")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment