From 559791e79b0a440489583bc84d342f858e507dc3 Mon Sep 17 00:00:00 2001 From: BO ZHANG Date: Wed, 20 Dec 2023 11:57:55 +0800 Subject: [PATCH] move decorator to pipeline.call --- csst_common/pipeline.py | 73 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 4 deletions(-) diff --git a/csst_common/pipeline.py b/csst_common/pipeline.py index 659d91b..b2efb8a 100644 --- a/csst_common/pipeline.py +++ b/csst_common/pipeline.py @@ -14,15 +14,17 @@ import json import os import shutil import subprocess +import traceback import warnings -from typing import Any +from typing import Callable, NamedTuple, Optional, Any -from astropy import time +from astropy.time import Time, TimeDelta from .ccds import CCDS from .dfs import DFS from .file import File from .logger import get_logger +from .status import CsstStatus, CsstResult # TODO: unit test in need @@ -120,6 +122,7 @@ class Pipeline: self.filter_warnings() def inspect(self): + """List environment variables such as `PIPELINE_ID`, etc.""" print(f"PIPELINE_ID={self.pipeline_id}") print(f"BUILD={self.build}") print(f"CREATED={self.created}") @@ -130,13 +133,15 @@ class Pipeline: @staticmethod def clean_directory(d): + """Clean a directory.""" print(f"Clean output directory '{d}'...") r = subprocess.run(f"rm -rf {d}/*", shell=True, capture_output=True) print("> ", r) r.check_returncode() def now(self): - return time.Time.now().isot + """Return ISOT format datetime using `astropy`.""" + return Time.now().isot @staticmethod def filter_warnings(): @@ -166,6 +171,58 @@ class Pipeline: """Move file `file_src` to `file_dist`.""" shutil.copy(file_src, file_dst) + def call(self, func: Callable, *args: Any, **kwargs: Any): + self.pipeline_logger.info( + f"=====================================================" + ) + t_start: Time = Time.now() + self.pipeline_logger.info(f"Starting Module: **{func.__name__}**") + # logger.info(f"Additional arguments: {args} {kwargs}") + try: + # if the module works well + res: CsstResult = func(*args, **kwargs) + assert isinstance(res, CsstResult) + # define results + status = res.status + files = res.files + output = res.output + except Exception as e: + # if the module raises error + exc_info = traceback.format_exc() # traceback info + self.pipeline_logger.error(f"Error occurs! \n{exc_info}") + # define results + status = CsstStatus.ERROR # default status if exceptions occur + files = None + output = {"exc_info": exc_info} # default output if exceptions occur + finally: + t_stop: Time = Time.now() + t_cost: float = (t_stop - t_start).value + if isinstance(status, CsstStatus): + # status is + self.pipeline_logger.info( + f"Module finished: status={status} | cost={t_cost:.1f} sec" + ) + else: + # invalid status + self.pipeline_logger.error( + f"Invalid status: {status} is not a CsstResult object!" + ) + # record exception traceback info + self.pipeline_logger.info( + f"ModuleResult: \n" + f" - name: {func.__name__}\n" + f" - status: {status}\n" + f" - files: {files}\n" + f" - output: {output}\n" + ) + return ModuleResult( + module=func.__name__, + cost=t_cost, + status=status, + files=files, + output=output, + ) + # class ErrorTrace: # """Write error trace to file.""" @@ -249,4 +306,12 @@ class Timestamp: def touch(self): """Write a time stamp.""" with open(self.file_path, "a+") as f: - f.write(f"{time.Time.now().isot}+00:00\n") + f.write(f"{Time.now().isot}+00:00\n") + + +class ModuleResult(NamedTuple): + module: str + cost: float + status: CsstStatus + files: Optional[list] + output: dict -- GitLab