Skip to content
data_manager.py 26.1 KiB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
"""
Identifier:     KSC-SJ4-csst_common/data_manager.py
Name:           data_manager.py
Description:    file path generator
Author:         Bo Zhang
Created:        2022-09-13
Modified-History:
    2022-09-13, Bo Zhang, created
BO ZHANG's avatar
BO ZHANG committed
    2022-09-13, Bo Zhang, added CsstMbiDataManager
BO ZHANG's avatar
BO ZHANG committed
    2022-09-29, Bo Zhang, favor CsstMsDataManager instead of CsstMbiDataManager
    2022-10-26, Bo Zhang, reconstruct CsstMsDataManager, deprecate CsstMbiDataManager
BO ZHANG's avatar
BO ZHANG committed
    2022-10-28, Bo Zhang, added CsstMsDataManager.query_rc(), dm.use_dfs, dm.node
BO ZHANG's avatar
BO ZHANG committed
    2022-11-06, Bo Zhang, deleted CsstMbiDataManager
BO ZHANG's avatar
BO ZHANG committed
    2022-11-20, Bo Zhang, added DFS APIs
BO ZHANG's avatar
BO ZHANG committed
"""
import glob
BO ZHANG's avatar
BO ZHANG committed
import os
import re
BO ZHANG's avatar
BO ZHANG committed
from typing import Union
BO ZHANG's avatar
BO ZHANG committed
import numpy as np
from astropy.io import fits
BO ZHANG's avatar
BO ZHANG committed
from astropy.table import Table
BO ZHANG's avatar
BO ZHANG committed
from csst_dfs_api.facility.calmerge import CalMergeApi
from csst_dfs_api.facility.level0 import Level0DataApi
from csst_dfs_api.facility.level0prc import Level0PrcApi
BO ZHANG's avatar
BO ZHANG committed
from csst_dfs_api.msc.level1 import Level1DataApi
BO ZHANG's avatar
BO ZHANG committed
from csst_dfs_api.msc.level2 import Level2DataApi
BO ZHANG's avatar
BO ZHANG committed
from csst_dfs_api.common.catalog import CatalogApi
BO ZHANG's avatar
BO ZHANG committed
from .logger import get_logger
from .params import CSST_PARAMS as CP
BO ZHANG's avatar
BO ZHANG committed
from .params import DFS_CONF
BO ZHANG's avatar
BO ZHANG committed
class CsstMsDataManager:
BO ZHANG's avatar
BO ZHANG committed
    """
    CSST MS data manager, including MBI and SLS.
BO ZHANG's avatar
BO ZHANG committed

BO ZHANG's avatar
BO ZHANG committed
    ``CsstMsDataManager`` provides an interface to switch between DFS and local file system.
    To initialize ``CsstMsDataManager`` from local directory, use ``CsstMsDataManager.from_dir()``
    To initialize ``CsstMsDataManager`` on ``dandelion`` or ``PM node``, ``CsstMsDataManager.quickstart()``.
    To initialize ``CsstMsDataManager`` from DFS, use ``CsstMsDataManager.from_dfs()``.
BO ZHANG's avatar
BO ZHANG committed
    To generate L0 and L1 file paths, use ``CsstMsDataManager.l0_detector()``, ``CsstMsDataManager.l1_detector()``, etc.
BO ZHANG's avatar
BO ZHANG committed
    Here are some examples for simulation with different versions.
BO ZHANG's avatar
BO ZHANG committed
    C3:
        MSC_MS_210525220000_100000020_06_raw.fits
        MSC_CRS_210525220000_100000020_06_raw.fits
        MSC_210525120000_0000020_06.cat
    C5.1:
        CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
        CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
        MSC_10000100_chip_06_filt_y.cat
        MSC_10000100_chip_06_filt_y.log
    C5.2
        CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
        CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
        MSC_100000100_chip_06_filt_y.cat
        MSC_100000100_chip_06_filt_y.log

BO ZHANG's avatar
BO ZHANG committed
    Parameters
    ----------
    ver_sim : str
BO ZHANG's avatar
BO ZHANG committed
        The version of simulation data, see ``csst_common.params.CP``.
    datatype : str
        The options are {"mbi", "sls", "all"}.
        The "all" option is used for QC in particular.
        Note that in this case methods like ``get_bias`` are unavailable.
    available_detectors : list
        The list of available detector serial numbers of available images.
    target_detectors : list
        The list of target detector serial numbers of available images.
BO ZHANG's avatar
BO ZHANG committed
    dir_l0 : str
        The L0 directory.
    dir_l1 : str
        The L1 directory.
BO ZHANG's avatar
BO ZHANG committed
    path_aux : str
        The aux data directory (bias, flat, dark).
BO ZHANG's avatar
BO ZHANG committed
    use_dfs : bool
        If True, use DFS. In case some modules may have other options such as astroquery.
    dfs_node : str
BO ZHANG's avatar
BO ZHANG committed
        The environment in which the pipeline will run.
        Use "pml" for Purple Mountain Lab cluster and "local" for others.
BO ZHANG's avatar
BO ZHANG committed
    dfs_root : str
        The DFS root path.
BO ZHANG's avatar
BO ZHANG committed
    obs_id : int
BO ZHANG's avatar
BO ZHANG committed
        The exposure ID.
BO ZHANG's avatar
BO ZHANG committed
    exp_start : int
BO ZHANG's avatar
BO ZHANG committed
        The exposure start time in ``yyyymmddhhmmss`` format.
BO ZHANG's avatar
BO ZHANG committed
    exp_stop : int
BO ZHANG's avatar
BO ZHANG committed
        The exposure start time in ``yyyymmddhhmmss`` format.
    _telescope : str
        The telescope name. Defaults to ``CSST`` for C5.2 simulation.
    _instrument : str
        The instrument name. Defaults to ``MSC`` for C5.2 simulation.
    _survey : str
        The survey name. Defaults to ``MS`` for C5.2 simulation.
BO ZHANG's avatar
BO ZHANG committed
    obs_type : str
BO ZHANG's avatar
BO ZHANG committed
        The image type signature for science images. Defualts to ``SCI`` for C5.2 simulation.
BO ZHANG's avatar
BO ZHANG committed
    l0_post : str
BO ZHANG's avatar
BO ZHANG committed
        The postfix. Defaults to ``L0_1`` for C5.2 simulation.
BO ZHANG's avatar
BO ZHANG committed
    log_ppl : str
        The pipeline log file name.
    log_mod : str
        The module log file name.
    clear_dir : bool
        If True, clear ``dm.dir_l1`` directory.
BO ZHANG's avatar
BO ZHANG committed
    verbose : bool
        If True, print verbose info.
BO ZHANG's avatar
BO ZHANG committed
    n_jobs : int
        The number of jobs.
    backend : str
        The joblib backend.
BO ZHANG's avatar
BO ZHANG committed

    Examples
    --------
BO ZHANG's avatar
BO ZHANG committed
    >>> dm_mbi = CsstMsDataManager(...)
BO ZHANG's avatar
BO ZHANG committed
    >>> # access L0 directory
    >>> dm_mbi.dir_l0
    >>> # access L1 directory
    >>> dm_mbi.dir_l1
    >>> # access path_aux
    >>> dm_mbi.path_aux
    >>> # access ver_sim
    >>> dm_mbi.ver_sim
    >>> # access target detectors
    >>> dm_mbi.target_detectors
    >>> # access available detectors
    >>> dm_mbi.available_detectors
    >>> # define an L1 file (detector-specified)
    >>> dm_mbi.l1_detector(detector=6)
    >>> # define an L1 file (non-detector-specified)
    >>> dm_mbi.l1_file("flipped_image.fits")
BO ZHANG's avatar
BO ZHANG committed
    """

    def __init__(self,
BO ZHANG's avatar
BO ZHANG committed
                 ver_sim: str = "C5.2",
                 datatype: str = "mbi",
                 available_detectors: Union[None, list] = None,
BO ZHANG's avatar
BO ZHANG committed
                 target_detectors: Union[None, list, int] = None,
BO ZHANG's avatar
BO ZHANG committed
                 dir_l0: str = ".",
                 dir_l1: str = ".",
                 path_aux: str = "",  # bias dark flat
BO ZHANG's avatar
BO ZHANG committed
                 use_dfs: bool = False,
                 dfs_node: str = "kmust",
BO ZHANG's avatar
BO ZHANG committed
                 dfs_root: str = "/share/dfs",
BO ZHANG's avatar
BO ZHANG committed
                 obs_id: str = "100000100",
                 exp_start: int = "20270810081950",
                 exp_stop: int = "20270810082220",
BO ZHANG's avatar
BO ZHANG committed
                 _telescope: str = "CSST",
                 _instrument: str = "MSC",
                 _survey: str = "MS",
BO ZHANG's avatar
BO ZHANG committed
                 obs_type: str = "SCI",
                 l0_post: str = "L0_1",
BO ZHANG's avatar
BO ZHANG committed
                 log_ppl="csst-l1ppl.log",
                 log_mod="csst-l1mod.log",
BO ZHANG's avatar
BO ZHANG committed
                 verbose=True,
BO ZHANG's avatar
BO ZHANG committed
                 n_jobs=18,
                 backend="multiprocessing"
BO ZHANG's avatar
BO ZHANG committed
        assert ver_sim in CP["sim"]["versions"]
BO ZHANG's avatar
BO ZHANG committed
        self.ver_sim = ver_sim
BO ZHANG's avatar
BO ZHANG committed

        # datatype, valid_detectors, detector2filter
        assert datatype in ["mbi", "sls", "all"]
BO ZHANG's avatar
BO ZHANG committed
        self.datatype = datatype
BO ZHANG's avatar
BO ZHANG committed
        if datatype == "mbi":
            # MBI
            self.valid_detectors = CP["mbi"]["detectors"]
            self.detector2filter = CP["mbi"]["detector2filter"]
        elif datatype == "sls":
BO ZHANG's avatar
BO ZHANG committed
            # SLS
            self.valid_detectors = CP["sls"]["detectors"]
            self.detector2filter = CP["sls"]["detector2filter"]
            self.valid_detectors = CP["all"]["detectors"]
            self.detector2filter = CP["all"]["detector2filter"]
BO ZHANG's avatar
BO ZHANG committed
        if verbose:
            print("Data type is: ", self.datatype)
            print("Valid detectors are: ", self.valid_detectors)

        # available_detectors
        self.available_detectors = available_detectors if available_detectors is not None else list()
BO ZHANG's avatar
BO ZHANG committed
        if verbose:
            print("Available detectors are:", self.available_detectors)
        # set all available detectors by default
        self.target_detectors = target_detectors
BO ZHANG's avatar
BO ZHANG committed
        if verbose:
            print("Target detectors are: ", self._target_detectors)

        # exposure info
BO ZHANG's avatar
BO ZHANG committed
        self.obs_id = obs_id
        self.exp_start = exp_start
        self.exp_stop = exp_stop
BO ZHANG's avatar
BO ZHANG committed

        # file name components
        self._telescope = _telescope
        self._instrument = _instrument
        self._survey = _survey
BO ZHANG's avatar
BO ZHANG committed
        self.obs_type = obs_type
        self.l0_post = l0_post
BO ZHANG's avatar
BO ZHANG committed

BO ZHANG's avatar
BO ZHANG committed
        # DFS configuration
        self.use_dfs = use_dfs
        self.dfs_node = dfs_node
BO ZHANG's avatar
BO ZHANG committed
        self.dfs_root = dfs_root

        # data directory
BO ZHANG's avatar
BO ZHANG committed
        self.dir_l0 = dir_l0
        self.dir_l1 = dir_l1
        self.path_aux = path_aux

        # record hard code names in history
BO ZHANG's avatar
BO ZHANG committed
        self.hardcode_history = []

BO ZHANG's avatar
BO ZHANG committed
        self.n_jobs = n_jobs
        self.backend = backend
BO ZHANG's avatar
BO ZHANG committed

BO ZHANG's avatar
BO ZHANG committed
        # aXe
BO ZHANG's avatar
BO ZHANG committed
        self.set_env()
BO ZHANG's avatar
BO ZHANG committed
        # change to working directory
        os.chdir(self.dir_l1)
        # clear dir_l1
        if clear_dir:
            self.clear_dir(self.dir_l1)
BO ZHANG's avatar
BO ZHANG committed
        # pipeline logger
BO ZHANG's avatar
BO ZHANG committed
        self.logger_ppl = get_logger(name="CSST L1 Pipeline Logger", filename=os.path.join(dir_l1, log_ppl))
BO ZHANG's avatar
BO ZHANG committed
        if verbose:
            self.logger_ppl.info("logger_ppl initialized")
BO ZHANG's avatar
BO ZHANG committed
        # module logger
        self.logger_mod = get_logger(name="CSST L1 Module Logger", filename=os.path.join(dir_l1, log_mod))
BO ZHANG's avatar
BO ZHANG committed
        if verbose:
            self.logger_mod.info("logger_mod initialized")
BO ZHANG's avatar
BO ZHANG committed

        self.custom_bias = None
        self.custom_dark = None
        self.custom_flat = None

    # DFS APIs
    @property
    def dfs_L0DataApi(self):
        return Level0DataApi()

    @property
    def dfs_L1DataApi(self):
        return Level1DataApi()

    @property
    def dfs_L2DataApi(self):
        return Level2DataApi()

    @property
    def dfs_L0PrcApi(self):
        return Level0PrcApi()

    @property
    def dfs_CalApi(self):
        return CalMergeApi()

BO ZHANG's avatar
BO ZHANG committed
    @property
    def dfs_CatApi(self):
        return CatalogApi()

BO ZHANG's avatar
BO ZHANG committed
    def set_env(self):
        """ set environment variables """
        if os.uname()[1] == "dandelion":
BO ZHANG's avatar
BO ZHANG committed
            os.environ["LD_LIBRARY_PATH"] = "/home/csstpipeline/anaconda3/lib"
            os.environ["AXE_BINDIR"] = "/home/csstpipeline/PycharmProjects/axe/cextern/src"
BO ZHANG's avatar
BO ZHANG committed
        else:
            os.environ["LD_LIBRARY_PATH"] = ""
BO ZHANG's avatar
BO ZHANG committed
            os.environ["AXE_BINDIR"] = ""
    @property
    def target_detectors(self):
        return self._target_detectors

    @target_detectors.setter
    def target_detectors(self, detectors: Union[None, list, int] = None):
        assert detectors is None or type(detectors) in [list, int]
        if detectors is None:
            self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors))
        elif isinstance(detectors, list):
            self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors) & set(detectors))
        elif isinstance(detectors, int):
            self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors) & {detectors})

    def set_detectors(self, detectors=None):
        raise DeprecationWarning("This method is deprecated, please directly use dm.target_detectors = detectors!")

    @staticmethod
BO ZHANG's avatar
BO ZHANG committed
    def from_dir(
            ver_sim="C5.2",
            datatype="mbi",
            dir_l0=".",
            dir_l1=".",
            path_aux="",
            use_dfs=False,
            dfs_node="kmust",
            n_jobs=18,
            backend="multiprocessing"
    ):
        """ initialize the multi-band imaging data manager """
BO ZHANG's avatar
BO ZHANG committed

        assert ver_sim in ["C5.2", ]

        # glob files
        fps_img = CsstMsDataManager.glob_image(dir_l0, ver_sim=ver_sim)
        if len(fps_img) == 0:
            raise FileNotFoundError(f"No file found in dir_l0: {dir_l0}")

        # available detectors
        available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img]
        available_detectors.sort()

        # parse info
BO ZHANG's avatar
BO ZHANG committed
        (_telescope, _instrument, _survey, obs_type,
         exp_start, exp_stop, obs_id,
         _detector, *l0_post, _ext) = re.split(r"[_.]", fps_img[0])
BO ZHANG's avatar
BO ZHANG committed
        # exp_start = int(exp_start)
        # exp_stop = int(exp_stop)
        # obs_id = int(obs_id)
BO ZHANG's avatar
BO ZHANG committed
        return CsstMsDataManager(
            ver_sim=ver_sim,
            datatype=datatype,
            available_detectors=available_detectors,
            target_detectors=None,
            dir_l0=dir_l0,
            dir_l1=dir_l1,
            path_aux=path_aux,  # bias dark flat
            use_dfs=use_dfs,
            dfs_node=dfs_node,
            obs_id=obs_id,
            exp_start=exp_start,
            exp_stop=exp_stop,
            _telescope=_telescope,
            _instrument=_instrument,
            _survey=_survey,
            obs_type=obs_type,
            l0_post="_".join(l0_post),
            n_jobs=n_jobs,
            backend=backend
        )
BO ZHANG's avatar
BO ZHANG committed
    @staticmethod
BO ZHANG's avatar
BO ZHANG committed
    def glob_image(dir_l0, ver_sim="C5.2"):
BO ZHANG's avatar
BO ZHANG committed
        """ glob files in L0 data directory """
        if ver_sim == "C3":
            pattern = os.path.join(dir_l0, "MSC_MS_*_raw.fits")
        else:
            assert ver_sim in ["C5.1", "C5.2"]
            pattern = os.path.join(dir_l0, "CSST_MSC_MS_SCI_*.fits")
        fps = glob.glob(pattern)
        fps = [os.path.basename(fp) for fp in fps]
        fps.sort()

        print("{} files found with pattern: {}".format(len(fps), pattern))
BO ZHANG's avatar
BO ZHANG committed
        return fps

    @staticmethod
    def glob_cat(dir_l0, ver_sim="C5"):
        """ glob input catalogs in L0 data directory """
        assert ver_sim in ["C5.1", "C5.2"]
        pattern = os.path.join(dir_l0, "MSC_*.cat")
BO ZHANG's avatar
BO ZHANG committed
        fps = glob.glob(pattern)
        fps = [os.path.basename(fp) for fp in fps]
        fps.sort()

        print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
        return fps

BO ZHANG's avatar
BO ZHANG committed
    def l0_id(self, detector=6):
        """ Level0 ID, consistent with DFS. """
        return f"{self.obs_id}{detector:02d}"

BO ZHANG's avatar
BO ZHANG committed
    def l0_cat(self, detector=6):
        """ the L0 cat file path"""
        assert self.ver_sim == "C5.2"
        fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
BO ZHANG's avatar
BO ZHANG committed
            self._instrument, self.obs_id, detector, self.detector2filter[detector])
BO ZHANG's avatar
BO ZHANG committed
        return os.path.join(self.dir_l0, fn)

    def l0_log(self, detector=6):
        """ L0 log file path """
        assert self.ver_sim == "C5.2"
        fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
BO ZHANG's avatar
BO ZHANG committed
            self._instrument, self.obs_id, detector, self.detector2filter[detector])
BO ZHANG's avatar
BO ZHANG committed
        return os.path.join(self.dir_l0, fn)

    def l0_detector(self, detector=6):
        """ L0 detector-specific image file path """
        assert self.ver_sim in ["C5.1", "C5.2"]
        fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format(
            self._telescope, self._instrument, self._survey,
BO ZHANG's avatar
BO ZHANG committed
            self.exp_start, self.exp_stop, self.obs_id, detector)
BO ZHANG's avatar
BO ZHANG committed
        return os.path.join(self.dir_l0, fn)

    def l0_crs(self, detector=6):
        """ L0 cosmic ray file path """
        assert self.ver_sim in ["C5.1", "C5.2"]
        fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format(
            self._telescope, self._instrument, self._survey,
BO ZHANG's avatar
BO ZHANG committed
            self.exp_start, self.exp_stop, self.obs_id, detector)
BO ZHANG's avatar
BO ZHANG committed
        return os.path.join(self.dir_l0, fn)

    def l1_detector(self, detector=6, post="img.fits"):
        """ generate L1 file path

        Parameters
        ----------
        detector:
            detector ID
        post:
            postfix
            e.g, {"img.fits", "wht.fits", "flg.fits", "img_L1.fits", "wht_L1.fits", "flg_L1.fits"}

        Returns
        -------
        L1 file path

        """
        assert self.ver_sim in ["C5.1", "C5.2"]
        fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format(
            self._telescope, self._instrument, self._survey,
BO ZHANG's avatar
BO ZHANG committed
            self.exp_start, self.exp_stop, self.obs_id, detector, post)
BO ZHANG's avatar
BO ZHANG committed
        return os.path.join(self.dir_l1, fn)

BO ZHANG's avatar
BO ZHANG committed
    def get_bias(self, detector=6):
BO ZHANG's avatar
BO ZHANG committed
        """ get bias data """
        if self.custom_bias is None:
            if self.datatype == "mbi":
                return glob.glob(self.path_aux.format("CLB", detector))[0]
            else:
                return self.path_aux.format(detector, "bias")
BO ZHANG's avatar
BO ZHANG committed
        else:
            return self.custom_bias(detector)
BO ZHANG's avatar
BO ZHANG committed

BO ZHANG's avatar
BO ZHANG committed
    def get_dark(self, detector=6):
BO ZHANG's avatar
BO ZHANG committed
        """ get dark data """
        if self.custom_dark is None:
            if self.datatype == "mbi":
                return glob.glob(self.path_aux.format("CLD", detector))[0]
            else:
                return self.path_aux.format(detector, "dark")
BO ZHANG's avatar
BO ZHANG committed
        else:
            return self.custom_dark(detector)
BO ZHANG's avatar
BO ZHANG committed

BO ZHANG's avatar
BO ZHANG committed
    def get_flat(self, detector=6):
BO ZHANG's avatar
BO ZHANG committed
        """ get flat data """
        if self.custom_flat is None:
            if self.datatype == "mbi":
                return glob.glob(self.path_aux.format("CLF", detector))[0]
            else:
                return self.path_aux.format(detector, "flat")
BO ZHANG's avatar
BO ZHANG committed
        else:
            return self.custom_flat(detector)
BO ZHANG's avatar
BO ZHANG committed

BO ZHANG's avatar
BO ZHANG committed
    def get_axeconf(self):
        if os.uname()[1] == "dandelion":
            return "/home/csstpipeline/L1Pipeline/aux/axeconf"
        else:
            # in docker
            return "/L1Pipeline/aux/axeconf"
BO ZHANG's avatar
BO ZHANG committed

BO ZHANG's avatar
BO ZHANG committed
    def l1_file(self, name="", comment=""):
        """ L1 file path

        Parameters
        ----------
        name : str
BO ZHANG's avatar
BO ZHANG committed
            file name
        comment : str
BO ZHANG's avatar
BO ZHANG committed
            use the function name plz

        Returns
        -------
        fp: str
            the synthetic file path

        """
        fp = os.path.join(self.dir_l1, name)
        # record hardcode history
        self.hardcode_history.append(dict(hdcd=fp, comment=comment))
        return fp

BO ZHANG's avatar
BO ZHANG committed
    def get_sls_info(self):
        """ Get the target SLS image header info and return. """
BO ZHANG's avatar
BO ZHANG committed
        # if self.use_dfs:
BO ZHANG's avatar
BO ZHANG committed
        #     raise NotImplementedError()
        # else:
        assert len(self.target_detectors) == 1
        header = fits.getheader(self.l0_detector(self.target_detectors[0]), ext=1)
BO ZHANG's avatar
BO ZHANG committed
        return header

BO ZHANG's avatar
BO ZHANG committed
    def get_mbi_info(self):
        """ Get all MBI image header info and return as a table. """
BO ZHANG's avatar
BO ZHANG committed
        # if self.use_dfs:
BO ZHANG's avatar
BO ZHANG committed
        #     raise NotImplementedError()
        # else:
        info = Table.read("/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/t_mbi_l1.fits")
BO ZHANG's avatar
BO ZHANG committed
        return info

BO ZHANG's avatar
BO ZHANG committed
    @staticmethod
    def quickstart(ver_sim="C5.2", datatype="mbi", dir_l1=".", exposure_id=100,
BO ZHANG's avatar
BO ZHANG committed
                   use_dfs=False, dfs_node="kmust", clear_dir=False):
        """
        Quick dataset generator for tests on dandelion or PML
BO ZHANG's avatar
BO ZHANG committed

        Parameters
        ----------
BO ZHANG's avatar
BO ZHANG committed
        ver_sim : str
BO ZHANG's avatar
BO ZHANG committed
            {"C5.2"}
BO ZHANG's avatar
BO ZHANG committed
        datatype : str
BO ZHANG's avatar
BO ZHANG committed
            {"mbi", "sls"}
BO ZHANG's avatar
BO ZHANG committed
        dir_l1 : str
BO ZHANG's avatar
BO ZHANG committed
            output directory
BO ZHANG's avatar
BO ZHANG committed
        exposure_id : int
            The serial number of the exposure. 20-154 for C5.2.
BO ZHANG's avatar
BO ZHANG committed
        use_dfs : bool
BO ZHANG's avatar
BO ZHANG committed
            If True, use DFS.
        dfs_node : str
            The DFS node. Defaults to "kmust", could be "pml".
        clear_dir : bool
            If True, clear dir_l1.
BO ZHANG's avatar
BO ZHANG committed

        Returns
        -------
BO ZHANG's avatar
BO ZHANG committed
        CsstMsDataManager
            The Main Survey Data Manager instance.
BO ZHANG's avatar
BO ZHANG committed
        """
        assert datatype in ["mbi", "sls"]
        # auto identify node name
        hostname = os.uname()[1]
        assert hostname in ["dandelion", "ubuntu"]

        # dandelion
        if hostname == "dandelion" and datatype == "mbi":
            dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/" \
                     "NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
            path_aux = "/nfsdata/users/cham/L1Test/ref_C5.2/MSC_{}_*_{:02d}_combine.fits"
        elif hostname == "dandelion" and datatype == "sls":
BO ZHANG's avatar
BO ZHANG committed
            dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/" \
BO ZHANG's avatar
BO ZHANG committed
                     "NGP_AstrometryON_shearOFF_Spec/MSC_{:07d}/".format(exposure_id)
BO ZHANG's avatar
BO ZHANG committed
            path_aux = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/csst_{:02d}{}.fits"
BO ZHANG's avatar
BO ZHANG committed

        # PMO
        elif hostname == "ubuntu" and datatype == "mbi":
            dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
                     "NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
            path_aux = "/data/sim_data/MSC_0000100/ref/MSC_{}_*_{:02d}_combine.fits"
        elif hostname == "ubuntu" and datatype == "sls":
            dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
                     "NGP_AstrometryON_shearOFF_Spec/MSC_{:07d}/".format(exposure_id)
            path_aux = ""
        else:
BO ZHANG's avatar
BO ZHANG committed
            raise ValueError("@DM: invalid hostname {} or datatype {}!".format(hostname, datatype))
BO ZHANG's avatar
BO ZHANG committed

        return CsstMsDataManager.from_dir(
BO ZHANG's avatar
BO ZHANG committed
            ver_sim=ver_sim, datatype=datatype, dir_l0=dir_l0, dir_l1=dir_l1, path_aux=path_aux,
BO ZHANG's avatar
BO ZHANG committed
            use_dfs=use_dfs, dfs_node=dfs_node)
BO ZHANG's avatar
BO ZHANG committed

    def __repr__(self):
        lines = ""
        lines += "<CsstMsDataManager>\n"
        lines += f"- Data type = {self.datatype}\n"
        lines += f"- Valid detectors = {self.valid_detectors}\n"
        lines += f"- Available detectors = {self.available_detectors}\n"
        lines += f"- Target detectors = {self.target_detectors}\n"
        lines += f"- dir_l0 = {self.dir_l0}\n"
        lines += f"- dir_l1 = {self.dir_l1}\n"
BO ZHANG's avatar
BO ZHANG committed
        lines += f"- use_dfs = {self.use_dfs}\n"
        lines += f"- dfs_node = {self.dfs_node}\n"
        lines += f"- CSST_DFS_GATEWAY = " + os.getenv("CSST_DFS_GATEWAY") + "\n"
    def remove_files(self, fmt="*.fits"):
BO ZHANG's avatar
BO ZHANG committed
        """ Remove L1 files conforming the format. """
        os.system(f"rm -rf {os.path.join(self.dir_l1, fmt)}")
BO ZHANG's avatar
BO ZHANG committed
    def remove_dir(self, dir_name):
        """ Remove L1 (sub-)directory. """
        os.system(f"rm -rf {os.path.join(self.dir_l1, dir_name)}")

    @staticmethod
    def clear_dir(dir_path):
        os.system(f"rm -rf {dir_path}/*")

BO ZHANG's avatar
BO ZHANG committed
    # DFS interfaces
    @property
    def dfs_node(self):
        return self._dfs_node

    @dfs_node.setter
    def dfs_node(self, dfs_node):
        # for DFS configuration, defaults to "local", could be "pml"
        assert dfs_node in DFS_CONF.keys()
        self._dfs_node = dfs_node
        for k, v in DFS_CONF[dfs_node].items():
            os.environ[k] = v
    @staticmethod
    def dfs_is_available():
BO ZHANG's avatar
BO ZHANG committed
        """ Test if DFS works. """
        try:
BO ZHANG's avatar
BO ZHANG committed
            tbl = CatalogApi().catalog_query(
                catalog_name="gaia3", ra=180, dec=0, radius=.1, min_mag=0, max_mag=30, obstime=-1, limit=-1
            )
            return len(tbl) > 0
BO ZHANG's avatar
BO ZHANG committed
        except:
            return False

    def dfs_rc_query(self, ra=180, dec=0, radius=2, min_mag=0, max_mag=30, obstime=-1, limit=-1):
        """ Query Reference Catalog (RC) from DFS. """
        try:
BO ZHANG's avatar
BO ZHANG committed
            cat = self.dfs_CatApi.catalog_query(
                catalog_name="gaia3", ra=ra, dec=dec, radius=radius, min_mag=min_mag, max_mag=max_mag,
                obstime=obstime, limit=limit
            )
            tbl = self.dfs_CatApi.to_table(cat)
BO ZHANG's avatar
BO ZHANG committed
            return tbl
        except:
            print("Error occurred during the query!")
            return None

    def dfs_l1_push(self):
        """ Push MBI/SLS L1 data to DFS. """
        # l1api = get_l1api()
        # l1api.write()
BO ZHANG's avatar
BO ZHANG committed

    def dfs_l2_push(self):
        """ Push SLS spectra to DFS. """
        pass

BO ZHANG's avatar
BO ZHANG committed
    @staticmethod
    def from_l1id(
            l1_id="1000000001",
            datatype="sls",
            dir_l0="/L1Pipeline/L0",
            dir_l1="/L1Pipeline/L1",
            use_dfs=True,
            dfs_node="pml",
            clear_l1=False,
            dfs_root="/share/dfs"
    ):
        pass

BO ZHANG's avatar
BO ZHANG committed
    @staticmethod
    def from_dfs(
            obs_id="100000100",
            datatype="mbi",
            dir_l0="/L1Pipeline/L0",
            dir_l1="/L1Pipeline/L1",
BO ZHANG's avatar
BO ZHANG committed
            use_dfs=True,
BO ZHANG's avatar
BO ZHANG committed
            dfs_node="pml",
            clear_l1=False,
BO ZHANG's avatar
BO ZHANG committed
            dfs_root="/share/dfs",
            n_jobs=18,
            backend="multiprocessing"
BO ZHANG's avatar
BO ZHANG committed
        """ Initialize CsstMsDataManager from DFS. """
        # (clear and) make directories
        if os.path.exists(dir_l0):
            os.system(f"rm -rf {dir_l0}/*")
        else:
            os.mkdir(dir_l0)
        # if os.path.exists(dir_l1):
        #     os.system(f"rm -rf {dir_l1}/*")
        # else:
        #     os.mkdir(dir_l1)
        # os.chdir(dir_l1)
        if not os.path.exists(dir_l1):
            os.mkdir(dir_l1)
BO ZHANG's avatar
BO ZHANG committed
        elif clear_l1:
            os.system(f"rm -rf {dir_l1}/*")
BO ZHANG's avatar
BO ZHANG committed
        print(f"Query obs_id={obs_id} ...", end="")
BO ZHANG's avatar
BO ZHANG committed
        records = CsstMsDataManager(dfs_node=dfs_node, verbose=False).dfs_L0DataApi.find(obs_id=obs_id)
BO ZHANG's avatar
BO ZHANG committed
        print(f"{records['totalCount']} records obtained!")
BO ZHANG's avatar
BO ZHANG committed
        tbl = Table([_.__dict__ for _ in records["data"]])
        tbl.sort(["detector_no", "obs_type"])

BO ZHANG's avatar
BO ZHANG committed
        print("Making symbolic links ...")
        for i_rec in range(len(tbl)):
BO ZHANG's avatar
BO ZHANG committed
            os.symlink(
BO ZHANG's avatar
BO ZHANG committed
                src=os.path.join(dfs_root, tbl["file_path"][i_rec]),
                dst=os.path.join(dir_l0, os.path.basename(tbl["file_path"][i_rec])),
        # as from_dfs only works in docker mode
BO ZHANG's avatar
BO ZHANG committed
        if datatype == "mbi":
            path_aux = "/L1Pipeline/aux/C5.2_ref_mbi/MSC_{}_*_{:02d}_combine.fits"
BO ZHANG's avatar
BO ZHANG committed
        elif datatype == "sls":
            path_aux = "/L1Pipeline/aux/C5.2_ref_sls/csst_{:02d}{}.fits"
BO ZHANG's avatar
BO ZHANG committed
        else:
            raise ValueError(f"Bad datatype: {datatype}")

BO ZHANG's avatar
BO ZHANG committed
        # initialize dm
        dm = CsstMsDataManager.from_dir(
            ver_sim="C5.2", datatype=datatype, dir_l0=dir_l0, dir_l1=dir_l1,
BO ZHANG's avatar
BO ZHANG committed
            path_aux=path_aux, use_dfs=use_dfs, dfs_node=dfs_node,
            n_jobs=n_jobs, backend=backend
BO ZHANG's avatar
BO ZHANG committed
        )
BO ZHANG's avatar
BO ZHANG committed
        assert dm.obs_id == obs_id
BO ZHANG's avatar
BO ZHANG committed

        return dm

BO ZHANG's avatar
BO ZHANG committed
    def dfs_l0_query(self, obs_id: str = "100000100"):
BO ZHANG's avatar
BO ZHANG committed
        """ Query L0 data from DFS. """
BO ZHANG's avatar
BO ZHANG committed
        result = self.dfs_L0DataApi.find(obs_id=str(obs_id))
BO ZHANG's avatar
BO ZHANG committed
        print(f"{result['totalCount']} records returned from DFS.")
BO ZHANG's avatar
BO ZHANG committed
        if not result["code"] == 0:
            raise ValueError(f"DFS returns non-zero code! ({result['code']})")
        tbl = Table([_.__dict__ for _ in result["data"]])
BO ZHANG's avatar
BO ZHANG committed
        tbl.sort(["detector_no", "obs_type"])
        # Check if all 30 detectors are available
        for detector in CP["all"]["detectors"]:
            for obs_type in ["sci", "cosmic_ray"]:
                if np.sum((tbl["detector_no"] == f"{detector:02d}") & (tbl["obs_type"] == obs_type)) == 0:
                    self.logger_ppl.warning(f"Record not found for detector {detector:02d} and obs_type {obs_type}")
        return tbl
BO ZHANG's avatar
BO ZHANG committed

BO ZHANG's avatar
BO ZHANG committed
    def dfs_l0_check_all(self):
        """ Check all C5.2 L0 data is available in DFS. """
        is_good = True
        for obs_id in range(100000020, 100000155):
            tbl = self.dfs_l0_query(obs_id=f"{obs_id}")
            if len(tbl) == 60:
                self.logger_ppl.info(f"DFS returns {len(tbl)} records for obs_id={obs_id}")
            else:
                is_good = False
                self.logger_ppl.warning(f"DFS returns {len(tbl)} records for obs_id={obs_id}")
        return is_good

    def dfs_l1_query(self, obs_id, detector):
BO ZHANG's avatar
BO ZHANG committed
        """ Query L1 data from DFS. """
        pass


# temporarily compatible with old interface
CsstMbiDataManager = CsstMsDataManager