Commit 48da47e8 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

added parallel mode in example interface

parent acd0f3cb
Pipeline #217 passed with stages
in 13 seconds
...@@ -178,7 +178,7 @@ Source code ...@@ -178,7 +178,7 @@ Source code
.. literalinclude:: example_interface.py .. literalinclude:: example_interface.py
:caption: ``example_interface.py`` :caption: ``example_interface.py``
:emphasize-lines: 7-10,36-40,84,86-87,90-91,94-95,99,102,148,152-153,156-157,160-161,164-165 :emphasize-lines: 7-11,36-41,85,87-88,91-92,95-96,99-101,111-116,148,155-165,167-173,178-179,182-183,186-187,190-191
:linenos: :linenos:
:language: python :language: python
......
...@@ -4,6 +4,7 @@ from typing import Union ...@@ -4,6 +4,7 @@ from typing import Union
import numpy as np import numpy as np
from astropy.io import fits from astropy.io import fits
import joblib
from csst_common.data_manager import CsstMsDataManager from csst_common.data_manager import CsstMsDataManager
from csst_common.file_recorder import FileRecorder from csst_common.file_recorder import FileRecorder
from csst_common.logger import get_logger from csst_common.logger import get_logger
...@@ -32,7 +33,7 @@ def check_results(dm: CsstMsDataManager, logger: logging.Logger) -> bool: ...@@ -32,7 +33,7 @@ def check_results(dm: CsstMsDataManager, logger: logging.Logger) -> bool:
return False return False
# process a single image # process a single image (NOT RECOMMENDED!)
def process_single_image( def process_single_image(
filepath_input: str, filepath_input: str,
filepath_output: str, filepath_output: str,
...@@ -95,15 +96,28 @@ def process_single_image( ...@@ -95,15 +96,28 @@ def process_single_image(
return CsstStatus.ERROR, fr return CsstStatus.ERROR, fr
# process an exposure (MBI or SLS) # process multiple images in an exposure (RECOMMENDED, at least for MBI or SLS)
# define a single job
def one_job(dm, detector):
""" Process a single image, defined for parallel processing. """
filepath_input = dm.l0_detector(detector=detector)
filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits")
data = read_image(filepath_input)
data_processed = process_data(data)
np.save(filepath_output, data_processed)
return
# parallel processing jobs
def process_multiple_images( def process_multiple_images(
dm: CsstMsDataManager, dm: CsstMsDataManager,
logger: Union[None, logging.Logger] = None logger: Union[None, logging.Logger] = None,
n_jobs: int = -1,
) -> tuple[CsstStatus, FileRecorder]: ) -> tuple[CsstStatus, FileRecorder]:
""" """
Flip all images. Flip all images.
Flip all images in an exposure in a for-loop (not in parallel). Flip all images in an exposure in a for-loop (serial and parallel).
Parameters Parameters
---------- ----------
...@@ -111,6 +125,8 @@ def process_multiple_images( ...@@ -111,6 +125,8 @@ def process_multiple_images(
The data manager of the specified exposure. The data manager of the specified exposure.
logger : {None, logging.Logger} logger : {None, logging.Logger}
The logger. If None, use the default logger. The logger. If None, use the default logger.
n_jobs : int
The number of prcesses.
Returns Returns
------- -------
...@@ -134,8 +150,9 @@ def process_multiple_images( ...@@ -134,8 +150,9 @@ def process_multiple_images(
# process data # process data
try: try:
# dm.target_detectors is a list of detector number that should be processed # start processing (dm.target_detectors is a list of detector number that should be processed)
# start processing
# [1/2] single-thread mode
for detector in dm.target_detectors: for detector in dm.target_detectors:
# this will NOT be written into the log file # this will NOT be written into the log file
logger.debug("Processing for detector {}".format(detector)) logger.debug("Processing for detector {}".format(detector))
...@@ -146,6 +163,15 @@ def process_multiple_images( ...@@ -146,6 +163,15 @@ def process_multiple_images(
np.save(filepath_output, data_processed) np.save(filepath_output, data_processed)
# record file! # record file!
fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector)) fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector))
# [2/2] multi-processing mode
joblib.Parallel(n_jobs=n_jobs, backend="multiprocessing")(
joblib.delayed(one_job)(dm, detector) for detector in dm.target_detectors
)
for detector in dm.target_detectors:
filepath_output = dm.l1_detector(detector=detector, post="L1_processed.fits")
fr.add_record(filepath=filepath_output, db=True, comment="processed file for Detector {}".format(detector))
# check results # check results
if check_results(dm=dm, logger=logger): if check_results(dm=dm, logger=logger):
# this will be written into the log file # this will be written into the log file
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment