Commit 99fe6700 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

added tests in data manager class

parent 6dbfe879
import os
import glob
import re
from astropy.io import fits
from .params import cp
class CsstMbiDataManager:
""" this class defines the file format of the input & output of CSST MSC L1 pipeline
C3:
MSC_MS_210525220000_100000020_06_raw.fits
MSC_CRS_210525220000_100000020_06_raw.fits
MSC_210525120000_0000020_06.cat
C5.1:
CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
MSC_10000100_chip_06_filt_y.cat
MSC_10000100_chip_06_filt_y.log
C5.2
CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
MSC_100000100_chip_06_filt_y.cat
MSC_100000100_chip_06_filt_y.log
"""
def __init__(self, ver_sim="C5.1", dir_l0="", dir_l1="", dir_pcref="", path_aux="", force_all_detectors=False):
""" initialize the multi-band imaging data manager
Parameters
----------
ver_sim: str
version of simulation data, see csst_common.params.cp.sim.VERSIONS
dir_l0: str
L0 directory
dir_l1: str
L1 directory
dir_pcref: str
position calibration data directory
path_aux: str
aux data directory (bias, flat, dark)
force_all_detectors: bool
if True, assert data for all detectors are available
Examples
--------
>>> dm = CsstMbiDataManager(...)
>>> # access L0 directory
>>> dm.dir_l0
>>> # access L1 directory
>>> dm.dir_l1
>>> # access dir_pcref
>>> dm.dir_pcref
>>> # access path_aux
>>> dm.path_aux
>>> # access ver_sim
>>> dm.ver_sim
>>> # access target detectors
>>> dm.target_detectors
>>> # access available detectors
>>> dm.available_detectors
>>> # define an L1 file (detector-specified)
>>> dm.l1_detector(detector=6)
>>> # define an L1 file (non-detector-specified)
>>> dm.l1_file("flipped_image.fits")
"""
assert ver_sim in cp.sim.VERSIONS
self.dir_l0 = dir_l0
self.dir_l1 = dir_l1
self.dir_pcref = dir_pcref
self.path_aux = path_aux
self.ver_sim = ver_sim
self.target_detectors = []
self.hardcode_history = []
fps_img = self.glob_image(dir_l0, ver_sim=ver_sim)
fps_cat = self.glob_cat(dir_l0, ver_sim=ver_sim)
if force_all_detectors:
assert len(fps_img) == len(cp.mbi.DETECTORS)
else:
assert len(fps_img) > 0
if ver_sim == "C3":
# get info
# print(re.split(r"[_.]", fps[0]))
self._instrument, self._survey, \
self._exp_start, self._exp_id, \
_detector, self._l0_suffix, _ext = re.split(r"[_.]", fps_img[0])
self._cat_id = re.split(r"[_.]", fps_cat[0])[1]
self._exp_start = int(self._exp_start)
self._exp_id = int(self._exp_id)
# available detectors
self.available_detectors = [int(re.split(r"[_.]", fp)[4]) for fp in fps_img]
self.available_detectors.sort()
elif ver_sim in ["C5.1", "C5.2"]:
# get info
# print(re.split(r"[_.]", fps[0]))
self._telescope, self._instrument, self._survey, self._imagetype, \
self._exp_start, self._exp_stop, self._exp_id, \
_detector, self._l0_suffix, self._version, _ext = re.split(r"[_.]", fps_img[0])
self._cat_id = re.split(r"[_.]", fps_cat[0])[1]
self._exp_start = int(self._exp_start)
self._exp_stop = int(self._exp_stop)
self._exp_id = int(self._exp_id)
# available detectors
self.available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img]
self.available_detectors.sort()
@staticmethod
def glob_image(dir_l0, ver_sim="C5"):
""" glob files in L0 data directory """
if ver_sim == "C3":
pattern = os.path.join(dir_l0, "MSC_MS_*_raw.fits")
else:
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "CSST_MSC_MS_SCI_*.fits")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
return fps
@staticmethod
def glob_cat(dir_l0, ver_sim="C5"):
""" glob input catalogs in L0 data directory """
if ver_sim == "C3":
pattern = os.path.join(dir_l0, "MSC_*.cat")
else:
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "MSC_*.cat")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
return fps
def l0_cat(self, detector=6):
""" the L0 cat file path"""
if self.ver_sim == "C3":
fn = "{}_{}_{:07d}_{:02d}.cat".format(
self._instrument, self._cat_id, self._exp_id - 100000000, detector)
elif self.ver_sim == "C5.1":
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id - 90000000, detector, cp.mbi.DETECTOR2FILTER[detector])
elif self.ver_sim == "C5.2":
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id, detector, cp.mbi.DETECTOR2FILTER[detector])
return os.path.join(self.dir_l0, fn)
def l0_log(self, detector=6):
""" L0 log file path """
if self.ver_sim == "C5.1":
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id - 90000000, detector, cp.mbi.DETECTOR2FILTER[detector])
elif self.ver_sim == "C5.2":
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id, detector, cp.mbi.DETECTOR2FILTER[detector])
return os.path.join(self.dir_l0, fn)
def l0_detector(self, detector=6):
""" L0 detector-specific image file path """
if self.ver_sim == "C3":
fn = "{}_{}_{}_{}_{:02d}_raw.fits".format(
self._instrument, self._survey, self._exp_start, self._exp_id, detector)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
return os.path.join(self.dir_l0, fn)
def l0_crs(self, detector=6):
""" L0 cosmic ray file path """
if self.ver_sim == "C3":
fn = "{}_CRS_{}_{}_{:02d}_raw.fits".format(
self._instrument, self._exp_start, self._exp_id, detector)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
return os.path.join(self.dir_l0, fn)
def l1_detector(self, detector=6, post="img.fits"):
""" generate L1 file path
Parameters
----------
detector:
detector ID
post:
postfix
{"img.fits", "wht.fits", "flg.fits", "img_L1.fits", "wht_L1.fits", "flg_L1.fits"}
Returns
-------
L1 file path
"""
if self.ver_sim == "C3":
fn = "{}_{}_{}_{}_{:02d}_{}".format(
self._instrument, self._survey,
self._exp_start, self._exp_id, detector, post)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector, post)
return os.path.join(self.dir_l1, fn)
def set_detectors(self, detectors=None):
""" set target detector """
if detectors is None:
# default detectors
self.target_detectors = self.available_detectors
else:
try:
# assert detectors is a subset of available detectors
assert set(detectors).issubset(set(self.available_detectors))
except AssertionError as ae:
print("@DM: available detector IDs are ", self.available_detectors)
print("@DM: target detector IDs are ", detectors)
# raise ae
self.target_detectors = detectors
print("final target detector IDs are ", self.target_detectors)
return
def get_bias(self, detector=6):
fp = glob.glob(self.path_aux.format("CLB", detector))[0]
return fits.getdata(fp)
def get_dark(self, detector=6):
fp = glob.glob(self.path_aux.format("CLD", detector))[0]
return fits.getdata(fp)
def get_flat(self, detector=6):
fp = glob.glob(self.path_aux.format("CLF", detector))[0]
return fits.getdata(fp)
def l1_file(self, name="", comment=""):
"""
Parameters
----------
name: str
file name
comment: str
use the function name plz
Returns
-------
fp: str
the synthetic file path
"""
fp = os.path.join(self.dir_l1, name)
# record hardcode history
self.hardcode_history.append(dict(hdcd=fp, comment=comment))
return fp
@staticmethod
def __testddl__(self):
dm = CsstMbiDataManager(
ver_sim="C5.2",
dir_l0="/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging"
"/NGP_AstrometryON_shearOFF/MSC_0000100",
dir_l1=".",
force_all_detectors=True,
)
print("----- available detectors -----")
print(dm.available_detectors)
for detector in dm.available_detectors[:2]:
print("----- L0 images -----")
print(dm.l0_detector(detector=detector))
print(os.path.exists(dm.l0_detector(detector=detector)))
print("----- L0 crs -----")
print(dm.l0_crs(detector=detector))
print(os.path.exists(dm.l0_detector(detector=detector)))
print("----- L0 input cat -----")
print(dm.l0_cat(detector=detector))
print(os.path.exists(dm.l0_cat(detector=detector)))
print("----- L0 input log -----")
print(dm.l0_log(detector=detector))
print(os.path.exists(dm.l0_log(detector=detector)))
print("----- L1 images -----")
print(dm.l1_detector(detector, post="img.fits"))
return
if __name__ == "__main__":
# test C3
import os
dm = CsstMbiDataManager(
ver_sim="C3", dir_l0="/data/L1Pipeline/msc/MSC_0000020", dir_l1="/data/L1Pipeline/msc/work")
print("----- L0 images -----")
print(dm.l0_detector(detector=6))
print(os.path.exists(dm.l0_detector(detector=6)))
print("----- L0 crs -----")
print(dm.l0_crs(detector=6))
print(os.path.exists(dm.l0_detector(detector=8)))
print("----- L0 input cat -----")
print(dm.l0_cat(8))
print(os.path.exists(dm.l0_cat(detector=8)))
print("----- available detectors -----")
print(dm.available_detectors)
print("----- L1 images -----")
print(dm.l1_detector(25, "img", "fits"))
# test C5.1
import os
dm = CsstMbiDataManager(
ver_sim="C5.1", dir_l0="/data/sim_data/MSC_0000100", dir_l1="/home/user/L1Pipeline/msc/work")
print("----- available detectors -----")
print(dm.available_detectors)
for detector in dm.available_detectors[:2]:
print("----- L0 images -----")
print(dm.l0_detector(detector=detector))
print(os.path.exists(dm.l0_detector(detector=detector)))
print("----- L0 crs -----")
print(dm.l0_crs(detector=detector))
print(os.path.exists(dm.l0_detector(detector=detector)))
print("----- L0 input cat -----")
print(dm.l0_cat(detector=detector))
print(os.path.exists(dm.l0_cat(detector=detector)))
print("----- L0 input log -----")
print(dm.l0_log(detector=detector))
print(os.path.exists(dm.l0_log(detector=detector)))
print("----- L1 images -----")
print(dm.l1_detector(detector, post="img.fits"))
import unittest
from csst_common.data_manager import CsstMbiDataManager
class TestParams(unittest.TestCase):
def test_params(self):
self.assertTrue(True)
def __init__(self):
self.dm = CsstMbiDataManager()
\ No newline at end of file
import unittest
class TestParams(unittest.TestCase):
def test_params(self):
self.assertTrue(True)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment