Commit 73e8aa7d authored by Fang Yuedong's avatar Fang Yuedong
Browse files

add IO unittest (test for CatalogBase.py) to test_imaging.py

parent 04019c9a
Pipeline #4262 passed with stage
in 0 seconds
......@@ -4,7 +4,9 @@ from scipy import interpolate
import astropy.constants as cons
from astropy.table import Table
import h5py as h5
import sys,os,math
import sys
import os
import math
from itertools import islice
import numpy as np
import galsim
......@@ -14,136 +16,130 @@ from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
from ObservationSim.MockObject._util import getObservedSED
from ObservationSim.MockObject import CatalogBase, Galaxy
from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
from ObservationSim.PSF.PSFInterp import PSFInterp
from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG
def convert_sed(mag, sed, target_filt, norm_filt=None):
bandpass = target_filt.bandpass_full
if norm_filt is not None:
norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001
else:
norm_filt = Table(
np.array(np.array([bandpass.wave_list*10.0, bandpass.func(bandpass.wave_list)])).T, names=(['WAVELENGTH', 'SENSITIVITY'])
class Catalog(CatalogBase):
def __init__(self):
super().__init__()
self.rotation = 0.
def load_norm_filt(self, obj):
return None
def load_sed(self, obj, **kward):
pcs = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'),
'csst_msc_sim/csst_fz_msc/sedlibs/pcs.h5'), "r")
lamb = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'),
'csst_msc_sim/csst_fz_msc/sedlibs/lamb.h5'), "r")
lamb_gal = lamb['lamb'][()]
pcs = pcs['pcs'][()]
cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
factor = 10**(-.4 * cosmo.distmod(obj.param['z']).value)
flux = np.matmul(pcs, obj.param['coeff']) * factor
# if np.any(flux < 0):
# raise ValueError("Glaxy %s: negative SED fluxes"%obj.id)
flux[flux < 0] = 0.
sedcat = np.vstack((lamb_gal, flux)).T
sed_data = getObservedSED(
sedCat=sedcat,
redshift=obj.param['z'],
av=obj.param["av"],
redden=obj.param["redden"]
)
norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001
sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=mag,
spectrum=sed,
norm_thr=norm_filt,
sWave=np.floor(norm_filt[norm_thr_rang_ids][0][0]),
eWave=np.ceil(norm_filt[norm_thr_rang_ids][-1][0]))
sed_photon = copy.copy(sed)
sed_photon = np.array([sed_photon['WAVELENGTH'], sed_photon['FLUX']*sedNormFactor]).T
sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest')
# Get magnitude
sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False)
interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=bandpass)
mag_csst = getABMAG(
interFlux=interFlux,
bandpass=bandpass
)
if target_filt.survey_type == "photometric":
return sed_photon, mag_csst, interFlux
elif target_filt.survey_type == "spectroscopic":
del sed_photon
return sed, mag_csst, interFlux
def _load_gals(file_path):
gals_cat = h5.File(file_path, 'r')['galaxies']
for ikeys in gals_cat.keys():
gals = gals_cat[ikeys]
param = {}
igals = 9
param['z'] = gals['redshift'][igals]
param['mag_use_normal'] = gals['mag_csst_g'][igals]
print(param['mag_use_normal'] )
param['e1'] = gals['ellipticity_true'][igals][0]
param['e2'] = gals['ellipticity_true'][igals][1]
param['e1_disk'] = param['e1']
param['e2_disk'] = param['e2']
param['e1_bulge'] = param['e1']
param['e2_bulge'] = param['e2']
# Masses
param['bulgemass'] = gals['bulgemass'][igals]
param['diskmass'] = gals['diskmass'][igals]
param['size'] = gals['size'][igals]
# Sersic index
param['disk_sersic_idx'] = 1.
param['bulge_sersic_idx'] = 4.
# Sizes
param['bfrac'] = param['bulgemass']/(param['bulgemass'] + param['diskmass'])
if param['bfrac'] >= 0.6:
param['hlr_bulge'] = param['size']
param['hlr_disk'] = param['size'] * (1. - param['bfrac'])
else:
param['hlr_disk'] = param['size']
param['hlr_bulge'] = param['size'] * param['bfrac']
# SED coefficients
param['coeff'] = gals['coeff'][igals]
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
pcs = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc/sedlibs/pcs.h5'), "r")
lamb = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc/sedlibs/lamb.h5'), "r")
lamb_gal = lamb['lamb'][()]
pcs = pcs['pcs'][()]
cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
factor = 10**(-.4 * cosmo.distmod(param['z']).value)
flux = np.matmul(pcs, param['coeff']) * factor
# if np.any(flux < 0):
# raise ValueError("Glaxy %s: negative SED fluxes"%obj.id)
flux[flux < 0] = 0.
sedcat = np.vstack((lamb_gal, flux)).T
sed_data = getObservedSED(
sedCat=sedcat,
redshift=param['z'],
av=param["av"],
redden=param["redden"]
)
wave, flux = sed_data[0], sed_data[1]
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 11001+0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photon/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
param["sed"] = sed
del wave
del flux
return param
wave, flux = sed_data[0], sed_data[1]
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 11001+0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photon/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
# obj.param["sed"] = sed
del wave
del flux
return sed
def _load(self, file_path):
gals_cat = h5.File(file_path, 'r')['galaxies']
for ikeys in gals_cat.keys():
gals = gals_cat[ikeys]
param = self.initialize_param()
igals = 9
param['z'] = gals['redshift'][igals]
param['mag_use_normal'] = gals['mag_csst_g'][igals]
print(param['mag_use_normal'])
param['e1'] = gals['ellipticity_true'][igals][0]
param['e2'] = gals['ellipticity_true'][igals][1]
# For shape calculation
param['e1'], param['e2'], param['ell_total'] = self.rotate_ellipticity(
e1=gals['ellipticity_true'][igals][0],
e2=gals['ellipticity_true'][igals][1],
rotation=self.rotation,
unit='radians')
param['e1_disk'] = param['e1']
param['e2_disk'] = param['e2']
param['e1_bulge'] = param['e1']
param['e2_bulge'] = param['e2']
# Masses
param['bulgemass'] = gals['bulgemass'][igals]
param['diskmass'] = gals['diskmass'][igals]
param['size'] = gals['size'][igals]
# Sersic index
param['disk_sersic_idx'] = 1.
param['bulge_sersic_idx'] = 4.
# Sizes
param['bfrac'] = param['bulgemass'] / \
(param['bulgemass'] + param['diskmass'])
if param['bfrac'] >= 0.6:
param['hlr_bulge'] = param['size']
param['hlr_disk'] = param['size'] * (1. - param['bfrac'])
else:
param['hlr_disk'] = param['size']
param['hlr_bulge'] = param['size'] * param['bfrac']
# SED coefficients
param['coeff'] = gals['coeff'][igals]
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
obj = Galaxy(param)
return obj
def defineCCD(iccd, config_file):
with open(config_file, "r") as stream:
try:
config = yaml.safe_load(stream)
#for key, value in config.items():
# for key, value in config.items():
# print (key + " : " + str(value))
except yaml.YAMLError as exc:
print(exc)
chip = Chip(chipID=iccd, config=config)
chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
focal_plane = FocalPlane(chip_list=[iccd])
chip.img.wcs= focal_plane.getTanWCS(192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
chip.img.wcs = focal_plane.getTanWCS(
192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
return chip
def defineFilt(chip):
filter_param = FilterParam()
filter_id, filter_type = chip.getChipFilter()
......@@ -158,7 +154,8 @@ def defineFilt(chip):
class imagingModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(imagingModule_coverage, self).__init__(methodName)
self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc')
self.dataPath = os.path.join(
os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc')
self.iccd = 8
def test_imaging(self):
......@@ -168,31 +165,37 @@ class imagingModule_coverage(unittest.TestCase):
filt = defineFilt(chip)
print(chip.chipID)
print(chip.cen_pix_x, chip.cen_pix_y)
obj = _load_gals(os.path.join(self.dataPath, 'galaxies_C6_bundle000287.h5')) #("UNIT_TEST_DATA/galaxies_C6_bundle000287.h5")
sed_data = obj['sed']
norm_filt = None
obj_sed, obj_mag, obj_flux = convert_sed(
mag=obj["mag_use_normal"],
catalog = Catalog()
obj = catalog._load(os.path.join(
self.dataPath, 'galaxies_C6_bundle000287.h5'))
sed_data = catalog.load_sed(obj)
norm_filt = catalog.load_norm_filt(obj)
obj_sed, obj_mag, obj_flux = catalog.convert_sed(
mag=obj.param["mag_use_normal"],
sed=sed_data,
target_filt=filt,
norm_filt=norm_filt,
)
pupil_area= np.pi * (0.5 * 2.)**2
pupil_area = np.pi * (0.5 * 2.)**2
exptime = 150.
nphotons_tot = obj_flux*pupil_area * exptime #getElectronFluxFilt(filt, tel, exptime)
# getElectronFluxFilt(filt, tel, exptime)
nphotons_tot = obj_flux*pupil_area * exptime
full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full)
print(full, nphotons_tot, obj_mag)
for i in range(4):
sub = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_sub_list[i])
sub = integrate_sed_bandpass(
sed=obj_sed, bandpass=filt.bandpass_sub_list[i])
ratio = sub / full
nphotons = ratio * nphotons_tot
disk = galsim.Sersic(n=obj['disk_sersic_idx'], half_light_radius=obj['hlr_disk'], flux=1.0)
disk_shape = galsim.Shear(g1=obj['e1_disk'], g2=obj['e2_disk'])
disk = galsim.Sersic(
n=obj.param['disk_sersic_idx'], half_light_radius=obj.param['hlr_disk'], flux=1.0)
disk_shape = galsim.Shear(
g1=obj.param['e1_disk'], g2=obj.param['e2_disk'])
disk = disk.shear(disk_shape)
gal_temp = disk
gal_temp = gal_temp.withFlux(nphotons)
......@@ -205,11 +208,9 @@ class imagingModule_coverage(unittest.TestCase):
else:
gal = gal + gal_temp
print(gal)
self.assertTrue( gal != None )
self.assertTrue(gal != None)
if __name__ == '__main__':
unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment