Commit f0b9d121 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add dump operations

parent 50ccbd08
Pipeline #11738 passed with stage
...@@ -16,6 +16,7 @@ import sys ...@@ -16,6 +16,7 @@ import sys
import json import json
import subprocess import subprocess
import os import os
import copy
import shutil import shutil
import traceback import traceback
import warnings import warnings
...@@ -25,6 +26,7 @@ from astropy.io import fits ...@@ -25,6 +26,7 @@ from astropy.io import fits
import csst_dfs_client import csst_dfs_client
import csst_fs import csst_fs
from csst_fs import s3_fs
from .ccds import CCDS from .ccds import CCDS
from .utils import retry from .utils import retry
...@@ -33,6 +35,8 @@ from .logger import get_logger ...@@ -33,6 +35,8 @@ from .logger import get_logger
from .status import CsstStatus, CsstResult from .status import CsstStatus, CsstResult
from .fits import s3_options from .fits import s3_options
s3_options = csst_fs.s3_config.load_s3_options()
def print_directory_tree(directory="."): def print_directory_tree(directory="."):
for root, dirs, files in os.walk(directory): for root, dirs, files in os.walk(directory):
...@@ -180,22 +184,34 @@ class Pipeline: ...@@ -180,22 +184,34 @@ class Pipeline:
print("Failed to clean output directory!") print("Failed to clean output directory!")
print_directory_tree(d) print_directory_tree(d)
# file operations
@staticmethod @staticmethod
def move(file_src: str, file_dst: str) -> str: def move(file_src: str, file_dst: str) -> str:
"""Move file `file_src` to `file_dist`.""" """Move file `file_src` to `file_dist`."""
return shutil.move(file_src, file_dst) return shutil.move(file_src, file_dst)
@staticmethod @staticmethod
def copy(file_src: str, file_dst: str) -> str: def copy_nas_file(file_src: str, file_dst: str) -> str:
"""Move file `file_src` to `file_dist`.""" """Copy NAS file `file_src` to `file_dist`."""
return shutil.copy(file_src, file_dst) shutil.copy(file_src, file_dst)
assert os.path.exists(
file_dst
), f"Failed to copy NAS file {file_src} to {file_dst}!"
return file_dst
@property @staticmethod
def summarize(self): def dump_oss_file(rpath: str, lpath: str) -> str:
"""Summarize this run.""" """Copy OSS file `file_src` to `file_dist`."""
t_stop: Time = Time.now() s3_fs.get(rpath, lpath)
t_cost: float = (t_stop - self.t_start).value * 86400.0 assert os.path.exists(lpath), f"Failed to dump OSS file {rpath} to {lpath}!"
self.logger.info(f"Total cost: {t_cost:.1f} sec") return lpath
# @property
# def summarize(self):
# """Summarize this run."""
# t_stop: Time = Time.now()
# t_cost: float = (t_stop - self.t_start).value * 86400.0
# self.logger.info(f"Total cost: {t_cost:.1f} sec")
def clean_output(self): def clean_output(self):
"""Clean output directory.""" """Clean output directory."""
...@@ -209,40 +225,72 @@ class Pipeline: ...@@ -209,40 +225,72 @@ class Pipeline:
"""Create new file in output directory.""" """Create new file in output directory."""
return os.path.join(self.dir_output, file_name) return os.path.join(self.dir_output, file_name)
def download_oss_file(self, oss_file_path: str, dir_dst: str = None) -> str: @staticmethod
"""Download an OSS file from OSS to output directory.""" def dump_file(remote_file_path: str, local_file_path: str = None) -> str:
if dir_dst is None: """Copy file `remote_file_path` to `local_file_path`."""
dir_dst = self.dir_output is_oss = remote_file_path.__contains__(":")
local_file_path = os.path.join(dir_dst, os.path.basename(oss_file_path)) if is_oss:
csst_fs.s3_fs.get(oss_file_path, local_file_path, s3_options=s3_options) local_file_path = Pipeline.dump_oss_file(remote_file_path, local_file_path)
assert os.path.exists(
local_file_path
), f"Failed to download {oss_file_path} to {local_file_path}"
return local_file_path
def abspath(self, file_path: str) -> str:
"""Return absolute path of `file_path`."""
if file_path.__contains__(":"):
# it's an OSS file path
assert self.use_oss, "USE_OSS must be True to use OSS file path!"
# download OSS file to output directory
local_file_path = self.download_oss_file(file_path)
# return local file path
return local_file_path
else: else:
# it's a NAS file path local_file_path = Pipeline.copy_nas_file(remote_file_path, local_file_path)
if file_path.startswith("CSST"): return local_file_path
# DFS
return os.path.join(self.dfs_root, file_path)
else:
# CCDS
return os.path.join(self.ccds_root, file_path)
def download_dfs_file(self, file_path: str, dir_dst: str = None) -> str: # abspath
def convert_to_abspath_for_dfs_recs(self, dfs_recs: list[dict]) -> list[dict]:
"""Convert `file_path` to absolute path for DFS."""
dfs_recs_abs = copy.deepcopy(dfs_recs)
for rec in dfs_recs_abs:
rec["file_path"] = os.path.join(self.dfs_root, rec["file_path"])
return dfs_recs_abs
def convert_to_abspath_for_ccds_refs(self, ccds_refs: dict) -> dict:
"""Convert `file_path` to absolute path for CCDS."""
ccds_refs_abs = copy.deepcopy(ccds_refs)
for ref_name, ref_path in ccds_refs_abs.items():
ccds_refs_abs[ref_name] = os.path.join(self.ccds_root, ref_path)
return ccds_refs_abs
def dump_dfs_recs(
self, dfs_recs_abs: list[dict], dir_dump: str = None
) -> list[dict]:
"""Copy DFS files to output directory."""
# set default dir_dump to output directory
if dir_dump is None:
dir_dump = os.path.join(self.dir_output, "dfs")
self.mkdir(dir_dump)
# dump data to dir_dump
dfs_recs_dump = copy.deepcopy(dfs_recs_abs)
for rec in dfs_recs_dump:
remote_file_path = rec["file_path"]
local_file_path = os.path.join(dir_dump, os.path.basename(remote_file_path))
# copy DFS file to local_file_path
self.dump_file(remote_file_path, local_file_path)
rec["file_path"] = local_file_path
return dfs_recs_dump
def dump_ccds_refs(self, refs: dict, dir_dump: str = None) -> dict:
"""Copy raw file from CCDS to output directory."""
# set default dir_dump to output directory
if dir_dump is None:
dir_dump = os.path.join(self.dir_output, "ccds")
self.mkdir(dir_dump)
# dump data to dir_dump
ccds_refs_dump = copy.deepcopy(refs)
for ref_name, ref_path in ccds_refs_dump.items():
remote_file_path = ref_path
local_file_path = os.path.join(dir_dump, os.path.basename(remote_file_path))
# copy DFS file to local_file_path
self.dump_file(remote_file_path, local_file_path)
ccds_refs_dump[ref_name] = local_file_path
return ccds_refs_dump
def dump_dfs_file(self, file_path: str, dir_dst: str = None) -> str:
"""Copy DFS file to output directory.""" """Copy DFS file to output directory."""
# by default, dump file to output directory
if dir_dst is None: if dir_dst is None:
dir_dst = self.dir_output dir_dst = self.dir_output
# if use OSS,
if self.use_oss: if self.use_oss:
# download OSS file to dst directory # download OSS file to dst directory
return self.download_oss_file(file_path, dir_dst) return self.download_oss_file(file_path, dir_dst)
...@@ -252,7 +300,7 @@ class Pipeline: ...@@ -252,7 +300,7 @@ class Pipeline:
self.copy(self.abspath(file_path), local_file_path) self.copy(self.abspath(file_path), local_file_path)
return local_file_path return local_file_path
def download_ccds_refs(self, refs: dict, dir_dst: str = None) -> dict: def dump_ccds_refs(self, refs: dict, dir_dst: str = None) -> dict:
"""Copy raw file from CCDS to output directory.""" """Copy raw file from CCDS to output directory."""
if dir_dst is None: if dir_dst is None:
dir_dst = self.dir_output dir_dst = self.dir_output
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment