Newer
Older
import unittest
from scipy import interpolate
import astropy.constants as cons
from astropy.table import Table
import h5py as h5
import sys
import os
import math
from itertools import islice
import numpy as np
import galsim
import yaml
import copy
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
from ObservationSim.MockObject._util import getObservedSED
from ObservationSim.MockObject import CatalogBase, Galaxy
from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
from ObservationSim.PSF.PSFInterp import PSFInterp
from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG
class Catalog(CatalogBase):
def __init__(self):
super().__init__()
self.rotation = 0.
def load_norm_filt(self, obj):
return None
def load_sed(self, obj, **kward):
pcs = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'),
'csst_msc_sim/csst_fz_msc/sedlibs/pcs.h5'), "r")
lamb = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'),
'csst_msc_sim/csst_fz_msc/sedlibs/lamb.h5'), "r")
lamb_gal = lamb['lamb'][()]
pcs = pcs['pcs'][()]
cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
factor = 10**(-.4 * cosmo.distmod(obj.param['z']).value)
flux = np.matmul(pcs, obj.param['coeff']) * factor
# if np.any(flux < 0):
# raise ValueError("Glaxy %s: negative SED fluxes"%obj.id)
flux[flux < 0] = 0.
sedcat = np.vstack((lamb_gal, flux)).T
sed_data = getObservedSED(
sedCat=sedcat,
redshift=obj.param['z'],
av=obj.param["av"],
redden=obj.param["redden"]
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
wave, flux = sed_data[0], sed_data[1]
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 11001+0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photon/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
# obj.param["sed"] = sed
del wave
del flux
return sed
def _load(self, file_path):
gals_cat = h5.File(file_path, 'r')['galaxies']
for ikeys in gals_cat.keys():
gals = gals_cat[ikeys]
param = self.initialize_param()
igals = 9
param['z'] = gals['redshift'][igals]
param['mag_use_normal'] = gals['mag_csst_g'][igals]
print(param['mag_use_normal'])
param['e1'] = gals['ellipticity_true'][igals][0]
param['e2'] = gals['ellipticity_true'][igals][1]
# For shape calculation
param['e1'], param['e2'], param['ell_total'] = self.rotate_ellipticity(
e1=gals['ellipticity_true'][igals][0],
e2=gals['ellipticity_true'][igals][1],
rotation=self.rotation,
unit='radians')
param['e1_disk'] = param['e1']
param['e2_disk'] = param['e2']
param['e1_bulge'] = param['e1']
param['e2_bulge'] = param['e2']
# Masses
param['bulgemass'] = gals['bulgemass'][igals]
param['diskmass'] = gals['diskmass'][igals]
param['size'] = gals['size'][igals]
# Sersic index
param['disk_sersic_idx'] = 1.
param['bulge_sersic_idx'] = 4.
# Sizes
param['bfrac'] = param['bulgemass'] / \
(param['bulgemass'] + param['diskmass'])
if param['bfrac'] >= 0.6:
param['hlr_bulge'] = param['size']
param['hlr_disk'] = param['size'] * (1. - param['bfrac'])
else:
param['hlr_disk'] = param['size']
param['hlr_bulge'] = param['size'] * param['bfrac']
# SED coefficients
param['coeff'] = gals['coeff'][igals]
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
obj = Galaxy(param)
return obj
def defineCCD(iccd, config_file):
with open(config_file, "r") as stream:
try:
config = yaml.safe_load(stream)
# for key, value in config.items():
# print (key + " : " + str(value))
except yaml.YAMLError as exc:
print(exc)
chip = Chip(chipID=iccd, config=config)
chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
focal_plane = FocalPlane(chip_list=[iccd])
chip.img.wcs = focal_plane.getTanWCS(
192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
def defineFilt(chip):
filter_param = FilterParam()
filter_id, filter_type = chip.getChipFilter()
filt = Filter(
filter_id=filter_id,
filter_type=filter_type,
filter_param=filter_param,
ccd_bandpass=chip.effCurve)
return filt
class imagingModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(imagingModule_coverage, self).__init__(methodName)
self.dataPath = os.path.join(
os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc')
self.iccd = 8
def test_imaging(self):
config_file = os.path.join(self.dataPath, 'config_test.yaml')
chip = defineCCD(self.iccd, config_file)
bandpass = defineFilt(chip)
filt = defineFilt(chip)
print(chip.chipID)
print(chip.cen_pix_x, chip.cen_pix_y)
catalog = Catalog()
obj = catalog._load(os.path.join(
self.dataPath, 'galaxies_C6_bundle000287.h5'))
sed_data = catalog.load_sed(obj)
norm_filt = catalog.load_norm_filt(obj)
obj_sed, obj_mag, obj_flux = catalog.convert_sed(
mag=obj.param["mag_use_normal"],
sed=sed_data,
target_filt=filt,
norm_filt=norm_filt,
)
pupil_area = np.pi * (0.5 * 2.)**2
# getElectronFluxFilt(filt, tel, exptime)
nphotons_tot = obj_flux*pupil_area * exptime
full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full)
print(full, nphotons_tot, obj_mag)
for i in range(4):
sub = integrate_sed_bandpass(
sed=obj_sed, bandpass=filt.bandpass_sub_list[i])
disk = galsim.Sersic(
n=obj.param['disk_sersic_idx'], half_light_radius=obj.param['hlr_disk'], flux=1.0)
disk_shape = galsim.Shear(
g1=obj.param['e1_disk'], g2=obj.param['e2_disk'])
disk = disk.shear(disk_shape)
gal_temp = disk
gal_temp = gal_temp.withFlux(nphotons)
psf = galsim.Gaussian(sigma=0.1, flux=1)
gal_temp = galsim.Convolve(psf, gal_temp)
if i == 0:
gal = gal_temp
else:
gal = gal + gal_temp
print(gal)
self.assertTrue(gal != None)