Newer
Older
"""
Identifier: KSC-SJ4-csst_common/data_manager.py
Name: data_manager.py
Description: file path generator
Author: Bo Zhang
Created: 2022-09-13
Modified-History:
2022-09-13, Bo Zhang, created
2022-09-29, Bo Zhang, favor CsstMsDataManager instead of CsstMbiDataManager
2022-10-26, Bo Zhang, reconstruct CsstMsDataManager, deprecate CsstMbiDataManager
2022-10-28, Bo Zhang, added CsstMsDataManager.query_rc(), dm.use_dfs, dm.node
from csst_dfs_api.facility.calmerge import CalMergeApi
from csst_dfs_api.facility.level0 import Level0DataApi
from csst_dfs_api.facility.level0prc import Level0PrcApi
from csst_dfs_api.common.catalog import CatalogApi
from .params import CSST_PARAMS as CP
"""
CSST MS data manager, including MBI and SLS.
``CsstMsDataManager`` provides an interface to switch between DFS and local file system.
To initialize ``CsstMsDataManager`` from local directory, use ``CsstMsDataManager.from_dir()``
To initialize ``CsstMsDataManager`` on ``dandelion`` or ``PM node``, ``CsstMsDataManager.quickstart()``.
To initialize ``CsstMsDataManager`` from DFS, use ``CsstMsDataManager.from_dfs()``.
To generate L0 and L1 file paths, use ``CsstMsDataManager.l0_detector()``, ``CsstMsDataManager.l1_detector()``, etc.
Here are some examples for simulation with different versions.
C3:
MSC_MS_210525220000_100000020_06_raw.fits
MSC_CRS_210525220000_100000020_06_raw.fits
MSC_210525120000_0000020_06.cat
C5.1:
CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
MSC_10000100_chip_06_filt_y.cat
MSC_10000100_chip_06_filt_y.log
C5.2
CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
MSC_100000100_chip_06_filt_y.cat
MSC_100000100_chip_06_filt_y.log
The version of simulation data, see ``csst_common.params.CP``.
datatype : str
The options are {"mbi", "sls", "all"}.
The "all" option is used for QC in particular.
Note that in this case methods like ``get_bias`` are unavailable.
available_detectors : list
The list of available detector serial numbers of available images.
target_detectors : list
The list of target detector serial numbers of available images.
dir_l0 : str
The L0 directory.
dir_l1 : str
The L1 directory.
use_dfs : bool
If True, use DFS. In case some modules may have other options such as astroquery.
The environment in which the pipeline will run.
Use "pml" for Purple Mountain Lab cluster and "local" for others.
The exposure start time in ``yyyymmddhhmmss`` format.
_telescope : str
The telescope name. Defaults to ``CSST`` for C5.2 simulation.
_instrument : str
The instrument name. Defaults to ``MSC`` for C5.2 simulation.
_survey : str
The survey name. Defaults to ``MS`` for C5.2 simulation.
The image type signature for science images. Defualts to ``SCI`` for C5.2 simulation.
log_ppl : str
The pipeline log file name.
log_mod : str
The module log file name.
clear_dir : bool
If True, clear ``dm.dir_l1`` directory.
n_jobs : int
The number of jobs.
backend : str
The joblib backend.
>>> # access L0 directory
>>> dm_mbi.dir_l0
>>> # access L1 directory
>>> dm_mbi.dir_l1
>>> # access path_aux
>>> dm_mbi.path_aux
>>> # access ver_sim
>>> dm_mbi.ver_sim
>>> # access target detectors
>>> dm_mbi.target_detectors
>>> # access available detectors
>>> dm_mbi.available_detectors
>>> # define an L1 file (detector-specified)
>>> dm_mbi.l1_detector(detector=6)
>>> # define an L1 file (non-detector-specified)
>>> dm_mbi.l1_file("flipped_image.fits")
ver_sim: str = "C5.2",
datatype: str = "mbi",
available_detectors: Union[None, list] = None,
dir_l0: str = ".",
dir_l1: str = ".",
path_aux: str = "", # bias dark flat
obs_id: str = "100000100",
exp_start: int = "20270810081950",
exp_stop: int = "20270810082220",
_telescope: str = "CSST",
_instrument: str = "MSC",
_survey: str = "MS",
assert datatype in ["mbi", "sls", "all"]
if datatype == "mbi":
# MBI
self.valid_detectors = CP["mbi"]["detectors"]
self.detector2filter = CP["mbi"]["detector2filter"]
elif datatype == "sls":
# SLS
self.valid_detectors = CP["sls"]["detectors"]
self.detector2filter = CP["sls"]["detector2filter"]
self.valid_detectors = CP["all"]["detectors"]
self.detector2filter = CP["all"]["detector2filter"]
if verbose:
print("Data type is: ", self.datatype)
print("Valid detectors are: ", self.valid_detectors)
self.available_detectors = available_detectors if available_detectors is not None else list()
if verbose:
print("Available detectors are:", self.available_detectors)
self.target_detectors = target_detectors
if verbose:
print("Target detectors are: ", self._target_detectors)
self.obs_id = obs_id
self.exp_start = exp_start
self.exp_stop = exp_stop
# file name components
self._telescope = _telescope
self._instrument = _instrument
self._survey = _survey
self.dir_l0 = dir_l0
self.dir_l1 = dir_l1
self.path_aux = path_aux
# change to working directory
os.chdir(self.dir_l1)
# clear dir_l1
if clear_dir:
self.clear_dir(self.dir_l1)
self.logger_ppl = get_logger(name="CSST L1 Pipeline Logger", filename=os.path.join(dir_l1, log_ppl))
if verbose:
self.logger_ppl.info("logger_ppl initialized")
# module logger
self.logger_mod = get_logger(name="CSST L1 Module Logger", filename=os.path.join(dir_l1, log_mod))
if verbose:
self.logger_mod.info("logger_mod initialized")
self.custom_bias = None
self.custom_dark = None
self.custom_flat = None
# DFS APIs
@property
def dfs_L0DataApi(self):
return Level0DataApi()
@property
def dfs_L1DataApi(self):
return Level1DataApi()
@property
def dfs_L2DataApi(self):
return Level2DataApi()
@property
def dfs_L0PrcApi(self):
return Level0PrcApi()
@property
def dfs_CalApi(self):
return CalMergeApi()
@property
def dfs_CatApi(self):
return CatalogApi()
def set_env(self):
""" set environment variables """
if os.uname()[1] == "dandelion":
os.environ["LD_LIBRARY_PATH"] = "/home/csstpipeline/anaconda3/lib"
os.environ["AXE_BINDIR"] = "/home/csstpipeline/PycharmProjects/axe/cextern/src"
@property
def target_detectors(self):
return self._target_detectors
@target_detectors.setter
def target_detectors(self, detectors: Union[None, list, int] = None):
assert detectors is None or type(detectors) in [list, int]
if detectors is None:
self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors))
elif isinstance(detectors, list):
self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors) & set(detectors))
elif isinstance(detectors, int):
self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors) & {detectors})
def set_detectors(self, detectors=None):
raise DeprecationWarning("This method is deprecated, please directly use dm.target_detectors = detectors!")
def from_dir(
ver_sim="C5.2",
datatype="mbi",
dir_l0=".",
dir_l1=".",
path_aux="",
use_dfs=False,
dfs_node="kmust",
n_jobs=18,
backend="multiprocessing"
):
""" initialize the multi-band imaging data manager """
assert ver_sim in ["C5.2", ]
# glob files
fps_img = CsstMsDataManager.glob_image(dir_l0, ver_sim=ver_sim)
if len(fps_img) == 0:
raise FileNotFoundError(f"No file found in dir_l0: {dir_l0}")
# available detectors
available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img]
available_detectors.sort()
# parse info
(_telescope, _instrument, _survey, obs_type,
exp_start, exp_stop, obs_id,
_detector, *l0_post, _ext) = re.split(r"[_.]", fps_img[0])
# exp_start = int(exp_start)
# exp_stop = int(exp_stop)
# obs_id = int(obs_id)
return CsstMsDataManager(
ver_sim=ver_sim,
datatype=datatype,
available_detectors=available_detectors,
target_detectors=None,
dir_l0=dir_l0,
dir_l1=dir_l1,
path_aux=path_aux, # bias dark flat
use_dfs=use_dfs,
dfs_node=dfs_node,
obs_id=obs_id,
exp_start=exp_start,
exp_stop=exp_stop,
_telescope=_telescope,
_instrument=_instrument,
_survey=_survey,
obs_type=obs_type,
l0_post="_".join(l0_post),
n_jobs=n_jobs,
backend=backend
)
""" glob files in L0 data directory """
if ver_sim == "C3":
pattern = os.path.join(dir_l0, "MSC_MS_*_raw.fits")
else:
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "CSST_MSC_MS_SCI_*.fits")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("{} files found with pattern: {}".format(len(fps), pattern))
return fps
@staticmethod
def glob_cat(dir_l0, ver_sim="C5"):
""" glob input catalogs in L0 data directory """
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "MSC_*.cat")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
return fps
def l0_id(self, detector=6):
""" Level0 ID, consistent with DFS. """
return f"{self.obs_id}{detector:02d}"
def l0_cat(self, detector=6):
""" the L0 cat file path"""
assert self.ver_sim == "C5.2"
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self.obs_id, detector, self.detector2filter[detector])
return os.path.join(self.dir_l0, fn)
def l0_log(self, detector=6):
""" L0 log file path """
assert self.ver_sim == "C5.2"
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self.obs_id, detector, self.detector2filter[detector])
return os.path.join(self.dir_l0, fn)
def l0_detector(self, detector=6):
""" L0 detector-specific image file path """
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self.exp_start, self.exp_stop, self.obs_id, detector)
return os.path.join(self.dir_l0, fn)
def l0_crs(self, detector=6):
""" L0 cosmic ray file path """
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self.exp_start, self.exp_stop, self.obs_id, detector)
return os.path.join(self.dir_l0, fn)
def l1_detector(self, detector=6, post="img.fits"):
""" generate L1 file path
Parameters
----------
detector:
detector ID
post:
postfix
e.g, {"img.fits", "wht.fits", "flg.fits", "img_L1.fits", "wht_L1.fits", "flg_L1.fits"}
Returns
-------
L1 file path
"""
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format(
self._telescope, self._instrument, self._survey,
self.exp_start, self.exp_stop, self.obs_id, detector, post)
if self.custom_bias is None:
if self.datatype == "mbi":
return glob.glob(self.path_aux.format("CLB", detector))[0]
else:
return self.path_aux.format(detector, "bias")
return self.custom_bias(detector)
if self.custom_dark is None:
if self.datatype == "mbi":
return glob.glob(self.path_aux.format("CLD", detector))[0]
else:
return self.path_aux.format(detector, "dark")
return self.custom_dark(detector)
if self.custom_flat is None:
if self.datatype == "mbi":
return glob.glob(self.path_aux.format("CLF", detector))[0]
else:
return self.path_aux.format(detector, "flat")
return self.custom_flat(detector)
def get_axeconf(self):
if os.uname()[1] == "dandelion":
return "/home/csstpipeline/L1Pipeline/aux/axeconf"
else:
# in docker
return "/L1Pipeline/aux/axeconf"
def l1_file(self, name="", comment=""):
""" L1 file path
Parameters
----------
use the function name plz
Returns
-------
fp: str
the synthetic file path
"""
fp = os.path.join(self.dir_l1, name)
# record hardcode history
self.hardcode_history.append(dict(hdcd=fp, comment=comment))
return fp
def get_sls_info(self):
""" Get the target SLS image header info and return. """
# raise NotImplementedError()
# else:
assert len(self.target_detectors) == 1
header = fits.getheader(self.l0_detector(self.target_detectors[0]), ext=1)
def get_mbi_info(self):
""" Get all MBI image header info and return as a table. """
# raise NotImplementedError()
# else:
info = Table.read("/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/t_mbi_l1.fits")
def quickstart(ver_sim="C5.2", datatype="mbi", dir_l1=".", exposure_id=100,
use_dfs=False, dfs_node="kmust", clear_dir=False):
"""
Quick dataset generator for tests on dandelion or PML
The serial number of the exposure. 20-154 for C5.2.
dfs_node : str
The DFS node. Defaults to "kmust", could be "pml".
clear_dir : bool
If True, clear dir_l1.
"""
assert datatype in ["mbi", "sls"]
# auto identify node name
hostname = os.uname()[1]
assert hostname in ["dandelion", "ubuntu"]
# dandelion
if hostname == "dandelion" and datatype == "mbi":
dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/" \
"NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
path_aux = "/nfsdata/users/cham/L1Test/ref_C5.2/MSC_{}_*_{:02d}_combine.fits"
elif hostname == "dandelion" and datatype == "sls":
dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/" \
"NGP_AstrometryON_shearOFF_Spec/MSC_{:07d}/".format(exposure_id)
path_aux = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/csst_{:02d}{}.fits"
# PMO
elif hostname == "ubuntu" and datatype == "mbi":
dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
"NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
path_aux = "/data/sim_data/MSC_0000100/ref/MSC_{}_*_{:02d}_combine.fits"
elif hostname == "ubuntu" and datatype == "sls":
dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
"NGP_AstrometryON_shearOFF_Spec/MSC_{:07d}/".format(exposure_id)
path_aux = ""
else:
raise ValueError("@DM: invalid hostname {} or datatype {}!".format(hostname, datatype))
ver_sim=ver_sim, datatype=datatype, dir_l0=dir_l0, dir_l1=dir_l1, path_aux=path_aux,
def __repr__(self):
lines = ""
lines += "<CsstMsDataManager>\n"
lines += f"- Data type = {self.datatype}\n"
lines += f"- Valid detectors = {self.valid_detectors}\n"
lines += f"- Available detectors = {self.available_detectors}\n"
lines += f"- Target detectors = {self.target_detectors}\n"
lines += f"- dir_l0 = {self.dir_l0}\n"
lines += f"- dir_l1 = {self.dir_l1}\n"
lines += f"- dfs_node = {self.dfs_node}\n"
lines += f"- CSST_DFS_GATEWAY = " + os.getenv("CSST_DFS_GATEWAY") + "\n"
def remove_files(self, fmt="*.fits"):
os.system(f"rm -rf {os.path.join(self.dir_l1, fmt)}")
def remove_dir(self, dir_name):
""" Remove L1 (sub-)directory. """
os.system(f"rm -rf {os.path.join(self.dir_l1, dir_name)}")
@staticmethod
def clear_dir(dir_path):
os.system(f"rm -rf {dir_path}/*")
# DFS interfaces
@property
def dfs_node(self):
return self._dfs_node
@dfs_node.setter
def dfs_node(self, dfs_node):
# for DFS configuration, defaults to "local", could be "pml"
assert dfs_node in DFS_CONF.keys()
self._dfs_node = dfs_node
for k, v in DFS_CONF[dfs_node].items():
os.environ[k] = v
@staticmethod
def dfs_is_available():
tbl = CatalogApi().catalog_query(
catalog_name="gaia3", ra=180, dec=0, radius=.1, min_mag=0, max_mag=30, obstime=-1, limit=-1
)
return len(tbl) > 0
except:
return False
def dfs_rc_query(self, ra=180, dec=0, radius=2, min_mag=0, max_mag=30, obstime=-1, limit=-1):
""" Query Reference Catalog (RC) from DFS. """
try:
cat = self.dfs_CatApi.catalog_query(
catalog_name="gaia3", ra=ra, dec=dec, radius=radius, min_mag=min_mag, max_mag=max_mag,
obstime=obstime, limit=limit
)
tbl = self.dfs_CatApi.to_table(cat)
return tbl
except:
print("Error occurred during the query!")
return None
def dfs_l1_push(self):
""" Push MBI/SLS L1 data to DFS. """
# l1api = get_l1api()
# l1api.write()
def dfs_l2_push(self):
""" Push SLS spectra to DFS. """
pass
@staticmethod
def from_l1id(
l1_id="1000000001",
datatype="sls",
dir_l0="/L1Pipeline/L0",
dir_l1="/L1Pipeline/L1",
use_dfs=True,
dfs_node="pml",
clear_l1=False,
dfs_root="/share/dfs"
):
pass
def from_dfs(
obs_id="100000100",
datatype="mbi",
dir_l0="/L1Pipeline/L0",
dir_l1="/L1Pipeline/L1",
dfs_root="/share/dfs",
n_jobs=18,
backend="multiprocessing"
# (clear and) make directories
if os.path.exists(dir_l0):
os.system(f"rm -rf {dir_l0}/*")
else:
os.mkdir(dir_l0)
# if os.path.exists(dir_l1):
# os.system(f"rm -rf {dir_l1}/*")
# else:
# os.mkdir(dir_l1)
# os.chdir(dir_l1)
if not os.path.exists(dir_l1):
os.mkdir(dir_l1)
records = CsstMsDataManager(dfs_node=dfs_node, verbose=False).dfs_L0DataApi.find(obs_id=obs_id)
tbl = Table([_.__dict__ for _ in records["data"]])
tbl.sort(["detector_no", "obs_type"])
src=os.path.join(dfs_root, tbl["file_path"][i_rec]),
dst=os.path.join(dir_l0, os.path.basename(tbl["file_path"][i_rec])),
# as from_dfs only works in docker mode
path_aux = "/L1Pipeline/aux/C5.2_ref_mbi/MSC_{}_*_{:02d}_combine.fits"
path_aux = "/L1Pipeline/aux/C5.2_ref_sls/csst_{:02d}{}.fits"
raise ValueError(f"Bad datatype: {datatype}")
# initialize dm
dm = CsstMsDataManager.from_dir(
ver_sim="C5.2", datatype=datatype, dir_l0=dir_l0, dir_l1=dir_l1,
path_aux=path_aux, use_dfs=use_dfs, dfs_node=dfs_node,
n_jobs=n_jobs, backend=backend
print(f"{result['totalCount']} records returned from DFS.")
if not result["code"] == 0:
raise ValueError(f"DFS returns non-zero code! ({result['code']})")
tbl = Table([_.__dict__ for _ in result["data"]])
tbl.sort(["detector_no", "obs_type"])
# Check if all 30 detectors are available
for detector in CP["all"]["detectors"]:
for obs_type in ["sci", "cosmic_ray"]:
if np.sum((tbl["detector_no"] == f"{detector:02d}") & (tbl["obs_type"] == obs_type)) == 0:
self.logger_ppl.warning(f"Record not found for detector {detector:02d} and obs_type {obs_type}")
return tbl
def dfs_l0_check_all(self):
""" Check all C5.2 L0 data is available in DFS. """
is_good = True
for obs_id in range(100000020, 100000155):
tbl = self.dfs_l0_query(obs_id=f"{obs_id}")
if len(tbl) == 60:
self.logger_ppl.info(f"DFS returns {len(tbl)} records for obs_id={obs_id}")
else:
is_good = False
self.logger_ppl.warning(f"DFS returns {len(tbl)} records for obs_id={obs_id}")
return is_good
def dfs_l1_query(self, obs_id, detector):