import os import galsim import numpy as np from astropy.io import fits from observation_sim.instruments.chip import chip_utils from observation_sim.instruments.chip import effects from astropy.time import Time from datetime import datetime, timezone def add_prescan_overscan(self, chip, filt, tel, pointing, catalog, obs_param): self.chip_output.Log_info("Apply pre/over-scan") chip.img = chip_utils.AddPreScan(GSImage=chip.img, pre1=chip.prescan_x, pre2=chip.prescan_y, over1=chip.overscan_x, over2=chip.overscan_y) if obs_param["add_dark"] is True: ny = int(chip.npix_y/2) base_dark = (ny-1)*(chip.readout_time/ny)*chip.dark_noise chip.img.array[(chip.prescan_y+ny):-(chip.prescan_y+ny), :] = base_dark return chip, filt, tel, pointing def add_crosstalk(self, chip, filt, tel, pointing, catalog, obs_param): crosstalk=np.zeros([16,16]) crosstalk[0,:] = np.array([1., 1e-4, 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) crosstalk[1,:] = np.array([1e-4, 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) crosstalk[2,:] = np.array([0., 0., 1., 1e-4, 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) crosstalk[3,:] = np.array([0., 0., 1e-4, 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) crosstalk[4,:] = np.array([0., 0., 0., 0., 1., 1e-4, 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) crosstalk[5,:] = np.array([0., 0., 0., 0., 1e-4, 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) crosstalk[6,:] = np.array([0., 0., 0., 0., 0., 0., 1., 1e-4, 0., 0., 0., 0., 0., 0., 0., 0.]) crosstalk[7,:] = np.array([0., 0., 0., 0., 0., 0., 1e-4, 1., 0., 0., 0., 0., 0., 0., 0., 0.]) crosstalk[8,:] = np.array([0., 0., 0., 0., 0., 0., 0., 0., 1., 1e-4, 0., 0., 0., 0., 0., 0.]) crosstalk[9,:] = np.array([0., 0., 0., 0., 0., 0., 0., 0., 1e-4, 1., 0., 0., 0., 0., 0., 0.]) crosstalk[10,:]= np.array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1e-4, 0., 0., 0., 0.]) crosstalk[11,:]= np.array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1e-4, 1., 0., 0., 0., 0.]) crosstalk[12,:]= np.array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1e-4, 0., 0.]) crosstalk[13,:]= np.array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1e-4, 1., 0., 0.]) crosstalk[14,:]= np.array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1e-4]) crosstalk[15,:]= np.array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1e-4, 1.]) # 2*8 -> 1*16 img = formatOutput(chip.img) ny, nx = img.array.shape nsecy=1 nsecx=16 dy = int(ny/nsecy) dx = int(nx/nsecx) newimg = galsim.Image(nx, ny, init_value=0) for i in range(16): for j in range(16): newimg.array[:, int(i*dx):int(i*dx+dx)] += crosstalk[i,j]*img.array[:, int(j*dx):int(j*dx+dx)] # 1*16 -> 2*8 newimg = formatRevert(newimg) chip.img.array[:, :] = newimg.array[:, :] return chip, filt, tel, pointing def add_readout_noise(self, chip, filt, tel, pointing, catalog, obs_param): seed = int(self.overall_config["random_seeds"] ["seed_readout"]) + pointing.id*30 + chip.chipID rng_readout = galsim.BaseDeviate(seed) readout_noise = galsim.GaussianNoise( rng=rng_readout, sigma=chip.read_noise) chip.img.addNoise(readout_noise) return chip, filt, tel, pointing def apply_gain(self, chip, filt, tel, pointing, catalog, obs_param): self.chip_output.Log_info(" Applying Gain") if obs_param["gain_16channel"] is True: chip.img, chip.gain_channel = effects.ApplyGainNonUniform16(chip.img, gain=chip.gain, nsecy=chip.nsecy, nsecx=chip.nsecx, seed=self.overall_config["random_seeds"]["seed_gainNonUniform"]+chip.chipID) elif obs_param["gain_16channel"] is False: chip.img /= chip.gain chip.gain_channel = np.ones(chip.nsecy*chip.nsecx)*chip.gain return chip, filt, tel, pointing def quantization_and_output(self, chip, filt, tel, pointing, catalog, obs_param): if not hasattr(self, 'h_ext'): _, _ = self.prepare_headers(chip=chip, pointing=pointing) self.updateHeaderInfo(header_flag='ext', keys=['SHTSTAT', 'SHTOPEN1', 'SHTCLOS0', 'SHTCLOS1', 'EXPTIME'], values=[ False, self.h_ext['SHTOPEN0'], self.h_ext['SHTOPEN0'], self.h_ext['SHTOPEN0'], 0.0]) # renew header info datetime_obs = datetime.utcfromtimestamp(pointing.timestamp) datetime_obs = datetime_obs.replace(tzinfo=timezone.utc) t_obs = Time(datetime_obs) # ccd刷新2s,等待0.5s,开灯后等待0.5s,开始曝光 t_obs_renew = Time(t_obs.mjd - 2. / 86400., format="mjd") t_obs_utc = datetime.utcfromtimestamp(np.round(datetime.utcfromtimestamp( t_obs_renew.unix).replace(tzinfo=timezone.utc).timestamp(), 1)) self.updateHeaderInfo(header_flag='prim', keys=[ 'DATE-OBS'], values=[t_obs_utc.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-5]]) gains1 = list(chip.gain_channel[0:8]) gains2 = list(chip.gain_channel[8:]) gains2.reverse() gains = np.append(gains1, gains2) self.updateHeaderInfo(header_flag='ext', keys=['GAIN01', 'GAIN02', 'GAIN03', 'GAIN04', 'GAIN05', 'GAIN06', 'GAIN07', 'GAIN08', 'GAIN09', 'GAIN10', 'GAIN11', 'GAIN12', 'GAIN13', 'GAIN14', 'GAIN15', 'GAIN16'], values=gains) if obs_param["format_output"] is True: self.chip_output.Log_info(" Apply 1*16 format") chip.img = chip_utils.formatOutput(GSImage=chip.img) chip.nsecy = 1 chip.nsecx = 16 chip.img.array[chip.img.array > 65535] = 65535 chip.img.replaceNegative(replace_value=0) chip.img.quantize() chip.img = galsim.Image(chip.img.array, dtype=np.uint16) fname = os.path.join(self.chip_output.subdir, self.h_prim['FILENAME'] + '.fits') # f_name_size = 68 # if (len(self.h_prim['FILENAME']) > f_name_size): # self.updateHeaderInfo(header_flag='prim', keys=['FILENAME'], values=[ # self.h_prim['FILENAME'][0:f_name_size]]) hdu1 = fits.PrimaryHDU(header=self.h_prim) self.updateHeaderInfo(header_flag='ext', keys=['DATASECT'], values=[ str(chip.img.array.shape[1]) + 'x' + str(chip.img.array.shape[0])]) hdu2 = fits.ImageHDU(chip.img.array, header=self.h_ext) hdu2.header.comments["XTENSION"] = "image extension" hdu = fits.HDUList([hdu1, hdu2]) hdu[0].add_datasum(when='data unit checksum') hdu[0].add_checksum(when='HDU checksum', override_datasum=True) hdu[1].add_datasum(when='data unit checksum') hdu[1].add_checksum(when='HDU checksum', override_datasum=True) hdu.writeto(fname, output_verify='ignore', overwrite=True) return chip, filt, tel, pointing