Commit 500a78ec authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

reconstructed CsstMsDataManager

parent 35519108
......@@ -81,7 +81,7 @@ sim: # simulation
- C5.1
- C3
basic:
all:
detectors:
- 1
- 2
......
......@@ -8,12 +8,12 @@ Modified-History:
2022-09-13, Bo Zhang, created
2022-09-13, Bo Zhang, added CsstMbiDataManager
2022-09-29, Bo Zhang, favor CsstMsDataManager instead of CsstMbiDataManager
2022-10-26, Bo Zhang, reconstruct CsstMsDataManager, deprecate CsstMbiDataManager
"""
import os
import glob
import re
import numpy as np
from astropy.io import fits
from .params import CSST_PARAMS as CP
......@@ -53,8 +53,6 @@ class CsstMsDataManager:
Will be removed in the next version.
path_aux : str
The aux data directory (bias, flat, dark).
assert_all_detectors : bool
If True, assert data for all detectors are available.
datatype : str
The options are {"mbi", "sls", "all"}.
The "all" option is used for QC in particular.
......@@ -83,15 +81,32 @@ class CsstMsDataManager:
>>> dm_mbi.l1_file("flipped_image.fits")
"""
def __init__(self, ver_sim="C5.2", dir_l0="", dir_l1="", dir_pcref="", path_aux="", assert_all_detectors=False,
datatype="mbi"):
""" initialize the multi-band imaging data manager """
def __init__(self,
ver_sim="C5.2",
datatype="mbi",
available_detectors=None,
target_detectors=None,
dir_l0=".",
dir_l1=".",
path_aux="", # bias dark flat
dir_pcref="", # deprecated
_exp_id=100000100,
_exp_start=20270810081950,
_exp_stop=20270810082220,
_telescope="CSST",
_instrument="MSC",
_survey="MS",
_imagetype="SCI",
_l0_post="L0_1",
):
# version
assert ver_sim in CP["sim"]["versions"]
self.ver_sim = ver_sim
# datatype, valid_detectors, detector2filter
assert datatype in ["mbi", "sls", "all"]
self.datatype = datatype
if datatype == "mbi":
# MBI
self.valid_detectors = CP["mbi"]["detectors"]
......@@ -102,59 +117,81 @@ class CsstMsDataManager:
self.detector2filter = CP["sls"]["detector2filter"]
else:
# ALL
self.valid_detectors = CP["basic"]["detectors"]
self.detector2filter = CP["basic"]["detector2filter"]
self.valid_detectors = CP["all"]["detectors"]
self.detector2filter = CP["all"]["detector2filter"]
# available_detectors
self.available_detectors = available_detectors
# set all available detectors by default
self.target_detectors = []
self.set_detectors(target_detectors)
# exposure info
self._exp_id = _exp_id
self._exp_start = _exp_start
self._exp_stop = _exp_stop
# file name components
self._telescope = _telescope
self._instrument = _instrument
self._survey = _survey
self._imagetype = _imagetype
self._l0_post = _l0_post
# data directory
self.dir_l0 = dir_l0
self.dir_l1 = dir_l1
self.dir_pcref = dir_pcref
self.path_aux = path_aux
self.ver_sim = ver_sim
self.target_detectors = []
# record hard code names in history
self.hardcode_history = []
fps_img = self.glob_image(dir_l0, ver_sim=ver_sim)
fps_cat = self.glob_cat(dir_l0, ver_sim=ver_sim)
if assert_all_detectors:
assert len(fps_img) == len(self.valid_detectors)
else:
assert len(fps_img) > 0
@staticmethod
def from_dfs(ver_sim="C5.2", data_type="mbi", exp_id=10000100, dir_l1="."):
raise NotImplementedError("from_dfs is currently not available!")
if ver_sim == "C3":
# get info
# print(re.split(r"[_.]", fps[0]))
self._instrument, self._survey, \
self._exp_start, self._exp_id, \
_detector, self._l0_suffix, _ext = re.split(r"[_.]", fps_img[0])
self._cat_id = re.split(r"[_.]", fps_cat[0])[1]
self._exp_start = int(self._exp_start)
self._exp_id = int(self._exp_id)
# available detectors
self.available_detectors = [int(re.split(r"[_.]", fp)[4]) for fp in fps_img]
self.available_detectors.sort()
elif ver_sim in ["C5.1", "C5.2"]:
# get info
# print(re.split(r"[_.]", fps[0]))
self._telescope, self._instrument, self._survey, self._imagetype, \
self._exp_start, self._exp_stop, self._exp_id, \
_detector, self._l0_suffix, self._version, _ext = re.split(r"[_.]", fps_img[0])
self._cat_id = re.split(r"[_.]", fps_cat[0])[1]
self._exp_start = int(self._exp_start)
self._exp_stop = int(self._exp_stop)
self._exp_id = int(self._exp_id)
# available detectors
self.available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img]
self.available_detectors.sort()
@staticmethod
def from_dir(ver_sim="C5.2", datatype="mbi", dir_l0=".", dir_l1=".", dir_pcref="", path_aux=""):
""" initialize the multi-band imaging data manager """
# set all available detectors by default
self.set_detectors(None)
assert ver_sim in ["C5.2", ]
# glob files
fps_img = CsstMsDataManager.glob_image(dir_l0, ver_sim=ver_sim)
if len(fps_img) == 0:
raise FileNotFoundError(f"No file found in dir_l0: {dir_l0}")
# available detectors
available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img]
available_detectors.sort()
# parse info
_telescope, _instrument, _survey, _imagetype, \
_exp_start, _exp_stop, _exp_id, \
_detector, *_l0_post, _ext = re.split(r"[_.]", fps_img[0])
_exp_start = int(_exp_start)
_exp_stop = int(_exp_stop)
_exp_id = int(_exp_id)
return CsstMsDataManager(ver_sim=ver_sim,
datatype=datatype,
available_detectors=available_detectors,
target_detectors=None,
dir_l0=dir_l0,
dir_l1=dir_l1,
path_aux=path_aux, # bias dark flat
dir_pcref=dir_pcref, # deprecated
_exp_id=_exp_id,
_exp_start=_exp_start,
_exp_stop=_exp_stop,
_telescope=_telescope,
_instrument=_instrument,
_survey=_survey,
_imagetype=_imagetype,
_l0_post="_".join(_l0_post),
)
@staticmethod
def glob_image(dir_l0, ver_sim="C5"):
......@@ -174,11 +211,8 @@ class CsstMsDataManager:
@staticmethod
def glob_cat(dir_l0, ver_sim="C5"):
""" glob input catalogs in L0 data directory """
if ver_sim == "C3":
pattern = os.path.join(dir_l0, "MSC_*.cat")
else:
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "MSC_*.cat")
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "MSC_*.cat")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
......@@ -188,49 +222,32 @@ class CsstMsDataManager:
def l0_cat(self, detector=6):
""" the L0 cat file path"""
if self.ver_sim == "C3":
fn = "{}_{}_{:07d}_{:02d}.cat".format(
self._instrument, self._cat_id, self._exp_id - 100000000, detector)
elif self.ver_sim == "C5.1":
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id - 90000000, detector, self.detector2filter[detector])
elif self.ver_sim == "C5.2":
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id, detector, self.detector2filter[detector])
assert self.ver_sim == "C5.2"
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id, detector, self.detector2filter[detector])
return os.path.join(self.dir_l0, fn)
def l0_log(self, detector=6):
""" L0 log file path """
if self.ver_sim == "C5.1":
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id - 90000000, detector, self.detector2filter[detector])
elif self.ver_sim == "C5.2":
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id, detector, self.detector2filter[detector])
assert self.ver_sim == "C5.2"
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id, detector, self.detector2filter[detector])
return os.path.join(self.dir_l0, fn)
def l0_detector(self, detector=6):
""" L0 detector-specific image file path """
if self.ver_sim == "C3":
fn = "{}_{}_{}_{}_{:02d}_raw.fits".format(
self._instrument, self._survey, self._exp_start, self._exp_id, detector)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
return os.path.join(self.dir_l0, fn)
def l0_crs(self, detector=6):
""" L0 cosmic ray file path """
if self.ver_sim == "C3":
fn = "{}_CRS_{}_{}_{:02d}_raw.fits".format(
self._instrument, self._exp_start, self._exp_id, detector)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
return os.path.join(self.dir_l0, fn)
def l1_detector(self, detector=6, post="img.fits"):
......@@ -249,15 +266,10 @@ class CsstMsDataManager:
L1 file path
"""
if self.ver_sim == "C3":
fn = "{}_{}_{}_{}_{:02d}_{}".format(
self._instrument, self._survey,
self._exp_start, self._exp_id, detector, post)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector, post)
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector, post)
return os.path.join(self.dir_l1, fn)
def set_detectors(self, detectors=None):
......@@ -309,9 +321,9 @@ class CsstMsDataManager:
Parameters
----------
name: str
name : str
file name
comment: str
comment : str
use the function name plz
Returns
......@@ -338,12 +350,12 @@ class CsstMsDataManager:
dir_l1 : str
output directory
exposure_id : int
21-154 for C5.2
The serial number of the exposure. 20-154 for C5.2.
Returns
-------
CsstMsDataManager
the MBI data manager instance
The Main Survey Data Manager instance.
"""
assert datatype in ["mbi", "sls"]
# auto identify node name
......@@ -377,31 +389,11 @@ class CsstMsDataManager:
else:
raise ValueError("@DM: invalid hostname {} or datatype {}!".format(hostname, datatype))
return CsstMsDataManager(
return CsstMsDataManager.from_dir(
ver_sim=ver_sim, datatype=datatype, dir_l0=dir_l0, dir_l1=dir_l1, dir_pcref=dir_pcref, path_aux=path_aux)
class CsstMbiDataManager:
""" this class defines the file format of the input & output of CSST MSC L1 pipeline
C3:
MSC_MS_210525220000_100000020_06_raw.fits
MSC_CRS_210525220000_100000020_06_raw.fits
MSC_210525120000_0000020_06.cat
C5.1:
CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
MSC_10000100_chip_06_filt_y.cat
MSC_10000100_chip_06_filt_y.log
C5.2
CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
MSC_100000100_chip_06_filt_y.cat
MSC_100000100_chip_06_filt_y.log
"""
def __init__(self, ver_sim="C5.2", dir_l0="", dir_l1="", dir_pcref="", path_aux="", force_all_detectors=False):
""" initialize the multi-band imaging data manager
......@@ -443,249 +435,5 @@ class CsstMbiDataManager:
>>> # define an L1 file (non-detector-specified)
>>> dm_mbi.l1_file("flipped_image.fits")
"""
# raise DeprecationWarning(
# "CsstMbiDataManager will no longer be available in some days, please use CsstMsDataManager instead.")
assert ver_sim in CP["sim"]["versions"]
self.dir_l0 = dir_l0
self.dir_l1 = dir_l1
self.dir_pcref = dir_pcref
self.path_aux = path_aux
self.ver_sim = ver_sim
self.target_detectors = []
self.hardcode_history = []
fps_img = self.glob_image(dir_l0, ver_sim=ver_sim)
fps_cat = self.glob_cat(dir_l0, ver_sim=ver_sim)
if force_all_detectors:
assert len(fps_img) == len(CP["mbi"]["detectors"])
else:
assert len(fps_img) > 0
if ver_sim == "C3":
# get info
# print(re.split(r"[_.]", fps[0]))
self._instrument, self._survey, \
self._exp_start, self._exp_id, \
_detector, self._l0_suffix, _ext = re.split(r"[_.]", fps_img[0])
self._cat_id = re.split(r"[_.]", fps_cat[0])[1]
self._exp_start = int(self._exp_start)
self._exp_id = int(self._exp_id)
# available detectors
self.available_detectors = [int(re.split(r"[_.]", fp)[4]) for fp in fps_img]
self.available_detectors.sort()
elif ver_sim in ["C5.1", "C5.2"]:
# get info
# print(re.split(r"[_.]", fps[0]))
self._telescope, self._instrument, self._survey, self._imagetype, \
self._exp_start, self._exp_stop, self._exp_id, \
_detector, self._l0_suffix, self._version, _ext = re.split(r"[_.]", fps_img[0])
self._cat_id = re.split(r"[_.]", fps_cat[0])[1]
self._exp_start = int(self._exp_start)
self._exp_stop = int(self._exp_stop)
self._exp_id = int(self._exp_id)
# available detectors
self.available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img]
self.available_detectors.sort()
@staticmethod
def glob_image(dir_l0, ver_sim="C5"):
""" glob files in L0 data directory """
if ver_sim == "C3":
pattern = os.path.join(dir_l0, "MSC_MS_*_raw.fits")
else:
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "CSST_MSC_MS_SCI_*.fits")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
return fps
@staticmethod
def glob_cat(dir_l0, ver_sim="C5"):
""" glob input catalogs in L0 data directory """
if ver_sim == "C3":
pattern = os.path.join(dir_l0, "MSC_*.cat")
else:
assert ver_sim in ["C5.1", "C5.2"]
pattern = os.path.join(dir_l0, "MSC_*.cat")
fps = glob.glob(pattern)
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
return fps
def l0_cat(self, detector=6):
""" the L0 cat file path"""
if self.ver_sim == "C3":
fn = "{}_{}_{:07d}_{:02d}.cat".format(
self._instrument, self._cat_id, self._exp_id - 100000000, detector)
elif self.ver_sim == "C5.1":
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id - 90000000, detector, CP["mbi"]["detector2filter"][detector])
elif self.ver_sim == "C5.2":
fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
self._instrument, self._exp_id, detector, CP["mbi"]["detector2filter"][detector])
return os.path.join(self.dir_l0, fn)
def l0_log(self, detector=6):
""" L0 log file path """
if self.ver_sim == "C5.1":
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id - 90000000, detector, CP["mbi"]["detector2filter"][detector])
elif self.ver_sim == "C5.2":
fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
self._instrument, self._exp_id, detector, CP["mbi"]["detector2filter"][detector])
return os.path.join(self.dir_l0, fn)
def l0_detector(self, detector=6):
""" L0 detector-specific image file path """
if self.ver_sim == "C3":
fn = "{}_{}_{}_{}_{:02d}_raw.fits".format(
self._instrument, self._survey, self._exp_start, self._exp_id, detector)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
return os.path.join(self.dir_l0, fn)
def l0_crs(self, detector=6):
""" L0 cosmic ray file path """
if self.ver_sim == "C3":
fn = "{}_CRS_{}_{}_{:02d}_raw.fits".format(
self._instrument, self._exp_start, self._exp_id, detector)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector)
return os.path.join(self.dir_l0, fn)
def l1_detector(self, detector=6, post="img.fits"):
""" generate L1 file path
Parameters
----------
detector:
detector ID
post:
postfix
{"img.fits", "wht.fits", "flg.fits", "img_L1.fits", "wht_L1.fits", "flg_L1.fits"}
Returns
-------
L1 file path
"""
if self.ver_sim == "C3":
fn = "{}_{}_{}_{}_{:02d}_{}".format(
self._instrument, self._survey,
self._exp_start, self._exp_id, detector, post)
else:
assert self.ver_sim in ["C5.1", "C5.2"]
fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format(
self._telescope, self._instrument, self._survey,
self._exp_start, self._exp_stop, self._exp_id, detector, post)
return os.path.join(self.dir_l1, fn)
def set_detectors(self, detectors=None):
""" set target detector """
if detectors is None:
# default detectors
self.target_detectors = self.available_detectors
else:
try:
# assert detectors is a subset of available detectors
assert set(detectors).issubset(set(self.available_detectors))
self.target_detectors = list(detectors)
except AssertionError as ae:
print("@DM: available detector are ", self.available_detectors)
print("@DM: target detector are ", detectors)
print("@DM: final target detectors are ", set(detectors) & set(self.available_detectors))
# raise ae
self.target_detectors = set(detectors) & set(self.available_detectors)
print("final target detector IDs are ", self.target_detectors)
return
def get_bias(self, detector=6):
fp = glob.glob(self.path_aux.format("CLB", detector))[0]
return fits.getdata(fp)
def get_dark(self, detector=6):
fp = glob.glob(self.path_aux.format("CLD", detector))[0]
return fits.getdata(fp)
def get_flat(self, detector=6):
fp = glob.glob(self.path_aux.format("CLF", detector))[0]
return fits.getdata(fp)
def l1_file(self, name="", comment=""):
"""
Parameters
----------
name: str
file name
comment: str
use the function name plz
Returns
-------
fp: str
the synthetic file path
"""
fp = os.path.join(self.dir_l1, name)
# record hardcode history
self.hardcode_history.append(dict(hdcd=fp, comment=comment))
return fp
@staticmethod
def quickstart(ver_sim="C5.2", dir_l1=".", exposure_id=100):
""" quick dataset generator for tests on dandelion or PMO
Parameters
----------
ver_sim: str
{"C5.2"}
dir_l1: str
output directory
exposure_id: int
21-154 for C5.2
Returns
-------
CsstMbiDataManager
the MBI data manager instance
"""
# auto identify node name
hostname = os.uname()[1]
if hostname == "dandelion":
# dandelion node
dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/" \
"NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
path_aux = "/nfsdata/users/cham/L1Test/ref_C5.2/MSC_{}_*_{:02d}_combine.fits"
dir_pcref = "/nfsdata/users/csstpipeline/L1Pipeline/msc/gaia_dr3/"
elif hostname == "ubuntu":
# PMO node
dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
"NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
path_aux = "/data/sim_data/MSC_0000100/ref/MSC_{}_*_{:02d}_combine.fits"
dir_pcref = "/home/user/L1Pipeline/msc/gaia_dr3/"
else:
raise ValueError("@DM: invalid hostname {}!".format(hostname))
return CsstMbiDataManager(ver_sim=ver_sim, dir_l0=dir_l0, dir_l1=dir_l1, dir_pcref=dir_pcref, path_aux=path_aux)
raise DeprecationWarning(
"CsstMbiDataManager will no longer be available in some days, please use CsstMsDataManager instead.")
......@@ -14,21 +14,6 @@ from csst_common.data_manager import CsstMbiDataManager, CsstMsDataManager
from csst_common.params import CSST_PARAMS as CP
# deprecated in some days
class TestCsstMbiDataManager(unittest.TestCase):
def setUp(self) -> None:
self.dm = CsstMbiDataManager.quickstart(ver_sim="C5.2", dir_l1=".", exposure_id=100)
def test_l0data_existence(self):
self.assertTrue(self.dm.available_detectors == CP["mbi"]["detectors"])
self.assertTrue(os.path.exists(self.dm.l0_detector(6)))
self.assertTrue(os.path.exists(self.dm.l0_log(6)))
self.assertTrue(os.path.exists(self.dm.l0_cat(6)))
self.assertTrue(os.path.exists(self.dm.l0_crs(6)))
self.assertTrue(isinstance(self.dm.l1_detector(6, post="img.fits"), str))
self.assertTrue(isinstance(self.dm.l1_file(name="some_file.ext", comment="a demo file"), str))
class TestCsstMsDataManager(unittest.TestCase):
def setUp(self) -> None:
self.dm_mbi = CsstMsDataManager.quickstart(ver_sim="C5.2", datatype="mbi", dir_l1=".", exposure_id=100)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment