import unittest from scipy import interpolate import astropy.constants as cons from astropy.table import Table import h5py as h5 import sys import os import math from itertools import islice import numpy as np import galsim import yaml import copy from astropy.cosmology import FlatLambdaCDM from astropy import constants from astropy import units as U from observation_sim.mock_objects._util import getObservedSED from observation_sim.mock_objects import CatalogBase, Galaxy from observation_sim.instruments import Chip, Filter, FilterParam, FocalPlane from observation_sim.psf.PSFInterp import PSFInterp from observation_sim.mock_objects._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG class Catalog(CatalogBase): def __init__(self): super().__init__() self.rotation = 0. def load_norm_filt(self, obj): return None def load_sed(self, obj, **kward): pcs = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc/sedlibs/pcs.h5'), "r") lamb = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc/sedlibs/lamb.h5'), "r") lamb_gal = lamb['lamb'][()] pcs = pcs['pcs'][()] cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111) factor = 10**(-.4 * cosmo.distmod(obj.param['z']).value) flux = np.matmul(pcs, obj.param['coeff']) * factor # if np.any(flux < 0): # raise ValueError("Glaxy %s: negative SED fluxes"%obj.id) flux[flux < 0] = 0. sedcat = np.vstack((lamb_gal, flux)).T sed_data = getObservedSED( sedCat=sedcat, redshift=obj.param['z'], av=obj.param["av"], redden=obj.param["redden"] ) wave, flux = sed_data[0], sed_data[1] speci = interpolate.interp1d(wave, flux) lamb = np.arange(2000, 11001+0.5, 0.5) y = speci(lamb) # erg/s/cm2/A --> photon/s/m2/A all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13 sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX')) # obj.param["sed"] = sed del wave del flux return sed def _load(self, file_path): gals_cat = h5.File(file_path, 'r')['galaxies'] for ikeys in gals_cat.keys(): gals = gals_cat[ikeys] param = self.initialize_param() igals = 9 param['z'] = gals['redshift'][igals] param['mag_use_normal'] = gals['mag_csst_g'][igals] print(param['mag_use_normal']) param['e1'] = gals['ellipticity_true'][igals][0] param['e2'] = gals['ellipticity_true'][igals][1] # For shape calculation param['e1'], param['e2'], param['ell_total'] = self.rotate_ellipticity( e1=gals['ellipticity_true'][igals][0], e2=gals['ellipticity_true'][igals][1], rotation=self.rotation, unit='radians') param['e1_disk'] = param['e1'] param['e2_disk'] = param['e2'] param['e1_bulge'] = param['e1'] param['e2_bulge'] = param['e2'] # Masses param['bulgemass'] = gals['bulgemass'][igals] param['diskmass'] = gals['diskmass'][igals] param['size'] = gals['size'][igals] # Sersic index param['disk_sersic_idx'] = 1. param['bulge_sersic_idx'] = 4. # Sizes param['bfrac'] = param['bulgemass'] / \ (param['bulgemass'] + param['diskmass']) if param['bfrac'] >= 0.6: param['hlr_bulge'] = param['size'] param['hlr_disk'] = param['size'] * (1. - param['bfrac']) else: param['hlr_disk'] = param['size'] param['hlr_bulge'] = param['size'] * param['bfrac'] # SED coefficients param['coeff'] = gals['coeff'][igals] # TEST no redening and no extinction param['av'] = 0.0 param['redden'] = 0 obj = Galaxy(param) return obj def defineCCD(iccd, config_file): with open(config_file, "r") as stream: try: config = yaml.safe_load(stream) # for key, value in config.items(): # print (key + " : " + str(value)) except yaml.YAMLError as exc: print(exc) chip = Chip(chipID=iccd, config=config) chip.img = galsim.ImageF(chip.npix_x, chip.npix_y) focal_plane = FocalPlane(chip_list=[iccd]) chip.img.wcs = focal_plane.getTanWCS( 192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale) return chip def defineFilt(chip): filter_param = FilterParam() filter_id, filter_type = chip.getChipFilter() filt = Filter( filter_id=filter_id, filter_type=filter_type, filter_param=filter_param, ccd_bandpass=chip.effCurve) return filt class imagingModule_coverage(unittest.TestCase): def __init__(self, methodName='runTest'): super(imagingModule_coverage, self).__init__(methodName) self.dataPath = os.path.join( os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc') self.iccd = 8 def test_imaging(self): config_file = os.path.join(self.dataPath, 'config_test.yaml') chip = defineCCD(self.iccd, config_file) bandpass = defineFilt(chip) filt = defineFilt(chip) print(chip.chipID) print(chip.cen_pix_x, chip.cen_pix_y) catalog = Catalog() obj = catalog._load(os.path.join( self.dataPath, 'galaxies_C6_bundle000287.h5')) sed_data = catalog.load_sed(obj) norm_filt = catalog.load_norm_filt(obj) obj_sed, obj_mag, obj_flux = catalog.convert_sed( mag=obj.param["mag_use_normal"], sed=sed_data, target_filt=filt, norm_filt=norm_filt, ) pupil_area = np.pi * (0.5 * 2.)**2 exptime = 150. # getElectronFluxFilt(filt, tel, exptime) nphotons_tot = obj_flux*pupil_area * exptime full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full) print(full, nphotons_tot, obj_mag) for i in range(4): sub = integrate_sed_bandpass( sed=obj_sed, bandpass=filt.bandpass_sub_list[i]) ratio = sub / full nphotons = ratio * nphotons_tot disk = galsim.Sersic( n=obj.param['disk_sersic_idx'], half_light_radius=obj.param['hlr_disk'], flux=1.0) disk_shape = galsim.Shear( g1=obj.param['e1_disk'], g2=obj.param['e2_disk']) disk = disk.shear(disk_shape) gal_temp = disk gal_temp = gal_temp.withFlux(nphotons) psf = galsim.Gaussian(sigma=0.1, flux=1) gal_temp = galsim.Convolve(psf, gal_temp) if i == 0: gal = gal_temp else: gal = gal + gal_temp print(gal) self.assertTrue(gal != None) if __name__ == '__main__': unittest.main()