import unittest from ObservationSim.MockObject.SpecDisperser import rotate90, SpecDisperser from ObservationSim.Config import ConfigDir, ReadConfig, ChipOutput from ObservationSim.Instrument import Telescope, Chip, FilterParam, Filter, FocalPlane from ObservationSim.MockObject import MockObject, Star from ObservationSim.PSF import PSFGauss import numpy as np import galsim from astropy.table import Table from scipy import interpolate import matplotlib.pyplot as plt from lmfit.models import LinearModel, GaussianModel def fit_SingleGauss(xX, yX, contmX, iHa0): background = LinearModel(prefix='line_') pars = background.make_params(intercept=yX.max(), slope=0) pars = background.guess(yX, x=xX) gauss = GaussianModel(prefix='g_') pars.update(gauss.make_params()) pars['g_center'].set(iHa0, min=iHa0 - 3, max=iHa0 + 3) pars['g_amplitude'].set(50, min=0) pars['g_sigma'].set(12, min=0.01) mod = gauss + background init = mod.eval(pars, x=xX) outX = mod.fit(yX, pars, x=xX) compsX = outX.eval_components(x=xX) # print(outX.fit_report(min_correl=0.25)) # print outX.params['g_center'] outX.fit_report(min_correl=0.25) # print(outX.fit_report(min_correl=0.25)) line_slopeX = float(outX.fit_report(min_correl=0.25).split('line_slope:')[1].split('+/-')[0]) * contmX err_line_slopeX = float( outX.fit_report(min_correl=0.25).split('line_slope:')[1].split('+/-')[1].split('(')[0]) * contmX line_interceptX = float(outX.fit_report(min_correl=0.25).split('line_intercept:')[1].split('+/-')[0]) * contmX err_line_interceptX = float( outX.fit_report(min_correl=0.25).split('line_intercept:')[1].split('+/-')[1].split('(')[0]) * contmX sigmaX = float(outX.fit_report(min_correl=0.25).split('g_sigma:')[1].split('+/-')[0]) err_sigmaX = float(outX.fit_report(min_correl=0.25).split('g_sigma:')[1].split('+/-')[1].split('(')[0]) fwhmX = float(outX.fit_report(min_correl=0.25).split('g_fwhm:')[1].split('+/-')[0]) err_fwhmX = float(outX.fit_report(min_correl=0.25).split('g_fwhm:')[1].split('+/-')[1].split('(')[0]) centerX = float(outX.fit_report(min_correl=0.25).split('g_center:')[1].split('+/-')[0]) err_centerX = float(outX.fit_report(min_correl=0.25).split('g_center:')[1].split('+/-')[1].split('(')[0]) return sigmaX, err_sigmaX, fwhmX, err_fwhmX, centerX, err_centerX def produceObj(x,y,chip): pos_img = galsim.PositionD(chip.bound.xmin + x, chip.bound.ymin + y) param = {} param["star"] = 1 param["id"] = 1 param["ra"] = chip.img.wcs.posToWorld(pos_img).ra.deg param["dec"] = chip.img.wcs.posToWorld(pos_img).dec.deg param["z"] = 0 param["sed_type"] = 1 param["model_tag"] = 1 param["mag_use_normal"] = 10 obj = Star(param) pos_img, offset, local_wcs = obj.getPosImg_Offset_WCS(img=chip.img, chip=chip) wave = np.arange(2500, 11000.5, 0.5) # sedLen = wave.shape[0] flux = pow(wave, -2) * 1e8 flux[200] = flux[200] * 10 flux[800] = flux[800] * 30 flux[2000] = flux[2000] * 5 obj.sed = Table(np.array([wave, flux]).T, names=('WAVELENGTH', 'FLUX')) return obj, pos_img class TestSpecDisperse(unittest.TestCase): def __init__(self, methodName='runTest',conff = '', throughputf = ''): super(TestSpecDisperse,self).__init__(methodName) self.conff = conff self.throughputf = throughputf def test_rotate901(self): m = np.array([[1,2,3,4,5],[6,7,8,9,10],[11,12,13,14,15],[16,17,18,19,20],[21,22,23,24,25]]) m1 = np.array([[21,16,11,6,1],[22,17,12,7,2],[23,18,13,8,3],[24,19,14,9,4],[25,20,15,10,5]]) m2 = np.array([[5,10,15,20,25],[4,9,14,19,24],[3,8,13,18,23],[2,7,12,17,22],[1,6,11,16,21]]) xc = 2 yc = 2 isClockwise = 0 m1, xc1, yc1 = rotate90(array_orig=m, xc=xc, yc=yc, isClockwise=isClockwise) self.assertTrue(xc1-xc == 0) self.assertTrue(yc1-yc == 0) self.assertTrue(np.sum(m-m1) == 0) def test_rotate902(self): m = np.array([[1,2,3,4,5],[6,7,8,9,10],[11,12,13,14,15],[16,17,18,19,20],[21,22,23,24,25]]) m1 = np.array([[21,16,11,6,1],[22,17,12,7,2],[23,18,13,8,3],[24,19,14,9,4],[25,20,15,10,5]]) m2 = np.array([[5,10,15,20,25],[4,9,14,19,24],[3,8,13,18,23],[2,7,12,17,22],[1,6,11,16,21]]) xc = 2 yc = 2 isClockwise =1 m1, xc1, yc1 = rotate90(array_orig=m, xc=xc, yc=yc, isClockwise=isClockwise) self.assertTrue(xc1-xc == 0) self.assertTrue(yc1-yc == 0) self.assertTrue(np.sum(m-m2) == 0) def test_Specdistperse1(self): star = galsim.Gaussian(fwhm=0.39) g_img = galsim.Image(100, 100, scale=0.074) starImg = star.drawImage(image=g_img) wave = np.arange(6200, 10000.5, 0.5) # sedLen = wave.shape[0] flux = pow(wave, -2) * 1e8 # flux[200] = flux[200] * 10 # flux[800] = flux[800] * 30 # flux[2000] = flux[2000] * 5 sed = Table(np.array([wave, flux]).T, names=('WAVELENGTH', 'FLUX')) conff = self.conff throughput_f = self.throughputf thp = Table.read(throughput_f) thp_i = interpolate.interp1d(thp['WAVELENGTH'], thp['SENSITIVITY']) sdp = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=6200, band_end=10000, isAlongY=0, conf=conff, gid=0) spec = sdp.compute_spec_orders() Aimg = spec['A'][0] wave_pix = spec['A'][5] wave_pos = spec['A'][3] sens = spec['A'][6] sh = Aimg.shape spec_pix = np.zeros(sh[1]) for i in range(sh[1]): spec_pix[i] = sum(Aimg[:, i]) # figure() # imshow(Aimg) wave_flux = np.zeros(wave_pix.shape[0]) for i in np.arange(1, wave_pix.shape[0] - 1): w = wave_pix[i] thp_w = thp_i(w) deltW = (w - wave_pix[i - 1]) / 2 + (wave_pix[i + 1] - w) / 2 f = spec_pix[wave_pos[0] - 1 + i] if 6200 <= w <= 10000: f = f / thp_w else: f = 0 wave_flux[i] = f / deltW sed_i = interpolate.interp1d(sed['WAVELENGTH'], sed['FLUX']) ids = wave_pix < 9700 ids1 = wave_pix[ids] > 6500 print('Spec disperse flux test') self.assertTrue(np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1]))<0.004) plt.figure() plt.plot(wave_pix, wave_flux) plt.plot(sed['WAVELENGTH'], sed['FLUX']) plt.xlim(6200, 10000) plt.ylim(1, 3) plt.yscale('log') plt.xlabel('$\lambda$') plt.ylabel('$F\lambda$') plt.legend(['extracted', 'input']) plt.show() def test_Specdistperse2(self): psf_fwhm = 0.39 pix_scale = 0.074 star = galsim.Gaussian(fwhm=psf_fwhm) g_img = galsim.Image(100, 100, scale=pix_scale) starImg = star.drawImage(image=g_img) wave = np.arange(6200, 10000.5, 0.5) # sedLen = wave.shape[0] flux = pow(wave, -2) * 1e8 flux[200] = flux[200] * 10 flux[800] = flux[800] * 30 flux[2000] = flux[2000] * 5 sed = Table(np.array([wave, flux]).T, names=('WAVELENGTH', 'FLUX')) conff = self.conff throughput_f = self.throughputf thp = Table.read(throughput_f) thp_i = interpolate.interp1d(thp['WAVELENGTH'], thp['SENSITIVITY']) sdp = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=6200, band_end=10000, isAlongY=0, conf=conff, gid=0) spec = sdp.compute_spec_orders() Aimg = spec['A'][0] wave_pix = spec['A'][5] wave_pos = spec['A'][3] sens = spec['A'][6] sh = Aimg.shape spec_pix = np.zeros(sh[1]) for i in range(sh[1]): spec_pix[i] = sum(Aimg[:, i]) # figure() # imshow(Aimg) wave_flux = np.zeros(wave_pix.shape[0]) for i in np.arange(1, wave_pix.shape[0] - 1): w = wave_pix[i] thp_w = thp_i(w) deltW = (w - wave_pix[i - 1]) / 2 + (wave_pix[i + 1] - w) / 2 f = spec_pix[wave_pos[0] - 1 + i] if 6200 <= w <= 10000: f = f / thp_w else: f = 0 wave_flux[i] = f / deltW sed_i = interpolate.interp1d(sed['WAVELENGTH'], sed['FLUX']) input_em_lam = 6600 ids = wave_pix < input_em_lam+200 ids1 = wave_pix[ids] > input_em_lam-200 deltLamda_pix = (max(wave_pix[ids][ids1]) - min(wave_pix[ids][ids1])) / (wave_pix[ids][ids1].shape[0] - 1) _, _, fwhmx, fwhmx_err, center, center_err = fit_SingleGauss(wave_pix[ids][ids1], wave_flux[ids][ids1], 1.0, 6600) print('Emission line position and shape test') self.assertTrue(input_em_lam-center < deltLamda_pix) self.assertTrue(fwhmx/deltLamda_pix*pix_scale - psf_fwhm < np.abs(0.01)) # print('error is ',np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1]))) # self.assertTrue(np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1]))<0.004) plt.figure() plt.plot(wave_pix, wave_flux) plt.plot(sed['WAVELENGTH'], sed['FLUX']) plt.xlim(6200, 10000) plt.ylim(1, 75) plt.yscale('log') plt.xlabel('$\lambda$') plt.ylabel('$F\lambda$') plt.legend(['extracted', 'input']) plt.show() def test_Specdistperse3(self): psf_fwhm = 0.39 pix_scale = 0.074 star = galsim.Gaussian(fwhm=psf_fwhm) g_img = galsim.Image(100, 100, scale=pix_scale) starImg = star.drawImage(image=g_img) wave = np.arange(6200, 10000.5, 0.5) # sedLen = wave.shape[0] flux = pow(wave, -2) * 1e8 flux[200] = flux[200] * 10 flux[800] = flux[800] * 30 flux[2000] = flux[2000] * 5 sed = Table(np.array([wave, flux]).T, names=('WAVELENGTH', 'FLUX')) conff = self.conff throughput_f = self.throughputf thp = Table.read(throughput_f) thp_i = interpolate.interp1d(thp['WAVELENGTH'], thp['SENSITIVITY']) sdp = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=6200, band_end=8000, isAlongY=0, conf=conff, gid=0) sdp1 = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=8000, band_end=10000, isAlongY=0, conf=conff, gid=0) spec = sdp.compute_spec_orders() spec1 = sdp1.compute_spec_orders() Aimg = spec['A'][0] + spec1['A'][0] wave_pix = spec['A'][5] wave_pos = spec['A'][3] sens = spec['A'][6] sh = Aimg.shape spec_pix = np.zeros(sh[1]) for i in range(sh[1]): spec_pix[i] = sum(Aimg[:, i]) wave_flux = np.zeros(wave_pix.shape[0]) for i in np.arange(1, wave_pix.shape[0] - 1): w = wave_pix[i] thp_w = thp_i(w) deltW = (w - wave_pix[i - 1]) / 2 + (wave_pix[i + 1] - w) / 2 f = spec_pix[wave_pos[0] - 1 + i] if 6200 <= w <= 10000: f = f / thp_w else: f = 0 wave_flux[i] = f / deltW sdp2 = SpecDisperser(orig_img=starImg, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=sed, band_start=6200, band_end=10000, isAlongY=0, conf=conff, gid=0) spec2 = sdp2.compute_spec_orders() Aimg2 = spec2['A'][0] spec_pix2 = np.zeros(sh[1]) for i in range(sh[1]): spec_pix2[i] = sum(Aimg2[:, i]) wave_flux2 = np.zeros(wave_pix.shape[0]) for i in np.arange(1, wave_pix.shape[0] - 1): w = wave_pix[i] thp_w = thp_i(w) deltW = (w - wave_pix[i - 1]) / 2 + (wave_pix[i + 1] - w) / 2 f = spec_pix2[wave_pos[0] - 1 + i] if 6200 <= w <= 10000: f = f / thp_w else: f = 0 wave_flux2[i] = f / deltW r1_i = interpolate.interp1d(wave_pix, wave_flux2) r2_i = interpolate.interp1d(wave_pix, wave_flux) print('Spec Splicing test') self.assertTrue(r1_i(8000)-r2_i(8000) < np.abs(0.0001)) # self.assertTrue(fwhmx/deltLamda_pix*pix_scale - psf_fwhm < np.abs(0.01)) # print('error is ',np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1]))) # self.assertTrue(np.mean((wave_flux[ids][ids1] - sed_i(wave_pix[ids][ids1]))/sed_i(wave_pix[ids][ids1]))<0.004) plt.figure() plt.plot(wave_pix, wave_flux2) plt.plot(wave_pix, wave_flux) # plt.plot(sed['WAVELENGTH'], sed['FLUX']) plt.xlim(6200, 10000) plt.ylim(1, 4) plt.yscale('log') plt.xlabel('$\lambda$') plt.ylabel('$F\lambda$') plt.legend(['one spec', 'split in 8000 A']) plt.show() def test_double_disperse(self): work_dir = "/public/home/fangyuedong/CSST_unittest/CSST/test/" # data_dir = "/Volumes/Extreme SSD/SimData/" data_dir = "/data/simudata/CSSOSDataProductsSims/data/" path_dict = ConfigDir(work_dir=work_dir, data_dir=data_dir) config = ReadConfig(path_dict["config_file"]) filter_param = FilterParam(filter_dir=path_dict["filter_dir"]) focal_plane = FocalPlane(survey_type=config["survey_type"]) chip = Chip(1, ccdEffCurve_dir=path_dict["ccd_dir"], CRdata_dir=path_dict["CRdata_dir"], normalize_dir=path_dict["normalize_dir"], sls_dir=path_dict["sls_dir"], config=config) filter_id, filter_type = chip.getChipFilter() filt = Filter(filter_id=filter_id, filter_type=filter_type, filter_param=filter_param, ccd_bandpass=chip.effCurve) tel = Telescope(optEffCurve_path=path_dict["mirror_file"]) psf_model = PSFGauss(chip=chip) wcs_fp = focal_plane.getTanWCS(config["ra_center"], config["dec_center"], config["image_rot"], chip.pix_scale) chip.img = galsim.ImageF(chip.npix_x, chip.npix_y) chip.img.setOrigin(chip.bound.xmin, chip.bound.ymin) chip.img.wcs = wcs_fp obj, pos_img = produceObj(2000,4500, chip) obj.drawObj_slitless( tel=tel, pos_img=pos_img, psf_model=psf_model, bandpass_list=filt.bandpass_sub_list, filt=filt, chip=chip, g1=0, g2=0, exptime=150, normFilter=chip.normF_star) obj, pos_img = produceObj(3685, 6500, chip) obj.drawObj_slitless( tel=tel, pos_img=pos_img, psf_model=psf_model, bandpass_list=filt.bandpass_sub_list, filt=filt, chip=chip, g1=0, g2=0, exptime=150, normFilter=chip.normF_star) obj, pos_img = produceObj(5000, 2500, chip) obj.drawObj_slitless( tel=tel, pos_img=pos_img, psf_model=psf_model, bandpass_list=filt.bandpass_sub_list, filt=filt, chip=chip, g1=0, g2=0, exptime=150, normFilter=chip.normF_star) print('Spec double disperse test') from astropy.io import fits fits.writeto('test.fits',chip.img.array, overwrite = True) # plt.figure() # plt.imshow(chip.img.array) # plt.show() if __name__ == '__main__': conff='/data/simudata/CSSOSDataProductsSims/data/CONF/CSST_GI2.conf' throughputf='/data/simudata/CSSOSDataProductsSims/data/CONF/GI.Throughput.1st.fits' suit = unittest.TestSuite() case1 = TestSpecDisperse('test_Specdistperse1',conff,throughputf) suit.addTest(case1) case2 = TestSpecDisperse('test_Specdistperse2', conff, throughputf) suit.addTest(case2) case3 = TestSpecDisperse('test_Specdistperse3', conff, throughputf) suit.addTest(case3) case4 = TestSpecDisperse('test_double_disperse', conff, throughputf) suit.addTest(case4) unittest.TextTestRunner(verbosity=2).run(suit) # runner = unittest.TextTestRunner() # runner.run(suit)