import os import galsim import ctypes import numpy as np from astropy.io import fits from datetime import datetime from ObservationSim.Instrument.Chip import Effects as effects from ObservationSim.Config.Header import generatePrimaryHeader, generateExtensionHeader try: import importlib.resources as pkg_resources except ImportError: # Try backported to PY<37 'importlib_resources' import importlib_resources as pkg_resources def log_info(msg, logger=None): if logger: logger.info(msg) else: print(msg, flush=True) def getChipSLSGratingID(chipID): gratingID = ['',''] if chipID == 1: gratingID = ['GI2', 'GI1'] if chipID == 2: gratingID = ['GV4', 'GV3'] if chipID == 3: gratingID = ['GU2', 'GU1'] if chipID == 4: gratingID = ['GU4', 'GU3'] if chipID == 5: gratingID = ['GV2', 'GV1'] if chipID == 10: gratingID = ['GI4', 'GI3'] if chipID == 21: gratingID = ['GI6', 'GI5'] if chipID == 26: gratingID = ['GV8', 'GV7'] if chipID == 27: gratingID = ['GU6', 'GU5'] if chipID == 28: gratingID = ['GU8', 'GU7'] if chipID == 29: gratingID = ['GV6', 'GV5'] if chipID == 30: gratingID = ['GI8', 'GI7'] return gratingID def getChipSLSConf(chipID): confFile = '' if chipID == 1: confFile = ['CSST_GI2.conf', 'CSST_GI1.conf'] if chipID == 2: confFile = ['CSST_GV4.conf', 'CSST_GV3.conf'] if chipID == 3: confFile = ['CSST_GU2.conf', 'CSST_GU1.conf'] if chipID == 4: confFile = ['CSST_GU4.conf', 'CSST_GU3.conf'] if chipID == 5: confFile = ['CSST_GV2.conf', 'CSST_GV1.conf'] if chipID == 10: confFile = ['CSST_GI4.conf', 'CSST_GI3.conf'] if chipID == 21: confFile = ['CSST_GI6.conf', 'CSST_GI5.conf'] if chipID == 26: confFile = ['CSST_GV8.conf', 'CSST_GV7.conf'] if chipID == 27: confFile = ['CSST_GU6.conf', 'CSST_GU5.conf'] if chipID == 28: confFile = ['CSST_GU8.conf', 'CSST_GU7.conf'] if chipID == 29: confFile = ['CSST_GV6.conf', 'CSST_GV5.conf'] if chipID == 30: confFile = ['CSST_GI8.conf', 'CSST_GI7.conf'] return confFile def generateHeader(chip, ra_cen, dec_cen, img_rot, im_type, pointing_ID, exptime=150., project_cycle=6, run_counter=0, timestamp = 1621915200): datetime_obs = datetime.utcfromtimestamp(timestamp) date_obs = datetime_obs.strftime("%y%m%d") time_obs = datetime_obs.strftime("%H%M%S") h_prim = generatePrimaryHeader( xlen=chip.npix_x, ylen=chip.npix_y, pointNum = str(pointing_ID), ra=ra_cen, dec=dec_cen, pixel_scale=chip.pix_scale, date=date_obs, time_obs=time_obs, im_type = im_type, exptime=exptime, project_cycle=project_cycle, run_counter=run_counter, chip_name=str(chip.chipID).rjust(2, '0') ) h_ext = generateExtensionHeader( chip=chip, xlen=chip.npix_x, ylen=chip.npix_y, ra=ra_cen, dec=dec_cen, pa=img_rot.deg, gain=chip.gain, readout=chip.read_noise, dark=chip.dark_noise, saturation=90000, pixel_scale=chip.pix_scale, pixel_size=chip.pix_size, xcen=chip.x_cen, ycen=chip.y_cen, extName='SCIE', timestamp = timestamp, exptime = exptime, readoutTime = chip.readout_time) return h_prim, h_ext def outputCal(chip, img, ra_cen, dec_cen, img_rot, im_type, pointing_ID, output_dir, exptime=150., project_cycle=6, run_counter=0, timestamp = 1621915200): h_prim, h_ext = generateHeader( chip=chip, ra_cen=ra_cen, dec_cen=dec_cen, img_rot=img_rot, im_type=im_type, pointing_ID=pointing_ID, exptime=exptime, project_cycle=project_cycle, run_counter=run_counter, timestamp=timestamp) hdu1 = fits.PrimaryHDU(header=h_prim) hdu1.add_checksum() hdu1.header.comments['CHECKSUM'] = 'HDU checksum' hdu1.header.comments['DATASUM'] = 'data unit checksum' hdu2 = fits.ImageHDU(img.array, header=h_ext) hdu2.add_checksum() hdu2.header.comments['XTENSION'] = 'extension type' hdu2.header.comments['CHECKSUM'] = 'HDU checksum' hdu2.header.comments['DATASUM'] = 'data unit checksum' hdu1 = fits.HDUList([hdu1, hdu2]) fname = os.path.join(output_dir, h_prim['FILENAME']+'.fits') hdu1.writeto(fname, output_verify='ignore', overwrite=True) def add_sky_background(img, filt, exptime, sky_map=None, tel=None): # Add sky background if sky_map is None: sky_map = filt.getSkyNoise(exptime=exptime) sky_map = sky_map * np.ones_like(img.array) sky_map = galsim.Image(array=sky_map) # Apply Poisson noise to the sky map # # (NOTE): only for photometric chips if it utilizes the photon shooting to draw stamps # if self.survey_type == "photometric": # sky_map.addNoise(poisson_noise) elif img.array.shape != sky_map.shape: raise ValueError("The shape img and sky_map must be equal.") elif tel is not None: # If sky_map is given in flux sky_map = sky_map * tel.pupil_area * exptime img += sky_map return img, sky_map def get_flat(img, seed): flat_img = effects.MakeFlatSmooth( GSBounds=img.bounds, seed=seed) flat_normal = flat_img / np.mean(flat_img.array) return flat_img, flat_normal def add_cosmic_rays(img, chip, exptime=150, seed=0): cr_map, cr_event_num = effects.produceCR_Map( xLen=chip.npix_x, yLen=chip.npix_y, exTime=exptime+0.5*chip.readout_time, cr_pixelRatio=0.003*(exptime+0.5*chip.readout_time)/600., gain=chip.gain, attachedSizes=chip.attachedSizes, seed=seed) # seed: obj-imaging:+0; bias:+1; dark:+2; flat:+3; img += cr_map cr_map[cr_map > 65535] = 65535 cr_map[cr_map < 0] = 0 crmap_gsimg = galsim.Image(cr_map, dtype=np.uint16) del cr_map return img, crmap_gsimg, cr_event_num def add_PRNU(img, chip, seed=0): prnu_img = effects.PRNU_Img( xsize=chip.npix_x, ysize=chip.npix_y, sigma=0.01, seed=seed) img *= prnu_img return img, prnu_img def get_poisson(seed=0, sky_level=0.): rng_poisson = galsim.BaseDeviate(seed) poisson_noise = galsim.PoissonNoise(rng_poisson, sky_level=sky_level) return rng_poisson, poisson_noise def get_base_img(img, read_noise, readout_time, dark_noise, exptime=150.): # base_level = read_noise**2 + dark_noise*(exptime+0.5*readout_time) base_level = dark_noise*(exptime+0.5*readout_time) base_img = base_level * np.ones_like(img.array) return base_img def add_poisson(img, chip, exptime=150., seed=0, sky_level=0., poisson_noise=None, dark_noise=None): if poisson_noise is None: _, poisson_noise = get_poisson(seed=seed, sky_level=sky_level) read_noise = chip.read_noise if dark_noise is None: dark_noise = chip.dark_noise base_img = get_base_img(img=img, read_noise=read_noise, readout_time=chip.readout_time, dark_noise=dark_noise, exptime=exptime) img += base_img img.addNoise(poisson_noise) # img -= read_noise**2 return img, base_img def add_brighter_fatter(img): #Inital dynamic lib try: with pkg_resources.files('ObservationSim.Instrument.Chip.libBF').joinpath("libmoduleBF.so") as lib_path: lib_bf = ctypes.CDLL(lib_path) except AttributeError: with pkg_resources.path('ObservationSim.Instrument.Chip.libBF', "libmoduleBF.so") as lib_path: lib_bf = ctypes.CDLL(lib_path) lib_bf.addEffects.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_float), ctypes.POINTER(ctypes.c_float), ctypes.c_int] # Set bit flag bit_flag = 1 bit_flag = bit_flag | (1 << 2) nx, ny = img.array.shape nn = nx * ny arr_ima= (ctypes.c_float*nn)() arr_imc= (ctypes.c_float*nn)() arr_ima[:]= img.array.reshape(nn) arr_imc[:]= np.zeros(nn) lib_bf.addEffects(nx, ny, arr_ima, arr_imc, bit_flag) img.array[:, :] = np.reshape(arr_imc, [nx, ny]) del arr_ima, arr_imc return img def add_inputdark(img, chip, exptime): fname = "/share/home/weichengliang/CSST_git/test_new_sim/csst-simulation/ObservationSim/Instrument/data/dark/dark_1000s_example_0.fits" hdu = fits.open(fname) #ny, nx = img.array.shape #inputdark = np.zeros([ny, nx]) img.array[:, :] += hdu[0].data/hdu[0].header['exptime']*exptime hdu.close() del inputdark return img def AddPreScan(GSImage, pre1=27, pre2=4, over1=71, over2=80, nsecy = 2, nsecx=8): img= GSImage.array ny, nx = img.shape dx = int(nx/nsecx) dy = int(ny/nsecy) imgt=np.zeros([int(nsecy*nsecx), int(ny/nsecy+pre2+over2), int(nx/nsecx+pre1+over1)]) for iy in range(nsecy): for ix in range(nsecx): if iy % 2 == 0: tx = ix else: tx = (nsecx-1)-ix ty = iy chunkidx = int(tx+ty*nsecx) #chunk1-[1,2,3,4], chunk2-[5,6,7,8], chunk3-[9,10,11,12], chunk4-[13,14,15,16] imgtemp = np.zeros([int(ny/nsecy+pre2+over2), int(nx/nsecx+pre1+over1)]) if int(chunkidx/4) == 0: imgtemp[pre2:pre2+dy, pre1:pre1+dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgt[chunkidx, :, :] = imgtemp if int(chunkidx/4) == 1: imgtemp[pre2:pre2+dy, over1:over1+dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgt[chunkidx, :, :] = imgtemp #[:, ::-1] if int(chunkidx/4) == 2: imgtemp[over2:over2+dy, over1:over1+dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgt[chunkidx, :, :] = imgtemp #[::-1, ::-1] if int(chunkidx/4) == 3: imgtemp[over2:over2+dy, pre1:pre1+dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgt[chunkidx, :, :] = imgtemp #[::-1, :] imgtx1 = np.hstack(imgt[:nsecx:, :, :]) #hstack chunk(1,2)-[1,2,3,4,5,6,7,8] imgtx2 = np.hstack(imgt[:(nsecx-1):-1, :, :]) #hstack chunk(4,3)-[16,15,14,13,12,11,,10,9] newimg = galsim.Image(int(nx+(pre1+over1)*nsecx), int(ny+(pre2+over2)*nsecy), init_value=0) newimg.array[:, :] = np.concatenate([imgtx1, imgtx2]) #vstack chunk(1,2) & chunk(4,3) newimg.wcs = GSImage.wcs return newimg def AddPreScanFO(GSImage, pre1=27, pre2=4, over1=71, over2=80, nsecy = 1, nsecx=16): img= GSImage.array ny, nx = img.shape dx = int(nx/nsecx) dy = int(ny/nsecy) newimg = galsim.Image(int(nx+(pre1+over1)*nsecx), int(ny+(pre2+over2)*nsecy), init_value=0) for ix in range(nsecx): newimg.array[pre2:pre2+dy, pre1+ix*(dx+pre1+over1):pre1+dx+ix*(dx+pre1+over1)] = img[0:dy, 0+ix*dx:dx+ix*dx] newimg.wcs = GSImage.wcs return newimg def formatOutput(GSImage, nsecy = 2, nsecx=8): img = GSImage.array ny, nx = img.shape dx = int(nx/nsecx) dy = int(ny/nsecy) imgt = np.zeros([int(nsecx*nsecy), dy, dx]) for iy in range(nsecy): for ix in range(nsecx): if iy % 2 == 0: tx = ix else: tx = (nsecx-1)-ix ty = iy chunkidx = int(tx+ty*nsecx) if int(chunkidx/4) == 0: imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] if int(chunkidx/4) == 1: imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] if int(chunkidx/4) == 2: imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] if int(chunkidx/4) == 3: imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx] imgttx0 = np.hstack(imgt[ 0:4:, :, :]) imgttx1 = np.hstack(imgt[ 4:8:, :, ::-1]) imgttx2 = np.hstack(imgt[8:12:, ::-1, ::-1]) imgttx3 = np.hstack(imgt[12:16:,::-1, :]) newimg = galsim.Image(int(dx*nsecx*nsecy), dy, init_value=0) newimg.array[:, :] = np.hstack([imgttx0, imgttx1, imgttx2, imgttx3]) return newimg def formatRevert(GSImage, nsecy = 1, nsecx=16): img = GSImage.array ny, nx = img.shape dx = int(nx/nsecx) dy = int(ny/nsecy) newimg = galsim.Image(int(dx*8), int(dy*2), init_value=0) for ix in range(0,4): tx = ix newimg.array[0:dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx] for ix in range(4,8): tx = ix newimg.array[0:dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx][:, ::-1] for ix in range(8,12): tx = 7-(ix-8) newimg.array[0+dy:dy+dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx][::-1, ::-1] for ix in range(12,16): tx = 7-(ix-8) newimg.array[0+dy:dy+dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx][::-1, :] return newimg