Commit 6c10fbe3 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

tweaks

parent 21923da5
Pipeline #11660 passed with stage
...@@ -60,8 +60,9 @@ class Pipeline: ...@@ -60,8 +60,9 @@ class Pipeline:
""" """
Examples Examples
-------- --------
>>> p = Pipeline(msg="{'a':1}") >>> p = Pipeline()
>>> p.msg_dict >>> p.info()
>>> p.task
{'a': 1} {'a': 1}
""" """
...@@ -123,6 +124,18 @@ class Pipeline: ...@@ -123,6 +124,18 @@ class Pipeline:
self.task = {} self.task = {}
self.logger.info(f"task = {self.task}") self.logger.info(f"task = {self.task}")
# warning operations
@staticmethod
def filter_warnings(level: str = "ignore"):
# Suppress all warnings
warnings.filterwarnings(level)
@staticmethod
def reset_warnings(self):
"""Reset warning filters."""
warnings.resetwarnings()
# message operations
@staticmethod @staticmethod
def dict2json(d: dict): def dict2json(d: dict):
"""Convert `dict` to JSON format string.""" """Convert `dict` to JSON format string."""
...@@ -133,18 +146,12 @@ class Pipeline: ...@@ -133,18 +146,12 @@ class Pipeline:
"""Convert JSON format string to `dict`.""" """Convert JSON format string to `dict`."""
return json.loads(m) return json.loads(m)
@property # file operations
def summarize(self): @staticmethod
"""Summarize this run.""" def mkdir(d):
t_stop: Time = Time.now() """Create a directory if it does not exist."""
t_cost: float = (t_stop - self.t_start).value * 86400.0 if not os.path.exists(d):
self.logger.info(f"Total cost: {t_cost:.1f} sec") os.makedirs(d)
def clean_output(self):
"""Clean output directory."""
# self.clean_directory(self.dir_output)
# clean in run command
pass
@staticmethod @staticmethod
def clean_directory(d): def clean_directory(d):
...@@ -158,19 +165,26 @@ class Pipeline: ...@@ -158,19 +165,26 @@ class Pipeline:
print("Failed to clean output directory!") print("Failed to clean output directory!")
print_directory_tree(d) print_directory_tree(d)
def now(self):
"""Return ISOT format datetime using `astropy`."""
return Time.now().isot
@staticmethod @staticmethod
def filter_warnings(level: str = "ignore"): def move(file_src: str, file_dst: str) -> str:
# Suppress all warnings """Move file `file_src` to `file_dist`."""
warnings.filterwarnings(level) return shutil.move(file_src, file_dst)
@staticmethod @staticmethod
def reset_warnings(self): def copy(file_src: str, file_dst: str) -> str:
"""Reset warning filters.""" """Move file `file_src` to `file_dist`."""
warnings.resetwarnings() return shutil.copy(file_src, file_dst)
@property
def summarize(self):
"""Summarize this run."""
t_stop: Time = Time.now()
t_cost: float = (t_stop - self.t_start).value * 86400.0
self.logger.info(f"Total cost: {t_cost:.1f} sec")
def clean_output(self):
"""Clean output directory."""
self.clean_directory(self.dir_output)
def file(self, file_path): def file(self, file_path):
"""Initialize File object.""" """Initialize File object."""
...@@ -180,25 +194,66 @@ class Pipeline: ...@@ -180,25 +194,66 @@ class Pipeline:
"""Create new file in output directory.""" """Create new file in output directory."""
return os.path.join(self.dir_output, file_name) return os.path.join(self.dir_output, file_name)
@staticmethod def download_oss_file(self, oss_file_path: str, dir_dst: str = None) -> str:
def move(file_src: str, file_dst: str):
"""Move file `file_src` to `file_dist`."""
shutil.move(file_src, file_dst)
@staticmethod
def copy(file_src: str, file_dst: str):
"""Move file `file_src` to `file_dist`."""
shutil.copy(file_src, file_dst)
def download_oss_file(self, oss_file_path: str) -> str:
"""Download an OSS file from OSS to output directory.""" """Download an OSS file from OSS to output directory."""
local_file_path = os.path.join(self.dir_output, os.path.basename(oss_file_path)) if dir_dst is None:
dir_dst = self.dir_output
local_file_path = os.path.join(dir_dst, os.path.basename(oss_file_path))
csst_fs.s3_fs.get(oss_file_path, local_file_path, s3_options=s3_options) csst_fs.s3_fs.get(oss_file_path, local_file_path, s3_options=s3_options)
assert os.path.exists( assert os.path.exists(
local_file_path local_file_path
), f"Failed to download {oss_file_path} to {local_file_path}" ), f"Failed to download {oss_file_path} to {local_file_path}"
return local_file_path return local_file_path
def abspath(self, file_path: str) -> str:
"""Return absolute path of `file_path`."""
if file_path.__contains__(":"):
# it's an OSS file path
assert self.use_oss, "USE_OSS must be True to use OSS file path!"
# download OSS file to output directory
local_file_path = self.download_oss_file(file_path)
# return local file path
return local_file_path
else:
# it's a NAS file path
if file_path.startswith("CSST"):
# DFS
return os.path.join(self.dfs_root, file_path)
else:
# CCDS
return os.path.join(self.ccds_root, file_path)
def download_dfs_file(self, file_path: str, dir_dst: str = None):
"""Copy DFS file to output directory."""
if dir_dst is None:
dir_dst = self.dir_output
if self.use_oss:
# download OSS file to dst directory
return self.download_oss_file(file_path, dir_dst)
else:
# copy DFS file to dst directory
local_file_path = os.path.join(dir_dst, os.path.basename(file_path))
self.copy(self.abspath(file_path), local_file_path)
return local_file_path
def copy_ccds_refs(self, refs: dict, dir_dst: str = None) -> dict:
"""Copy raw file from CCDS to output directory."""
if dir_dst is None:
dir_dst = self.dir_output
local_refs = {}
for ref_name, ref_path in refs.items():
local_file_path = os.path.join(dir_dst, os.path.basename(ref_path))
local_refs[ref_name] = self.copy(self.abspath(ref_path), local_file_path)
return local_refs
# time operations
def now(self) -> str:
"""Return ISOT format datetime using `astropy`."""
return Time.now().isot
# call modules
def call(self, func: Callable, *args: Any, **kwargs: Any): def call(self, func: Callable, *args: Any, **kwargs: Any):
self.logger.info(f"=====================================================") self.logger.info(f"=====================================================")
t_start: Time = Time.now() t_start: Time = Time.now()
...@@ -249,6 +304,7 @@ class Pipeline: ...@@ -249,6 +304,7 @@ class Pipeline:
output=output, output=output,
) )
# retry operations
@staticmethod @staticmethod
def retry(*args, **kwargs): def retry(*args, **kwargs):
return retry(*args, **kwargs) return retry(*args, **kwargs)
...@@ -261,6 +317,7 @@ class Pipeline: ...@@ -261,6 +317,7 @@ class Pipeline:
else: else:
raise ValueError("CCDS client not initialized!") raise ValueError("CCDS client not initialized!")
# automatically log pipeline information
def info(self): def info(self):
"""Return pipeline information.""" """Return pipeline information."""
self.logger.info(f"DOCKER_IMAGE={self.docker_image}") self.logger.info(f"DOCKER_IMAGE={self.docker_image}")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment