Commit 0697a198 authored by Fang Yuedong's avatar Fang Yuedong
Browse files

v0 for testing

parent e1458fd3
import unittest
from ObservationSim.MockObject.SpecDisperser import rotate90, SpecDisperser
from ObservationSim.Config import ConfigDir, ReadConfig, ChipOutput
from ObservationSim.Instrument import Telescope, Chip, FilterParam, Filter, FocalPlane
from ObservationSim.MockObject import MockObject, Star
from ObservationSim.PSF import PSFGauss
import numpy as np
import galsim
from astropy.table import Table
from scipy import interpolate
import matplotlib.pyplot as plt
from lmfit.models import LinearModel, GaussianModel
def fit_SingleGauss(xX, yX, contmX, iHa0):
background = LinearModel(prefix='line_')
pars = background.make_params(intercept=yX.max(), slope=0)
pars = background.guess(yX, x=xX)
gauss = GaussianModel(prefix='g_')
pars.update(gauss.make_params())
pars['g_center'].set(iHa0, min=iHa0 - 3, max=iHa0 + 3)
pars['g_amplitude'].set(50, min=0)
pars['g_sigma'].set(12, min=0.01)
mod = gauss + background
init = mod.eval(pars, x=xX)
outX = mod.fit(yX, pars, x=xX)
compsX = outX.eval_components(x=xX)
# print(outX.fit_report(min_correl=0.25))
# print outX.params['g_center']
outX.fit_report(min_correl=0.25)
# print(outX.fit_report(min_correl=0.25))
line_slopeX = float(outX.fit_report(min_correl=0.25).split('line_slope:')[1].split('+/-')[0]) * contmX
err_line_slopeX = float(
outX.fit_report(min_correl=0.25).split('line_slope:')[1].split('+/-')[1].split('(')[0]) * contmX
line_interceptX = float(outX.fit_report(min_correl=0.25).split('line_intercept:')[1].split('+/-')[0]) * contmX
err_line_interceptX = float(
outX.fit_report(min_correl=0.25).split('line_intercept:')[1].split('+/-')[1].split('(')[0]) * contmX
sigmaX = float(outX.fit_report(min_correl=0.25).split('g_sigma:')[1].split('+/-')[0])
err_sigmaX = float(outX.fit_report(min_correl=0.25).split('g_sigma:')[1].split('+/-')[1].split('(')[0])
fwhmX = float(outX.fit_report(min_correl=0.25).split('g_fwhm:')[1].split('+/-')[0])
err_fwhmX = float(outX.fit_report(min_correl=0.25).split('g_fwhm:')[1].split('+/-')[1].split('(')[0])
centerX = float(outX.fit_report(min_correl=0.25).split('g_center:')[1].split('+/-')[0])
err_centerX = float(outX.fit_report(min_correl=0.25).split('g_center:')[1].split('+/-')[1].split('(')[0])
return sigmaX, err_sigmaX, fwhmX, err_fwhmX, centerX, err_centerX
def produceObj(x,y,chip):
pos_img = galsim.PositionD(chip.bound.xmin + x, chip.bound.ymin + y)
param = {}
param["star"] = 1
param["id"] = 1
param["ra"] = chip.img.wcs.posToWorld(pos_img).ra.deg
param["dec"] = chip.img.wcs.posToWorld(pos_img).dec.deg
param["z"] = 0
param["sed_type"] = 1
param["model_tag"] = 1
param["mag_use_normal"] = 10
obj = Star(param)
pos_img, offset, local_wcs = obj.getPosImg_Offset_WCS(img=chip.img, chip=chip)
wave = np.arange(2500, 11000.5, 0.5)
# sedLen = wave.shape[0]
flux = pow(wave, -2) * 1e8
flux[200] = flux[200] * 10
flux[800] = flux[800] * 30
flux[2000] = flux[2000] * 5
obj.sed = Table(np.array([wave, flux]).T,
names=('WAVELENGTH', 'FLUX'))
return obj, pos_img
class TestSpecDisperse(unittest.TestCase):
def __init__(self, methodName='runTest',conff = '', throughputf = ''):
super(TestSpecDisperse,self).__init__(methodName)
self.conff = conff
self.throughputf = throughputf
def test_rotate901(self):
m = np.array([[1,2,3,4,5],[6,7,8,9,10],[11,12,13,14,15],[16,17,18,19,20],[21,22,23,24,25]])
m1 = np.array([[21,16,11,6,1],[22,17,12,7,2],[23,18,13,8,3],[24,19,14,9,4],[25,20,15,10,5]])
m2 = np.array([[5,10,15,20,25],[4,9,14,19,24],[3,8,13,18,23],[2,7,12,17,22],[1,6,11,16,21]])
xc = 2
yc = 2
isClockwise = 0
m1, xc1, yc1 = rotate90(array_orig=m, xc=xc, yc=yc, isClockwise=isClockwise)
self.assertTrue(xc1-xc == 0)
self.assertTrue(yc1-yc == 0)
self.assertTrue(np.sum(m-m1) == 0)
def test_rotate902(self):
m = np.array([[1,2,3,4,5],[6,7,8,9,10],[11,12,13,14,15],[16,17,18,19,20],[21,22,23,24,25]])
m1 = np.array([[21,16,11,6,1],[22,17,12,7,2],[23,18,13,8,3],[24,19,14,9,4],[25,20,15,10,5]])
m2 = np.array([[5,10,15,20,25],[4,9,14,19,24],[3,8,13,18,23],[2,7,12,17,22],[1,6,11,16,21]])
xc = 2
yc = 2
isClockwise =1
m1, xc1, yc1 = rotate90(array_orig=m, xc=xc, yc=yc, isClockwise=isClockwise)
self.assertTrue(xc1-xc == 0)
self.assertTrue(yc1-yc == 0)
self.assertTrue(np.sum(m-m2) == 0)
def test_Specdistperse1(self):
star = galsim.Gaussian(fwhm=0.39)
g_img = galsim.Image(100, 100, scale=0.074)
starImg = star.drawImage(image=g_img)
wave = np.arange(6200, 10000.5, 0.5)
# sedLen = wave.shape[0]
flux = pow(wave, -2) * 1e8
# flux[200] = flux[200] * 10
# flux[800] = flux[800] * 30
# flux[2000] = flux[2000] * 5
sed = Table(np.array([wave, flux]).T,
names=('WAVELENGTH', 'FLUX'))
conff = self.conff
throughput_f = self.throughputf
thp = Table.read(throughput_f)
thp_i = interpolate.interp1d(thp['WAVELENGTH'], thp['SENSITIVITY'])
sdp = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=6200,
band_end=10000, isAlongY=0, conf=conff, gid=0)
spec = sdp.compute_spec_orders()
Aimg = spec['A'][0]
wave_pix = spec['A'][5]
wave_pos = spec['A'][3]
sens = spec['A'][6]
sh = Aimg.shape
spec_pix = np.zeros(sh[1])
for i in range(sh[1]):
spec_pix[i] = sum(Aimg[:, i])
# figure()
# imshow(Aimg)
wave_flux = np.zeros(wave_pix.shape[0])
for i in np.arange(1, wave_pix.shape[0] - 1):
w = wave_pix[i]
thp_w = thp_i(w)
deltW = (w - wave_pix[i - 1]) / 2 + (wave_pix[i + 1] - w) / 2
f = spec_pix[wave_pos[0] - 1 + i]
if 6200 <= w <= 10000:
f = f / thp_w
else:
f = 0
wave_flux[i] = f / deltW
sed_i = interpolate.interp1d(sed['WAVELENGTH'], sed['FLUX'])
ids = wave_pix < 9700
ids1 = wave_pix[ids] > 6500
print('Spec disperse flux test')
self.assertTrue(np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1]))<0.004)
plt.figure()
plt.plot(wave_pix, wave_flux)
plt.plot(sed['WAVELENGTH'], sed['FLUX'])
plt.xlim(6200, 10000)
plt.ylim(1, 3)
plt.yscale('log')
plt.xlabel('$\lambda$')
plt.ylabel('$F\lambda$')
plt.legend(['extracted', 'input'])
plt.show()
def test_Specdistperse2(self):
psf_fwhm = 0.39
pix_scale = 0.074
star = galsim.Gaussian(fwhm=psf_fwhm)
g_img = galsim.Image(100, 100, scale=pix_scale)
starImg = star.drawImage(image=g_img)
wave = np.arange(6200, 10000.5, 0.5)
# sedLen = wave.shape[0]
flux = pow(wave, -2) * 1e8
flux[200] = flux[200] * 10
flux[800] = flux[800] * 30
flux[2000] = flux[2000] * 5
sed = Table(np.array([wave, flux]).T,
names=('WAVELENGTH', 'FLUX'))
conff = self.conff
throughput_f = self.throughputf
thp = Table.read(throughput_f)
thp_i = interpolate.interp1d(thp['WAVELENGTH'], thp['SENSITIVITY'])
sdp = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=6200,
band_end=10000, isAlongY=0, conf=conff, gid=0)
spec = sdp.compute_spec_orders()
Aimg = spec['A'][0]
wave_pix = spec['A'][5]
wave_pos = spec['A'][3]
sens = spec['A'][6]
sh = Aimg.shape
spec_pix = np.zeros(sh[1])
for i in range(sh[1]):
spec_pix[i] = sum(Aimg[:, i])
# figure()
# imshow(Aimg)
wave_flux = np.zeros(wave_pix.shape[0])
for i in np.arange(1, wave_pix.shape[0] - 1):
w = wave_pix[i]
thp_w = thp_i(w)
deltW = (w - wave_pix[i - 1]) / 2 + (wave_pix[i + 1] - w) / 2
f = spec_pix[wave_pos[0] - 1 + i]
if 6200 <= w <= 10000:
f = f / thp_w
else:
f = 0
wave_flux[i] = f / deltW
sed_i = interpolate.interp1d(sed['WAVELENGTH'], sed['FLUX'])
input_em_lam = 6600
ids = wave_pix < input_em_lam+200
ids1 = wave_pix[ids] > input_em_lam-200
deltLamda_pix = (max(wave_pix[ids][ids1]) - min(wave_pix[ids][ids1])) / (wave_pix[ids][ids1].shape[0] - 1)
_, _, fwhmx, fwhmx_err, center, center_err = fit_SingleGauss(wave_pix[ids][ids1], wave_flux[ids][ids1], 1.0, 6600)
print('Emission line position and shape test')
self.assertTrue(input_em_lam-center < deltLamda_pix)
self.assertTrue(fwhmx/deltLamda_pix*pix_scale - psf_fwhm < np.abs(0.01))
# print('error is ',np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1])))
# self.assertTrue(np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1]))<0.004)
plt.figure()
plt.plot(wave_pix, wave_flux)
plt.plot(sed['WAVELENGTH'], sed['FLUX'])
plt.xlim(6200, 10000)
plt.ylim(1, 75)
plt.yscale('log')
plt.xlabel('$\lambda$')
plt.ylabel('$F\lambda$')
plt.legend(['extracted', 'input'])
plt.show()
def test_Specdistperse3(self):
psf_fwhm = 0.39
pix_scale = 0.074
star = galsim.Gaussian(fwhm=psf_fwhm)
g_img = galsim.Image(100, 100, scale=pix_scale)
starImg = star.drawImage(image=g_img)
wave = np.arange(6200, 10000.5, 0.5)
# sedLen = wave.shape[0]
flux = pow(wave, -2) * 1e8
flux[200] = flux[200] * 10
flux[800] = flux[800] * 30
flux[2000] = flux[2000] * 5
sed = Table(np.array([wave, flux]).T,
names=('WAVELENGTH', 'FLUX'))
conff = self.conff
throughput_f = self.throughputf
thp = Table.read(throughput_f)
thp_i = interpolate.interp1d(thp['WAVELENGTH'], thp['SENSITIVITY'])
sdp = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=6200,
band_end=8000, isAlongY=0, conf=conff, gid=0)
sdp1 = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=8000,
band_end=10000, isAlongY=0, conf=conff, gid=0)
spec = sdp.compute_spec_orders()
spec1 = sdp1.compute_spec_orders()
Aimg = spec['A'][0] + spec1['A'][0]
wave_pix = spec['A'][5]
wave_pos = spec['A'][3]
sens = spec['A'][6]
sh = Aimg.shape
spec_pix = np.zeros(sh[1])
for i in range(sh[1]):
spec_pix[i] = sum(Aimg[:, i])
wave_flux = np.zeros(wave_pix.shape[0])
for i in np.arange(1, wave_pix.shape[0] - 1):
w = wave_pix[i]
thp_w = thp_i(w)
deltW = (w - wave_pix[i - 1]) / 2 + (wave_pix[i + 1] - w) / 2
f = spec_pix[wave_pos[0] - 1 + i]
if 6200 <= w <= 10000:
f = f / thp_w
else:
f = 0
wave_flux[i] = f / deltW
sdp2 = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=6200,
band_end=10000, isAlongY=0, conf=conff, gid=0)
spec2 = sdp2.compute_spec_orders()
Aimg2 = spec2['A'][0]
spec_pix2 = np.zeros(sh[1])
for i in range(sh[1]):
spec_pix2[i] = sum(Aimg2[:, i])
wave_flux2 = np.zeros(wave_pix.shape[0])
for i in np.arange(1, wave_pix.shape[0] - 1):
w = wave_pix[i]
thp_w = thp_i(w)
deltW = (w - wave_pix[i - 1]) / 2 + (wave_pix[i + 1] - w) / 2
f = spec_pix2[wave_pos[0] - 1 + i]
if 6200 <= w <= 10000:
f = f / thp_w
else:
f = 0
wave_flux2[i] = f / deltW
r1_i = interpolate.interp1d(wave_pix, wave_flux2)
r2_i = interpolate.interp1d(wave_pix, wave_flux)
print('Spec Splicing test')
self.assertTrue(r1_i(8000)-r2_i(8000) < np.abs(0.0001))
# self.assertTrue(fwhmx/deltLamda_pix*pix_scale - psf_fwhm < np.abs(0.01))
# print('error is ',np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1])))
# self.assertTrue(np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1]))<0.004)
plt.figure()
plt.plot(wave_pix, wave_flux2)
plt.plot(wave_pix, wave_flux)
# plt.plot(sed['WAVELENGTH'], sed['FLUX'])
plt.xlim(6200, 10000)
plt.ylim(1, 4)
plt.yscale('log')
plt.xlabel('$\lambda$')
plt.ylabel('$F\lambda$')
plt.legend(['one spec', 'split in 8000 A'])
plt.show()
def test_double_disperse(self):
work_dir = "/public/home/fangyuedong/CSST_unittest/CSST/test/"
# data_dir = "/Volumes/Extreme SSD/SimData/"
data_dir = "/data/simudata/CSSOSDataProductsSims/data/"
path_dict = ConfigDir(work_dir=work_dir, data_dir=data_dir)
config = ReadConfig(path_dict["config_file"])
filter_param = FilterParam(filter_dir=path_dict["filter_dir"])
focal_plane = FocalPlane(survey_type=config["survey_type"])
chip = Chip(1, ccdEffCurve_dir=path_dict["ccd_dir"], CRdata_dir=path_dict["CRdata_dir"],
normalize_dir=path_dict["normalize_dir"], sls_dir=path_dict["sls_dir"],
config=config)
filter_id, filter_type = chip.getChipFilter()
filt = Filter(filter_id=filter_id, filter_type=filter_type, filter_param=filter_param,
ccd_bandpass=chip.effCurve)
tel = Telescope(optEffCurve_path=path_dict["mirror_file"])
psf_model = PSFGauss(chip=chip)
wcs_fp = focal_plane.getTanWCS(config["ra_center"], config["dec_center"], config["image_rot"], chip.pix_scale)
chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
chip.img.setOrigin(chip.bound.xmin, chip.bound.ymin)
chip.img.wcs = wcs_fp
obj, pos_img = produceObj(2000,4500, chip)
obj.drawObj_slitless(
tel=tel,
pos_img=pos_img,
psf_model=psf_model,
bandpass_list=filt.bandpass_sub_list,
filt=filt,
chip=chip,
g1=0,
g2=0,
exptime=150,
normFilter=chip.normF_star)
obj, pos_img = produceObj(3685, 6500, chip)
obj.drawObj_slitless(
tel=tel,
pos_img=pos_img,
psf_model=psf_model,
bandpass_list=filt.bandpass_sub_list,
filt=filt,
chip=chip,
g1=0,
g2=0,
exptime=150,
normFilter=chip.normF_star)
obj, pos_img = produceObj(5000, 2500, chip)
obj.drawObj_slitless(
tel=tel,
pos_img=pos_img,
psf_model=psf_model,
bandpass_list=filt.bandpass_sub_list,
filt=filt,
chip=chip,
g1=0,
g2=0,
exptime=150,
normFilter=chip.normF_star)
print('Spec double disperse test')
from astropy.io import fits
fits.writeto('test.fits',chip.img.array, overwrite = True)
# plt.figure()
# plt.imshow(chip.img.array)
# plt.show()
if __name__ == '__main__':
conff='/data/simudata/CSSOSDataProductsSims/data/CONF/CSST_GI2.conf'
throughputf='/data/simudata/CSSOSDataProductsSims/data/CONF/GI.Throughput.1st.fits'
suit = unittest.TestSuite()
case1 = TestSpecDisperse('test_Specdistperse1',conff,throughputf)
suit.addTest(case1)
case2 = TestSpecDisperse('test_Specdistperse2', conff, throughputf)
suit.addTest(case2)
case3 = TestSpecDisperse('test_Specdistperse3', conff, throughputf)
suit.addTest(case3)
case4 = TestSpecDisperse('test_double_disperse', conff, throughputf)
suit.addTest(case4)
unittest.TextTestRunner(verbosity=2).run(suit)
# runner = unittest.TextTestRunner()
# runner.run(suit)
\ No newline at end of file
File added
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment