diff --git a/csst_common/file.py b/csst_common/file.py index 009e1f451da8920cf014725b634da61262702e91..df0414be41c2935a21049e18013965f175949c4b 100644 --- a/csst_common/file.py +++ b/csst_common/file.py @@ -1,15 +1,77 @@ import os +import re from typing import Optional class File: def __init__(self, file_path: str = "/path/to/file.fits", new_dir=None): self.file_path = file_path + # get dir name self.dirname = os.path.dirname(self.file_path) + # get file name self.file_name = os.path.basename(self.file_path) - self.prefix, self.ext = os.path.splitext(self.file_name) + # default new dir self.new_dir = new_dir if new_dir is not None else self.dirname + # match file name + pattern = re.compile( + r"(?P[A-Z]{4})_" + r"(?P[A-Z]+)_" + r"(?P[A-Z]+)_" + r"(?P[A-Z]+)_" + r"(?P[0-9]{14})_" + r"(?P[0-9]{14})_" + r"(?P[0-9]{11,14})_" + r"(?P[0-9]+)_" + r"L(?P[0-9]{1})_" + r"V(?P[0-9]{2})" + r"(?P[a-z.]+)" + ) + self.mo = re.fullmatch(pattern, self.file_name) + assert self.mo is not None, f"Pattern does not match for file: {self.file_name}" + # set attributes + for k, v in self.mo.groupdict().items(): + self.__setattr__(k, v) + + def derive( + self, + new_dir=None, + telescope=None, + instrument=None, + project=None, + obs_type=None, + exp_start=None, + exp_stop=None, + obs_id=None, + detector=None, + level=None, + version=None, + ext=None, + ): + return os.path.join( + new_dir if new_dir is not None else self.new_dir, + f"{self.telescope if telescope is None else str(telescope)}_" + f"{self.instrument if instrument is None else str(instrument)}_" + f"{self.project if project is None else str(project)}_" + f"{self.obs_type if obs_type is None else str(obs_type)}_" + f"{self.exp_start if exp_start is None else str(exp_start)}_" + f"{self.exp_stop if exp_stop is None else str(exp_stop)}_" + f"{self.obs_id if obs_id is None else str(obs_id)}_" + f"{self.detector if detector is None else str(detector)}_" + f"L{self.level if level is None else str(level)}_" + f"V{self.version if version is None else str(version)}" + f"{self.ext if ext is None else str(ext)}", + ) + + def derive0(self, *args, **kwargs): + return self.derive(*args, **kwargs, level=0) + + def derive1(self, *args, **kwargs): + return self.derive(*args, **kwargs, level=1) + + def derive2(self, *args, **kwargs): + return self.derive(*args, **kwargs, level=2) + def replace_ext( self, new_ext: Optional[str] = "img.fits", new_dir: Optional[str] = None ) -> str: @@ -22,3 +84,6 @@ class File: return os.path.join(new_dir, self.prefix + new_ext) else: return os.path.join(new_dir, self.prefix + "_" + new_ext) + + def __repr__(self): + return f"" diff --git a/csst_common/io.py b/csst_common/io.py index e51eaa24911bf348211c525a4553c3cd29587861..230a9f4c3f8bbc48cdae85bd3087505092684815 100644 --- a/csst_common/io.py +++ b/csst_common/io.py @@ -1,10 +1,76 @@ +import warnings +from copy import deepcopy + from astropy.io import fits -def check_file(file_path="test.fits") -> bool: - pass +def verify_checksum(file_path) -> bool: + """ + Verify a .fits file via checksum. + + Return True if checksum is good. + + Parameters + ---------- + file_path : str + File path. + + References + ---------- + https://docs.astropy.org/en/stable/io/fits/usage/verification.html#verification-using-the-fits-checksum-keyword-convention + """ + with warnings.catch_warnings(record=True) as warning_list: + # file_path = fits.util.get_testdata_filepath('checksum_false.fits') + with fits.open(file_path, checksum=True): + pass + print(warning_list) + return len(warning_list) == 0 + + +def append_header( + h1: fits.Header, h2: fits.Header, duplicates: str = "delete" +) -> fits.Header: + """ + Append h2 to h1. + + Append fits headers, taken into aacount duplicated keywords. + + Parameters + ---------- + h1 : fits.Header + Original fits header. + h2 : fits.Header + Extended fits header. + duplicates : str + The operation for processing duplicates. + "delete" to delete duplicated keywords in h1 and keep them in h2. + "update" to update duplicated keywords in h1 and remove them from h2. + Returns + ------- + fits.Header + The combined header. -def _check_file_fits() -> bool: - """Validate checksum for .fits files.""" - return True + References + ---------- + https://docs.astropy.org/en/stable/io/fits/usage/headers.html#comment-history-and-blank-keywords + """ + # copy data + original_h = deepcopy(h1) + extended_h = deepcopy(h2) + original_keys = original_h.keys() + ignored_keys = ["", "COMMENT", "HISTORY"] + assert duplicates in ("delete", "update") + if duplicates == "update": + for card in extended_h.cards: + if card.keyword not in ignored_keys and card.keyword in original_keys: + print(f"Update existing key *{card.keyword} in original fits.Header*") + original_h.set(card.keyword, card.value, card.comment) + extended_h.remove(card.keyword) + elif duplicates == "delete": + for card in extended_h.cards: + if card.keyword not in ignored_keys and card.keyword in original_keys: + print(f"Delete existing key *{card.keyword} in original fits.Header*") + original_h.remove(card.keyword) + original_h.extend(extended_h, bottom=True) + return original_h diff --git a/csst_common/pipeline.py b/csst_common/pipeline.py index a8fc8c6c6b7fdb2b6acf37d1d0607289d2a4ae48..16d2fe5f02da88848d1ff64909f9758a3d57805d 100644 --- a/csst_common/pipeline.py +++ b/csst_common/pipeline.py @@ -57,7 +57,7 @@ class Pipeline: # Frequently used files self.msg = MessageWriter(os.path.join(self.dir_output, "message.txt")) - self.tsr = TimeStampRecorder(os.path.join(self.dir_output, "timestamp.txt")) + self.tsr = TimestampRecorder(os.path.join(self.dir_output, "timestamp.txt")) # self.exit_code = ExitCode(os.path.join(self.dir_output, "exit_code")) # self.error_trace = ErrorTrace(os.path.join(self.dir_output, "error_trace")) @@ -169,12 +169,12 @@ class MessageWriter: # print(f"Exit with code {code} (written to '{self.file_path}')") -class TimeStampRecorder: +class TimestampRecorder: def __init__(self, file_path: str = "tsr.txt"): """ - TimeStampRecorder Class. + TimestampRecorder Class. - Initialize a TimeStampRecorder object anc connect it to `file_path`. + Initialize a TimestampRecorder object anc connect it to `file_path`. Parameters ---------- @@ -184,7 +184,7 @@ class TimeStampRecorder: self.file_path = file_path def __repr__(self): - return f"< TimeStampRecorder [{self.file_path}] >" + return f"< TimestampRecorder [{self.file_path}] >" def empty(self): """Clean time stamp file.""" diff --git a/tests/test_file.py b/tests/test_file.py index bc45a2d27e5c9364414f4e43c44105aa39c425ac..260f3cdb1ff7ca7a8233e70e00d142a88072c24e 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -1,9 +1,13 @@ import unittest + from csst_common import File class TestFile(unittest.TestCase): def test_parameterized_module_decorator(self): - f = File("/path/to/file.fits") - self.assertEqual(f.replace_ext(".cat"), "/path/to/file.cat") - self.assertEqual(f.replace_ext("img.cat"), "/path/to/file_img.cat") + file_path = ( + "/dfs_root/L0/MSC/SCI/61605/10160000072/MS/" + "CSST_MSC_MS_SCIE_20270719043315_20270719043545_10160000072_07_L0_V01.fits" + ) + file = File(file_path, new_dir="/pipeline/output") + self.assertTrue(file.mo.groupdict() is not None) diff --git a/tests/test_fits_header_ops.py b/tests/test_fits_header_ops.py new file mode 100644 index 0000000000000000000000000000000000000000..8f298f08f5688f0a2b479a118917d0f22f5b1801 --- /dev/null +++ b/tests/test_fits_header_ops.py @@ -0,0 +1,52 @@ +import unittest +from csst_common.io import append_header +from astropy.io import fits + + +class TestFile(unittest.TestCase): + def test_append_header(self): + h1 = fits.Header() + h2 = fits.Header() + + h1.set("A", 1, "comment") + h1.set("B", 2, "comment") + h1.add_comment("=" * 72, before="A") + h1.add_comment("one", before="A") + h1.add_comment("=" * 72, before="A") + + h2.set("B", 3, "comment") + h2.set("C", 4, "comment") + h2.add_comment("=" * 72, before="B") + h2.add_comment("another", before="B") + h2.add_comment("=" * 72, before="B") + + self.assertEqual( + tuple(append_header(h1, h2, duplicates="update").keys()), + ( + "COMMENT", + "COMMENT", + "COMMENT", + "A", + "B", + "COMMENT", + "COMMENT", + "COMMENT", + "C", + ), + "update mode failed", + ) + self.assertEqual( + tuple(append_header(h1, h2, duplicates="delete").keys()), + ( + "COMMENT", + "COMMENT", + "COMMENT", + "A", + "COMMENT", + "COMMENT", + "COMMENT", + "B", + "C", + ), + "delete mode failed", + )