import logging import os from typing import Union import numpy as np from astropy.io import fits import joblib from csst_common.data_manager import CsstMsDataManager from csst_common.file_recorder import FileRecorder from csst_common.logger import get_logger from csst_common.status import CsstStatus def read_image(filepath_input: str) -> np.ndarray: """ Read image. """ return fits.getdata(filepath_input) def process_data(data: np.ndarray) -> np.ndarray: """ Process data. """ return np.fliplr(np.flipud(data)) # process a single image (NOT RECOMMENDED!) def process_single_image( filepath_input: str, filepath_output: str, logger: Union[None, logging.Logger] = None ) -> tuple[CsstStatus, FileRecorder]: """ Flip a single image. Flip a single image with ``numpy.fliplr`` and ``numpy.flipud``. Parameters ---------- filepath_input : str The input filepath. filepath_output : str The output filepath. logger : logging.Logger The logger. Returns ------- tuple[CsstStatus, FileRecorder] The final status. Examples -------- >>> process_single_image( >>> filepath_input="input_image.fits", >>> filepath_output="output_image.fits", >>> logger=None >>> ) """ # set default logger if logger is None: logger = get_logger() # get an empty file recorder fr = FileRecorder() # process data logger.info("Start processing image {}".format(filepath_input)) # start processing data = read_image(filepath_input) data_processed = process_data(data) np.save(filepath_output, data_processed) # record file! fr.add_record(filepath=filepath_output, db=True, comment="the processed image") # this will be written into the log file logger.info("Finish processing, result saved to {}".format(filepath_output)) # check result existence if os.path.exists(filepath_output): # file exists, check precison if fits.getheader(filepath_output)["TOL"] < 1e-5: return CsstStatus.PERFECT, fr else: return CsstStatus.WARNING, fr else: # file doesn't exist, do your fallback solution fits.HDUList(fits.PrimaryHDU()).writeto(filepath_output) assert os.path.exists(filepath_output) return CsstStatus.WARNING, fr # process multiple images in an exposure (RECOMMENDED, at least for MBI or SLS) # define a single job def one_job(dm: CsstMsDataManager, detector: int): """ Process a single image, defined for parallel processing. """ filepath_input = dm.l0_detector(detector=detector) filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits") # data processing data = read_image(filepath_input) data_processed = process_data(data) np.save(filepath_output, data_processed) # check result existence if os.path.exists(filepath_output): # file exists, check precison if fits.getheader(filepath_output)["TOL"] < 1e-5: return CsstStatus.PERFECT else: return CsstStatus.WARNING else: # file doesn't exist, do your fallback solution fits.HDUList(fits.PrimaryHDU()).writeto(filepath_output) assert os.path.exists(filepath_output) return CsstStatus.WARNING # process in serial / parallel def process_multiple_images( dm: CsstMsDataManager, ) -> tuple[CsstStatus, FileRecorder]: """ Flip all images. Flip all images in an exposure in a for-loop (serial and parallel). Parameters ---------- dm : CsstMsDataManager The data manager of the specified exposure. Returns ------- tuple[CsstStatus, FileRecorder] The final status. Examples -------- >>> dm = CsstMsDataManager.quickstart( >>> ver_sim="C5.2", dir_l1="", datatype="sls", exposure_id=100) >>> process_multiple_images(dm) """ # get an empty file recorder fr = FileRecorder() # process data # start processing (dm.target_detectors is a list of detector number that should be processed) # [1/2] single-thread mode for detector in dm.target_detectors: # this will NOT be written into the log file dm.logger_mod.info("Start data processing for detector {}".format(detector)) filepath_input = dm.l0_detector(detector=detector) filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits") data = read_image(filepath_input) data_processed = process_data(data) np.save(filepath_output, data_processed) # record file! fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector)) # [2/2] multi-processing mode dm.logger_mod.info("Starting data processing with multiprocessing ...") status_list = joblib.Parallel(n_jobs=dm.n_jobs, backend=dm.backend)( joblib.delayed(one_job)(dm, detector) for detector in dm.target_detectors ) dm.logger_mod.info("Finished processing ...") for detector in dm.target_detectors: filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits") fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector)) # check results assert fr.is_good() return CsstStatus.PERFECT if all([_ == CsstStatus.PERFECT for _ in status_list]) else CsstStatus.WARNING, fr