Commit 29ac5666 authored by Fang Yuedong's avatar Fang Yuedong
Browse files

Baseline

parents
import numpy as np
import os
vc_A = 2.99792458e+18 # speed of light: A/s
vc_m = 2.99792458e+8 # speed of light: m/s
h_Plank = 6.626196e-27 # Plank constant: erg s
def photonEnergy(lambd):
""" The energy of photon at a given wavelength
Parameter:
lambd: the wavelength in unit of Angstrom
Return:
eph: energy of photon in unit of erg
"""
nu = vc_A / lambd
eph = h_Plank * nu
return eph
\ No newline at end of file
import os
import numpy as np
import random
import galsim
import h5py as h5
import healpy as hp
from .Star import Star
from .Galaxy import Galaxy
from .Quasar import Quasar
from ._util import seds, sed_assign, extAv
from astropy.table import Table
from astropy.coordinates import spherical_to_cartesian
nside = 128
class Catalog(object):
def __init__(self, config, chip, cat_dir, sed_dir, pRa, pDec, rotation=None, template_dir=None):
self.cat_dir = cat_dir
self.sed_dir = sed_dir
self.chip = chip
self.template_dir = template_dir
self.rng_sedGal = random.Random()
self.rng_sedStar = random.Random()
self.rng_sedGal.seed(config["seed_gal"])
self.rng_sedStar.seed(config["seed_star"])
self.seed_Av = config["seed_Av"]
self.ud = galsim.UniformDeviate(config["seed_mock"])
self.pRa = float('{:8.4f}'.format(pRa))
self.pDec = float('{:8.4f}'.format(pDec))
# star_file = 'stars_ccd' + chip.getChipLabel(chip.chipID) + '_p_RA'+str(self.pRa) + '_DE' + str(self.pDec) + '.hdf5'
# galaxy_file = 'galaxies_ccd' + chip.getChipLabel(chip.chipID) + '_p_RA'+str(self.pRa) + '_DE' + str(self.pDec) + '.hdf5'
# star_file = 'MMW_Cluster_D20_10_healpix.hdf5'
star_file = 'MMW_Cluster_D20_healpix_21.hdf5'
galaxy_file = 'galaxyCats_r_10.0_healpix.hdf5'
star_SED_file = 'SpecLib.hdf5'
self.star_path = os.path.join(cat_dir, star_file)
self.galaxy_path = os.path.join(cat_dir, galaxy_file)
self.star_SED_path = os.path.join(cat_dir, star_SED_file)
self.rotation = rotation
self._load_SED_lib_gals()
self._load_SED_lib_star()
# self._load_SED_info()
self._get_healpix_list()
self._load()
def _get_healpix_list(self):
self.sky_coverage = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.2)
# self.sky_coverage_enlarged = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.5)
ra_min, ra_max, dec_min, dec_max = self.sky_coverage.xmin, self.sky_coverage.xmax, self.sky_coverage.ymin, self.sky_coverage.ymax
ra = np.deg2rad(np.array([ra_min, ra_max, ra_max, ra_min]))
dec = np.deg2rad(np.array([dec_max, dec_max, dec_min, dec_min]))
# phi, theta = ra, np.pi/2. - dec
# vertices_pix = np.unique(hp.ang2pix(nside, theta, phi))
# print(vertices_pix)
vertices = spherical_to_cartesian(1., dec, ra)
self.pix_list = hp.query_polygon(nside, np.array(vertices).T, inclusive=True)
print("HEALPix List: ", self.pix_list)
def _load_SED_lib_star(self):
self.tempSED_star = h5.File(self.star_SED_path,'r')
def _load_SED_lib_gals(self):
tempdir_gal = os.path.join(self.template_dir, "Galaxy/")
# tempdir_star = os.path.join(self.template_dir, "PicklesStars/")
self.tempSed_gal, self.tempRed_gal = seds("galaxy.list", seddir=tempdir_gal)
# self.tempSed_star, self.tempRed_star = seds("star.list", seddir=tempdir_star)
def _load_SED_info(self):
sed_info_file = os.path.join(self.sed_dir, "sed.info")
sed_info = Table.read(sed_info_file, format="ascii")
self.cosids = sed_info["IDCosmoDC2"]
self.objtypes = sed_info["objType"]
def _load_gals(self, gals):
ngals = len(gals['galaxyID'])
# print(ngals)
for igals in range(ngals):
param = {}
param['ra'] = gals['ra_true'][igals]
param['dec'] = gals['dec_true'][igals]
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
param['mag_use_normal'] = gals['mag_true_g_lsst'][igals]
if param['mag_use_normal'] >= 26.5:
continue
self.ids += 1
param['id'] = self.ids
param['z'] = gals['redshift_true'][igals]
param['model_tag'] = 'None'
param['gamma1'] = 0
param['gamma2'] = 0
param['kappa'] = 0
param['delta_ra'] = 0
param['delta_dec'] = 0
sersicB = gals['sersic_bulge'][igals]
hlrMajB = gals['size_bulge_true'][igals]
hlrMinB = gals['size_minor_bulge_true'][igals]
sersicD = gals['sersic_disk'][igals]
hlrMajD = gals['size_disk_true'][igals]
hlrMinD = gals['size_minor_disk_true'][igals]
aGal = gals['size_true'][igals]
bGal = gals['size_minor_true'][igals]
param['bfrac'] = gals['bulge_to_total_ratio_i'][igals]
param['theta'] = gals['position_angle_true'][igals]
param['hlr_bulge'] = np.sqrt(hlrMajB * hlrMinB)
param['hlr_disk'] = np.sqrt(hlrMajD * hlrMinD)
param['ell_bulge'] = (hlrMajB - hlrMinB)/(hlrMajB + hlrMinB)
param['ell_disk'] = (hlrMajD - hlrMinD)/(hlrMajD + hlrMinD)
param['ell_tot'] = (aGal - bGal) / (aGal + bGal)
# Assign each galaxy a template SED
param['sed_type'] = sed_assign(phz=param['z'], btt=param['bfrac'], rng=self.rng_sedGal)
param['redden'] = self.tempRed_gal[param['sed_type']]
param['av'] = self.avGal[int(self.ud()*self.nav)]
if param['sed_type'] <= 5:
param['av'] = 0.0
param['redden'] = 0
param['star'] = 0 # Galaxy
if param['sed_type'] >= 29:
param['av'] = 0.6 * param['av'] / 3.0 # for quasar, av=[0, 0.2], 3.0=av.max-av.im
param['star'] = 2 # Quasar
if param['star'] == 0:
obj = Galaxy(param, self.rotation)
self.objs.append(obj)
if param['star'] == 2:
obj = Quasar(param)
self.objs.append(obj)
def _load_stars(self, stars):
nstars = len(stars['sourceID'])
# print(nstars)
for istars in range(nstars):
param = {}
param['ra'] = stars['RA'][istars]
param['dec'] = stars['Dec'][istars]
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
param['mag_use_normal'] = stars['app_sdss_g'][istars]
if param['mag_use_normal'] >= 26.5:
continue
self.ids += 1
param['id'] = self.ids
param['sed_type'] = stars['sourceID'][istars]
param['model_tag'] = stars['model_tag'][istars]
param['teff'] = stars['teff'][istars]
param['logg'] = stars['grav'][istars]
param['feh'] = stars['feh'][istars]
param['z'] = 0.0
param['star'] = 1 # Star
obj = Star(param)
self.objs.append(obj)
def _load(self):
self.nav = 15005
self.avGal = extAv(self.nav, seed=self.seed_Av)
# gals = Table.read(self.galaxy_path)
# stars = Table.read(self.star_path)
# self.ngals = gals['galaxyID'].size
# self.nstars = stars['sourceID'].size
gals_cat = h5.File(self.galaxy_path, 'r')['galaxies']
star_cat = h5.File(self.star_path, 'r')['catalog']
self.objs = []
self.ids = 0
for pix in self.pix_list:
gals = gals_cat[str(pix)]
stars = star_cat[str(pix)]
self._load_gals(gals)
self._load_stars(stars)
print("number of objects in catalog: ", len(self.objs))
del self.avGal
# for igals in range(self.ngals):
# param = {}
# param['id'] = igals + 1
# param['ra'] = gals['ra_true'][igals]
# param['dec'] = gals['dec_true'][igals]
# param['z'] = gals['redshift_true'][igals]
# param['model_tag'] = 'None'
# param['gamma1'] = 0
# param['gamma2'] = 0
# param['kappa'] = 0
# param['delta_ra'] = 0
# param['delta_dec'] = 0
# sersicB = gals['sersic_bulge'][igals]
# hlrMajB = gals['size_bulge_true'][igals]
# hlrMinB = gals['size_minor_bulge_true'][igals]
# sersicD = gals['sersic_disk'][igals]
# hlrMajD = gals['size_disk_true'][igals]
# hlrMinD = gals['size_minor_disk_true'][igals]
# aGal = gals['size_true'][igals]
# bGal = gals['size_minor_true'][igals]
# param['bfrac'] = gals['bulge_to_total_ratio_i'][igals]
# param['theta'] = gals['position_angle_true'][igals]
# param['hlr_bulge'] = np.sqrt(hlrMajB * hlrMinB)
# param['hlr_disk'] = np.sqrt(hlrMajD * hlrMinD)
# param['ell_bulge'] = (hlrMajB - hlrMinB)/(hlrMajB + hlrMinB)
# param['ell_disk'] = (hlrMajD - hlrMinD)/(hlrMajD + hlrMinD)
# param['ell_tot'] = (aGal - bGal) / (aGal + bGal)
# # Assign each galaxy a template SED
# param['sed_type'] = sed_assign(phz=param['z'], btt=param['bfrac'], rng=self.rng_sedGal)
# param['redden'] = self.tempRed_gal[param['sed_type']]
# param['av'] = avGal[int(self.ud()*nav)]
# if param['sed_type'] <= 5:
# param['av'] = 0.0
# param['redden'] = 0
# param['star'] = 0 # Galaxy
# if param['sed_type'] >= 29:
# param['av'] = 0.6 * param['av'] / 3.0 # for quasar, av=[0, 0.2], 3.0=av.max-av.im
# param['star'] = 2 # Quasar
# param['mag_use_normal'] = gals['mag_true_g_lsst'][igals]
# if param['star'] == 0:
# obj = Galaxy(param, self.rotation)
# self.objs.append(obj)
# if param['star'] == 2:
# obj = Quasar(param)
# self.objs.append(obj)
# for istars in range(self.nstars):
# param = {}
# param['id'] = self.ngals + istars + 1
# param['ra'] = stars['RA'][istars]
# param['dec'] = stars['Dec'][istars]
# param['sed_type'] = stars['sourceID'][istars]
# param['model_tag'] = stars['model_tag'][istars]
# param['z'] = 0.0
# param['star'] = 1 # Star
# param['mag_use_normal'] = stars['app_sdss_g'][istars]
# obj = Star(param)
# self.objs.append(obj)
# del gals
# del stars
# del avGal
\ No newline at end of file
import MockObject
class CosmicRay(MockObject):
pass
\ No newline at end of file
import numpy as np
import galsim
import os, sys
import astropy.constants as cons
from astropy.table import Table
from ._util import eObs, integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getObservedSED, getABMAG
from .SpecDisperser import SpecDisperser
from .MockObject import MockObject
from scipy import interpolate
class Galaxy(MockObject):
def __init__(self, param, rotation=None):
super().__init__(param)
self.thetaR = self.param["theta"]
self.bfrac = self.param["bfrac"]
self.hlr_disk = self.param["hlr_disk"]
self.hlr_bulge = self.param["hlr_bulge"]
# Extract ellipticity components
self.e_disk = galsim.Shear(g=self.param["ell_disk"], beta=self.thetaR*galsim.degrees)
self.e_bulge = galsim.Shear(g=self.param["ell_bulge"], beta=self.thetaR*galsim.degrees)
self.e_total = galsim.Shear(g=self.param["ell_tot"], beta=self.thetaR*galsim.degrees)
self.e1_disk, self.e2_disk = self.e_disk.g1, self.e_disk.g2
self.e1_bulge, self.e2_bulge = self.e_bulge.g1, self.e_bulge.g2
self.e1_total, self.e2_total = self.e_total.g1, self.e_total.g2
if rotation is not None:
self.rotateEllipticity(rotation)
def load_SED(self, survey_type, sed_path, cosids=None, objtypes=None, sed_templates=None, normFilter=None, target_filt=None):
if survey_type == "photometric":
norm_thr_rang_ids = normFilter['SENSITIVITY'] > 0.001
if sed_templates is None:
# Read SED data directly
itype = objtypes[cosids==self.sed_type][0]
sed_file = os.path.join(sed_path, itype + "_ID%s.sed"%(self.sed_type))
if not os.path.exists(sed_file):
raise ValueError("!!! No SED found.")
sed_data = Table.read(sed_file, format="ascii")
wave, flux = sed_data["observedLambda"].data, sed_data["observedFlux"].data
else:
# Load SED from templates
sed_data = sed_templates[self.sed_type]
# redshift, intrinsic extinction
sed_data = getObservedSED(
sedCat=sed_data,
redshift=self.z,
av=self.param['av'],
redden=self.param['redden'])
wave, flux = sed_data[0], sed_data[1]
flux_photon = flux * (wave / (cons.h.value * cons.c.value)) * 1e-13
sed_photon = Table(np.array([wave, flux_photon]).T, names=('WAVELENGTH', 'FLUX'))
# Get scaling factor for SED
sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=self.param['mag_use_normal'],
spectrum=sed_photon,
norm_thr=normFilter,
sWave=np.floor(normFilter[norm_thr_rang_ids][0][0]),
eWave=np.ceil(normFilter[norm_thr_rang_ids][-1][0]))
sed_photon = np.array([sed_photon['WAVELENGTH'], sed_photon['FLUX']*sedNormFactor]).T
# Convert to galsim.SED object
spec = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest')
self.sed = galsim.SED(spec, wave_type='A', flux_type='1', fast=False)
# Get magnitude
interFlux = integrate_sed_bandpass(sed=self.sed, bandpass=target_filt.bandpass_full)
self.param['mag_%s'%target_filt.filter_type] = getABMAG(
interFlux=interFlux,
bandpass=target_filt.bandpass_full)
# print('mag_use_normal = ', self.param['mag_use_normal'])
# print('mag_%s = '%target_filt.filter_type, self.param['mag_%s'%target_filt.filter_type])
# print('redshift = %.3f'%(self.z))
# print('sed_type = %d, av = %.2f, redden = %d'%(self.sed_type, self.param['av'], self.param['redden']))
elif survey_type == "spectroscopic":
if sed_templates is None:
self.sedPhotons(sed_path=sed_path, cosids=cosids, objtypes=objtypes)
else:
sed_data = sed_templates[self.sed_type]
sed_data = getObservedSED(
sedCat=sed_data,
redshift=self.z,
av=self.param['av'],
redden=self.param['redden'])
speci = interpolate.interp1d(sed_data[0], sed_data[1])
lamb = np.arange(2500, 10001 + 0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photo/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
self.sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
def unload_SED(self):
"""(Test) free up SED memory
"""
del self.sed
def sedPhotons(self, sed_path, cosids, objtypes):
itype = objtypes[cosids == self.sed_type][0]
sed_file = os.path.join(sed_path, itype + "_ID%s.sed" % (self.sed_type))
if not os.path.exists(sed_file):
raise ValueError("!!! No SED found.")
sed = Table.read(sed_file, format="ascii")
spec_data = {}
f_orig = sed["observedFlux"].data
w_orig = sed["observedLambda"].data
speci = interpolate.interp1d(w_orig, f_orig)
lamb = np.arange(2500, 10001 + 0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photo/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
self.sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
def getGSObj_multiband(self, tel, psf_list, bandpass_list, filt, nphotons_tot=None, g1=0, g2=0, exptime=150.):
if len(psf_list) != len(bandpass_list):
raise ValueError("!!!The number of PSF profiles and the number of bandpasses must be equal.")
objs = []
if nphotons_tot == None:
nphotons_tot = self.getElectronFluxFilt(filt, tel, exptime)
# print("nphotons_tot = ", nphotons_tot)
try:
full = integrate_sed_bandpass(sed=self.sed, bandpass=filt.bandpass_full)
except Exception as e:
print(e)
return -1
for i in range(len(bandpass_list)):
bandpass = bandpass_list[i]
try:
sub = integrate_sed_bandpass(sed=self.sed, bandpass=bandpass)
except Exception as e:
print(e)
return -1
ratio = sub/full
if not (ratio == -1 or (ratio != ratio)):
nphotons = ratio * nphotons_tot
else:
return -1
psf = psf_list[i]
disk = galsim.Sersic(n=1.0, half_light_radius=self.hlr_disk, flux=1.0)
disk_shape = galsim.Shear(g1=self.e1_disk, g2=self.e2_disk)
disk = disk.shear(disk_shape)
bulge = galsim.Sersic(n=4.0, half_light_radius=self.hlr_bulge, flux=1.0)
bulge_shape = galsim.Shear(g1=self.e1_bulge, g2=self.e2_bulge)
bulge = bulge.shear(bulge_shape)
gal = self.bfrac * bulge + (1.0 - self.bfrac) * disk
gal = gal.withFlux(nphotons)
gal_shear = galsim.Shear(g1=g1, g2=g2)
gal = gal.shear(gal_shear)
gal = galsim.Convolve(psf, gal)
objs.append(gal)
final = galsim.Sum(objs)
return final
def drawObj_multiband(self, tel, pos_img, psf_model, bandpass_list, filt, chip, nphotons_tot=None, g1=0, g2=0, exptime=150.):
if nphotons_tot == None:
nphotons_tot = self.getElectronFluxFilt(filt, tel, exptime)
# print("nphotons_tot = ", nphotons_tot)
try:
full = integrate_sed_bandpass(sed=self.sed, bandpass=filt.bandpass_full)
except Exception as e:
print(e)
return False
nphotons_sum = 0
photons_list = []
xmax, ymax = 0, 0
# print('hlr_disk = %.4f, hlr_bulge = %.4f'%(self.hlr_disk, self.hlr_bulge))
big_galaxy = False
if self.hlr_disk > 3.0: # Very big galaxy
big_galaxy = True
# (TEST) Galsim Parameters
if self.getMagFilter(filt) <= 15 and (not big_galaxy):
folding_threshold = 5.e-4
else:
folding_threshold = 5.e-3
gsp = galsim.GSParams(folding_threshold=folding_threshold)
for i in range(len(bandpass_list)):
bandpass = bandpass_list[i]
try:
sub = integrate_sed_bandpass(sed=self.sed, bandpass=bandpass)
except Exception as e:
print(e)
# return False
continue
ratio = sub/full
if not (ratio == -1 or (ratio != ratio)):
nphotons = ratio * nphotons_tot
else:
# return False
continue
nphotons_sum += nphotons
# print("nphotons_sub-band_%d = %.2f"%(i, nphotons))
psf, pos_shear = psf_model.get_PSF(chip=chip, pos_img=pos_img, bandpass=bandpass, folding_threshold=folding_threshold)
disk = galsim.Sersic(n=1.0, half_light_radius=self.hlr_disk, flux=1.0, gsparams=gsp)
disk_shape = galsim.Shear(g1=self.e1_disk, g2=self.e2_disk)
disk = disk.shear(disk_shape)
bulge = galsim.Sersic(n=4.0, half_light_radius=self.hlr_bulge, flux=1.0, gsparams=gsp)
bulge_shape = galsim.Shear(g1=self.e1_bulge, g2=self.e2_bulge)
bulge = bulge.shear(bulge_shape)
gal = self.bfrac * bulge + (1.0 - self.bfrac) * disk
gal = gal.withFlux(nphotons)
gal_shear = galsim.Shear(g1=g1, g2=g2)
gal = gal.shear(gal_shear)
if self.hlr_disk < 10.0: # Not apply PSF for very big galaxy
gal = galsim.Convolve(psf, gal)
# Use (explicit) stamps to draw
stamp = gal.drawImage(wcs=self.localWCS, method='phot', offset=self.offset, save_photons=True)
xmax = max(xmax, stamp.xmax)
ymax = max(ymax, stamp.ymax)
photons = stamp.photons
photons.x += self.x_nominal
photons.y += self.y_nominal
photons_list.append(photons)
# print('xmax = %d, ymax = %d '%(xmax, ymax))
stamp = galsim.ImageF(int(xmax*1.1), int(ymax*1.1))
stamp.wcs = self.localWCS
stamp.setCenter(self.x_nominal, self.y_nominal)
bounds = stamp.bounds & chip.img.bounds
stamp[bounds] = chip.img[bounds]
if not big_galaxy:
for i in range(len(photons_list)):
if i == 0:
chip.sensor.accumulate(photons_list[i], stamp)
else:
chip.sensor.accumulate(photons_list[i], stamp, resume=True)
else:
sensor = galsim.Sensor()
for i in range(len(photons_list)):
if i == 0:
sensor.accumulate(photons_list[i], stamp)
else:
sensor.accumulate(photons_list[i], stamp, resume=True)
# print(stamp.array.sum())
# chip.img[bounds] += stamp[bounds]
chip.img[bounds] = stamp[bounds]
# print("nphotons_sum = ", nphotons_sum)
del photons_list
del stamp
return True, pos_shear
def drawObj_slitless(self, tel, pos_img, psf_model, bandpass_list, filt, chip, nphotons_tot=None, g1=0, g2=0,
exptime=150., normFilter=None):
norm_thr_rang_ids = normFilter['SENSITIVITY'] > 0.001
sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=self.param['mag_use_normal'], spectrum=self.sed,
norm_thr=normFilter,
sWave=np.floor(normFilter[norm_thr_rang_ids][0][0]),
eWave=np.ceil(normFilter[norm_thr_rang_ids][-1][0]))
if sedNormFactor == 0:
return False
normalSED = Table(np.array([self.sed['WAVELENGTH'], self.sed['FLUX'] * sedNormFactor]).T,
names=('WAVELENGTH', 'FLUX'))
big_galaxy = False
if self.hlr_disk > 3.0: # Very big galaxy
big_galaxy = True
if self.getMagFilter(filt) <= 15 and (not big_galaxy):
folding_threshold = 5.e-4
else:
folding_threshold = 5.e-3
gsp = galsim.GSParams(folding_threshold=folding_threshold)
# nphotons_sum = 0
for i in range(len(bandpass_list)):
bandpass = bandpass_list[i]
psf, pos_shear = psf_model.get_PSF(chip=chip, pos_img=pos_img, bandpass=bandpass, folding_threshold=folding_threshold)
disk = galsim.Sersic(n=1.0, half_light_radius=self.hlr_disk, flux=1.0, gsparams=gsp)
disk_shape = galsim.Shear(g1=self.e1_disk, g2=self.e2_disk)
disk = disk.shear(disk_shape)
bulge = galsim.Sersic(n=4.0, half_light_radius=self.hlr_bulge, flux=1.0, gsparams=gsp)
bulge_shape = galsim.Shear(g1=self.e1_bulge, g2=self.e2_bulge)
bulge = bulge.shear(bulge_shape)
gal = self.bfrac * bulge + (1.0 - self.bfrac) * disk
gal = gal.withFlux(tel.pupil_area * exptime)
gal_shear = galsim.Shear(g1=g1, g2=g2)
gal = gal.shear(gal_shear)
gal = galsim.Convolve(psf, gal)
starImg = gal.drawImage(wcs=self.localWCS)
origin_star = [self.y_nominal - (starImg.center.y - starImg.ymin),
self.x_nominal - (starImg.center.x - starImg.xmin)]
# print(self.y_nominal, starImg.center.y, starImg.ymin)
sdp = SpecDisperser(orig_img=starImg, xcenter=self.x_nominal-chip.bound.xmin,
ycenter=self.y_nominal-chip.bound.ymin, origin=origin_star,
tar_spec=normalSED,
band_start=bandpass.blue_limit * 10, band_end=bandpass.red_limit * 10,
conf=chip.sls_conf[1],
isAlongY=0)
spec_orders = sdp.compute_spec_orders()
for k, v in spec_orders.items():
img_s = v[0]
origin_order_x = v[1]
origin_order_y = v[2]
specImg = galsim.ImageF(img_s)
photons = galsim.PhotonArray.makeFromImage(specImg)
photons.x += origin_order_x
photons.y += origin_order_y
xlen_imf = int(specImg.xmax - specImg.xmin + 1)
ylen_imf = int(specImg.ymax - specImg.ymin + 1)
stamp = galsim.ImageF(xlen_imf, ylen_imf)
stamp.wcs = self.localWCS
stamp.setOrigin(origin_order_x, origin_order_y)
bounds = stamp.bounds & chip.img.bounds
if bounds.area() == 0:
continue
stamp[bounds] = chip.img[bounds]
if not big_galaxy:
chip.sensor.accumulate(photons, stamp)
else:
sensor = galsim.Sensor()
sensor.accumulate(photons, stamp)
# chip.sensor.accumulate(photons, stamp)
chip.img[bounds] = stamp[bounds]
del stamp
del sdp
del spec_orders
del psf
return True, pos_shear
def getGSObj(self, psf, g1=0, g2=0, flux=None, filt=None, tel=None, exptime=150.):
if flux == None:
flux = self.getElectronFluxFilt(filt, tel, exptime)
disk = galsim.Sersic(n=1.0, half_light_radius=self.hlr_disk, flux=1.0)
disk_shape = galsim.Shear(g1=self.e1_disk, g2=self.e2_disk)
disk = disk.shear(disk_shape)
bulge = galsim.Sersic(n=4.0, half_light_radius=self.hlr_bulge, flux=1.0)
bulge_shape = galsim.Shear(g1=self.e1_bulge, g2=self.e2_bulge)
bulge = bulge.shear(bulge_shape)
gal = self.bfrac * bulge + (1.0 - self.bfrac) * disk
gal = gal.withFlux(flux)
gal_shear = galsim.Shear(g1=g1, g2=g2)
gal = gal.shear(gal_shear)
final = galsim.Convolve(psf, gal)
return final
def rotateEllipticity(self, rotation):
if rotation == 1:
self.e1_disk, self.e2_disk, self.e1_bulge, self.e2_bulge, self.e1_total, self.e2_total = -self.e2_disk, self.e1_disk, -self.e2_bulge, self.e1_bulge, -self.e2_total, self.e1_total
if rotation == 2:
self.e1_disk, self.e2_disk, self.e1_bulge, self.e2_bulge, self.e1_total, self.e2_total = -self.e1_disk, -self.e2_disk, -self.e1_bulge, -self.e2_bulge, -self.e1_total, -self.e2_total
if rotation == 3:
self.e1_disk, self.e2_disk, self.e1_bulge, self.e2_bulge, self.e1_total, self.e2_total = self.e2_disk, -self.e1_disk, self.e2_bulge, -self.e1_bulge, self.e2_total, -self.e1_total
def drawObject(self, img, final, noise_level=0.0, flux=None, filt=None, tel=None, exptime=150.):
""" Override the method in parent class
Need to constrain the size of image stamp for extended objects
"""
isUpdated = True
if flux == None:
flux = self.getElectronFluxFilt(filt, tel, exptime)
stamp = final.drawImage(wcs=self.localWCS, offset=self.offset)
stamp_arr = stamp.array
mask = (stamp_arr >= 0.001*noise_level) # why 0.001?
err = int(np.sqrt(mask.sum()))
if np.mod(err, 2) == 1:
err += 1
# if err == 1:
if err == 0:
subSize = 16 # why 16?
else:
subSize = max([err, 16])
fluxRatio = flux / stamp_arr[mask].sum()
final = final.withScaledFlux(fluxRatio)
imgSub = galsim.ImageF(subSize, subSize)
# Draw with FFT
# stamp = final.drawImage(image=imgSub, wcs=self.localWCS, offset=self.offset)
# Draw with Photon Shoot
stamp = final.drawImage(image=imgSub, wcs=self.localWCS, method='phot', offset=self.offset)
stamp.setCenter(self.x_nominal, self.y_nominal)
if np.sum(np.isnan(stamp.array)) >= 1:
stamp.setZero()
bounds = stamp.bounds & img.bounds
if bounds.area() == 0:
isUpdated = False
else:
img[bounds] += stamp[bounds]
return img, stamp, isUpdated
def getObservedEll(self, g1=0, g2=0):
e1_obs, e2_obs, e_obs, theta = eObs(self.e1_total, self.e2_total, g1, g2)
return self.e1_total, self.e2_total, g1, g2, e1_obs, e2_obs
\ No newline at end of file
import galsim
import numpy as np
import astropy.constants as cons
from astropy.table import Table
from ._util import magToFlux, vc_A
from ._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getObservedSED, getABMAG
from .SpecDisperser import SpecDisperser
class MockObject(object):
def __init__(self, param):
self.param = param
if self.param["star"] == 0:
self.type = "galaxy"
elif self.param["star"] == 1:
self.type = "star"
elif self.param["star"] == 2:
self.type = "quasar"
self.id = self.param["id"]
self.ra = self.param["ra"]
self.dec = self.param["dec"]
self.z = self.param["z"]
self.sed_type = self.param["sed_type"]
self.model_tag = self.param["model_tag"]
self.mag_use_normal = self.param["mag_use_normal"]
def getMagFilter(self, filt):
if filt.filter_type in ["GI", "GV", "GU"]:
return self.param["mag_use_normal"]
return self.param["mag_%s"%filt.filter_type]
# (TEST) stamp size
# return 13.0
def getNumPhotons(self, flux, tel, exptime=150.):
pupil_area = tel.pupil_area * (100.)**2 # m^2 to cm^2
return flux * pupil_area * exptime
def getElectronFluxFilt(self, filt, tel, exptime=150.):
photonEnergy = filt.getPhotonE()
flux = magToFlux(self.getMagFilter(filt))
factor = 1.0e4 * flux/photonEnergy * vc_A * (1.0/filt.blue_limit - 1.0/filt.red_limit)
return factor * filt.efficiency * tel.pupil_area * exptime
def getPosWorld(self):
ra = self.param["ra"]
dec = self.param["dec"]
return galsim.CelestialCoord(ra=ra*galsim.degrees,dec=dec*galsim.degrees)
def getPosImg_Offset_WCS(self, img, fdmodel=None, chip=None, verbose=True):
self.posImg = img.wcs.toImage(self.getPosWorld())
self.localWCS = img.wcs.local(self.posImg)
if (fdmodel is not None) and (chip is not None):
if verbose:
print("\n")
print("Before field distortion:\n")
print("x = %.2f, y = %.2f\n"%(self.posImg.x, self.posImg.y), flush=True)
self.posImg = fdmodel.get_Distorted(chip=chip, pos_img=self.posImg)
if verbose:
print("After field distortion:\n")
print("x = %.2f, y = %.2f\n"%(self.posImg.x, self.posImg.y), flush=True)
x, y = self.posImg.x + 0.5, self.posImg.y + 0.5
self.x_nominal = int(np.floor(x + 0.5))
self.y_nominal = int(np.floor(y + 0.5))
dx = x - self.x_nominal
dy = y - self.y_nominal
self.offset = galsim.PositionD(dx, dy)
return self.posImg, self.offset, self.localWCS
def drawObject(self, img, final, flux=None, filt=None, tel=None, exptime=150.):
""" Draw (point like) object on img.
Should be overided for extended source, e.g. galaxy...
Paramter:
img: the "canvas"
final: final (after shear, PSF etc.) GSObject
Return:
img: the image with the GSObject added (or discarded)
isUpdated: is the "canvas" been updated? (a flag for updating statistcs)
"""
isUpdated = True
# Draw with FFT
# stamp = final.drawImage(wcs=self.localWCS, offset=self.offset)
# Draw with Photon Shoot
stamp = final.drawImage(wcs=self.localWCS, method='phot', offset=self.offset)
stamp.setCenter(self.x_nominal, self.y_nominal)
if np.sum(np.isnan(stamp.array)) >= 1:
stamp.setZero()
bounds = stamp.bounds & img.bounds
if bounds.area() == 0:
isUpdated = False
else:
img[bounds] += stamp[bounds]
return img, stamp, isUpdated
def drawObj_multiband(self, tel, pos_img, psf_model, bandpass_list, filt, chip, nphotons_tot=None, g1=0, g2=0, exptime=150.):
if nphotons_tot == None:
nphotons_tot = self.getElectronFluxFilt(filt, tel, exptime)
# print("nphotons_tot = ", nphotons_tot)
try:
full = integrate_sed_bandpass(sed=self.sed, bandpass=filt.bandpass_full)
except Exception as e:
print(e)
return False
nphotons_sum = 0
photons_list = []
xmax, ymax = 0, 0
# (TEST) Galsim Parameters
if self.getMagFilter(filt) <= 15:
folding_threshold = 5.e-4
else:
folding_threshold = 5.e-3
gsp = galsim.GSParams(folding_threshold=folding_threshold)
for i in range(len(bandpass_list)):
bandpass = bandpass_list[i]
try:
sub = integrate_sed_bandpass(sed=self.sed, bandpass=bandpass)
except Exception as e:
print(e)
# return False
continue
ratio = sub/full
if not (ratio == -1 or (ratio != ratio)):
nphotons = ratio * nphotons_tot
else:
# return False
continue
nphotons_sum += nphotons
# print("nphotons_sub-band_%d = %.2f"%(i, nphotons))
psf, pos_shear = psf_model.get_PSF(chip=chip, pos_img=pos_img, bandpass=bandpass, folding_threshold=folding_threshold)
star = galsim.DeltaFunction(gsparams=gsp)
star = star.withFlux(nphotons)
star = galsim.Convolve(psf, star)
stamp = star.drawImage(wcs=self.localWCS, method='phot', offset=self.offset, save_photons=True)
xmax = max(xmax, stamp.xmax)
ymax = max(ymax, stamp.ymax)
photons = stamp.photons
photons.x += self.x_nominal
photons.y += self.y_nominal
photons_list.append(photons)
# Test stamp size
# print(xmax, ymax)
stamp = galsim.ImageF(int(xmax*1.1), int(ymax*1.1))
stamp.wcs = self.localWCS
stamp.setCenter(self.x_nominal, self.y_nominal)
bounds = stamp.bounds & chip.img.bounds
stamp[bounds] = chip.img[bounds]
for i in range(len(photons_list)):
if i == 0:
chip.sensor.accumulate(photons_list[i], stamp)
else:
chip.sensor.accumulate(photons_list[i], stamp, resume=True)
chip.img[bounds] = stamp[bounds]
# print(chip.img.array.sum())
# print("nphotons_sum = ", nphotons_sum)
del photons_list
del stamp
return True, pos_shear
def drawObj_slitless(self, tel, pos_img, psf_model, bandpass_list, filt, chip, nphotons_tot=None, g1=0, g2=0,
exptime=150., normFilter=None):
norm_thr_rang_ids = normFilter['SENSITIVITY'] > 0.001
sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=self.param['mag_use_normal'], spectrum=self.sed,
norm_thr=normFilter,
sWave=np.floor(normFilter[norm_thr_rang_ids][0][0]),
eWave=np.ceil(normFilter[norm_thr_rang_ids][-1][0]))
# print(self.x_nominal, self.y_nominal, chip.bound)
if sedNormFactor == 0:
return False
if self.getMagFilter(filt) <= 15:
folding_threshold = 5.e-4
else:
folding_threshold = 5.e-3
gsp = galsim.GSParams(folding_threshold=folding_threshold)
normalSED = Table(np.array([self.sed['WAVELENGTH'], self.sed['FLUX'] * sedNormFactor]).T,
names=('WAVELENGTH', 'FLUX'))
for i in range(len(bandpass_list)):
bandpass = bandpass_list[i]
psf, pos_shear = psf_model.get_PSF(chip=chip, pos_img=pos_img, bandpass=bandpass, folding_threshold=folding_threshold)
star = galsim.DeltaFunction(gsparams=gsp)
star = star.withFlux(tel.pupil_area * exptime)
star = galsim.Convolve(psf, star)
starImg = star.drawImage(nx=100, ny=100, wcs=self.localWCS)
origin_star = [self.y_nominal - (starImg.center.y - starImg.ymin),
self.x_nominal - (starImg.center.x - starImg.xmin)]
sdp = SpecDisperser(orig_img=starImg, xcenter=self.x_nominal-chip.bound.xmin,
ycenter=self.y_nominal-chip.bound.ymin, origin=origin_star,
tar_spec=normalSED,
band_start=bandpass.blue_limit * 10, band_end=bandpass.red_limit * 10,
conf=chip.sls_conf[1],
isAlongY=0)
spec_orders = sdp.compute_spec_orders()
for k, v in spec_orders.items():
img_s = v[0]
origin_order_x = v[1]
origin_order_y = v[2]
specImg = galsim.ImageF(img_s)
photons = galsim.PhotonArray.makeFromImage(specImg)
photons.x += origin_order_x
photons.y += origin_order_y
xlen_imf = int(specImg.xmax - specImg.xmin + 1)
ylen_imf = int(specImg.ymax - specImg.ymin + 1)
stamp = galsim.ImageF(xlen_imf, ylen_imf)
stamp.wcs = self.localWCS
stamp.setOrigin(origin_order_x, origin_order_y)
bounds = stamp.bounds & chip.img.bounds
if bounds.area() == 0:
continue
stamp[bounds] = chip.img[bounds]
chip.sensor.accumulate(photons, stamp)
chip.img[bounds] = stamp[bounds]
del stamp
del sdp
del spec_orders
del psf
return True, pos_shear
def SNRestimate(self, img_obj, flux, noise_level=0.0, seed=31415):
img_flux = img_obj.added_flux
stamp = img_obj.copy() * 0.0
rng = galsim.BaseDeviate(seed)
gaussianNoise = galsim.GaussianNoise(rng, sigma=noise_level)
stamp.addNoise(gaussianNoise)
sig_obj = np.std(stamp.array)
snr_obj = img_flux / sig_obj
return snr_obj
def getObservedEll(self, g1=0, g2=0):
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
\ No newline at end of file
import galsim
import os, sys
import numpy as np
import astropy.constants as cons
from .MockObject import MockObject
from astropy.table import Table
from scipy import interpolate
from ._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getObservedSED, getABMAG
from .MockObject import MockObject
class Quasar(MockObject):
def __init__(self, param):
super().__init__(param)
def load_SED(self, survey_type, sed_path, cosids=None, objtypes=None, sed_templates=None, normFilter=None, target_filt=None):
if survey_type == "photometric":
norm_thr_rang_ids = normFilter['SENSITIVITY'] > 0.001
if sed_templates is None:
# Read SED data directly
itype = objtypes[cosids==self.sed_type][0]
sed_file = os.path.join(sed_path, itype + "_ID%s.sed"%(self.sed_type))
if not os.path.exists(sed_file):
raise ValueError("!!! No SED found.")
sed_data = Table.read(sed_file, format="ascii")
wave, flux = sed_data["observedLambda"].data, sed_data["observedFlux"].data
else:
# Load SED from templates
sed_data = sed_templates[self.sed_type]
# redshift, intrinsic extinction
sed_data = getObservedSED(
sedCat=sed_data,
redshift=self.z,
av=self.param['av'],
redden=self.param['redden'])
wave, flux = sed_data[0], sed_data[1]
flux_photon = flux * (wave / (cons.h.value * cons.c.value)) * 1e-13
sed_photon = Table(np.array([wave, flux_photon]).T, names=('WAVELENGTH', 'FLUX'))
# Get scaling factor for SED
sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=self.param['mag_use_normal'],
spectrum=sed_photon,
norm_thr=normFilter,
sWave=np.floor(normFilter[norm_thr_rang_ids][0][0]),
eWave=np.ceil(normFilter[norm_thr_rang_ids][-1][0]))
sed_photon = np.array([sed_photon['WAVELENGTH'], sed_photon['FLUX']*sedNormFactor]).T
# Convert to galsim.SED object
spec = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest')
self.sed = galsim.SED(spec, wave_type='A', flux_type='1', fast=False)
# Get magnitude
interFlux = integrate_sed_bandpass(sed=self.sed, bandpass=target_filt.bandpass_full)
self.param['mag_%s'%target_filt.filter_type] = getABMAG(
interFlux=interFlux,
bandpass=target_filt.bandpass_full)
# print('mag_use_normal = ', self.param['mag_use_normal'])
# print('mag_%s = '%target_filt.filter_type, self.param['mag_%s'%target_filt.filter_type])
elif survey_type == "spectroscopic":
if sed_templates is None:
self.sedPhotons(sed_path=sed_path, cosids=cosids, objtypes=objtypes)
else:
sed_data = sed_templates[self.sed_type]
sed_data = getObservedSED(
sedCat=sed_data,
redshift=self.z,
av=self.param['av'],
redden=self.param['redden'])
speci = interpolate.interp1d(sed_data[0], sed_data[1])
lamb = np.arange(2500, 10001 + 0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photo/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
self.sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
def unload_SED(self):
"""(Test) free up SED memory
"""
del self.sed
def getGSObj(self, psf, g1=0, g2=0, flux=None, filt=None, tel=None, exptime=150.):
if flux == None:
flux = self.getElectronFluxFilt(filt, tel, exptime)
qso = galsim.Gaussian(sigma=1.e-8, flux=1.)
qso = qso.withFlux(flux)
final = galsim.Convolve(psf, qso)
return final
\ No newline at end of file
from .SpecDisperser import SpecDisperser
from .SpecDisperser import rotate90
import galsim
import numpy as np
from astropy.table import Table
from scipy import interpolate
import galsim
import os
###calculate sky map by sky SED
def calculateSkyMap_split_g(xLen=9232, yLen=9126, blueLimit=4200, redLimit=6500, skyfn='param/skybackground/sky_emiss_hubble_50_50_A.dat', conf=[''], pixelSize=0.074, isAlongY=0,
split_pos=100000):
skyMap = np.ones([yLen, xLen], dtype='float32')
# for i in range(len(conf)):
# conf[i] = os.path.join(SLSSIM_PATH, conf[i])
conf1 = conf[0]
conf2 = conf[0]
if np.size(conf) == 2:
conf2 = conf[1]
if isAlongY == 1:
skyMap = np.ones([xLen, yLen], dtype='float32')
skyImg = galsim.Image(skyMap, xmin=0, ymin=0)
tbstart = blueLimit
tbend = redLimit
fimg = np.zeros_like(skyMap)
fImg = galsim.Image(fimg)
# skyfn = os.path.join(SLSSIM_PATH, skyfn)
skySpec = np.loadtxt(skyfn)
spec = Table(np.array([skySpec[:, 0], skySpec[:, 1]]).T, names=('WAVELENGTH', 'FLUX'))
if isAlongY == 0:
directParm = 0
if isAlongY ==1:
directParm = 1
if split_pos >= skyImg.array.shape[directParm]:
skyImg1 = galsim.Image(skyImg.array)
origin1 = [0, 0]
# sdp = specDisperser.specDisperser(orig_img=skyImg1, xcenter=skyImg1.center.x, ycenter=skyImg1.center.y,
# full_img=fimg, tar_spec=spec, band_start=tbstart, band_end=tbend,
# origin=origin1,
# conf=conf1)
# sdp.compute_spec_orders()
sdp = SpecDisperser(orig_img=skyImg1, xcenter=skyImg1.center.x, ycenter=skyImg1.center.y, origin=origin1,
tar_spec=spec,
band_start=tbstart, band_end=tbend,
conf=conf2)
spec_orders = sdp.compute_spec_orders()
for k, v in spec_orders.items():
img_s = v[0]
origin_order_x = v[1]
origin_order_y = v[2]
ssImg = galsim.ImageF(img_s)
ssImg.setOrigin(origin_order_x, origin_order_y)
bounds = ssImg.bounds & fImg.bounds
fImg[bounds] = fImg[bounds] + ssImg[bounds]
else:
skyImg1 = galsim.Image(skyImg.array[:, 0:split_pos])
origin1 = [0, 0]
skyImg2 = galsim.Image(skyImg.array[:, split_pos:-1])
origin2 = [0, split_pos]
# sdp = specDisperser.specDisperser(orig_img=skyImg1, xcenter=skyImg1.center.x, ycenter=skyImg1.center.y,
# full_img=fimg, tar_spec=spec, band_start=tbstart, band_end=tbend,
# origin=origin1,
# conf=conf1)
# sdp.compute_spec_orders()
sdp = SpecDisperser(orig_img=skyImg1, xcenter=skyImg1.center.x, ycenter=skyImg1.center.y, origin=origin1,
tar_spec=spec,
band_start=tbstart, band_end=tbend,
conf=conf1)
spec_orders = sdp.compute_spec_orders()
for k, v in spec_orders.items():
img_s = v[0]
origin_order_x = v[1]
origin_order_y = v[2]
ssImg = galsim.ImageF(img_s)
ssImg.setOrigin(origin_order_x, origin_order_y)
bounds = ssImg.bounds & fImg.bounds
fImg[bounds] = fImg[bounds] + ssImg[bounds]
sdp = SpecDisperser(orig_img=skyImg2, xcenter=skyImg2.center.x, ycenter=skyImg2.center.y, origin=origin2,
tar_spec=spec,
band_start=tbstart, band_end=tbend,
conf=conf2)
spec_orders = sdp.compute_spec_orders()
for k, v in spec_orders.items():
img_s = v[0]
origin_order_x = v[1]
origin_order_y = v[2]
ssImg = galsim.ImageF(img_s)
ssImg.setOrigin(origin_order_x, origin_order_y)
bounds = ssImg.bounds & fImg.bounds
fImg[bounds] = fImg[bounds] + ssImg[bounds]
if isAlongY == 1:
fimg, tmx, tmy = rotate90(array_orig=fImg.array, xc=0, yc=0, isClockwise=0)
else:
fimg = fImg.array
fimg = fimg * pixelSize * pixelSize
return fimg
def calculateSkyMap(xLen=9232, yLen=9126, blueLimit=4200, redLimit=6500,
skyfn='param/skybackground/sky_emiss_hubble_50_50_A.dat', conf='', pixelSize=0.074, isAlongY=0):
skyMap = np.ones([yLen, xLen], dtype='float32')
if isAlongY == 1:
skyMap = np.ones([xLen, yLen], dtype='float32')
skyImg = galsim.Image(skyMap)
tbstart = blueLimit
tbend = redLimit
fimg = np.zeros_like(skyMap)
fImg = galsim.Image(fimg)
skySpec = np.loadtxt(skyfn)
spec = Table(np.array([skySpec[:, 0], skySpec[:, 1]]).T, names=('WAVELENGTH', 'FLUX'))
sdp = SpecDisperser(orig_img=skyImg, xcenter=skyImg.center.x, ycenter=skyImg.center.y, origin=[1, 1],
tar_spec=spec,
band_start=tbstart, band_end=tbend,
conf=conf)
spec_orders = sdp.compute_spec_orders()
for k, v in spec_orders.items():
img_s = v[0]
origin_order_x = v[1]
origin_order_y = v[2]
ssImg = galsim.ImageF(img_s)
ssImg.setOrigin(origin_order_x, origin_order_y)
bounds = ssImg.bounds & fImg.bounds
fImg[bounds] = fImg[bounds] + ssImg[bounds]
if isAlongY == 1:
fimg, tmx, tmy = rotate90(array_orig=fImg.array, xc=0, yc=0, isClockwise=0)
else:
fimg = fImg.array
fimg = fimg * pixelSize * pixelSize
return fimg
from astropy.table import Table
import astropy.constants as cons
import collections
from collections import OrderedDict
from scipy import interpolate
from astropy import units as u
# from numpy import *
import numpy as np
from pylab import *
import galsim
import sys
import os
def rotate90(array_orig=None, xc=0, yc=0, isClockwise=0):
if array_orig is None:
return
l1 = array_orig.shape[0]
l2 = array_orig.shape[1]
if xc < 0 or xc > l2 - 1 or yc < 0 or yc > l1 - 1:
return
n_xc = xc
n_yc = yc
array_final = np.zeros_like(array_orig.T)
if isClockwise == 1:
for i in np.arange(l2):
array_final[i] = array_orig[:, l2 - i - 1]
n_xc = yc
n_yc = l2 - 1 - xc
else:
for i in np.arange(l2):
array_final[i] = array_orig[::-1, i]
n_xc = l1 - 1 - yc
n_yc = xc
return array_final, n_xc, n_yc
class SpecDisperser(object):
def __init__(self, orig_img=None, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=None, band_start=6200,
band_end=10000, isAlongY=0, conf='../param/CONF/csst.conf', gid=0):
"""
orig_img: normal image,galsim image
xcenter, ycenter: the center of galaxy in orig_img
origin : [int, int]
`origin` defines the lower left pixel index (y,x) of the `direct`
cutout from a larger detector-frame image
tar_spec: galsim.SED
"""
# self.img_x = orig_img.shape[1]
# self.img_y = orig_img.shape[0]
self.thumb_img = np.abs(orig_img.array)
self.thumb_x = orig_img.center.x
self.thumb_y = orig_img.center.y
self.img_sh = orig_img.array.shape
self.id = gid
self.xcenter = xcenter
self.ycenter = ycenter
self.isAlongY = isAlongY
if self.isAlongY == 1:
self.thumb_img, self.thumb_x, self.thumb_y = rotate90(array_orig=self.thumb_img, xc=orig_img.center.x,
yc=orig_img.center.y, isClockwise=1)
self.img_sh = orig_img.array.T.shape
self.xcenter = ycenter
self.ycenter = xcenter
self.origin = origin
self.band_start = band_start
self.band_end = band_end
self.spec = tar_spec
self.beam_flux = OrderedDict()
self.grating_conf = aXeConf(conf)
self.grating_conf.get_beams()
self.grating_conf_file = conf
def compute_spec_orders(self):
all_orders = OrderedDict()
beam_names = self.grating_conf.beams
for beam in beam_names:
all_orders[beam] = self.compute_spec(beam)
return all_orders
def compute_spec(self, beam):
from .disperse_c import interp
from .disperse_c import disperse
# from MockObject.disperse_c import disperse
dx = self.grating_conf.dxlam[beam]
xoff = 0
ytrace_beam, lam_beam = self.grating_conf.get_beam_trace(x=self.xcenter, y=self.ycenter, dx=(dx + xoff),
beam=beam)
### Account for pixel centering of the trace
yfrac_beam = ytrace_beam - floor(ytrace_beam)
ysens = lam_beam * 0
lam_index = argsort(lam_beam)
conf_sens = self.grating_conf.sens[beam]
lam_intep = np.linspace(self.band_start, self.band_end, int((self.band_end - self.band_start) / 0.5))
thri = interpolate.interp1d(conf_sens['WAVELENGTH'], conf_sens['SENSITIVITY'])
spci = interpolate.interp1d(self.spec['WAVELENGTH'], self.spec['FLUX'])
beam_thr = thri(lam_intep)
spec_sample = spci(lam_intep)
bean_thr_spec = beam_thr * spec_sample
ysensitivity = lam_beam * 0
ysensitivity[lam_index] = interp.interp_conserve_c(lam_beam[lam_index], lam_intep,
beam_thr * math.pi * 100 * 100 * 1e-7 / (
cons.h.value * cons.c.value / (
lam_intep * 1e-10)), integrate=0, left=0,
right=0)
self.writerSensitivityFile(conffile = self.grating_conf_file, beam = beam, w = lam_beam[lam_index], sens = ysensitivity[lam_index])
ysens[lam_index] = interp.interp_conserve_c(lam_beam[lam_index], lam_intep, bean_thr_spec, integrate=1, left=0,
right=0)
sensitivity_beam = ysens
len_spec_x = len(dx)
len_spec_y = int(ceil(yfrac_beam[-1]) - floor(yfrac_beam[0]) + 1)
beam_sh = (self.img_sh[0] + len_spec_y, self.img_sh[1] + len_spec_x)
modelf = zeros(product(beam_sh), dtype=float)
model = modelf.reshape(beam_sh)
idx = np.arange(modelf.size, dtype=int64).reshape(beam_sh)
x0 = array((self.thumb_y, self.thumb_x), dtype=int64)
dxpix = dx - dx[0] + x0[1]
dyc = cast[int](ytrace_beam)
dypix = dyc - dyc[0] + x0[0]
flat_index = idx[dypix, dxpix]
nonz = sensitivity_beam != 0
status = disperse.disperse_grism_object(self.thumb_img,
flat_index[nonz], yfrac_beam[nonz],
sensitivity_beam[nonz],
modelf, x0,
array(self.img_sh, dtype=int64),
array(beam_sh, dtype=int64))
model = modelf.reshape(beam_sh)
self.beam_flux[beam] = sum(modelf)
origin_in = zeros_like(self.origin)
dx0_in = dx[0]
dy0_in = dyc[0]
if self.isAlongY == 1:
model, tmx, tmy = rotate90(array_orig=model, isClockwise=0)
origin_in[0] = self.origin[0]
origin_in[1] = self.origin[1] - len_spec_y
dx0_in = -dyc[0]
dy0_in = dx[0]
else:
origin_in[0] = self.origin[0]
origin_in[1] = self.origin[1]
dx0_in = dx[0]
dy0_in = dyc[0]
originOut_x = origin_in[1] + dx0_in - 1
originOut_y = origin_in[0] + dy0_in - 1
return model, originOut_x, originOut_y
def writerSensitivityFile(self, conffile = '', beam = '', w = None, sens = None):
orders={'A':'1st','B':'0st','C':'2st'}
sens_file_name = conffile[0:-5]+'_sensitivity_'+ orders[beam] + '.fits'
if not os.path.exists(sens_file_name) == True:
senstivity_out = Table(array([w,sens]).T, names=('WAVELENGTH', 'SENSITIVITY'))
senstivity_out.write(sens_file_name, format='fits')
"""
Demonstrate aXe trace polynomials.
"""
class aXeConf():
def __init__(self, conf_file='WFC3.IR.G141.V2.5.conf'):
"""Read an aXe-compatible configuration file
Parameters
----------
conf_file: str
Filename of the configuration file to read
"""
if conf_file is not None:
self.conf = self.read_conf_file(conf_file)
self.conf_file = conf_file
self.count_beam_orders()
## Global XOFF/YOFF offsets
if 'XOFF' in self.conf.keys():
self.xoff = np.float(self.conf['XOFF'])
else:
self.xoff = 0.
if 'YOFF' in self.conf.keys():
self.yoff = np.float(self.conf['YOFF'])
else:
self.yoff = 0.
def read_conf_file(self, conf_file='WFC3.IR.G141.V2.5.conf'):
"""Read an aXe config file, convert floats and arrays
Parameters
----------
conf_file: str
Filename of the configuration file to read.
Parameters are stored in an OrderedDict in `self.conf`.
"""
from collections import OrderedDict
conf = OrderedDict()
lines = open(conf_file).readlines()
for line in lines:
## empty / commented lines
if (line.startswith('#')) | (line.strip() == '') | ('"' in line):
continue
## split the line, taking out ; and # comments
spl = line.split(';')[0].split('#')[0].split()
param = spl[0]
if len(spl) > 2:
value = np.cast[float](spl[1:])
else:
try:
value = float(spl[1])
except:
value = spl[1]
conf[param] = value
return conf
def count_beam_orders(self):
"""Get the maximum polynomial order in DYDX or DLDP for each beam
"""
self.orders = {}
for beam in ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']:
order = 0
while 'DYDX_{0:s}_{1:d}'.format(beam, order) in self.conf.keys():
order += 1
while 'DLDP_{0:s}_{1:d}'.format(beam, order) in self.conf.keys():
order += 1
self.orders[beam] = order - 1
def get_beams(self):
"""Get beam parameters and read sensitivity curves
"""
import os
from collections import OrderedDict
from astropy.table import Table, Column
self.dxlam = OrderedDict()
self.nx = OrderedDict()
self.sens = OrderedDict()
self.beams = []
for beam in self.orders:
if self.orders[beam] > 0:
self.beams.append(beam)
self.dxlam[beam] = np.arange(self.conf['BEAM{0}'.format(beam)].min(),
self.conf['BEAM{0}'.format(beam)].max(), dtype=int)
self.nx[beam] = int(self.dxlam[beam].max() - self.dxlam[beam].min()) + 1
self.sens[beam] = Table.read(
'{0}/{1}'.format(os.path.dirname(self.conf_file), self.conf['SENSITIVITY_{0}'.format(beam)]))
# self.sens[beam].wave = np.cast[np.double](self.sens[beam]['WAVELENGTH'])
# self.sens[beam].sens = np.cast[np.double](self.sens[beam]['SENSITIVITY'])
### Need doubles for interpolating functions
for col in self.sens[beam].colnames:
data = np.cast[np.double](self.sens[beam][col])
self.sens[beam].remove_column(col)
self.sens[beam].add_column(Column(data=data, name=col))
self.beams.sort()
def field_dependent(self, xi, yi, coeffs):
"""aXe field-dependent coefficients
See the `aXe manual <http://axe.stsci.edu/axe/manual/html/node7.html#SECTION00721200000000000000>`_ for a description of how the field-dependent coefficients are specified.
Parameters
----------
xi, yi : float or array-like
Coordinate to evaluate the field dependent coefficients, where
`xi = x-REFX` and `yi = y-REFY`.
coeffs : array-like
Field-dependency coefficients
Returns
-------
a : float or array-like
Evaluated field-dependent coefficients
"""
## number of coefficients for a given polynomial order
## 1:1, 2:3, 3:6, 4:10, order:order*(order+1)/2
if isinstance(coeffs, float):
order = 1
else:
order = int(-1 + np.sqrt(1 + 8 * len(coeffs))) // 2
## Build polynomial terms array
## $a = a_0+a_1x_i+a_2y_i+a_3x_i^2+a_4x_iy_i+a_5yi^2+$ ...
xy = []
for p in range(order):
for px in range(p + 1):
# print 'x**%d y**%d' %(p-px, px)
xy.append(xi ** (p - px) * yi ** (px))
## Evaluate the polynomial, allowing for N-dimensional inputs
a = np.sum((np.array(xy).T * coeffs).T, axis=0)
return a
def evaluate_dp(self, dx, dydx):
"""Evalate arc length along the trace given trace polynomial coefficients
Parameters
----------
dx : array-like
x pixel to evaluate
dydx : array-like
Coefficients of the trace polynomial
Returns
-------
dp : array-like
Arc length along the trace at position `dx`.
For `dydx` polynomial orders 0, 1 or 2, integrate analytically.
Higher orders must be integrated numerically.
**Constant:**
.. math:: dp = dx
**Linear:**
.. math:: dp = \sqrt{1+\mathrm{DYDX}[1]}\cdot dx
**Quadratic:**
.. math:: u = \mathrm{DYDX}[1] + 2\ \mathrm{DYDX}[2]\cdot dx
.. math:: dp = (u \sqrt{1+u^2} + \mathrm{arcsinh}\ u) / (4\cdot \mathrm{DYDX}[2])
"""
## dp is the arc length along the trace
## $\lambda = dldp_0 + dldp_1 dp + dldp_2 dp^2$ ...
poly_order = len(dydx) - 1
if (poly_order == 2):
if np.abs(np.unique(dydx[2])).max() == 0:
poly_order = 1
if poly_order == 0: ## dy=0
dp = dx
elif poly_order == 1: ## constant dy/dx
dp = np.sqrt(1 + dydx[1] ** 2) * (dx)
elif poly_order == 2: ## quadratic trace
u0 = dydx[1] + 2 * dydx[2] * (0)
dp0 = (u0 * np.sqrt(1 + u0 ** 2) + np.arcsinh(u0)) / (4 * dydx[2])
u = dydx[1] + 2 * dydx[2] * (dx)
dp = (u * np.sqrt(1 + u ** 2) + np.arcsinh(u)) / (4 * dydx[2]) - dp0
else:
## high order shape, numerical integration along trace
## (this can be slow)
xmin = np.minimum((dx).min(), 0)
xmax = np.maximum((dx).max(), 0)
xfull = np.arange(xmin, xmax)
dyfull = 0
for i in range(1, poly_order):
dyfull += i * dydx[i] * (xfull - 0.5) ** (i - 1)
## Integrate from 0 to dx / -dx
dpfull = xfull * 0.
lt0 = xfull < 0
if lt0.sum() > 1:
dpfull[lt0] = np.cumsum(np.sqrt(1 + dyfull[lt0][::-1] ** 2))[::-1]
dpfull[lt0] *= -1
#
gt0 = xfull > 0
if gt0.sum() > 0:
dpfull[gt0] = np.cumsum(np.sqrt(1 + dyfull[gt0] ** 2))
dp = np.interp(dx, xfull, dpfull)
if dp[-1] == dp[-2]:
dp[-1] = dp[-2] + np.diff(dp)[-2]
return dp
def get_beam_trace(self, x=507, y=507, dx=0., beam='A'):
"""Get an aXe beam trace for an input reference pixel and list of output x pixels `dx`
Parameters
----------
x, y : float or array-like
Evaluate trace definition at detector coordinates `x` and `y`.
dx : float or array-like
Offset in x pixels from `(x,y)` where to compute trace offset and
effective wavelength
beam : str
Beam name (i.e., spectral order) to compute. By aXe convention,
`beam='A'` is the first order, 'B' is the zeroth order and
additional beams are the higher positive and negative orders.
Returns
-------
dy : float or array-like
Center of the trace in y pixels offset from `(x,y)` evaluated at
`dx`.
lam : float or array-like
Effective wavelength along the trace evaluated at `dx`.
"""
NORDER = self.orders[beam] + 1
xi, yi = x - self.xoff, y - self.yoff
xoff_beam = self.field_dependent(xi, yi, self.conf['XOFF_{0}'.format(beam)])
yoff_beam = self.field_dependent(xi, yi, self.conf['YOFF_{0}'.format(beam)])
## y offset of trace (DYDX)
dydx = np.zeros(NORDER) # 0 #+1.e-80
dydx = [0] * NORDER
for i in range(NORDER):
if 'DYDX_{0:s}_{1:d}'.format(beam, i) in self.conf.keys():
coeffs = self.conf['DYDX_{0:s}_{1:d}'.format(beam, i)]
dydx[i] = self.field_dependent(xi, yi, coeffs)
# $dy = dydx_0+dydx_1 dx+dydx_2 dx^2+$ ...
dy = yoff_beam
for i in range(NORDER):
dy += dydx[i] * (dx - xoff_beam) ** i
## wavelength solution
dldp = np.zeros(NORDER)
dldp = [0] * NORDER
for i in range(NORDER):
if 'DLDP_{0:s}_{1:d}'.format(beam, i) in self.conf.keys():
coeffs = self.conf['DLDP_{0:s}_{1:d}'.format(beam, i)]
dldp[i] = self.field_dependent(xi, yi, coeffs)
self.eval_input = {'x': x, 'y': y, 'beam': beam, 'dx': dx}
self.eval_output = {'xi': xi, 'yi': yi, 'dldp': dldp, 'dydx': dydx,
'xoff_beam': xoff_beam, 'yoff_beam': yoff_beam,
'dy': dy}
dp = self.evaluate_dp(dx - xoff_beam, dydx)
# ## dp is the arc length along the trace
# ## $\lambda = dldp_0 + dldp_1 dp + dldp_2 dp^2$ ...
# if self.conf['DYDX_ORDER_%s' %(beam)] == 0: ## dy=0
# dp = dx-xoff_beam
# elif self.conf['DYDX_ORDER_%s' %(beam)] == 1: ## constant dy/dx
# dp = np.sqrt(1+dydx[1]**2)*(dx-xoff_beam)
# elif self.conf['DYDX_ORDER_%s' %(beam)] == 2: ## quadratic trace
# u0 = dydx[1]+2*dydx[2]*(0)
# dp0 = (u0*np.sqrt(1+u0**2)+np.arcsinh(u0))/(4*dydx[2])
# u = dydx[1]+2*dydx[2]*(dx-xoff_beam)
# dp = (u*np.sqrt(1+u**2)+np.arcsinh(u))/(4*dydx[2])-dp0
# else:
# ## high order shape, numerical integration along trace
# ## (this can be slow)
# xmin = np.minimum((dx-xoff_beam).min(), 0)
# xmax = np.maximum((dx-xoff_beam).max(), 0)
# xfull = np.arange(xmin, xmax)
# dyfull = 0
# for i in range(1, NORDER):
# dyfull += i*dydx[i]*(xfull-0.5)**(i-1)
#
# ## Integrate from 0 to dx / -dx
# dpfull = xfull*0.
# lt0 = xfull <= 0
# if lt0.sum() > 1:
# dpfull[lt0] = np.cumsum(np.sqrt(1+dyfull[lt0][::-1]**2))[::-1]
# dpfull[lt0] *= -1
# #
# gt0 = xfull >= 0
# if gt0.sum() > 0:
# dpfull[gt0] = np.cumsum(np.sqrt(1+dyfull[gt0]**2))
#
# dp = np.interp(dx-xoff_beam, xfull, dpfull)
## Evaluate dldp
lam = dp * 0.
for i in range(NORDER):
lam += dldp[i] * dp ** i
return dy, lam
def show_beams(self, beams=['E', 'D', 'C', 'B', 'A']):
"""
Make a demo plot of the beams of a given configuration file
"""
import matplotlib.pyplot as plt
x0, x1 = 507, 507
dx = np.arange(-800, 1200)
if 'WFC3.UV' in self.conf_file:
x0, x1 = 2073, 250
dx = np.arange(-1200, 1200)
if 'G800L' in self.conf_file:
x0, x1 = 2124, 1024
dx = np.arange(-1200, 1200)
s = 200 # marker size
fig = plt.figure(figsize=[10, 3])
plt.scatter(0, 0, marker='s', s=s, color='black', edgecolor='0.8',
label='Direct')
for beam in beams:
if 'XOFF_{0}'.format(beam) not in self.conf.keys():
continue
xoff = self.field_dependent(x0, x1, self.conf['XOFF_{0}'.format(beam)])
dy, lam = self.get_beam_trace(x0, x1, dx=dx, beam=beam)
xlim = self.conf['BEAM{0}'.format(beam)]
ok = (dx >= xlim[0]) & (dx <= xlim[1])
plt.scatter(dx[ok] + xoff, dy[ok], c=lam[ok] / 1.e4, marker='s', s=s,
alpha=0.5, edgecolor='None')
plt.text(np.median(dx[ok]), np.median(dy[ok]) + 1, beam,
ha='center', va='center', fontsize=14)
print('Beam {0}, lambda=({1:.1f} - {2:.1f})'.format(beam, lam[ok].min(), lam[ok].max()))
plt.grid()
plt.xlabel(r'$\Delta x$')
plt.ylabel(r'$\Delta y$')
cb = plt.colorbar(pad=0.01, fraction=0.05)
cb.set_label(r'$\lambda\,(\mu\mathrm{m})$')
plt.title(self.conf_file)
plt.tight_layout()
plt.savefig('{0}.pdf'.format(self.conf_file))
# def load_grism_config(conf_file):
# """Load parameters from an aXe configuration file
# Parameters
# ----------
# conf_file : str
# Filename of the configuration file
# Returns
# -------
# conf : `~grizli.grismconf.aXeConf`
# Configuration file object. Runs `conf.get_beams()` to read the
# sensitivity curves.
# """
# conf = aXeConf(conf_file)
# conf.get_beams()
# return conf
from .SpecDisperser import *
from .disperse_c import disperse, interp
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment