import os import yaml import numpy as np import scipy.ndimage as nd from astropy.io import fits import matplotlib.pyplot as plt import math from CpicImgSim.config import cpism_refdata, solar_spectrum, MAG_SYSTEM from CpicImgSim.utils import region_replace, random_seed_select from CpicImgSim.io import log from CpicImgSim.optics import filter_throughput def sky_frame_maker(band, skybg, platescale, shape): """ generate a sky background frame. Parameters ---------- band : str The band of the sky background. skybg : str The sky background file name. platescale : float The platescale of the camera in arcsec/pixel. shape : tuple The shape of the output frame. (y, x) Returns ------- sky_bkg_frame : numpy.ndarray The sky background frame. """ filter = filter_throughput(band) sk_spec = solar_spectrum.renorm(skybg, MAG_SYSTEM, filter) sky_bkg_frame = np.zeros(shape) sky_bkg_frame += (sk_spec * filter).integrate() * platescale**2 return sky_bkg_frame class CRobj(object): """ Cosmic ray object. Attributes ---------- flux : float The flux of the cosmic ray. angle : float The angle of the cosmic ray. sigma : float The width of the cosmic ray. length : int The length of the cosmic ray. """ def __init__(self, flux, angle, sigma, length) -> None: self.flux = flux self.angle = angle self.sigma = sigma self.length = length class CosmicRayFrameMaker(object): """ Cosmic ray frame maker. Parameters ---------- depth : float The depth of the camera pixel in um. pitch : float The pitch of the camera pixel in um. cr_rate : float The cosmic ray rate per second per cm2. """ def __init__(self) -> None: self.tmp_size = [7, 101] self.freq_power = -0.9 self.trail_std = 0.1 self.depth = 10 # um self.pitch = 13 # um self.cr_rate = 1 # particle per s per cm2 from Miles et al. 2021 def make_CR(self, length, sigma, seed=-1): """ make a image of cosmic ray with given length and sigma. Parameters ---------- length : int The length of the cosmic ray in pixel. sigma : float The width of the cosmic ray in pixel. Returns ------- output : numpy.ndarray The image of cosmic ray. """ h = self.tmp_size[0] w = self.tmp_size[1] freq = ((w-1)/2-np.abs(np.arange(w)-(w-1)/2)+1)**(self.freq_power) x = np.arange(w) - (w-1)/2 hl = (length-1)/2 x_wing = np.exp(-(np.abs(x)-hl)**2/sigma/sigma/2) x_wing[np.abs(x) < hl] = 1 cr = np.zeros([h, w]) center = (h-1)/2 for i in range(h): phase = np.random.rand(w)*2*np.pi trail0 = abs(np.fft.fft(freq*np.sin(phase) + 1j*x*np.cos(phase))) # TODO maybe somthing wrong trail_norm = (trail0 - trail0.mean())/trail0.std() cr[i, :] = np.exp(-(i - center)**2/sigma/sigma/2) \ * (trail_norm * self.trail_std + 1) * x_wing output = np.zeros([w, w]) d = (w-h)//2 output[d:d+h, :] = cr return output def _length_rand(self, N, seed=-1): """ randomly generate N cosmic ray length. """ len_out = [] seed = random_seed_select(seed=seed) log.debug(f"cr length seed: {seed}") for i in range(N): x2y2 = 2 while x2y2 > 1: lx = 1 - 2 * np.random.rand() ly = 1 - 2 * np.random.rand() x2y2 = lx * lx + ly * ly z = 1 - 2 * x2y2 r = 2 * np.sqrt(x2y2 * (1 - x2y2)) length = abs(r / z * self.depth) pitch = self.pitch len_out.append(int(length/pitch)) return np.array(len_out) def _number_rand(self, expt, pixsize, random=False, seed=-1): """ randomly generate the number of cosmic rays. """ area = (self.pitch / 1e4)**2 * pixsize[0] * pixsize[1] ncr = area * expt * self.cr_rate if random: seed = random_seed_select(seed=seed) log.debug(f"cr count: {seed}") ncr = np.random.poisson(ncr) else: ncr = int(ncr) self.ncr = ncr return ncr def _sigma_rand(self, N, seed=-1): """ randomly generate N cosmic ray sigma. """ sig_sig = 0.5 # asuming the sigma of size of cosmic ray is 0.5 seed = random_seed_select(seed=seed) log.debug(f"cr width seed: {seed}") sig = abs(np.random.randn(N))*sig_sig + 1/np.sqrt(12) * 1.2 # assume sigma is 1.2 times of pictch sig return sig def _flux_rand(self, N, seed=-1): """ randomly generate N cosmic ray flux. """ seed = random_seed_select(seed=seed) log.debug(f"cr flux seed: {seed}") u = np.random.rand(N) S0 = 800 lam = 0.57 S = (-np.log(1-u)/lam + S0**0.25)**4 return S def random_CR_parameter(self, expt, pixsize): """ randomly generate cosmic ray parameters, including number, length, flux, sigma and angle. Parameters ---------- expt : float The exposure time in second. pixsize : list The size of the image in pixel. Returns ------- CRs : list A list of cosmic ray objects. """ N = self._number_rand(expt, pixsize) log.debug(f"cr count: {N}") length = self._length_rand(N) if N > 0: log.debug(f"cr length, max: {length.max()}, min: {length.min()}") flux = self._flux_rand(N) log.debug(f"cr flux, max: {flux.max()}, min: {flux.min()}") sig = self._sigma_rand(N) log.debug(f"cr width, max: {sig.max()}, min: {sig.min()}") seed = random_seed_select(seed=-1) log.debug(f"cr angle seed: {seed}") angle = np.random.rand(N) * 180 CRs = [] for i in range(N): CRs.append(CRobj(flux[i], angle[i], sig[i], length[i])) return CRs def make_cr_frame(self, shape, expt, seed=-1): """ make a cosmic ray frame. Parameters ---------- shape : list The size of the image in pixel. expt : float The exposure time in second. seed : int, optional The random seed. The default is -1. If seed is -1, the seed will be randomly selected. Returns ------- image : numpy.ndarray The cosmic ray frame. """ image = np.zeros(shape) sz = shape cr_array = self.random_CR_parameter(expt, shape) cr_center = (self.tmp_size[1] - 1)/2 seed = random_seed_select(seed=seed) log.debug(f"cr position seed: {seed}") for i in range(len(cr_array)): cr = cr_array[i] x = np.random.rand() * sz[1] y = np.random.rand() * sz[0] cr_img = self.make_CR(cr.length, cr.sigma) cr_img *= cr.flux cr_img = abs(nd.rotate(cr_img, cr.angle, reshape=False)) if i == 0: pdin = False else: pdin = True if i == len(cr_array) - 1: pdout = False else: pdout = True image = region_replace( image, cr_img, [y-cr_center, x-cr_center], padded_in=pdin, padded_out=pdout, subpix=True ) image = np.maximum(image, 0) log.debug(f"cr image max: {image.max()}, min: {image.min()}") return image class CpicVisEmccd(object): def __init__(self): self._defaut_config() self.config_init() @classmethod def from_config(cls, config): obj = cls.__new__() obj._defaut_config() obj.__dict__.update(config) obj._refresh_config() return obj def _defaut_config(self): self.plszx = 1024 self.plszy = 1024 self.pscan1 = 8 self.pscan2 = 0 self.oscan1 = 16 self.oscan2 = 18 self.udark = 6 self.bdark = 2 self.ldark = 16 self.rdark = 16 self.max_adu = 16_383 self.switch = { 'flat': True, 'dark': True, 'stripe': True, 'cic': True, 'cte': True, 'badcolumn': True, 'nonlinear': True, 'cosmicray': True, 'blooming': False, 'bias_vp': True, 'bias_hp': True, 'bias_ci': True, 'bias_shift': True, } self.dark_file = cpism_refdata + '/camera/emccd_dark_current.fits' self.flat_file = cpism_refdata + '/camera/emccd_flat_field.fits' self.cic_file = cpism_refdata + '/camera/emccd_cic.fits' self.bad_col_file = cpism_refdata + '/camera/emccd_bad_columns.fits' self.cic = 0.2 self.fullwell = 80_000 self.em_fullwell = 500_000 #780_000 self.em_cte = 0.9996 self.emreg_cal_num = 10 # 用来加速计算 self.emreg_num = 604 self.readout_speed = 6.25e6 # Hz self.readout_time = 0.365 # s self.heat_speed = 1000 # voltage / 1000 degree per frame self.temper_speed = 0.05 # degree per second self.cooler_temp = -80 self.readout_noise = 160 self.ph_per_adu = 59 self.bias_level = 200 self.vertical_param = [0, 14] self.horizontal1_param = [5.3/2, 12, 5/2, 50] self.horizontal2_param = [5.3/4, 16.17, 5/4, 76.65] self.bias_hp_resd = 2 self.cooler_interfence = 20 self.bias_level_std = 3 self.bias_shift_per_volt = -1.34 self.shift_time = 1 / 1000 def shutter_effect(self, image, expt): image_mean = image.mean(axis=0) image_shutter = np.broadcast_to(image_mean, image.shape) image_shutter /= image_shutter.mean() image_shutter *= image.mean() * self.shift_time / expt return image_shutter.astype(int) def config_init(self): self._img_index = 0 self._ccd_temp = self.cooler_temp self._vertical_i0 = np.random.randint(0, 2) self._frame_read_time = 2080 * 1055 / 6.25e6 self.flat_shape = [self.plszy, self.plszx] darksz_x = self.plszx + self.rdark + self.ldark darksz_y = self.plszy + self.udark + self.bdark self.dark_shape = [darksz_y, darksz_x] biassz_x = darksz_x + self.pscan1 + self.oscan1 biassz_y = darksz_y + self.pscan2 + self.oscan2 self.image_shape = [biassz_y, biassz_x] self.flat = fits.getdata(self.flat_file) # self.cic = fits.getdata(self.cic_file) self.dark = fits.getdata(self.dark_file) self.bad_col = fits.getdata(self.bad_col_file) self.ccd_temp = self.cooler_temp self.system_time = 0 self.time_syn(0, initial=True) self.emgain_set(1024, -30) def CTE_cal(n, N_p, CTE, S_0): ''' CTE_cal(order of trail pixels, number of pixel transfers, CTE, initial intensity of target pixel) ''' CTI = 1 - CTE S_0_n = S_0 * ((N_p * CTI) ** n) / math.factorial(n) * math.exp(-N_p * CTI) return S_0_n def cte_201(cte, start=0, length=10): N_p = 604 S_0 = 1 res = [0]*start for n in range(length): s = CTE_cal(n, N_p, cte, S_0) res.append(s) return np.array(res) cti_trail = cte_201(self.em_cte, start=0, length=10) self.cti_trail = cti_trail / cti_trail.sum() def bias_frame(self, show=False): shape = self.image_shape TPI = np.pi * 2 # vertical pattern # 使用一维的曲线描述竖条纹的截面 vp_1d = np.zeros(shape[1]) # 以下代码用于模拟垂直间隔的竖条纹在奇偶幅图像时的不同表现 # 后续相机更新过程中,已经将改特性进行修改,因此不再使用此代码 # 这个代码写的还是有点意思,所以保留下来了 # vp_1d[0::2] = self.vertical_param[self._vertical_i0] # self._vertical_i0 = 1 - self._vertical_i0 # vp_1d[1::2] = self.vertical_param[self._vertical_i0] vp_1d[0::2] = self.vertical_param[self._vertical_i0] vp_1d[1::2] = self.vertical_param[1 - self._vertical_i0] vp_frame = np.zeros(shape) if self.switch['bias_vp']: vp_frame += vp_1d if show: # pragma: no cover plt.figure(figsize=(10, 3)) plt.plot(vp_1d) plt.xlim([0, 100]) plt.xlabel('x-axis') plt.ylabel('ADU') plt.title('vertical pattern') # horizontal pattern # 图像上的横条纹是梳状,分为两个部分,左边大约77个像素是周期小一点,其余的会大一点 boundary = 77 # boundary between left and width boundary_width = 5 # 左右需要平滑过度一下 y = np.arange(self.image_shape[0]) hp_left_param = self.horizontal1_param # 实测数据拟合得到的 hp_left_1d = hp_left_param[0] * np.sin(TPI * (y / hp_left_param[1] + np.random.rand())) hp_left_1d += hp_left_param[2] * np.sin(TPI * (y / hp_left_param[3] + np.random.rand())) hp_left_frame = np.broadcast_to(hp_left_1d, [boundary+boundary_width, len(hp_left_1d),]).T hp_right_param = self.horizontal2_param hp_right_1d = hp_right_param[0] * np.sin(TPI * (y / hp_right_param[1] + np.random.rand())) hp_right_1d += hp_right_param[2] * np.sin(TPI * (y / hp_right_param[3] + np.random.rand())) hp_right_frame = np.broadcast_to(hp_right_1d, [shape[1] - boundary, len(hp_right_1d)]).T combine_profile_left = np.ones(boundary + boundary_width) combine_profile_left[-boundary_width:] = (boundary_width - np.arange(boundary_width) - 1) / boundary_width combine_profile_right = np.ones(shape[1] - boundary) combine_profile_right[:boundary_width] = np.arange(boundary_width)/boundary_width hp_frame = np.zeros(shape) hp_frame[:, :boundary+boundary_width] = hp_left_frame * combine_profile_left hp_frame[:, boundary:] = hp_right_frame * combine_profile_right # residual frame 横条纹外,还有一个垂直方向的梯度,根据测试数据,使用直线加指数函数的方式来拟合 exp_a, exp_b, lin_a, lin_b = (-1.92377463e+00, 1.32698365e-01, 8.39509583e-04, 4.25384480e-01) y_trend = exp_a * np.exp(-(y + 1) * exp_b) + y * lin_a - lin_b # random horizontal pattern generated in frequency domain # 除了规则横条纹外,还有随机的横条纹,随机的横条纹在频域空间生成,具有相同的频率谱和随机的相位 rsd_freq_len = len(y_trend) * 4 red_freq = np.arange(rsd_freq_len) red_freq = red_freq - (len(red_freq) - 1)/2 red_freq = np.exp(-red_freq**2 / 230**2) * 240 + np.random.randn(rsd_freq_len)*30 red_freq = np.fft.fftshift(red_freq) phase = np.random.rand(rsd_freq_len) * TPI hp_rsd_1d = np.fft.ifft(np.exp(1j * phase) * red_freq) hp_rsd_1d = np.abs(hp_rsd_1d) * self.bias_hp_resd hp_rsd_1d = hp_rsd_1d[:rsd_freq_len//4] hp_rsd_1d = hp_rsd_1d - np.mean(hp_rsd_1d) hp_rsd_1d = y_trend + hp_rsd_1d hp_frame = (hp_frame.T + hp_rsd_1d).T if not self.switch['bias_hp']: hp_frame *= 0 if show: plt.figure(figsize=(10, 3)) # plt.plot(hp_right_1d) plt.plot(hp_rsd_1d) # plt.xlim([0, 200]) plt.xlabel('y-axis') plt.ylabel('ADU') plt.title('vertical pattern') # 接上制冷机后,会有亮暗点 #cooler interfence effect ci_position = 10 ci_sub_struct = 80 ci_sub_exp = 2.5 ci_x_shft = 3 ci_interval = 250 # 6.25MHz readout / 2.5KHz interfence ci_dn = self.cooler_interfence npix = shape[0] * shape[1] n_ci_event = npix // ci_interval ci_align = np.zeros((n_ci_event, ci_interval)) ci_align[:, ci_position] = np.random.randn(n_ci_event) * ci_dn ci_align[:, ci_position+1] = np.random.randn(n_ci_event) * ci_dn yi0 = np.random.randint(0, ci_sub_struct) xs0 = (ci_interval - ci_position) / (ci_sub_struct / 2)**ci_sub_exp for yi in range(n_ci_event): sub_yi = (yi - yi0) % ci_sub_struct sub_yi = abs(sub_yi - ci_sub_struct / 2) shiftx = int(sub_yi**ci_sub_exp * xs0) ci_align[yi, :] = np.roll(ci_align[yi, :], shiftx) ci_align = np.pad(ci_align.flatten(), (0, npix-n_ci_event*ci_interval)) ci_frame = ci_align.reshape(shape[0], shape[1]) for yi in range(shape[0]): ci_frame[yi, :] = np.roll(ci_frame[yi, :], yi * ci_x_shft) if not self.switch['bias_ci']: ci_frame *= 0 bias_shift = 0 if self.switch['bias_shift']: bias_shift = (self.volt - 25) * self.bias_shift_per_volt # 混合在一起 rn_adu = self.readout_noise / self.ph_per_adu bias_level = self.bias_level + np.random.randn() * self.bias_level_std bias_frame = vp_frame + ci_frame + hp_frame + bias_level bias_frame += rn_adu * np.random.randn(shape[0], shape[1]) + bias_shift return bias_frame def nonlinear_effect(self, image): """ nonlinear effect """ fullwell = self.fullwell nonlinear_coefficient = 0.1 log.debug( f"nonlinear effect added with coefficient {nonlinear_coefficient}") image += (image / fullwell)**2 * nonlinear_coefficient * fullwell return image def time_syn(self, t, readout=True, initial=False): if initial: self.ccd_temp = self.cooler_temp self.system_time = t return dt = t heat = 0 if readout: heat = self.volt / self.heat_speed self.ccd_temp = heat + self.cooler_temp + (self.ccd_temp - self.cooler_temp) * np.exp(-dt * self.temper_speed) if self.ccd_temp < self.cooler_temp: # self.ccd_temp = self.cooler_temp self.system_time += dt # def em_cte(self, img): # i_shift = 1 # cte_coe = 0.1 # img_shift_i = np.zeros_like(img) # img_shift_i = np.random.poisson(img * cte_coe) # pass def emgain_set(self, em_set, ccd_temp=None, self_update=True): if ccd_temp is None: ccd_temp = self.ccd_temp volt_coe_a = -0.01828 volt_coe_b = 43.61 volt_func = lambda es: volt_coe_a * es + volt_coe_b self.volt = volt_func(em_set) volt_3ff = volt_func(int('3ff', 16)) volt_190 = volt_func(int('190', 16)) em_coe_c = 0.24 # using the expression of em_b = ln(g199) / constant to make fitting easier constant = (np.exp(em_coe_c * volt_190) - np.exp(em_coe_c * volt_3ff)) # fitting from the ccd test result ln_g190 = (-ccd_temp - 7) * 0.0325 em_coe_b = ln_g190 / constant emgain = np.exp(em_coe_b * np.exp(em_coe_c * self.volt)) emgain = np.maximum(1, emgain) # print(emgain, em_coe_b, em_coe_c * self.volt, self.volt, np.exp(em_coe_c * self.volt)) if self_update: self.emgain = emgain self.emset = em_set return emgain def add_em_cte_conv(self, image): shape = image.shape img_line = np.convolve(image.flatten(), self.cti_trail, mode='full') return img_line[:shape[0]*shape[1]].reshape(shape) def readout(self, image_focal, em_set, expt_set, image_cosmic_ray=False): expt = expt_set if expt_set == 0: expt = 0.001 dt = self.readout_time + expt self.time_syn(dt, readout=True) emgain = self.emgain_set(em_set, self.ccd_temp) image = image_focal if self.switch['flat']: image = image * self.flat if self.switch['nonlinear']: image = self.nonlinear_effect(image) darksz_x = self.plszx + self.rdark + self.ldark darksz_y = self.plszy + self.udark + self.bdark img_dark = np.zeros((darksz_y, darksz_x)) img_dark[ self.bdark:self.plszy+self.bdark, self.ldark:self.ldark+self.plszx ] = image image = img_dark if self.switch['dark']: image += self.dark image *= expt if self.switch['nonlinear']: image = self.nonlinear_effect(image) if self.switch['cic']: image += self.cic if image_cosmic_ray is not None and self.switch['cosmicray']: image += image_cosmic_ray if self.switch['blooming']: image = self.vertical_blooming(image) if self.switch['badcolumn']: for i in range(self.bad_col.shape[1]): deadpix_x = self.bad_col[0, i] deadpix_y = self.bad_col[1, i] image[deadpix_y:, deadpix_x] = 0 biassz_x = darksz_x + self.pscan1 + self.oscan1 biassz_y = darksz_y + self.pscan2 + self.oscan2 img_bias = np.zeros((biassz_y, biassz_x), dtype=int) img_bias[ self.pscan2:self.pscan2+darksz_y, self.pscan1:self.pscan1+darksz_x ] = np.random.poisson(image) sub_img = image[100:-100, 100:-100] print(np.random.poisson(sub_img).std(), sub_img.mean()) image = img_bias lower_limit_n = int(np.log(emgain)/np.log(1 + 0.2)) # to make pEM < 0.2 emregister_num = np.maximum(lower_limit_n, self.emreg_cal_num) emregister_num = np.minimum(emregister_num, self.emreg_num) pEM = np.exp(np.log(emgain)/emregister_num) - 1 big_cte = self.em_cte ** (self.emreg_num / emregister_num) image += np.random.poisson(image ) # for _ in range(emregister_num): # # image += np.random.binomial(image, pEM) # residual = np.random.binomial(image, 1-big_cte) # image[:, 1:] += residual[:, :-1] - residual[:, 1:] sub_img = image[100:-100, 100:-100] / emgain print(sub_img.std(), sub_img.mean(), emgain) bias = self.bias_frame() image = image / self.ph_per_adu + bias image = np.minimum(image, self.max_adu) image = np.maximum(image, 0) return image#.astype(dtype=np.uint16) class EMCCD(object): """ EMCCD camera class Parameters ---------- config_file : str config file name Attributes ---------- switch : dict switch for each camera effects, including: - 'flat': bool, - 'dark': bool, - 'stripe': bool, - 'cic': bool, - 'cte': bool, - 'badcolumn': bool, - 'nonlinear': bool, - 'cosmicray': bool, - 'blooming': bool, """ def __init__(self, config_file="ccd201_config.yaml"): self.plszx = 1024 self.plszy = 1024 self.pscan1 = 8 self.pscan2 = 0 self.oscan1 = 16 self.oscan2 = 18 self.udark = 6 self.bdark = 2 self.ldark = 16 self.rdark = 16 self.fullwell = 80_000 self.em_fullwell = 780_000 # if config file exists, load it, otherwise use default values config_file = cpism_refdata + '/camera/' + config_file log.debug(f"emccd config file: {config_file}") if os.path.exists(config_file): self.load_config(config_file) else: raise(ValueError('config_file error')) self.flat_shape = [self.plszy, self.plszx] darksz_x = self.plszx + self.rdark + self.ldark darksz_y = self.plszy + self.udark + self.bdark self.dark_shape = [darksz_y, darksz_x] biassz_x = darksz_x + self.pscan1 + self.oscan1 biassz_y = darksz_y + self.pscan2 + self.oscan2 self.image_shape = [biassz_y, biassz_x] self.flat = fits.getdata(self.flat_file) self.cic = fits.getdata(self.cic_file) self.dark = fits.getdata(self.dark_file) self.bad_col = fits.getdata(self.bad_col_file) def load_config(self, config_file): """ load config file. Only for internal use. """ with open(config_file, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) self.switch = config['switch'] self.readout_noise = config['readout_noise'] self.ph_per_adu = config['ph_per_adu'] self.bias_level = config['bias_level'] self.max_adu = config['max_adu'] self.dark_file = cpism_refdata + "/camera/" + config['dark_file'] self.flat_file = cpism_refdata + "/camera/" + config['flat_file'] self.cic_file = cpism_refdata + "/camera/" + config['cic_file'] self.bad_col_file = cpism_refdata + \ "/camera/" + config['bad_col_file'] def vertical_blooming(self, image): """ vertical blooming effect """ fullwell = self.fullwell line = np.arange(image.shape[0]) yp, xp = np.where(image > fullwell) n_saturated = len(xp) log.debug(f"{len(xp)} pixels are saturated!") if n_saturated > 5000: log.warning(f"More than 5000({len(xp)}) pixels are saturated!") img0 = image.copy() for x, y in zip(xp, yp): image[:, x] += np.exp(-(line-y)**2/20**2) * img0[y, x] * 0.2 return np.minimum(image, fullwell) def nonlinear_effect(self, image): """ nonlinear effect """ fullwell = self.fullwell nonlinear_coefficient = 0.1 log.debug( f"nonlinear effect added with coefficient {nonlinear_coefficient}") image += (image / fullwell)**2 * nonlinear_coefficient * fullwell return image def emregester_blooming(self, image, max_iteration=5): """ emregester blooming effect """ line = image.flatten().copy() curve_x = np.arange(1300)+2 curve_y = np.exp(11*curve_x**(-0.19)-11) curve_y[0] = 0 curve_y /= curve_y.sum() over_limit_coe = 0.999 saturated = image > self.em_fullwell n_saturated = saturated.sum() if n_saturated > 0: log.debug(f"{n_saturated} pixels are saturated during EM process.") if n_saturated > 2000: log.warning( f"More than 2000 ({n_saturated}) pixels are saturated during EM process!") for index in range(max_iteration): over_limit = np.maximum( line - self.em_fullwell * over_limit_coe, 0) line = np.minimum(line, self.em_fullwell * over_limit_coe) blooming = np.convolve(over_limit, curve_y, mode='full')[ :len(line)] line = line + blooming n_over = (line > self.em_fullwell).sum() if n_over <= 0: break log.debug( f'{index}/{max_iteration} loop: saturated pixel number: {n_over}') return line.reshape(image.shape) def cte(self, image): """ cte effect """ image = self.emregester_blooming(image, max_iteration=5) return image def readout(self, image_focal, emgain, expt, image_cosmic_ray=None): """ emccd readout. Generate a image with emccd readout effect. Parameters ---------- image_focal : numpy.ndarray image at focal plane. Unit: electron / second emgain : float emgain of emccd expt : float exposure time. Unit: second image_cosmic_ray : numpy.ndarray, optional cosmic ray image. Unit: electron, by default None Returns ------- numpy.ndarray image with emccd readout effect. Unit: ADU Notes ----- 1. effects include: dark, flat, cte, blooming, nonlinear, etc. Can be turned on/off by switch. 2. size of input image_focal must be 1024x1024 3. size of output image is 1080x1056 (including overscan and dark reference region) 4. Q.E is not included in this function. It should be included in image_focal. See optics.py for details. """ log.debug( fr"EMCCD readout: {emgain=:}, {expt=:}, image_comic_ray:{'None' if image_cosmic_ray is None else 'Not None'}") log.debug( f"camera effects switch={self.switch}" ) image = image_focal if self.switch['flat']: image = image * self.flat if self.switch['nonlinear']: image = self.nonlinear_effect(image) darksz_x = self.plszx + self.rdark + self.ldark darksz_y = self.plszy + self.udark + self.bdark img_dark = np.zeros((darksz_y, darksz_x)) img_dark[ self.bdark:self.plszy+self.bdark, self.ldark:self.ldark+self.plszx ] = image image = img_dark if self.switch['dark']: image += self.dark image *= expt if self.switch['cic']: image += self.cic if image_cosmic_ray is not None and self.switch['cosmicray']: image += image_cosmic_ray if self.switch['blooming']: image = self.vertical_blooming(image) if self.switch['badcolumn']: for i in range(self.bad_col.shape[1]): deadpix_x = self.bad_col[0, i] deadpix_y = self.bad_col[1, i] image[deadpix_y:, deadpix_x] = 0 biassz_x = darksz_x + self.pscan1 + self.oscan1 biassz_y = darksz_y + self.pscan2 + self.oscan2 img_bias = np.zeros((biassz_y, biassz_x), dtype=int) seed = random_seed_select() log.debug(f"photon noise seed: {seed}") print(image.mean()) img_bias[ self.pscan2:self.pscan2+darksz_y, self.pscan1:self.pscan1+darksz_x ] = np.random.poisson(image) image = img_bias if self.switch['cte']: image = self.cte(image * emgain) / emgain seed = random_seed_select() log.debug(f"gamma noise seed: {seed}") if emgain != 1: image = np.random.gamma(image, emgain) image = np.minimum(image, self.em_fullwell) seed = random_seed_select() log.debug(f"readout noise seed: {seed}") image += np.random.randn(biassz_y, biassz_x) * self.readout_noise image = image / self.ph_per_adu + self.bias_level if self.switch['stripe']: image += self.add_stripe_effect(image) image = np.minimum(image, self.max_adu) image = np.maximum(image, 0) return image.astype(np.uint16) def add_stripe_effect(self, image): """ add stripe effect """ shape = image.shape v_width = 1 v_amplitude = 30 v_limit = 0.01 v_base = 10 h_width = 20 h_amplitude = 10 h_limit = 0.9 h_base = 20 index = np.linspace(0, np.pi, shape[0] * shape[1]) def stripe(width, limit, amplitude, base, axis=0): seed = random_seed_select() log.debug(f"stripe noise seed: {seed}") dim_axis = shape[axis] dim_other = shape[0] * shape[1] // shape[axis] value = np.sin(index / width * dim_axis + np.pi * dim_axis / width * np.random.randint(1024)) value = np.maximum(value, -limit) value = np.minimum(value, limit) value = (value / limit + limit) / 2 * amplitude + base value = value.reshape(dim_axis, dim_other) if axis == 1: value = value.T return value output = stripe(v_width, v_limit, v_amplitude, v_base, axis=1) output += stripe(h_width, h_limit, h_amplitude, h_base, axis=0) return output # # plt.plot(horizontal_index, horizontal_value) # # # plt.xlim([0, 6.28]) # # plt.show() # fits.writeto('horizontal_value.fits', output, overwrite=True) # if __name__ == '__main__': # import matplotlib.pyplot as plt # emccd = EMCCD() # image_focal = np.zeros((emccd.plszy, emccd.plszx)) + 1000 # image_focal[100:105, 100:105] = 10_000_000 # after_cte = emccd.emregester_blooming(image_focal, max_iteration=100) # print(after_cte.sum(), image_focal.sum()) # fits.writeto('after_cte.fits', after_cte, overwrite=True) # # darksz_x = emccd.plszx + emccd.rdark + emccd.ldark # # darksz_y = emccd.plszy + emccd.udark + emccd.bdark # # iamge_cosmic_ray = np.zeros((darksz_y, darksz_x)) # # emgain = 10 # # expt = 10 # # image = emccd.readout(image_focal, emgain, expt, iamge_cosmic_ray) # # fits.writeto('test.fits', image, overwrite=True) # image = np.zeros((1000, 1000)) # make_cosmic_ray_frame = CosmicRayFrameMaker() # crimage = make_cosmic_ray_frame(image.shape, 3000) # fits.writeto('crimage.fits', crimage, overwrite=True) # # emccd.add_stripe_effect(image)