import unittest from scipy import interpolate import astropy.constants as cons from astropy.table import Table import h5py as h5 import sys,os,math from itertools import islice import numpy as np import galsim import yaml import copy from astropy.cosmology import FlatLambdaCDM from astropy import constants from astropy import units as U from ObservationSim.MockObject._util import getObservedSED from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane from ObservationSim.PSF.PSFInterp import PSFInterp from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG def convert_sed(mag, sed, target_filt, norm_filt=None): bandpass = target_filt.bandpass_full if norm_filt is not None: norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001 else: norm_filt = Table( np.array(np.array([bandpass.wave_list*10.0, bandpass.func(bandpass.wave_list)])).T, names=(['WAVELENGTH', 'SENSITIVITY']) ) norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001 sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=mag, spectrum=sed, norm_thr=norm_filt, sWave=np.floor(norm_filt[norm_thr_rang_ids][0][0]), eWave=np.ceil(norm_filt[norm_thr_rang_ids][-1][0])) sed_photon = copy.copy(sed) sed_photon = np.array([sed_photon['WAVELENGTH'], sed_photon['FLUX']*sedNormFactor]).T sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest') # Get magnitude sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False) interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=bandpass) mag_csst = getABMAG( interFlux=interFlux, bandpass=bandpass ) if target_filt.survey_type == "photometric": return sed_photon, mag_csst, interFlux elif target_filt.survey_type == "spectroscopic": del sed_photon return sed, mag_csst, interFlux def _load_gals(file_path): gals_cat = h5.File(file_path, 'r')['galaxies'] for ikeys in gals_cat.keys(): gals = gals_cat[ikeys] param = {} igals = 9 param['z'] = gals['redshift'][igals] param['mag_use_normal'] = gals['mag_csst_g'][igals] print(param['mag_use_normal'] ) param['e1'] = gals['ellipticity_true'][igals][0] param['e2'] = gals['ellipticity_true'][igals][1] param['e1_disk'] = param['e1'] param['e2_disk'] = param['e2'] param['e1_bulge'] = param['e1'] param['e2_bulge'] = param['e2'] # Masses param['bulgemass'] = gals['bulgemass'][igals] param['diskmass'] = gals['diskmass'][igals] param['size'] = gals['size'][igals] # Sersic index param['disk_sersic_idx'] = 1. param['bulge_sersic_idx'] = 4. # Sizes param['bfrac'] = param['bulgemass']/(param['bulgemass'] + param['diskmass']) if param['bfrac'] >= 0.6: param['hlr_bulge'] = param['size'] param['hlr_disk'] = param['size'] * (1. - param['bfrac']) else: param['hlr_disk'] = param['size'] param['hlr_bulge'] = param['size'] * param['bfrac'] # SED coefficients param['coeff'] = gals['coeff'][igals] # TEST no redening and no extinction param['av'] = 0.0 param['redden'] = 0 pcs = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc/sedlibs/pcs.h5'), "r") lamb = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc/sedlibs/lamb.h5'), "r") lamb_gal = lamb['lamb'][()] pcs = pcs['pcs'][()] cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111) factor = 10**(-.4 * cosmo.distmod(param['z']).value) flux = np.matmul(pcs, param['coeff']) * factor # if np.any(flux < 0): # raise ValueError("Glaxy %s: negative SED fluxes"%obj.id) flux[flux < 0] = 0. sedcat = np.vstack((lamb_gal, flux)).T sed_data = getObservedSED( sedCat=sedcat, redshift=param['z'], av=param["av"], redden=param["redden"] ) wave, flux = sed_data[0], sed_data[1] speci = interpolate.interp1d(wave, flux) lamb = np.arange(2000, 11001+0.5, 0.5) y = speci(lamb) # erg/s/cm2/A --> photon/s/m2/A all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13 sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX')) param["sed"] = sed del wave del flux return param def defineCCD(iccd, config_file): with open(config_file, "r") as stream: try: config = yaml.safe_load(stream) #for key, value in config.items(): # print (key + " : " + str(value)) except yaml.YAMLError as exc: print(exc) chip = Chip(chipID=iccd, config=config) chip.img = galsim.ImageF(chip.npix_x, chip.npix_y) focal_plane = FocalPlane(chip_list=[iccd]) chip.img.wcs= focal_plane.getTanWCS(192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale) return chip def defineFilt(chip): filter_param = FilterParam() filter_id, filter_type = chip.getChipFilter() filt = Filter( filter_id=filter_id, filter_type=filter_type, filter_param=filter_param, ccd_bandpass=chip.effCurve) return filt class imagingModule_coverage(unittest.TestCase): def __init__(self, methodName='runTest'): super(imagingModule_coverage, self).__init__(methodName) self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc') self.iccd = 8 def test_imaging(self): config_file = os.path.join(self.dataPath, 'config_test.yaml') chip = defineCCD(self.iccd, config_file) bandpass = defineFilt(chip) filt = defineFilt(chip) print(chip.chipID) print(chip.cen_pix_x, chip.cen_pix_y) obj = _load_gals(os.path.join(self.dataPath, 'galaxies_C6_bundle000287.h5')) #("UNIT_TEST_DATA/galaxies_C6_bundle000287.h5") sed_data = obj['sed'] norm_filt = None obj_sed, obj_mag, obj_flux = convert_sed( mag=obj["mag_use_normal"], sed=sed_data, target_filt=filt, norm_filt=norm_filt, ) pupil_area= np.pi * (0.5 * 2.)**2 exptime = 150. nphotons_tot = obj_flux*pupil_area * exptime #getElectronFluxFilt(filt, tel, exptime) full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full) print(full, nphotons_tot, obj_mag) for i in range(4): sub = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_sub_list[i]) ratio = sub / full nphotons = ratio * nphotons_tot disk = galsim.Sersic(n=obj['disk_sersic_idx'], half_light_radius=obj['hlr_disk'], flux=1.0) disk_shape = galsim.Shear(g1=obj['e1_disk'], g2=obj['e2_disk']) disk = disk.shear(disk_shape) gal_temp = disk gal_temp = gal_temp.withFlux(nphotons) psf = galsim.Gaussian(sigma=0.1, flux=1) gal_temp = galsim.Convolve(psf, gal_temp) if i == 0: gal = gal_temp else: gal = gal + gal_temp print(gal) self.assertTrue( gal != None ) if __name__ == '__main__': unittest.main()