Commit 9a499f72 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add some useful tools for pipeline integration

parent d8221d3f
Pipeline #2421 failed with stage
import os
import re
from typing import Optional
class File:
def __init__(self, file_path: str = "/path/to/file.fits", new_dir=None):
self.file_path = file_path
# get dir name
self.dirname = os.path.dirname(self.file_path)
# get file name
self.file_name = os.path.basename(self.file_path)
self.prefix, self.ext = os.path.splitext(self.file_name)
# default new dir
self.new_dir = new_dir if new_dir is not None else self.dirname
# match file name
pattern = re.compile(
r"(?P<telescope>[A-Z]{4})_"
r"(?P<instrument>[A-Z]+)_"
r"(?P<project>[A-Z]+)_"
r"(?P<obs_type>[A-Z]+)_"
r"(?P<exp_start>[0-9]{14})_"
r"(?P<exp_stop>[0-9]{14})_"
r"(?P<obs_id>[0-9]{11,14})_"
r"(?P<detector>[0-9]+)_"
r"L(?P<level>[0-9]{1})_"
r"V(?P<version>[0-9]{2})"
r"(?P<ext>[a-z.]+)"
)
self.mo = re.fullmatch(pattern, self.file_name)
assert self.mo is not None, f"Pattern does not match for file: {self.file_name}"
# set attributes
for k, v in self.mo.groupdict().items():
self.__setattr__(k, v)
def derive(
self,
new_dir=None,
telescope=None,
instrument=None,
project=None,
obs_type=None,
exp_start=None,
exp_stop=None,
obs_id=None,
detector=None,
level=None,
version=None,
ext=None,
):
return os.path.join(
new_dir if new_dir is not None else self.new_dir,
f"{self.telescope if telescope is None else str(telescope)}_"
f"{self.instrument if instrument is None else str(instrument)}_"
f"{self.project if project is None else str(project)}_"
f"{self.obs_type if obs_type is None else str(obs_type)}_"
f"{self.exp_start if exp_start is None else str(exp_start)}_"
f"{self.exp_stop if exp_stop is None else str(exp_stop)}_"
f"{self.obs_id if obs_id is None else str(obs_id)}_"
f"{self.detector if detector is None else str(detector)}_"
f"L{self.level if level is None else str(level)}_"
f"V{self.version if version is None else str(version)}"
f"{self.ext if ext is None else str(ext)}",
)
def derive0(self, *args, **kwargs):
return self.derive(*args, **kwargs, level=0)
def derive1(self, *args, **kwargs):
return self.derive(*args, **kwargs, level=1)
def derive2(self, *args, **kwargs):
return self.derive(*args, **kwargs, level=2)
def replace_ext(
self, new_ext: Optional[str] = "img.fits", new_dir: Optional[str] = None
) -> str:
......@@ -22,3 +84,6 @@ class File:
return os.path.join(new_dir, self.prefix + new_ext)
else:
return os.path.join(new_dir, self.prefix + "_" + new_ext)
def __repr__(self):
return f"<File {self.file_path}>"
import warnings
from copy import deepcopy
from astropy.io import fits
def check_file(file_path="test.fits") -> bool:
pass
def verify_checksum(file_path) -> bool:
"""
Verify a .fits file via checksum.
Return True if checksum is good.
Parameters
----------
file_path : str
File path.
References
----------
https://docs.astropy.org/en/stable/io/fits/usage/verification.html#verification-using-the-fits-checksum-keyword-convention
"""
with warnings.catch_warnings(record=True) as warning_list:
# file_path = fits.util.get_testdata_filepath('checksum_false.fits')
with fits.open(file_path, checksum=True):
pass
print(warning_list)
return len(warning_list) == 0
def append_header(
h1: fits.Header, h2: fits.Header, duplicates: str = "delete"
) -> fits.Header:
"""
Append h2 to h1.
Append fits headers, taken into aacount duplicated keywords.
Parameters
----------
h1 : fits.Header
Original fits header.
h2 : fits.Header
Extended fits header.
duplicates : str
The operation for processing duplicates.
"delete" to delete duplicated keywords in h1 and keep them in h2.
"update" to update duplicated keywords in h1 and remove them from h2.
Returns
-------
fits.Header
The combined header.
def _check_file_fits() -> bool:
"""Validate checksum for .fits files."""
return True
References
----------
https://docs.astropy.org/en/stable/io/fits/usage/headers.html#comment-history-and-blank-keywords
"""
# copy data
original_h = deepcopy(h1)
extended_h = deepcopy(h2)
original_keys = original_h.keys()
ignored_keys = ["", "COMMENT", "HISTORY"]
assert duplicates in ("delete", "update")
if duplicates == "update":
for card in extended_h.cards:
if card.keyword not in ignored_keys and card.keyword in original_keys:
print(f"Update existing key *{card.keyword} in original fits.Header*")
original_h.set(card.keyword, card.value, card.comment)
extended_h.remove(card.keyword)
elif duplicates == "delete":
for card in extended_h.cards:
if card.keyword not in ignored_keys and card.keyword in original_keys:
print(f"Delete existing key *{card.keyword} in original fits.Header*")
original_h.remove(card.keyword)
original_h.extend(extended_h, bottom=True)
return original_h
......@@ -57,7 +57,7 @@ class Pipeline:
# Frequently used files
self.msg = MessageWriter(os.path.join(self.dir_output, "message.txt"))
self.tsr = TimeStampRecorder(os.path.join(self.dir_output, "timestamp.txt"))
self.tsr = TimestampRecorder(os.path.join(self.dir_output, "timestamp.txt"))
# self.exit_code = ExitCode(os.path.join(self.dir_output, "exit_code"))
# self.error_trace = ErrorTrace(os.path.join(self.dir_output, "error_trace"))
......@@ -169,12 +169,12 @@ class MessageWriter:
# print(f"Exit with code {code} (written to '{self.file_path}')")
class TimeStampRecorder:
class TimestampRecorder:
def __init__(self, file_path: str = "tsr.txt"):
"""
TimeStampRecorder Class.
TimestampRecorder Class.
Initialize a TimeStampRecorder object anc connect it to `file_path`.
Initialize a TimestampRecorder object anc connect it to `file_path`.
Parameters
----------
......@@ -184,7 +184,7 @@ class TimeStampRecorder:
self.file_path = file_path
def __repr__(self):
return f"< TimeStampRecorder [{self.file_path}] >"
return f"< TimestampRecorder [{self.file_path}] >"
def empty(self):
"""Clean time stamp file."""
......
import unittest
from csst_common import File
class TestFile(unittest.TestCase):
def test_parameterized_module_decorator(self):
f = File("/path/to/file.fits")
self.assertEqual(f.replace_ext(".cat"), "/path/to/file.cat")
self.assertEqual(f.replace_ext("img.cat"), "/path/to/file_img.cat")
file_path = (
"/dfs_root/L0/MSC/SCI/61605/10160000072/MS/"
"CSST_MSC_MS_SCIE_20270719043315_20270719043545_10160000072_07_L0_V01.fits"
)
file = File(file_path, new_dir="/pipeline/output")
self.assertTrue(file.mo.groupdict() is not None)
import unittest
from csst_common.io import append_header
from astropy.io import fits
class TestFile(unittest.TestCase):
def test_append_header(self):
h1 = fits.Header()
h2 = fits.Header()
h1.set("A", 1, "comment")
h1.set("B", 2, "comment")
h1.add_comment("=" * 72, before="A")
h1.add_comment("one", before="A")
h1.add_comment("=" * 72, before="A")
h2.set("B", 3, "comment")
h2.set("C", 4, "comment")
h2.add_comment("=" * 72, before="B")
h2.add_comment("another", before="B")
h2.add_comment("=" * 72, before="B")
self.assertEqual(
tuple(append_header(h1, h2, duplicates="update").keys()),
(
"COMMENT",
"COMMENT",
"COMMENT",
"A",
"B",
"COMMENT",
"COMMENT",
"COMMENT",
"C",
),
"update mode failed",
)
self.assertEqual(
tuple(append_header(h1, h2, duplicates="delete").keys()),
(
"COMMENT",
"COMMENT",
"COMMENT",
"A",
"COMMENT",
"COMMENT",
"COMMENT",
"B",
"C",
),
"delete mode failed",
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment