Newer
Older
import logging
import os
from typing import Union
import numpy as np
from astropy.io import fits
from csst_common.file_recorder import FileRecorder
from csst_common.logger import get_logger
from csst_common.status import CsstStatus
def read_image(filepath_input: str) -> np.ndarray:
def process_data(data: np.ndarray) -> np.ndarray:
""" Process data. """
return np.fliplr(np.flipud(data))
# process a single image (NOT RECOMMENDED!)
filepath_input: str,
filepath_output: str,
"""
Flip a single image.
Flip a single image with ``numpy.fliplr`` and ``numpy.flipud``.
Parameters
----------
filepath_input : str
The input filepath.
filepath_output : str
The output filepath.
The final status.
Examples
--------
>>> process_single_image(
>>> filepath_input="input_image.fits",
>>> filepath_output="output_image.fits",
>>> logger=None
>>> )
"""
# set default logger
if logger is None:
logger = get_logger()
# get an empty file recorder
fr = FileRecorder()
logger.info("Start processing image {}".format(filepath_input))
# start processing
data = read_image(filepath_input)
data_processed = process_data(data)
np.save(filepath_output, data_processed)
# record file!
fr.add_record(filepath=filepath_output, db=True, comment="the processed image")
# this will be written into the log file
logger.info("Finish processing, result saved to {}".format(filepath_output))
# check result existence
if os.path.exists(filepath_output):
# file exists, check status and precision
if fits.getheader(filepath_output)["STT_DIST"] == 0 and \
fits.getheader(filepath_output)["PCS_DIST"] < 1e-5 and \
fits.getheader(filepath_output)["NS_DIST"] >= 8:
else:
# file doesn't exist, do your fallback solution
fits.HDUList(fits.PrimaryHDU()).writeto(filepath_output)
assert os.path.exists(filepath_output)
# process multiple images in an exposure (RECOMMENDED, at least for MBI or SLS)
# define a single job
""" Process a single image, defined for parallel processing. """
filepath_input = dm.l0_detector(detector=detector)
filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits")
data = read_image(filepath_input)
data_processed = process_data(data)
np.save(filepath_output, data_processed)
# check result existence
if os.path.exists(filepath_output):
# file exists, check status and precision
if fits.getheader(filepath_output)["STT_DIST"] == 0 and \
fits.getheader(filepath_output)["PCS_DIST"] < 1e-5 and \
fits.getheader(filepath_output)["NS_DIST"] >= 8:
return CsstStatus.PERFECT
else:
return CsstStatus.WARNING
else:
# file doesn't exist, do your fallback solution
fits.HDUList(fits.PrimaryHDU()).writeto(filepath_output)
assert os.path.exists(filepath_output)
return CsstStatus.WARNING
Flip all images in an exposure in a for-loop (serial and parallel).
Parameters
----------
dm : CsstMsDataManager
The data manager of the specified exposure.
Returns
-------
The final status.
Examples
--------
>>> dm = CsstMsDataManager.quickstart(
>>> ver_sim="C5.2", dir_l1="", datatype="sls", exposure_id=100)
# get an empty file recorder
fr = FileRecorder()
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# start processing (dm.target_detectors is a list of detector number that should be processed)
# [1/2] single-thread mode
for detector in dm.target_detectors:
# this will NOT be written into the log file
dm.logger_mod.info("Start data processing for detector {}".format(detector))
filepath_input = dm.l0_detector(detector=detector)
filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits")
data = read_image(filepath_input)
data_processed = process_data(data)
np.save(filepath_output, data_processed)
# record file!
fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector))
# [2/2] multi-processing mode
dm.logger_mod.info("Starting data processing with multiprocessing ...")
status_list = joblib.Parallel(n_jobs=dm.n_jobs, backend=dm.backend)(
joblib.delayed(one_job)(dm, detector) for detector in dm.target_detectors
)
dm.logger_mod.info("Finished processing ...")
for detector in dm.target_detectors:
filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits")
fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector))
# check results
assert fr.is_good()
return CsstStatus.PERFECT if all([_ == CsstStatus.PERFECT for _ in status_list]) else CsstStatus.WARNING, fr