import os import galsim import ctypes import numpy as np from astropy.io import fits from datetime import datetime from observation_sim.instruments.chip import effects from observation_sim.config.header import generatePrimaryHeader, generateExtensionHeader try: import importlib.resources as pkg_resources except ImportError: # Try backported to PY<37 'importlib_resources' import importlib_resources as pkg_resources def log_info(msg, logger=None): if logger: logger.info(msg) else: print(msg, flush=True) def getChipSLSGratingID(chipID): gratingID = ['', ''] if chipID == 1: gratingID = ['GI2', 'GI1'] if chipID == 2: gratingID = ['GV4', 'GV3'] if chipID == 3: gratingID = ['GU2', 'GU1'] if chipID == 4: gratingID = ['GU4', 'GU3'] if chipID == 5: gratingID = ['GV2', 'GV1'] if chipID == 10: gratingID = ['GI4', 'GI3'] if chipID == 21: gratingID = ['GI6', 'GI5'] if chipID == 26: gratingID = ['GV8', 'GV7'] if chipID == 27: gratingID = ['GU6', 'GU5'] if chipID == 28: gratingID = ['GU8', 'GU7'] if chipID == 29: gratingID = ['GV6', 'GV5'] if chipID == 30: gratingID = ['GI8', 'GI7'] return gratingID def getChipSLSConf(chipID): confFile = '' if chipID == 1: confFile = ['CSST_GI2.conf', 'CSST_GI1.conf'] if chipID == 2: confFile = ['CSST_GV4.conf', 'CSST_GV3.conf'] if chipID == 3: confFile = ['CSST_GU2.conf', 'CSST_GU1.conf'] if chipID == 4: confFile = ['CSST_GU4.conf', 'CSST_GU3.conf'] if chipID == 5: confFile = ['CSST_GV2.conf', 'CSST_GV1.conf'] if chipID == 10: confFile = ['CSST_GI4.conf', 'CSST_GI3.conf'] if chipID == 21: confFile = ['CSST_GI6.conf', 'CSST_GI5.conf'] if chipID == 26: confFile = ['CSST_GV8.conf', 'CSST_GV7.conf'] if chipID == 27: confFile = ['CSST_GU6.conf', 'CSST_GU5.conf'] if chipID == 28: confFile = ['CSST_GU8.conf', 'CSST_GU7.conf'] if chipID == 29: confFile = ['CSST_GV6.conf', 'CSST_GV5.conf'] if chipID == 30: confFile = ['CSST_GI8.conf', 'CSST_GI7.conf'] return confFile def generateHeader(chip, pointing, img_type=None, img_type_code=None, project_cycle='9', run_counter='1'): if (img_type is None) or (img_type_code is None): img_type = pointing.pointing_type img_type_code = pointing.pointing_type_code h_prim = generatePrimaryHeader( xlen=chip.npix_x, ylen=chip.npix_y, pointing_id=pointing.obs_id, pointing_type_code=img_type_code, ra=pointing.ra, dec=pointing.dec, pixel_scale=chip.pix_scale, time_pt=pointing.timestamp, exptime=pointing.exp_time, im_type=img_type, sat_pos=[pointing.sat_x, pointing.sat_y, pointing.sat_z], sat_vel=[pointing.sat_vx, pointing.sat_vy, pointing.sat_vz], project_cycle=project_cycle, run_counter=run_counter, chip_name=str(chip.chipID).rjust(2, '0')) h_ext = generateExtensionHeader( chip=chip, xlen=chip.npix_x, ylen=chip.npix_y, ra=pointing.ra, dec=pointing.dec, pa=pointing.img_pa.deg, gain=chip.gain, readout=chip.read_noise, dark=chip.dark_noise, saturation=90000, pixel_scale=chip.pix_scale, pixel_size=chip.pix_size, xcen=chip.x_cen, ycen=chip.y_cen, extName=img_type, timestamp=pointing.timestamp, exptime=pointing.exp_time, readoutTime=chip.readout_time, t_shutter_open=pointing.t_shutter_open, t_shutter_close=pointing.t_shutter_close) return h_prim, h_ext def output_fits_image(chip, pointing, img, output_dir, img_type=None, img_type_code=None, project_cycle='9', run_counter='1'): h_prim, h_ext = generateHeader( chip=chip, pointing=pointing, img_type=img_type, img_type_code=img_type_code, project_cycle=project_cycle, run_counter=run_counter) hdu1 = fits.PrimaryHDU(header=h_prim) hdu1.add_checksum() hdu1.header.comments['CHECKSUM'] = 'HDU checksum' hdu1.header.comments['DATASUM'] = 'data unit checksum' hdu2 = fits.ImageHDU(img.array, header=h_ext) hdu2.add_checksum() hdu2.header.comments['XTENSION'] = 'extension type' hdu2.header.comments['CHECKSUM'] = 'HDU checksum' hdu2.header.comments['DATASUM'] = 'data unit checksum' hdu1 = fits.HDUList([hdu1, hdu2]) fname = os.path.join(output_dir, h_prim['FILENAME']+'.fits') hdu1.writeto(fname, output_verify='ignore', overwrite=True) def add_sky_background(img, filt, exptime, sky_map=None, tel=None): # Add sky background if sky_map is None: sky_map = filt.getSkyNoise(exptime=exptime) sky_map = sky_map * np.ones_like(img.array) sky_map = galsim.Image(array=sky_map) # Apply Poisson noise to the sky map # # (NOTE): only for photometric chips if it utilizes the photon shooting to draw stamps # if self.survey_type == "photometric": # sky_map.addNoise(poisson_noise) elif img.array.shape != sky_map.shape: raise ValueError("The shape img and sky_map must be equal.") elif tel is not None: # If sky_map is given in flux sky_map = sky_map * tel.pupil_area * exptime img += sky_map return img, sky_map def get_flat(img, seed): flat_img = effects.MakeFlatSmooth( GSBounds=img.bounds, seed=seed) flat_normal = flat_img / np.mean(flat_img.array) return flat_img, flat_normal def add_cosmic_rays(img, chip, exptime=150, seed=0): cr_map, cr_event_num = effects.produceCR_Map( xLen=chip.npix_x, yLen=chip.npix_y, exTime=exptime+0.5*chip.readout_time, cr_pixelRatio=0.003*(exptime+0.5*chip.readout_time)/600., gain=chip.gain, attachedSizes=chip.attachedSizes, seed=seed) # seed: obj-imaging:+0; bias:+1; dark:+2; flat:+3; img += cr_map cr_map[cr_map > 65535] = 65535 cr_map[cr_map < 0] = 0 crmap_gsimg = galsim.Image(cr_map, dtype=np.uint16) del cr_map return img, crmap_gsimg, cr_event_num def add_PRNU(img, chip, seed=0): prnu_img = effects.PRNU_Img( xsize=chip.npix_x, ysize=chip.npix_y, sigma=0.01, seed=seed) img *= prnu_img return img, prnu_img def get_poisson(seed=0, sky_level=0.): rng_poisson = galsim.BaseDeviate(seed) poisson_noise = galsim.PoissonNoise(rng_poisson, sky_level=sky_level) return rng_poisson, poisson_noise def get_base_img(img, chip, read_noise, readout_time, dark_noise, exptime=150., InputDark=None): if InputDark is None: # base_level = read_noise**2 + dark_noise*(exptime+0.5*readout_time) # base_level = dark_noise*(exptime+0.5*readout_time) base_level = dark_noise*(exptime) base_img1 = base_level * np.ones_like(img.array) else: base_img1 = np.zeros_like(img.array) ny = int(chip.npix_y/2) nx = chip.npix_x arr = np.arange(ny).reshape(ny, 1) arr = np.broadcast_to(arr, (ny, nx)) base_img2 = np.zeros_like(img.array) base_img2[:ny, :] = arr base_img2[ny:, :] = arr[::-1, :] base_img2[:, :] = base_img2[:, :]*(readout_time/ny)*dark_noise return base_img1+base_img2 def add_poisson(img, chip, exptime=150., seed=0, sky_level=0., poisson_noise=None, dark_noise=None, InputDark=None): if poisson_noise is None: _, poisson_noise = get_poisson(seed=seed, sky_level=sky_level) read_noise = chip.read_noise if dark_noise is None: dark_noise = chip.dark_noise base_img = get_base_img(img=img, chip=chip, read_noise=read_noise, readout_time=chip.readout_time, dark_noise=dark_noise, exptime=exptime, InputDark=InputDark) img += base_img img.addNoise(poisson_noise) # img -= read_noise**2 if InputDark is not None: # "Instrument/data/dark/dark_1000s_example_0.fits" hdu = fits.open(InputDark) img += hdu[0].data/hdu[0].header['exptime']*exptime hdu.close() return img, base_img def add_brighter_fatter(img): # Inital dynamic lib try: with pkg_resources.files('observation_sim.instruments.chip.libBF').joinpath("libmoduleBF.so") as lib_path: lib_bf = ctypes.CDLL(lib_path) except AttributeError: with pkg_resources.path('observation_sim.instruments.chip.libBF', "libmoduleBF.so") as lib_path: lib_bf = ctypes.CDLL(lib_path) lib_bf.addEffects.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.POINTER( ctypes.c_float), ctypes.POINTER(ctypes.c_float), ctypes.c_int] # Set bit flag bit_flag = 1 bit_flag = bit_flag | (1 << 2) nx, ny = img.array.shape nn = nx * ny arr_ima = (ctypes.c_float*nn)() arr_imc = (ctypes.c_float*nn)() arr_ima[:] = img.array.reshape(nn) arr_imc[:] = np.zeros(nn) lib_bf.addEffects(nx, ny, arr_ima, arr_imc, bit_flag) img.array[:, :] = np.reshape(arr_imc, [nx, ny]) del arr_ima, arr_imc return img """ def add_inputdark(img, chip, exptime): fname = "/share/home/weichengliang/CSST_git/test_new_sim/csst-simulation/ObservationSim/Instrument/data/dark/dark_1000s_example_0.fits" hdu = fits.open(fname) #ny, nx = img.array.shape #inputdark = np.zeros([ny, nx]) img.array[:, :] += hdu[0].data/hdu[0].header['exptime']*exptime hdu.close() del inputdark return img """ def AddPreScan(GSImage, pre1=27, pre2=4, over1=71, over2=80, nsecy=2, nsecx=8): img = GSImage.array ny, nx = img.shape dx = int(nx/nsecx) dy = int(ny/nsecy) imgt = np.zeros( [int(nsecy*nsecx), int(ny/nsecy+pre2+over2), int(nx/nsecx+pre1+over1)]) for iy in range(nsecy): for ix in range(nsecx): if iy % 2 == 0: tx = ix else: tx = (nsecx-1)-ix ty = iy # chunk1-[1,2,3,4], chunk2-[5,6,7,8], chunk3-[9,10,11,12], chunk4-[13,14,15,16] chunkidx = int(tx+ty*nsecx) imgtemp = np.zeros( [int(ny/nsecy+pre2+over2), int(nx/nsecx+pre1+over1)]) if int(chunkidx/4) == 0: imgtemp[pre2:pre2+dy, pre1:pre1 + dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgt[chunkidx, :, :] = imgtemp if int(chunkidx/4) == 1: imgtemp[pre2:pre2+dy, over1:over1 + dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgt[chunkidx, :, :] = imgtemp # [:, ::-1] if int(chunkidx/4) == 2: imgtemp[over2:over2+dy, over1:over1 + dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgt[chunkidx, :, :] = imgtemp # [::-1, ::-1] if int(chunkidx/4) == 3: imgtemp[over2:over2+dy, pre1:pre1 + dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgt[chunkidx, :, :] = imgtemp # [::-1, :] # hstack chunk(1,2)-[1,2,3,4,5,6,7,8] imgtx1 = np.hstack(imgt[:nsecx:, :, :]) # hstack chunk(4,3)-[16,15,14,13,12,11,,10,9] imgtx2 = np.hstack(imgt[:(nsecx-1):-1, :, :]) newimg = galsim.Image(int(nx+(pre1+over1)*nsecx), int(ny+(pre2+over2)*nsecy), init_value=0) newimg.array[:, :] = np.concatenate( [imgtx1, imgtx2]) # vstack chunk(1,2) & chunk(4,3) newimg.wcs = GSImage.wcs return newimg def AddPreScanFO(GSImage, pre1=27, pre2=4, over1=71, over2=80, nsecy=1, nsecx=16): img = GSImage.array ny, nx = img.shape dx = int(nx/nsecx) dy = int(ny/nsecy) newimg = galsim.Image(int(nx+(pre1+over1)*nsecx), int(ny+(pre2+over2)*nsecy), init_value=0) for ix in range(nsecx): newimg.array[pre2:pre2+dy, pre1+ix * (dx+pre1+over1):pre1+dx+ix*(dx+pre1+over1)] = img[0:dy, 0+ix*dx:dx+ix*dx] newimg.wcs = GSImage.wcs return newimg def formatOutput(GSImage, nsecy=2, nsecx=8): img = GSImage.array ny, nx = img.shape dx = int(nx/nsecx) dy = int(ny/nsecy) imgt = np.zeros([int(nsecx*nsecy), dy, dx]) for iy in range(nsecy): for ix in range(nsecx): if iy % 2 == 0: tx = ix else: tx = (nsecx-1)-ix ty = iy chunkidx = int(tx+ty*nsecx) if int(chunkidx/4) == 0: imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] if int(chunkidx/4) == 1: imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] if int(chunkidx/4) == 2: imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] if int(chunkidx/4) == 3: imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgttx0 = np.hstack(imgt[0:4:, :, :]) imgttx1 = np.hstack(imgt[4:8:, :, ::-1]) imgttx2 = np.hstack(imgt[8:12:, ::-1, ::-1]) imgttx3 = np.hstack(imgt[12:16:, ::-1, :]) newimg = galsim.Image(int(dx*nsecx*nsecy), dy, init_value=0) newimg.array[:, :] = np.hstack([imgttx0, imgttx1, imgttx2, imgttx3]) return newimg def formatRevert(GSImage, nsecy=1, nsecx=16): img = GSImage.array ny, nx = img.shape dx = int(nx/nsecx) dy = int(ny/nsecy) newimg = galsim.Image(int(dx*8), int(dy*2), init_value=0) for ix in range(0, 4): tx = ix newimg.array[0:dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx] for ix in range(4, 8): tx = ix newimg.array[0:dy, 0+tx*dx:dx+tx * dx] = img[:, 0+ix*dx:dx+ix*dx][:, ::-1] for ix in range(8, 12): tx = 7-(ix-8) newimg.array[0+dy:dy+dy, 0+tx*dx:dx+tx * dx] = img[:, 0+ix*dx:dx+ix*dx][::-1, ::-1] for ix in range(12, 16): tx = 7-(ix-8) newimg.array[0+dy:dy+dy, 0+tx*dx:dx+tx * dx] = img[:, 0+ix*dx:dx+ix*dx][::-1, :] return newimg