From d9387ab4ef9378310c35ed441e4fe59792c4337f Mon Sep 17 00:00:00 2001 From: chengliang Date: Wed, 17 Apr 2024 13:59:57 +0800 Subject: [PATCH] append unittest --- tests/test_imaging.py | 215 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 tests/test_imaging.py diff --git a/tests/test_imaging.py b/tests/test_imaging.py new file mode 100644 index 0000000..4e41563 --- /dev/null +++ b/tests/test_imaging.py @@ -0,0 +1,215 @@ +import unittest + +from scipy import interpolate +import astropy.constants as cons +from astropy.table import Table +import h5py as h5 +import sys,os,math +from itertools import islice +import numpy as np +import galsim +import yaml +import copy +from astropy.cosmology import FlatLambdaCDM +from astropy import constants +from astropy import units as U +from ObservationSim.MockObject._util import getObservedSED + +from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane +from ObservationSim.PSF.PSFInterp import PSFInterp +from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG + + +def convert_sed(mag, sed, target_filt, norm_filt=None): + bandpass = target_filt.bandpass_full + + if norm_filt is not None: + norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001 + else: + norm_filt = Table( + np.array(np.array([bandpass.wave_list*10.0, bandpass.func(bandpass.wave_list)])).T, names=(['WAVELENGTH', 'SENSITIVITY']) + ) + norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001 + + sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=mag, + spectrum=sed, + norm_thr=norm_filt, + sWave=np.floor(norm_filt[norm_thr_rang_ids][0][0]), + eWave=np.ceil(norm_filt[norm_thr_rang_ids][-1][0])) + sed_photon = copy.copy(sed) + sed_photon = np.array([sed_photon['WAVELENGTH'], sed_photon['FLUX']*sedNormFactor]).T + sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest') + # Get magnitude + sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False) + interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=bandpass) + mag_csst = getABMAG( + interFlux=interFlux, + bandpass=bandpass + ) + if target_filt.survey_type == "photometric": + return sed_photon, mag_csst, interFlux + elif target_filt.survey_type == "spectroscopic": + del sed_photon + return sed, mag_csst, interFlux + +def _load_gals(file_path): + gals_cat = h5.File(file_path, 'r')['galaxies'] + for ikeys in gals_cat.keys(): + gals = gals_cat[ikeys] + + param = {} + igals = 9 + param['z'] = gals['redshift'][igals] + param['mag_use_normal'] = gals['mag_csst_g'][igals] + print(param['mag_use_normal'] ) + + param['e1'] = gals['ellipticity_true'][igals][0] + param['e2'] = gals['ellipticity_true'][igals][1] + + param['e1_disk'] = param['e1'] + param['e2_disk'] = param['e2'] + param['e1_bulge'] = param['e1'] + param['e2_bulge'] = param['e2'] + + # Masses + param['bulgemass'] = gals['bulgemass'][igals] + param['diskmass'] = gals['diskmass'][igals] + + param['size'] = gals['size'][igals] + + # Sersic index + param['disk_sersic_idx'] = 1. + param['bulge_sersic_idx'] = 4. + + # Sizes + param['bfrac'] = param['bulgemass']/(param['bulgemass'] + param['diskmass']) + if param['bfrac'] >= 0.6: + param['hlr_bulge'] = param['size'] + param['hlr_disk'] = param['size'] * (1. - param['bfrac']) + else: + param['hlr_disk'] = param['size'] + param['hlr_bulge'] = param['size'] * param['bfrac'] + + # SED coefficients + param['coeff'] = gals['coeff'][igals] + + # TEST no redening and no extinction + param['av'] = 0.0 + param['redden'] = 0 + + pcs = h5.File("/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/sedlibs/"+"pcs.h5", "r") + lamb = h5.File("/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/sedlibs/"+"lamb.h5", "r") + lamb_gal = lamb['lamb'][()] + pcs = pcs['pcs'][()] + + cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111) + factor = 10**(-.4 * cosmo.distmod(param['z']).value) + flux = np.matmul(pcs, param['coeff']) * factor + # if np.any(flux < 0): + # raise ValueError("Glaxy %s: negative SED fluxes"%obj.id) + flux[flux < 0] = 0. + sedcat = np.vstack((lamb_gal, flux)).T + sed_data = getObservedSED( + sedCat=sedcat, + redshift=param['z'], + av=param["av"], + redden=param["redden"] + ) + wave, flux = sed_data[0], sed_data[1] + + speci = interpolate.interp1d(wave, flux) + lamb = np.arange(2000, 11001+0.5, 0.5) + y = speci(lamb) + # erg/s/cm2/A --> photon/s/m2/A + all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13 + sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX')) + + param["sed"] = sed + del wave + del flux + return param + + +def defineCCD(iccd, config_file): + with open(config_file, "r") as stream: + try: + config = yaml.safe_load(stream) + #for key, value in config.items(): + # print (key + " : " + str(value)) + except yaml.YAMLError as exc: + print(exc) + chip = Chip(chipID=iccd, config=config) + chip.img = galsim.ImageF(chip.npix_x, chip.npix_y) + focal_plane = FocalPlane(chip_list=[iccd]) + chip.img.wcs= focal_plane.getTanWCS(192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale) + return chip + +def defineFilt(chip): + filter_param = FilterParam() + filter_id, filter_type = chip.getChipFilter() + filt = Filter( + filter_id=filter_id, + filter_type=filter_type, + filter_param=filter_param, + ccd_bandpass=chip.effCurve) + return filt + + +class imagingModule_coverage(unittest.TestCase): + def __init__(self, methodName='runTest'): + super(imagingModule_coverage, self).__init__(methodName) + self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1') + self.iccd = 8 + + def test_imaging(self): + config_file = os.path.join(self.dataPath, 'config_test.yaml') + chip = defineCCD(self.iccd, config_file) + bandpass = defineFilt(chip) + filt = defineFilt(chip) + print(chip.chipID) + print(chip.cen_pix_x, chip.cen_pix_y) + + + obj = _load_gals("UNIT_TEST_DATA/galaxies_C6_bundle000287.h5") + + sed_data = obj['sed'] + norm_filt = None + obj_sed, obj_mag, obj_flux = convert_sed( + mag=obj["mag_use_normal"], + sed=sed_data, + target_filt=filt, + norm_filt=norm_filt, + ) + + pupil_area= np.pi * (0.5 * 2.)**2 + exptime = 150. + nphotons_tot = obj_flux*pupil_area * exptime #getElectronFluxFilt(filt, tel, exptime) + full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full) + print(full, nphotons_tot, obj_mag) + for i in range(4): + sub = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_sub_list[i]) + + ratio = sub / full + nphotons = ratio * nphotons_tot + disk = galsim.Sersic(n=obj['disk_sersic_idx'], half_light_radius=obj['hlr_disk'], flux=1.0) + disk_shape = galsim.Shear(g1=obj['e1_disk'], g2=obj['e2_disk']) + disk = disk.shear(disk_shape) + gal_temp = disk + gal_temp = gal_temp.withFlux(nphotons) + + psf = galsim.Gaussian(sigma=0.1, flux=1) + gal_temp = galsim.Convolve(psf, gal_temp) + + if i == 0: + gal = gal_temp + else: + gal = gal + gal_temp + print(gal) + + + self.assertTrue( gal != None ) + + +if __name__ == '__main__': + unittest.main() + -- GitLab