Commit f83bc722 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

added CsstMsDataManager

parent 1f0e1a52
......@@ -6,7 +6,7 @@ Author: Bo Zhang
Created: 2022-09-13
Modified-History:
2022-09-13, Bo Zhang, created
2022-09-13, Bo Zhang, fixed a bug
2022-09-29, Bo Zhang, favor CsstMsDataManager instead of CsstMbiDataManager
"""
import os
import glob
......@@ -17,8 +17,30 @@ from astropy.io import fits
from .params import CSST_PARAMS as CP
class CsstSlsDataManager:
def __init__(self, ver_sim="C5.2", dir_l0="", dir_l1="", dir_pcref="", path_aux="", force_all_detectors=False):
class CsstMsDataManager:
""" CSST MS data manager, including MBI and SLS
C3:
MSC_MS_210525220000_100000020_06_raw.fits
MSC_CRS_210525220000_100000020_06_raw.fits
MSC_210525120000_0000020_06.cat
C5.1:
CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
MSC_10000100_chip_06_filt_y.cat
MSC_10000100_chip_06_filt_y.log
C5.2
CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
MSC_100000100_chip_06_filt_y.cat
MSC_100000100_chip_06_filt_y.log
"""
def __init__(self, ver_sim="C5.2", dir_l0="", dir_l1="", dir_pcref="", path_aux="", force_all_detectors=False,
datatype="mbi"):
""" initialize the multi-band imaging data manager
Parameters
......@@ -35,31 +57,43 @@ class CsstSlsDataManager:
aux data directory (bias, flat, dark)
force_all_detectors: bool
if True, assert data for all detectors are available
datatype: str
{"mbi", "sls"}
Examples
--------
>>> dm = CsstSlsDataManager(...)
>>> dm_mbi = CsstMbiDataManager(...)
>>> # access L0 directory
>>> dm.dir_l0
>>> dm_mbi.dir_l0
>>> # access L1 directory
>>> dm.dir_l1
>>> dm_mbi.dir_l1
>>> # access dir_pcref
>>> dm.dir_pcref
>>> dm_mbi.dir_pcref
>>> # access path_aux
>>> dm.path_aux
>>> dm_mbi.path_aux
>>> # access ver_sim
>>> dm.ver_sim
>>> dm_mbi.ver_sim
>>> # access target detectors
>>> dm.target_detectors
>>> dm_mbi.target_detectors
>>> # access available detectors
>>> dm.available_detectors
>>> dm_mbi.available_detectors
>>> # define an L1 file (detector-specified)
>>> dm.l1_detector(detector=6)
>>> dm_mbi.l1_detector(detector=6)
>>> # define an L1 file (non-detector-specified)
>>> dm.l1_file("flipped_image.fits")
>>> dm_mbi.l1_file("flipped_image.fits")
"""
assert ver_sim in CP["sim"]["versions"]
assert datatype in ["mbi", "sls"]
if datatype == "mbi":
# MBI
self.valid_detectors = CP["mbi"]["detectors"]
self.detector2filter = CP["mbi"]["detector2filter"]
else:
# SLS
self.valid_detectors = CP["sls"]["detectors"]
self.detector2filter = CP["sls"]["detector2filter"]
self.dir_l0 = dir_l0
self.dir_l1 = dir_l1
self.dir_pcref = dir_pcref
......@@ -73,7 +107,7 @@ class CsstSlsDataManager:
fps_cat = self.glob_cat(dir_l0, ver_sim=ver_sim)
if force_all_detectors:
assert len(fps_img) == len(CP["mbi"]["detectors"])
assert len(fps_img) == len(self.valid_detectors)
else:
assert len(fps_img) > 0
......@@ -108,6 +142,217 @@ class CsstSlsDataManager:
self.available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img]
self.available_detectors.sort()
@staticmethod
def glob_image(dir_l0, ver_sim="C5"):
""" glob files in L0 data directory """
if ver_sim == "C3":
pattern = os.path.join(dir_l0, "MSC_MS_*_raw.fits")
else:
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "CSST_MSC_MS_SCI_*.fits")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
return fps
@staticmethod
def glob_cat(dir_l0, ver_sim="C5"):
""" glob input catalogs in L0 data directory """
if ver_sim == "C3":
pattern = os.path.join(dir_l0, "MSC_*.cat")
else:
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "MSC_*.cat")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
return fps
def l0_cat(self, detector=6):
""" the L0 cat file path"""
if self.ver_sim == "C3":
fn = "{}_{}_{:07d}_{:02d}.cat".format(
self._instrument, self._cat_id, self._exp_id - 100000000, detector)
elif self.ver_sim == "C5.1":
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id - 90000000, detector, self.detector2filter[detector])
elif self.ver_sim == "C5.2":
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id, detector, self.detector2filter[detector])
return os.path.join(self.dir_l0, fn)
def l0_log(self, detector=6):
""" L0 log file path """
if self.ver_sim == "C5.1":
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id - 90000000, detector, self.detector2filter[detector])
elif self.ver_sim == "C5.2":
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id, detector, self.detector2filter[detector])
return os.path.join(self.dir_l0, fn)
def l0_detector(self, detector=6):
""" L0 detector-specific image file path """
if self.ver_sim == "C3":
fn = "{}_{}_{}_{}_{:02d}_raw.fits".format(
self._instrument, self._survey, self._exp_start, self._exp_id, detector)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
return os.path.join(self.dir_l0, fn)
def l0_crs(self, detector=6):
""" L0 cosmic ray file path """
if self.ver_sim == "C3":
fn = "{}_CRS_{}_{}_{:02d}_raw.fits".format(
self._instrument, self._exp_start, self._exp_id, detector)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
return os.path.join(self.dir_l0, fn)
def l1_detector(self, detector=6, post="img.fits"):
""" generate L1 file path
Parameters
----------
detector:
detector ID
post:
postfix
e.g, {"img.fits", "wht.fits", "flg.fits", "img_L1.fits", "wht_L1.fits", "flg_L1.fits"}
Returns
-------
L1 file path
"""
if self.ver_sim == "C3":
fn = "{}_{}_{}_{}_{:02d}_{}".format(
self._instrument, self._survey,
self._exp_start, self._exp_id, detector, post)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector, post)
return os.path.join(self.dir_l1, fn)
def set_detectors(self, detectors=None):
""" set target detector """
if detectors is None:
# default detectors
self.target_detectors = self.available_detectors
else:
try:
# assert detectors is a subset of available detectors
assert set(detectors).issubset(set(self.available_detectors))
self.target_detectors = list(detectors)
except AssertionError as ae:
print("@DM: available detector are ", self.available_detectors)
print("@DM: target detector are ", detectors)
print("@DM: final target detectors are ", set(detectors) & set(self.available_detectors))
# raise ae
self.target_detectors = set(detectors) & set(self.available_detectors)
print("final target detector IDs are ", self.target_detectors)
return
def get_bias(self, detector=6):
fp = glob.glob(self.path_aux.format("CLB", detector))[0]
return fits.getdata(fp)
def get_dark(self, detector=6):
fp = glob.glob(self.path_aux.format("CLD", detector))[0]
return fits.getdata(fp)
def get_flat(self, detector=6):
fp = glob.glob(self.path_aux.format("CLF", detector))[0]
return fits.getdata(fp)
def l1_file(self, name="", comment=""):
""" L1 file path
Parameters
----------
name: str
file name
comment: str
use the function name plz
Returns
-------
fp: str
the synthetic file path
"""
fp = os.path.join(self.dir_l1, name)
# record hardcode history
self.hardcode_history.append(dict(hdcd=fp, comment=comment))
return fp
@staticmethod
def quickstart(ver_sim="C5.2", datatype="mbi", dir_l1=".", exposure_id=100):
""" quick dataset generator for tests on dandelion or PMO
Parameters
----------
ver_sim:
{"C5.2"}
datatype: str
{"mbi", "sls"}
dir_l1:
output directory
exposure_id:
21-154 for C5.2
Returns
-------
CsstMbiDataManager
the MBI data manager instance
"""
assert datatype in ["mbi", "sls"]
# auto identify node name
hostname = os.uname()[1]
assert hostname in ["dandelion", "ubuntu"]
# dandelion
if hostname == "dandelion" and datatype == "mbi":
dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/" \
"NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
path_aux = "/nfsdata/users/cham/L1Test/ref_C5.2/MSC_{}_*_{:02d}_combine.fits"
dir_pcref = "/nfsdata/users/csstpipeline/L1Pipeline/msc/gaia_dr3/"
elif hostname == "dandelion" and datatype == "sls":
dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/" \
"NGP_AstrometryON_shearOFF_Spec/MSC_{:07d}/".format(exposure_id)
path_aux = ""
dir_pcref = ""
# PMO
elif hostname == "ubuntu" and datatype == "mbi":
dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
"NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
path_aux = "/data/sim_data/MSC_0000100/ref/MSC_{}_*_{:02d}_combine.fits"
dir_pcref = "/home/user/L1Pipeline/msc/gaia_dr3/"
elif hostname == "ubuntu" and datatype == "sls":
dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
"NGP_AstrometryON_shearOFF_Spec/MSC_{:07d}/".format(exposure_id)
path_aux = ""
dir_pcref = ""
else:
raise ValueError("@DM: invalid hostname {}!".format(hostname))
return CsstMsDataManager(ver_sim=ver_sim, dir_l0=dir_l0, dir_l1=dir_l1, dir_pcref=dir_pcref, path_aux=path_aux)
class CsstMbiDataManager:
""" this class defines the file format of the input & output of CSST MSC L1 pipeline
......@@ -151,26 +396,28 @@ class CsstMbiDataManager:
Examples
--------
>>> dm = CsstMbiDataManager(...)
>>> dm_mbi = CsstMbiDataManager(...)
>>> # access L0 directory
>>> dm.dir_l0
>>> dm_mbi.dir_l0
>>> # access L1 directory
>>> dm.dir_l1
>>> dm_mbi.dir_l1
>>> # access dir_pcref
>>> dm.dir_pcref
>>> dm_mbi.dir_pcref
>>> # access path_aux
>>> dm.path_aux
>>> dm_mbi.path_aux
>>> # access ver_sim
>>> dm.ver_sim
>>> dm_mbi.ver_sim
>>> # access target detectors
>>> dm.target_detectors
>>> dm_mbi.target_detectors
>>> # access available detectors
>>> dm.available_detectors
>>> dm_mbi.available_detectors
>>> # define an L1 file (detector-specified)
>>> dm.l1_detector(detector=6)
>>> dm_mbi.l1_detector(detector=6)
>>> # define an L1 file (non-detector-specified)
>>> dm.l1_file("flipped_image.fits")
>>> dm_mbi.l1_file("flipped_image.fits")
"""
# raise DeprecationWarning(
# "CsstMbiDataManager will no longer be available in some days, please use CsstMsDataManager instead.")
assert ver_sim in CP["sim"]["versions"]
self.dir_l0 = dir_l0
......@@ -378,34 +625,6 @@ class CsstMbiDataManager:
self.hardcode_history.append(dict(hdcd=fp, comment=comment))
return fp
@staticmethod
def __testddl__():
""" test function """
dm = CsstMbiDataManager(
ver_sim="C5.2",
dir_l0="/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging"
"/NGP_AstrometryON_shearOFF/MSC_0000100",
dir_l1=".",
force_all_detectors=True,
)
print("----- available detectors -----")
print(dm.available_detectors)
for detector in dm.available_detectors[:2]:
print("----- L0 images -----")
print(dm.l0_detector(detector=detector))
print(os.path.exists(dm.l0_detector(detector=detector)))
print("----- L0 crs -----")
print(dm.l0_crs(detector=detector))
print(os.path.exists(dm.l0_detector(detector=detector)))
print("----- L0 input cat -----")
print(dm.l0_cat(detector=detector))
print(os.path.exists(dm.l0_cat(detector=detector)))
print("----- L0 input log -----")
print(dm.l0_log(detector=detector))
print(os.path.exists(dm.l0_log(detector=detector)))
print("----- L1 images -----")
print(dm.l1_detector(detector, post="img.fits"))
return dm
@staticmethod
def quickstart(ver_sim="C5.2", dir_l1=".", exposure_id=100):
......@@ -446,12 +665,12 @@ class CsstMbiDataManager:
return CsstMbiDataManager(ver_sim=ver_sim, dir_l0=dir_l0, dir_l1=dir_l1, dir_pcref=dir_pcref, path_aux=path_aux)
DIR_SIM = {
("C5.2", "dandelion"): "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/"
"NGP_AstrometryON_shearOFF/MSC_{:07d}/",
("C5.2", "ubuntu"): "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/"
"NGP_AstrometryON_shearOFF/MSC_{:07d}/",
}
# DIR_SIM = {
# ("C5.2", "dandelion"): "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/"
# "NGP_AstrometryON_shearOFF/MSC_{:07d}/",
# ("C5.2", "ubuntu"): "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/"
# "NGP_AstrometryON_shearOFF/MSC_{:07d}/",
# }
if __name__ == "__main__":
# test C3
......
......@@ -10,16 +10,17 @@ Modified-History:
"""
import os
import unittest
from csst_common.data_manager import CsstMbiDataManager
from csst_common.data_manager import CsstMbiDataManager, CsstMsDataManager
from csst_common.params import CSST_PARAMS as CP
class TestDataManager(unittest.TestCase):
# deprecated in some days
class TestCsstMbiDataManager(unittest.TestCase):
def setUp(self) -> None:
self.dm = CsstMbiDataManager.quickstart(ver_sim="C5.2", dir_l1=".", exposure_id=100)
def test_l0data_existence(self):
self.assertTrue(len(self.dm.available_detectors) == len(CP["mbi"]["detectors"]))
self.assertTrue(self.dm.available_detectors == CP["mbi"]["detectors"])
self.assertTrue(os.path.exists(self.dm.l0_detector(6)))
self.assertTrue(os.path.exists(self.dm.l0_log(6)))
self.assertTrue(os.path.exists(self.dm.l0_cat(6)))
......@@ -28,6 +29,30 @@ class TestDataManager(unittest.TestCase):
self.assertTrue(isinstance(self.dm.l1_file(name="some_file.ext", comment="a demo file"), str))
class TestCsstMsDataManager(unittest.TestCase):
def setUp(self) -> None:
self.dm_mbi = CsstMsDataManager.quickstart(ver_sim="C5.2", datatype="mbi", dir_l1=".", exposure_id=100)
self.dm_sls = CsstMsDataManager.quickstart(ver_sim="C5.2", datatype="sls", dir_l1=".", exposure_id=100)
def test_mbi_data_existence(self):
self.assertTrue(self.dm_mbi.available_detectors == CP["mbi"]["detectors"])
self.assertTrue(os.path.exists(self.dm_mbi.l0_detector(6)))
self.assertTrue(os.path.exists(self.dm_mbi.l0_log(6)))
self.assertTrue(os.path.exists(self.dm_mbi.l0_cat(6)))
self.assertTrue(os.path.exists(self.dm_mbi.l0_crs(6)))
self.assertTrue(isinstance(self.dm_mbi.l1_detector(6, post="img.fits"), str))
self.assertTrue(isinstance(self.dm_mbi.l1_file(name="some_file.ext", comment="a demo file"), str))
def test_sls_data_existence(self):
self.assertTrue(self.dm_sls.available_detectors == CP["sls"]["detectors"])
self.assertTrue(os.path.exists(self.dm_sls.l0_detector(6)))
self.assertTrue(os.path.exists(self.dm_sls.l0_log(6)))
self.assertTrue(os.path.exists(self.dm_sls.l0_cat(6)))
self.assertTrue(os.path.exists(self.dm_sls.l0_crs(6)))
self.assertTrue(isinstance(self.dm_sls.l1_detector(6, post="img.fits"), str))
self.assertTrue(isinstance(self.dm_sls.l1_file(name="some_file.ext", comment="a demo file"), str))
# class TestParams(unittest.TestCase):
# def test_params(self):
# self.assertTrue(True)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment