Commit d9387ab4 authored by Wei Chengliang's avatar Wei Chengliang
Browse files

append unittest

parent a8a8036c
import unittest
from scipy import interpolate
import astropy.constants as cons
from astropy.table import Table
import h5py as h5
import sys,os,math
from itertools import islice
import numpy as np
import galsim
import yaml
import copy
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
from ObservationSim.MockObject._util import getObservedSED
from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
from ObservationSim.PSF.PSFInterp import PSFInterp
from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG
def convert_sed(mag, sed, target_filt, norm_filt=None):
bandpass = target_filt.bandpass_full
if norm_filt is not None:
norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001
else:
norm_filt = Table(
np.array(np.array([bandpass.wave_list*10.0, bandpass.func(bandpass.wave_list)])).T, names=(['WAVELENGTH', 'SENSITIVITY'])
)
norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001
sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=mag,
spectrum=sed,
norm_thr=norm_filt,
sWave=np.floor(norm_filt[norm_thr_rang_ids][0][0]),
eWave=np.ceil(norm_filt[norm_thr_rang_ids][-1][0]))
sed_photon = copy.copy(sed)
sed_photon = np.array([sed_photon['WAVELENGTH'], sed_photon['FLUX']*sedNormFactor]).T
sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest')
# Get magnitude
sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False)
interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=bandpass)
mag_csst = getABMAG(
interFlux=interFlux,
bandpass=bandpass
)
if target_filt.survey_type == "photometric":
return sed_photon, mag_csst, interFlux
elif target_filt.survey_type == "spectroscopic":
del sed_photon
return sed, mag_csst, interFlux
def _load_gals(file_path):
gals_cat = h5.File(file_path, 'r')['galaxies']
for ikeys in gals_cat.keys():
gals = gals_cat[ikeys]
param = {}
igals = 9
param['z'] = gals['redshift'][igals]
param['mag_use_normal'] = gals['mag_csst_g'][igals]
print(param['mag_use_normal'] )
param['e1'] = gals['ellipticity_true'][igals][0]
param['e2'] = gals['ellipticity_true'][igals][1]
param['e1_disk'] = param['e1']
param['e2_disk'] = param['e2']
param['e1_bulge'] = param['e1']
param['e2_bulge'] = param['e2']
# Masses
param['bulgemass'] = gals['bulgemass'][igals]
param['diskmass'] = gals['diskmass'][igals]
param['size'] = gals['size'][igals]
# Sersic index
param['disk_sersic_idx'] = 1.
param['bulge_sersic_idx'] = 4.
# Sizes
param['bfrac'] = param['bulgemass']/(param['bulgemass'] + param['diskmass'])
if param['bfrac'] >= 0.6:
param['hlr_bulge'] = param['size']
param['hlr_disk'] = param['size'] * (1. - param['bfrac'])
else:
param['hlr_disk'] = param['size']
param['hlr_bulge'] = param['size'] * param['bfrac']
# SED coefficients
param['coeff'] = gals['coeff'][igals]
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
pcs = h5.File("/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/sedlibs/"+"pcs.h5", "r")
lamb = h5.File("/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/sedlibs/"+"lamb.h5", "r")
lamb_gal = lamb['lamb'][()]
pcs = pcs['pcs'][()]
cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
factor = 10**(-.4 * cosmo.distmod(param['z']).value)
flux = np.matmul(pcs, param['coeff']) * factor
# if np.any(flux < 0):
# raise ValueError("Glaxy %s: negative SED fluxes"%obj.id)
flux[flux < 0] = 0.
sedcat = np.vstack((lamb_gal, flux)).T
sed_data = getObservedSED(
sedCat=sedcat,
redshift=param['z'],
av=param["av"],
redden=param["redden"]
)
wave, flux = sed_data[0], sed_data[1]
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 11001+0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photon/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
param["sed"] = sed
del wave
del flux
return param
def defineCCD(iccd, config_file):
with open(config_file, "r") as stream:
try:
config = yaml.safe_load(stream)
#for key, value in config.items():
# print (key + " : " + str(value))
except yaml.YAMLError as exc:
print(exc)
chip = Chip(chipID=iccd, config=config)
chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
focal_plane = FocalPlane(chip_list=[iccd])
chip.img.wcs= focal_plane.getTanWCS(192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
return chip
def defineFilt(chip):
filter_param = FilterParam()
filter_id, filter_type = chip.getChipFilter()
filt = Filter(
filter_id=filter_id,
filter_type=filter_type,
filter_param=filter_param,
ccd_bandpass=chip.effCurve)
return filt
class imagingModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(imagingModule_coverage, self).__init__(methodName)
self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1')
self.iccd = 8
def test_imaging(self):
config_file = os.path.join(self.dataPath, 'config_test.yaml')
chip = defineCCD(self.iccd, config_file)
bandpass = defineFilt(chip)
filt = defineFilt(chip)
print(chip.chipID)
print(chip.cen_pix_x, chip.cen_pix_y)
obj = _load_gals("UNIT_TEST_DATA/galaxies_C6_bundle000287.h5")
sed_data = obj['sed']
norm_filt = None
obj_sed, obj_mag, obj_flux = convert_sed(
mag=obj["mag_use_normal"],
sed=sed_data,
target_filt=filt,
norm_filt=norm_filt,
)
pupil_area= np.pi * (0.5 * 2.)**2
exptime = 150.
nphotons_tot = obj_flux*pupil_area * exptime #getElectronFluxFilt(filt, tel, exptime)
full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full)
print(full, nphotons_tot, obj_mag)
for i in range(4):
sub = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_sub_list[i])
ratio = sub / full
nphotons = ratio * nphotons_tot
disk = galsim.Sersic(n=obj['disk_sersic_idx'], half_light_radius=obj['hlr_disk'], flux=1.0)
disk_shape = galsim.Shear(g1=obj['e1_disk'], g2=obj['e2_disk'])
disk = disk.shear(disk_shape)
gal_temp = disk
gal_temp = gal_temp.withFlux(nphotons)
psf = galsim.Gaussian(sigma=0.1, flux=1)
gal_temp = galsim.Convolve(psf, gal_temp)
if i == 0:
gal = gal_temp
else:
gal = gal + gal_temp
print(gal)
self.assertTrue( gal != None )
if __name__ == '__main__':
unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment