Skip to content
example_interface.py 5.71 KiB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
import logging
import os
from typing import Union

import numpy as np
from astropy.io import fits
import joblib
BO ZHANG's avatar
BO ZHANG committed
from csst_common.data_manager import CsstMsDataManager
from csst_common.file_recorder import FileRecorder
BO ZHANG's avatar
BO ZHANG committed
from csst_common.logger import get_logger
from csst_common.status import CsstStatus


def read_image(filepath_input: str) -> np.ndarray:
BO ZHANG's avatar
BO ZHANG committed
    """ Read image. """
    return fits.getdata(filepath_input)
BO ZHANG's avatar
BO ZHANG committed


def process_data(data: np.ndarray) -> np.ndarray:
    """ Process data. """
    return np.fliplr(np.flipud(data))


# process a single image (NOT RECOMMENDED!)
BO ZHANG's avatar
BO ZHANG committed
def process_single_image(
        filepath_input: str,
        filepath_output: str,
BO ZHANG's avatar
BO ZHANG committed
        logger: Union[None, logging.Logger] = None
) -> tuple[CsstStatus, FileRecorder]:
BO ZHANG's avatar
BO ZHANG committed
    """
    Flip a single image.

    Flip a single image with ``numpy.fliplr`` and ``numpy.flipud``.

    Parameters
    ----------
    filepath_input : str
        The input filepath.
    filepath_output : str
        The output filepath.
BO ZHANG's avatar
BO ZHANG committed
    logger : logging.Logger
        The logger.

    Returns
    -------
    tuple[CsstStatus, FileRecorder]
BO ZHANG's avatar
BO ZHANG committed
        The final status.

    Examples
    --------
    >>> process_single_image(
    >>>     filepath_input="input_image.fits",
    >>>     filepath_output="output_image.fits",
BO ZHANG's avatar
BO ZHANG committed
    >>>     logger=None
    >>> )
    """
    # set default logger
    if logger is None:
        logger = get_logger()

    # get an empty file recorder
    fr = FileRecorder()

BO ZHANG's avatar
BO ZHANG committed
    # process data
BO ZHANG's avatar
BO ZHANG committed
    logger.info("Start processing image {}".format(filepath_input))
    # start processing
    data = read_image(filepath_input)
    data_processed = process_data(data)
    np.save(filepath_output, data_processed)
    # record file!
    fr.add_record(filepath=filepath_output, db=True, comment="the processed image")
    # this will be written into the log file
    logger.info("Finish processing, result saved to {}".format(filepath_output))

    # check result existence
    if os.path.exists(filepath_output):
BO ZHANG's avatar
BO ZHANG committed
        # file exists, check status and precision
        if fits.getheader(filepath_output)["STT_DIST"] == 0 and \
                fits.getheader(filepath_output)["PCS_DIST"] < 1e-5 and \
                fits.getheader(filepath_output)["NS_DIST"] >= 8:
BO ZHANG's avatar
BO ZHANG committed
            return CsstStatus.PERFECT, fr
BO ZHANG's avatar
BO ZHANG committed
        else:
BO ZHANG's avatar
BO ZHANG committed
            return CsstStatus.WARNING, fr
BO ZHANG's avatar
BO ZHANG committed
    else:
        # file doesn't exist, do your fallback solution
        fits.HDUList(fits.PrimaryHDU()).writeto(filepath_output)
        assert os.path.exists(filepath_output)
BO ZHANG's avatar
BO ZHANG committed
        return CsstStatus.WARNING, fr
BO ZHANG's avatar
BO ZHANG committed


# process multiple images in an exposure (RECOMMENDED, at least for MBI or SLS)
# define a single job
BO ZHANG's avatar
BO ZHANG committed
def one_job(dm: CsstMsDataManager, detector: int):
    """ Process a single image, defined for parallel processing. """
    filepath_input = dm.l0_detector(detector=detector)
    filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits")
BO ZHANG's avatar
BO ZHANG committed

    # data processing
    data = read_image(filepath_input)
    data_processed = process_data(data)
    np.save(filepath_output, data_processed)
BO ZHANG's avatar
BO ZHANG committed

    # check result existence
    if os.path.exists(filepath_output):
BO ZHANG's avatar
BO ZHANG committed
        # file exists, check status and precision
        if fits.getheader(filepath_output)["STT_DIST"] == 0 and \
                fits.getheader(filepath_output)["PCS_DIST"] < 1e-5 and \
                fits.getheader(filepath_output)["NS_DIST"] >= 8:
BO ZHANG's avatar
BO ZHANG committed
            return CsstStatus.PERFECT
        else:
            return CsstStatus.WARNING
    else:
        # file doesn't exist, do your fallback solution
        fits.HDUList(fits.PrimaryHDU()).writeto(filepath_output)
        assert os.path.exists(filepath_output)
        return CsstStatus.WARNING
BO ZHANG's avatar
BO ZHANG committed
# process in serial / parallel
BO ZHANG's avatar
BO ZHANG committed
def process_multiple_images(
        dm: CsstMsDataManager,
) -> tuple[CsstStatus, FileRecorder]:
BO ZHANG's avatar
BO ZHANG committed
    """
    Flip all images.

    Flip all images in an exposure in a for-loop (serial and parallel).
BO ZHANG's avatar
BO ZHANG committed

    Parameters
    ----------
    dm : CsstMsDataManager
        The data manager of the specified exposure.

    Returns
    -------
    tuple[CsstStatus, FileRecorder]
BO ZHANG's avatar
BO ZHANG committed
        The final status.

    Examples
    --------
    >>> dm = CsstMsDataManager.quickstart(
BO ZHANG's avatar
BO ZHANG committed
    >>>     ver_sim="C5.2", dir_l1="", datatype="sls", exposure_id=100)
BO ZHANG's avatar
BO ZHANG committed
    >>> process_multiple_images(dm)
BO ZHANG's avatar
BO ZHANG committed
    """

    # get an empty file recorder
    fr = FileRecorder()

BO ZHANG's avatar
BO ZHANG committed
    # process data
BO ZHANG's avatar
BO ZHANG committed
    # start processing (dm.target_detectors is a list of detector number that should be processed)

    # [1/2] single-thread mode
    for detector in dm.target_detectors:
        # this will NOT be written into the log file
        dm.logger_mod.info("Start data processing for detector {}".format(detector))
        filepath_input = dm.l0_detector(detector=detector)
        filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits")
        data = read_image(filepath_input)
        data_processed = process_data(data)
        np.save(filepath_output, data_processed)
        # record file!
        fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector))

    # [2/2] multi-processing mode
    dm.logger_mod.info("Starting data processing with multiprocessing ...")
    status_list = joblib.Parallel(n_jobs=dm.n_jobs, backend=dm.backend)(
        joblib.delayed(one_job)(dm, detector) for detector in dm.target_detectors
    )
    dm.logger_mod.info("Finished processing ...")
    for detector in dm.target_detectors:
        filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits")
        fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector))

    # check results
    assert fr.is_good()
    return CsstStatus.PERFECT if all([_ == CsstStatus.PERFECT for _ in status_list]) else CsstStatus.WARNING, fr