diff --git a/csst_common/data_manager.py b/csst_common/data_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..b98ac0c73140cbe7c7e2cc730281c0b46cb4a8d2 --- /dev/null +++ b/csst_common/data_manager.py @@ -0,0 +1,344 @@ +import os +import glob +import re + +from astropy.io import fits + +from .params import cp + + +class CsstMbiDataManager: + """ this class defines the file format of the input & output of CSST MSC L1 pipeline + + C3: + MSC_MS_210525220000_100000020_06_raw.fits + MSC_CRS_210525220000_100000020_06_raw.fits + MSC_210525120000_0000020_06.cat + + C5.1: + CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits + CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits + MSC_10000100_chip_06_filt_y.cat + MSC_10000100_chip_06_filt_y.log + + C5.2 + CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits + CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits + MSC_100000100_chip_06_filt_y.cat + MSC_100000100_chip_06_filt_y.log + + """ + + def __init__(self, ver_sim="C5.1", dir_l0="", dir_l1="", dir_pcref="", path_aux="", force_all_detectors=False): + """ initialize the multi-band imaging data manager + + Parameters + ---------- + ver_sim: str + version of simulation data, see csst_common.params.cp.sim.VERSIONS + dir_l0: str + L0 directory + dir_l1: str + L1 directory + dir_pcref: str + position calibration data directory + path_aux: str + aux data directory (bias, flat, dark) + force_all_detectors: bool + if True, assert data for all detectors are available + + Examples + -------- + >>> dm = CsstMbiDataManager(...) + >>> # access L0 directory + >>> dm.dir_l0 + >>> # access L1 directory + >>> dm.dir_l1 + >>> # access dir_pcref + >>> dm.dir_pcref + >>> # access path_aux + >>> dm.path_aux + >>> # access ver_sim + >>> dm.ver_sim + >>> # access target detectors + >>> dm.target_detectors + >>> # access available detectors + >>> dm.available_detectors + >>> # define an L1 file (detector-specified) + >>> dm.l1_detector(detector=6) + >>> # define an L1 file (non-detector-specified) + >>> dm.l1_file("flipped_image.fits") + """ + assert ver_sim in cp.sim.VERSIONS + + self.dir_l0 = dir_l0 + self.dir_l1 = dir_l1 + self.dir_pcref = dir_pcref + self.path_aux = path_aux + self.ver_sim = ver_sim + self.target_detectors = [] + + self.hardcode_history = [] + + fps_img = self.glob_image(dir_l0, ver_sim=ver_sim) + fps_cat = self.glob_cat(dir_l0, ver_sim=ver_sim) + + if force_all_detectors: + assert len(fps_img) == len(cp.mbi.DETECTORS) + else: + assert len(fps_img) > 0 + + if ver_sim == "C3": + # get info + # print(re.split(r"[_.]", fps[0])) + self._instrument, self._survey, \ + self._exp_start, self._exp_id, \ + _detector, self._l0_suffix, _ext = re.split(r"[_.]", fps_img[0]) + self._cat_id = re.split(r"[_.]", fps_cat[0])[1] + + self._exp_start = int(self._exp_start) + self._exp_id = int(self._exp_id) + + # available detectors + self.available_detectors = [int(re.split(r"[_.]", fp)[4]) for fp in fps_img] + self.available_detectors.sort() + + elif ver_sim in ["C5.1", "C5.2"]: + # get info + # print(re.split(r"[_.]", fps[0])) + self._telescope, self._instrument, self._survey, self._imagetype, \ + self._exp_start, self._exp_stop, self._exp_id, \ + _detector, self._l0_suffix, self._version, _ext = re.split(r"[_.]", fps_img[0]) + self._cat_id = re.split(r"[_.]", fps_cat[0])[1] + + self._exp_start = int(self._exp_start) + self._exp_stop = int(self._exp_stop) + self._exp_id = int(self._exp_id) + + # available detectors + self.available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img] + self.available_detectors.sort() + + @staticmethod + def glob_image(dir_l0, ver_sim="C5"): + """ glob files in L0 data directory """ + if ver_sim == "C3": + pattern = os.path.join(dir_l0, "MSC_MS_*_raw.fits") + else: + assert ver_sim in ["C5.1", "C5.2"] + pattern = os.path.join(dir_l0, "CSST_MSC_MS_SCI_*.fits") + fps = glob.glob(pattern) + fps = [os.path.basename(fp) for fp in fps] + fps.sort() + + print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern)) + return fps + + @staticmethod + def glob_cat(dir_l0, ver_sim="C5"): + """ glob input catalogs in L0 data directory """ + if ver_sim == "C3": + pattern = os.path.join(dir_l0, "MSC_*.cat") + else: + assert ver_sim in ["C5.1", "C5.2"] + pattern = os.path.join(dir_l0, "MSC_*.cat") + fps = glob.glob(pattern) + fps = [os.path.basename(fp) for fp in fps] + fps.sort() + + print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern)) + return fps + + def l0_cat(self, detector=6): + """ the L0 cat file path""" + if self.ver_sim == "C3": + fn = "{}_{}_{:07d}_{:02d}.cat".format( + self._instrument, self._cat_id, self._exp_id - 100000000, detector) + elif self.ver_sim == "C5.1": + fn = "{}_{}_chip_{:02d}_filt_{}.cat".format( + self._instrument, self._exp_id - 90000000, detector, cp.mbi.DETECTOR2FILTER[detector]) + elif self.ver_sim == "C5.2": + fn = "{}_{}_chip_{:02d}_filt_{}.cat".format( + self._instrument, self._exp_id, detector, cp.mbi.DETECTOR2FILTER[detector]) + return os.path.join(self.dir_l0, fn) + + def l0_log(self, detector=6): + """ L0 log file path """ + if self.ver_sim == "C5.1": + fn = "{}_{}_chip_{:02d}_filt_{}.log".format( + self._instrument, self._exp_id - 90000000, detector, cp.mbi.DETECTOR2FILTER[detector]) + elif self.ver_sim == "C5.2": + fn = "{}_{}_chip_{:02d}_filt_{}.log".format( + self._instrument, self._exp_id, detector, cp.mbi.DETECTOR2FILTER[detector]) + return os.path.join(self.dir_l0, fn) + + def l0_detector(self, detector=6): + """ L0 detector-specific image file path """ + if self.ver_sim == "C3": + fn = "{}_{}_{}_{}_{:02d}_raw.fits".format( + self._instrument, self._survey, self._exp_start, self._exp_id, detector) + else: + assert self.ver_sim in ["C5.1", "C5.2"] + fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format( + self._telescope, self._instrument, self._survey, + self._exp_start, self._exp_stop, self._exp_id, detector) + return os.path.join(self.dir_l0, fn) + + def l0_crs(self, detector=6): + """ L0 cosmic ray file path """ + if self.ver_sim == "C3": + fn = "{}_CRS_{}_{}_{:02d}_raw.fits".format( + self._instrument, self._exp_start, self._exp_id, detector) + else: + assert self.ver_sim in ["C5.1", "C5.2"] + fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format( + self._telescope, self._instrument, self._survey, + self._exp_start, self._exp_stop, self._exp_id, detector) + return os.path.join(self.dir_l0, fn) + + def l1_detector(self, detector=6, post="img.fits"): + """ generate L1 file path + + Parameters + ---------- + detector: + detector ID + post: + postfix + {"img.fits", "wht.fits", "flg.fits", "img_L1.fits", "wht_L1.fits", "flg_L1.fits"} + + Returns + ------- + L1 file path + + """ + if self.ver_sim == "C3": + fn = "{}_{}_{}_{}_{:02d}_{}".format( + self._instrument, self._survey, + self._exp_start, self._exp_id, detector, post) + else: + assert self.ver_sim in ["C5.1", "C5.2"] + fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format( + self._telescope, self._instrument, self._survey, + self._exp_start, self._exp_stop, self._exp_id, detector, post) + return os.path.join(self.dir_l1, fn) + + def set_detectors(self, detectors=None): + """ set target detector """ + if detectors is None: + # default detectors + self.target_detectors = self.available_detectors + else: + try: + # assert detectors is a subset of available detectors + assert set(detectors).issubset(set(self.available_detectors)) + except AssertionError as ae: + print("@DM: available detector IDs are ", self.available_detectors) + print("@DM: target detector IDs are ", detectors) + # raise ae + self.target_detectors = detectors + print("final target detector IDs are ", self.target_detectors) + return + + def get_bias(self, detector=6): + fp = glob.glob(self.path_aux.format("CLB", detector))[0] + return fits.getdata(fp) + + def get_dark(self, detector=6): + fp = glob.glob(self.path_aux.format("CLD", detector))[0] + return fits.getdata(fp) + + def get_flat(self, detector=6): + fp = glob.glob(self.path_aux.format("CLF", detector))[0] + return fits.getdata(fp) + + def l1_file(self, name="", comment=""): + """ + + Parameters + ---------- + name: str + file name + comment: str + use the function name plz + + Returns + ------- + fp: str + the synthetic file path + + """ + fp = os.path.join(self.dir_l1, name) + # record hardcode history + self.hardcode_history.append(dict(hdcd=fp, comment=comment)) + return fp + + @staticmethod + def __testddl__(self): + dm = CsstMbiDataManager( + ver_sim="C5.2", + dir_l0="/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging" + "/NGP_AstrometryON_shearOFF/MSC_0000100", + dir_l1=".", + force_all_detectors=True, + ) + print("----- available detectors -----") + print(dm.available_detectors) + for detector in dm.available_detectors[:2]: + print("----- L0 images -----") + print(dm.l0_detector(detector=detector)) + print(os.path.exists(dm.l0_detector(detector=detector))) + print("----- L0 crs -----") + print(dm.l0_crs(detector=detector)) + print(os.path.exists(dm.l0_detector(detector=detector))) + print("----- L0 input cat -----") + print(dm.l0_cat(detector=detector)) + print(os.path.exists(dm.l0_cat(detector=detector))) + print("----- L0 input log -----") + print(dm.l0_log(detector=detector)) + print(os.path.exists(dm.l0_log(detector=detector))) + print("----- L1 images -----") + print(dm.l1_detector(detector, post="img.fits")) + return + + +if __name__ == "__main__": + # test C3 + import os + dm = CsstMbiDataManager( + ver_sim="C3", dir_l0="/data/L1Pipeline/msc/MSC_0000020", dir_l1="/data/L1Pipeline/msc/work") + print("----- L0 images -----") + print(dm.l0_detector(detector=6)) + print(os.path.exists(dm.l0_detector(detector=6))) + print("----- L0 crs -----") + print(dm.l0_crs(detector=6)) + print(os.path.exists(dm.l0_detector(detector=8))) + print("----- L0 input cat -----") + print(dm.l0_cat(8)) + print(os.path.exists(dm.l0_cat(detector=8))) + print("----- available detectors -----") + print(dm.available_detectors) + print("----- L1 images -----") + print(dm.l1_detector(25, "img", "fits")) + + # test C5.1 + import os + dm = CsstMbiDataManager( + ver_sim="C5.1", dir_l0="/data/sim_data/MSC_0000100", dir_l1="/home/user/L1Pipeline/msc/work") + print("----- available detectors -----") + print(dm.available_detectors) + for detector in dm.available_detectors[:2]: + print("----- L0 images -----") + print(dm.l0_detector(detector=detector)) + print(os.path.exists(dm.l0_detector(detector=detector))) + print("----- L0 crs -----") + print(dm.l0_crs(detector=detector)) + print(os.path.exists(dm.l0_detector(detector=detector))) + print("----- L0 input cat -----") + print(dm.l0_cat(detector=detector)) + print(os.path.exists(dm.l0_cat(detector=detector))) + print("----- L0 input log -----") + print(dm.l0_log(detector=detector)) + print(os.path.exists(dm.l0_log(detector=detector))) + print("----- L1 images -----") + print(dm.l1_detector(detector, post="img.fits")) diff --git a/tests/test_data_manager.py b/tests/test_data_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..605f1357812a15f5718a41ada3d1c543c8d1bad4 --- /dev/null +++ b/tests/test_data_manager.py @@ -0,0 +1,9 @@ +import unittest +from csst_common.data_manager import CsstMbiDataManager + +class TestParams(unittest.TestCase): + def test_params(self): + self.assertTrue(True) + + def __init__(self): + self.dm = CsstMbiDataManager() \ No newline at end of file diff --git a/tests/test_params.py b/tests/test_params.py new file mode 100644 index 0000000000000000000000000000000000000000..ac2abc96297a0f847639496ba8dcef6d09f388e3 --- /dev/null +++ b/tests/test_params.py @@ -0,0 +1,6 @@ +import unittest + + +class TestParams(unittest.TestCase): + def test_params(self): + self.assertTrue(True)