diff --git a/tests/test_imaging.py b/tests/test_imaging.py index 7d1e5226c3b07b9d6534a45e349bf3df473d48d4..e7175d62074da17dbff4627444f18fd2a4ea306e 100644 --- a/tests/test_imaging.py +++ b/tests/test_imaging.py @@ -4,7 +4,9 @@ from scipy import interpolate import astropy.constants as cons from astropy.table import Table import h5py as h5 -import sys,os,math +import sys +import os +import math from itertools import islice import numpy as np import galsim @@ -14,136 +16,130 @@ from astropy.cosmology import FlatLambdaCDM from astropy import constants from astropy import units as U from ObservationSim.MockObject._util import getObservedSED +from ObservationSim.MockObject import CatalogBase, Galaxy from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane from ObservationSim.PSF.PSFInterp import PSFInterp from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG -def convert_sed(mag, sed, target_filt, norm_filt=None): - bandpass = target_filt.bandpass_full - - if norm_filt is not None: - norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001 - else: - norm_filt = Table( - np.array(np.array([bandpass.wave_list*10.0, bandpass.func(bandpass.wave_list)])).T, names=(['WAVELENGTH', 'SENSITIVITY']) +class Catalog(CatalogBase): + def __init__(self): + super().__init__() + self.rotation = 0. + + def load_norm_filt(self, obj): + return None + + def load_sed(self, obj, **kward): + pcs = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), + 'csst_msc_sim/csst_fz_msc/sedlibs/pcs.h5'), "r") + lamb = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), + 'csst_msc_sim/csst_fz_msc/sedlibs/lamb.h5'), "r") + lamb_gal = lamb['lamb'][()] + pcs = pcs['pcs'][()] + + cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111) + factor = 10**(-.4 * cosmo.distmod(obj.param['z']).value) + flux = np.matmul(pcs, obj.param['coeff']) * factor + # if np.any(flux < 0): + # raise ValueError("Glaxy %s: negative SED fluxes"%obj.id) + flux[flux < 0] = 0. + sedcat = np.vstack((lamb_gal, flux)).T + sed_data = getObservedSED( + sedCat=sedcat, + redshift=obj.param['z'], + av=obj.param["av"], + redden=obj.param["redden"] ) - norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001 - - sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=mag, - spectrum=sed, - norm_thr=norm_filt, - sWave=np.floor(norm_filt[norm_thr_rang_ids][0][0]), - eWave=np.ceil(norm_filt[norm_thr_rang_ids][-1][0])) - sed_photon = copy.copy(sed) - sed_photon = np.array([sed_photon['WAVELENGTH'], sed_photon['FLUX']*sedNormFactor]).T - sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest') - # Get magnitude - sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False) - interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=bandpass) - mag_csst = getABMAG( - interFlux=interFlux, - bandpass=bandpass - ) - if target_filt.survey_type == "photometric": - return sed_photon, mag_csst, interFlux - elif target_filt.survey_type == "spectroscopic": - del sed_photon - return sed, mag_csst, interFlux - -def _load_gals(file_path): - gals_cat = h5.File(file_path, 'r')['galaxies'] - for ikeys in gals_cat.keys(): - gals = gals_cat[ikeys] - - param = {} - igals = 9 - param['z'] = gals['redshift'][igals] - param['mag_use_normal'] = gals['mag_csst_g'][igals] - print(param['mag_use_normal'] ) - - param['e1'] = gals['ellipticity_true'][igals][0] - param['e2'] = gals['ellipticity_true'][igals][1] - - param['e1_disk'] = param['e1'] - param['e2_disk'] = param['e2'] - param['e1_bulge'] = param['e1'] - param['e2_bulge'] = param['e2'] - - # Masses - param['bulgemass'] = gals['bulgemass'][igals] - param['diskmass'] = gals['diskmass'][igals] - - param['size'] = gals['size'][igals] - - # Sersic index - param['disk_sersic_idx'] = 1. - param['bulge_sersic_idx'] = 4. - - # Sizes - param['bfrac'] = param['bulgemass']/(param['bulgemass'] + param['diskmass']) - if param['bfrac'] >= 0.6: - param['hlr_bulge'] = param['size'] - param['hlr_disk'] = param['size'] * (1. - param['bfrac']) - else: - param['hlr_disk'] = param['size'] - param['hlr_bulge'] = param['size'] * param['bfrac'] - - # SED coefficients - param['coeff'] = gals['coeff'][igals] - - # TEST no redening and no extinction - param['av'] = 0.0 - param['redden'] = 0 - - pcs = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc/sedlibs/pcs.h5'), "r") - lamb = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc/sedlibs/lamb.h5'), "r") - lamb_gal = lamb['lamb'][()] - pcs = pcs['pcs'][()] - - cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111) - factor = 10**(-.4 * cosmo.distmod(param['z']).value) - flux = np.matmul(pcs, param['coeff']) * factor - # if np.any(flux < 0): - # raise ValueError("Glaxy %s: negative SED fluxes"%obj.id) - flux[flux < 0] = 0. - sedcat = np.vstack((lamb_gal, flux)).T - sed_data = getObservedSED( - sedCat=sedcat, - redshift=param['z'], - av=param["av"], - redden=param["redden"] - ) - wave, flux = sed_data[0], sed_data[1] - - speci = interpolate.interp1d(wave, flux) - lamb = np.arange(2000, 11001+0.5, 0.5) - y = speci(lamb) - # erg/s/cm2/A --> photon/s/m2/A - all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13 - sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX')) - - param["sed"] = sed - del wave - del flux - return param + wave, flux = sed_data[0], sed_data[1] + + speci = interpolate.interp1d(wave, flux) + lamb = np.arange(2000, 11001+0.5, 0.5) + y = speci(lamb) + # erg/s/cm2/A --> photon/s/m2/A + all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13 + sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX')) + + # obj.param["sed"] = sed + del wave + del flux + return sed + + def _load(self, file_path): + gals_cat = h5.File(file_path, 'r')['galaxies'] + for ikeys in gals_cat.keys(): + gals = gals_cat[ikeys] + + param = self.initialize_param() + igals = 9 + param['z'] = gals['redshift'][igals] + param['mag_use_normal'] = gals['mag_csst_g'][igals] + print(param['mag_use_normal']) + + param['e1'] = gals['ellipticity_true'][igals][0] + param['e2'] = gals['ellipticity_true'][igals][1] + + # For shape calculation + param['e1'], param['e2'], param['ell_total'] = self.rotate_ellipticity( + e1=gals['ellipticity_true'][igals][0], + e2=gals['ellipticity_true'][igals][1], + rotation=self.rotation, + unit='radians') + + param['e1_disk'] = param['e1'] + param['e2_disk'] = param['e2'] + param['e1_bulge'] = param['e1'] + param['e2_bulge'] = param['e2'] + + # Masses + param['bulgemass'] = gals['bulgemass'][igals] + param['diskmass'] = gals['diskmass'][igals] + + param['size'] = gals['size'][igals] + + # Sersic index + param['disk_sersic_idx'] = 1. + param['bulge_sersic_idx'] = 4. + + # Sizes + param['bfrac'] = param['bulgemass'] / \ + (param['bulgemass'] + param['diskmass']) + if param['bfrac'] >= 0.6: + param['hlr_bulge'] = param['size'] + param['hlr_disk'] = param['size'] * (1. - param['bfrac']) + else: + param['hlr_disk'] = param['size'] + param['hlr_bulge'] = param['size'] * param['bfrac'] + + # SED coefficients + param['coeff'] = gals['coeff'][igals] + + # TEST no redening and no extinction + param['av'] = 0.0 + param['redden'] = 0 + + obj = Galaxy(param) + + return obj def defineCCD(iccd, config_file): with open(config_file, "r") as stream: try: config = yaml.safe_load(stream) - #for key, value in config.items(): + # for key, value in config.items(): # print (key + " : " + str(value)) except yaml.YAMLError as exc: print(exc) chip = Chip(chipID=iccd, config=config) chip.img = galsim.ImageF(chip.npix_x, chip.npix_y) focal_plane = FocalPlane(chip_list=[iccd]) - chip.img.wcs= focal_plane.getTanWCS(192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale) + chip.img.wcs = focal_plane.getTanWCS( + 192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale) return chip + def defineFilt(chip): filter_param = FilterParam() filter_id, filter_type = chip.getChipFilter() @@ -158,7 +154,8 @@ def defineFilt(chip): class imagingModule_coverage(unittest.TestCase): def __init__(self, methodName='runTest'): super(imagingModule_coverage, self).__init__(methodName) - self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc') + self.dataPath = os.path.join( + os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc') self.iccd = 8 def test_imaging(self): @@ -168,31 +165,37 @@ class imagingModule_coverage(unittest.TestCase): filt = defineFilt(chip) print(chip.chipID) print(chip.cen_pix_x, chip.cen_pix_y) - - - obj = _load_gals(os.path.join(self.dataPath, 'galaxies_C6_bundle000287.h5')) #("UNIT_TEST_DATA/galaxies_C6_bundle000287.h5") - - sed_data = obj['sed'] - norm_filt = None - obj_sed, obj_mag, obj_flux = convert_sed( - mag=obj["mag_use_normal"], + + catalog = Catalog() + + obj = catalog._load(os.path.join( + self.dataPath, 'galaxies_C6_bundle000287.h5')) + sed_data = catalog.load_sed(obj) + norm_filt = catalog.load_norm_filt(obj) + + obj_sed, obj_mag, obj_flux = catalog.convert_sed( + mag=obj.param["mag_use_normal"], sed=sed_data, target_filt=filt, norm_filt=norm_filt, ) - pupil_area= np.pi * (0.5 * 2.)**2 + pupil_area = np.pi * (0.5 * 2.)**2 exptime = 150. - nphotons_tot = obj_flux*pupil_area * exptime #getElectronFluxFilt(filt, tel, exptime) + # getElectronFluxFilt(filt, tel, exptime) + nphotons_tot = obj_flux*pupil_area * exptime full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full) print(full, nphotons_tot, obj_mag) for i in range(4): - sub = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_sub_list[i]) - + sub = integrate_sed_bandpass( + sed=obj_sed, bandpass=filt.bandpass_sub_list[i]) + ratio = sub / full nphotons = ratio * nphotons_tot - disk = galsim.Sersic(n=obj['disk_sersic_idx'], half_light_radius=obj['hlr_disk'], flux=1.0) - disk_shape = galsim.Shear(g1=obj['e1_disk'], g2=obj['e2_disk']) + disk = galsim.Sersic( + n=obj.param['disk_sersic_idx'], half_light_radius=obj.param['hlr_disk'], flux=1.0) + disk_shape = galsim.Shear( + g1=obj.param['e1_disk'], g2=obj.param['e2_disk']) disk = disk.shear(disk_shape) gal_temp = disk gal_temp = gal_temp.withFlux(nphotons) @@ -205,11 +208,9 @@ class imagingModule_coverage(unittest.TestCase): else: gal = gal + gal_temp print(gal) - - self.assertTrue( gal != None ) + self.assertTrue(gal != None) if __name__ == '__main__': unittest.main() -