Commit 559791e7 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

move decorator to pipeline.call

parent 5ad5dfe7
Pipeline #2585 failed with stage
in 0 seconds
......@@ -14,15 +14,17 @@ import json
import os
import shutil
import subprocess
import traceback
import warnings
from typing import Any
from typing import Callable, NamedTuple, Optional, Any
from astropy import time
from astropy.time import Time, TimeDelta
from .ccds import CCDS
from .dfs import DFS
from .file import File
from .logger import get_logger
from .status import CsstStatus, CsstResult
# TODO: unit test in need
......@@ -120,6 +122,7 @@ class Pipeline:
self.filter_warnings()
def inspect(self):
"""List environment variables such as `PIPELINE_ID`, etc."""
print(f"PIPELINE_ID={self.pipeline_id}")
print(f"BUILD={self.build}")
print(f"CREATED={self.created}")
......@@ -130,13 +133,15 @@ class Pipeline:
@staticmethod
def clean_directory(d):
"""Clean a directory."""
print(f"Clean output directory '{d}'...")
r = subprocess.run(f"rm -rf {d}/*", shell=True, capture_output=True)
print("> ", r)
r.check_returncode()
def now(self):
return time.Time.now().isot
"""Return ISOT format datetime using `astropy`."""
return Time.now().isot
@staticmethod
def filter_warnings():
......@@ -166,6 +171,58 @@ class Pipeline:
"""Move file `file_src` to `file_dist`."""
shutil.copy(file_src, file_dst)
def call(self, func: Callable, *args: Any, **kwargs: Any):
self.pipeline_logger.info(
f"====================================================="
)
t_start: Time = Time.now()
self.pipeline_logger.info(f"Starting Module: **{func.__name__}**")
# logger.info(f"Additional arguments: {args} {kwargs}")
try:
# if the module works well
res: CsstResult = func(*args, **kwargs)
assert isinstance(res, CsstResult)
# define results
status = res.status
files = res.files
output = res.output
except Exception as e:
# if the module raises error
exc_info = traceback.format_exc() # traceback info
self.pipeline_logger.error(f"Error occurs! \n{exc_info}")
# define results
status = CsstStatus.ERROR # default status if exceptions occur
files = None
output = {"exc_info": exc_info} # default output if exceptions occur
finally:
t_stop: Time = Time.now()
t_cost: float = (t_stop - t_start).value
if isinstance(status, CsstStatus):
# status is
self.pipeline_logger.info(
f"Module finished: status={status} | cost={t_cost:.1f} sec"
)
else:
# invalid status
self.pipeline_logger.error(
f"Invalid status: {status} is not a CsstResult object!"
)
# record exception traceback info
self.pipeline_logger.info(
f"ModuleResult: \n"
f" - name: {func.__name__}\n"
f" - status: {status}\n"
f" - files: {files}\n"
f" - output: {output}\n"
)
return ModuleResult(
module=func.__name__,
cost=t_cost,
status=status,
files=files,
output=output,
)
# class ErrorTrace:
# """Write error trace to file."""
......@@ -249,4 +306,12 @@ class Timestamp:
def touch(self):
"""Write a time stamp."""
with open(self.file_path, "a+") as f:
f.write(f"{time.Time.now().isot}+00:00\n")
f.write(f"{Time.now().isot}+00:00\n")
class ModuleResult(NamedTuple):
module: str
cost: float
status: CsstStatus
files: Optional[list]
output: dict
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment