Skip to content
ChipUtils.py 13.2 KiB
Newer Older
import os
import galsim
import ctypes
import numpy as np
from astropy.io import fits
from datetime import datetime

from ObservationSim.Instrument.Chip import Effects as effects
from ObservationSim.Config.Header import generatePrimaryHeader, generateExtensionHeader

try:
    import importlib.resources as pkg_resources
except ImportError:
    # Try backported to PY<37 'importlib_resources'
    import importlib_resources as pkg_resources


def log_info(msg, logger=None):
    if logger:
        logger.info(msg)
    else:
        print(msg, flush=True)

Zhang Xin's avatar
Zhang Xin committed
def getChipSLSGratingID(chipID):
    gratingID = ['','']
    if chipID == 1: gratingID = ['GI2', 'GI1']
    if chipID == 2: gratingID = ['GV4', 'GV3']
    if chipID == 3: gratingID = ['GU2', 'GU1']
    if chipID == 4: gratingID = ['GU4', 'GU3']
    if chipID == 5: gratingID = ['GV2', 'GV1']
    if chipID == 10: gratingID = ['GI4', 'GI3']
    if chipID == 21: gratingID = ['GI6', 'GI5']
    if chipID == 26: gratingID = ['GV8', 'GV7']
    if chipID == 27: gratingID = ['GU6', 'GU5']
    if chipID == 28: gratingID = ['GU8', 'GU7']
    if chipID == 29: gratingID = ['GV6', 'GV5']
    if chipID == 30: gratingID = ['GI8', 'GI7']
    return gratingID

def getChipSLSConf(chipID):
    confFile = ''
    if chipID == 1: confFile = ['CSST_GI2.conf', 'CSST_GI1.conf']
    if chipID == 2: confFile = ['CSST_GV4.conf', 'CSST_GV3.conf']
    if chipID == 3: confFile = ['CSST_GU2.conf', 'CSST_GU1.conf']
    if chipID == 4: confFile = ['CSST_GU4.conf', 'CSST_GU3.conf']
    if chipID == 5: confFile = ['CSST_GV2.conf', 'CSST_GV1.conf']
    if chipID == 10: confFile = ['CSST_GI4.conf', 'CSST_GI3.conf']
    if chipID == 21: confFile = ['CSST_GI6.conf', 'CSST_GI5.conf']
    if chipID == 26: confFile = ['CSST_GV8.conf', 'CSST_GV7.conf']
    if chipID == 27: confFile = ['CSST_GU6.conf', 'CSST_GU5.conf']
    if chipID == 28: confFile = ['CSST_GU8.conf', 'CSST_GU7.conf']
    if chipID == 29: confFile = ['CSST_GV6.conf', 'CSST_GV5.conf']
    if chipID == 30: confFile = ['CSST_GI8.conf', 'CSST_GI7.conf']
    return confFile

def generateHeader(chip, ra_cen, dec_cen, img_rot, im_type, pointing_ID, exptime=150., project_cycle=6, run_counter=0, timestamp = 1621915200):
    datetime_obs = datetime.utcfromtimestamp(timestamp)
    date_obs = datetime_obs.strftime("%y%m%d")
    time_obs = datetime_obs.strftime("%H%M%S")
    h_prim = generatePrimaryHeader(
        xlen=chip.npix_x, 
        ylen=chip.npix_y, 
        pointNum = str(pointing_ID),
        ra=ra_cen, 
        dec=dec_cen, 
        pixel_scale=chip.pix_scale,
        date=date_obs,
        time_obs=time_obs,
        im_type = im_type,
        exptime=exptime,
        project_cycle=project_cycle,
        run_counter=run_counter,
        chip_name=str(chip.chipID).rjust(2, '0')
        )
    h_ext = generateExtensionHeader(
        chip=chip,
        xlen=chip.npix_x, 
        ylen=chip.npix_y, 
        ra=ra_cen, 
        dec=dec_cen,
        pa=img_rot.deg, 
        gain=chip.gain, 
        readout=chip.read_noise, 
        dark=chip.dark_noise, 
        saturation=90000, 
        pixel_scale=chip.pix_scale, 
        pixel_size=chip.pix_size,
        xcen=chip.x_cen,
        ycen=chip.y_cen,
Fang Yuedong's avatar
Fang Yuedong committed
        extName='SCIE',
        timestamp = timestamp,
        exptime = exptime,
        readoutTime = chip.readout_time)
    return h_prim, h_ext

def outputCal(chip, img, ra_cen, dec_cen, img_rot, im_type, pointing_ID, output_dir, exptime=150., project_cycle=6, run_counter=0, timestamp = 1621915200):
    h_prim, h_ext = generateHeader(
        chip=chip,
        ra_cen=ra_cen,
        dec_cen=dec_cen,
        img_rot=img_rot,
        im_type=im_type,
        pointing_ID=pointing_ID,
        exptime=exptime,
        project_cycle=project_cycle,
        run_counter=run_counter,
        timestamp=timestamp)
    hdu1 = fits.PrimaryHDU(header=h_prim)
    hdu1.add_checksum()
    hdu1.header.comments['CHECKSUM'] = 'HDU checksum'
    hdu1.header.comments['DATASUM'] = 'data unit checksum'
    hdu2 = fits.ImageHDU(img.array, header=h_ext)
    hdu2.add_checksum()
    hdu2.header.comments['XTENSION'] = 'extension type'
    hdu2.header.comments['CHECKSUM'] = 'HDU checksum'
    hdu2.header.comments['DATASUM'] = 'data unit checksum'
    hdu1 = fits.HDUList([hdu1, hdu2])
    fname = os.path.join(output_dir, h_prim['FILENAME']+'.fits')
    hdu1.writeto(fname, output_verify='ignore', overwrite=True)

def add_sky_background(img, filt, exptime, sky_map=None, tel=None):
    # Add sky background
    if sky_map is None:
        sky_map = filt.getSkyNoise(exptime=exptime)
        sky_map = sky_map * np.ones_like(img.array)
        sky_map = galsim.Image(array=sky_map)
        # Apply Poisson noise to the sky map
        # # (NOTE): only for photometric chips if it utilizes the photon shooting to draw stamps
        # if self.survey_type == "photometric":
        #     sky_map.addNoise(poisson_noise)
    elif img.array.shape != sky_map.shape:
        raise ValueError("The shape img and sky_map must be equal.")
    elif tel is not None: # If sky_map is given in flux
        sky_map = sky_map * tel.pupil_area * exptime
    img += sky_map
    return img, sky_map

def get_flat(img, seed):
    flat_img = effects.MakeFlatSmooth(
                GSBounds=img.bounds, 
                seed=seed)
    flat_normal = flat_img / np.mean(flat_img.array)
    return flat_img, flat_normal

def add_cosmic_rays(img, chip, exptime=150, seed=0):
    cr_map, cr_event_num = effects.produceCR_Map(
        xLen=chip.npix_x, yLen=chip.npix_y, 
        exTime=exptime+0.5*chip.readout_time, 
        cr_pixelRatio=0.003*(exptime+0.5*chip.readout_time)/600.,
        gain=chip.gain, 
        attachedSizes=chip.attachedSizes,
        seed=seed)   # seed: obj-imaging:+0; bias:+1; dark:+2; flat:+3;
    img += cr_map
    cr_map[cr_map > 65535] = 65535
    cr_map[cr_map < 0] = 0
    crmap_gsimg = galsim.Image(cr_map, dtype=np.uint16)
    del cr_map
    return img, crmap_gsimg, cr_event_num

def add_PRNU(img, chip, seed=0):
    prnu_img = effects.PRNU_Img(
        xsize=chip.npix_x, 
        ysize=chip.npix_y, 
        sigma=0.01, 
        seed=seed)
    img *= prnu_img
    return img, prnu_img

def get_poisson(seed=0, sky_level=0.):
    rng_poisson = galsim.BaseDeviate(seed)
    poisson_noise = galsim.PoissonNoise(rng_poisson, sky_level=sky_level)
    return rng_poisson, poisson_noise

Wei Chengliang's avatar
Wei Chengliang committed
def get_base_img(img, chip, read_noise, readout_time, dark_noise, exptime=150., InputDark=None):
    if InputDark == None:
        # base_level = read_noise**2 + dark_noise*(exptime+0.5*readout_time)
        ## base_level = dark_noise*(exptime+0.5*readout_time)
        base_level = dark_noise*(exptime)
        base_img1 = base_level * np.ones_like(img.array)
    else:
        base_img1 = np.zeros_like(img.array)

    ny = int(chip.npix_y/2)
    nx = chip.npix_x
    arr = np.arange(ny).reshape(ny, 1)
    arr = np.broadcast_to(arr, (ny, nx))
    base_img2 = np.zeros_like(img.array)
    base_img2[:ny, :] = arr
    base_img2[ny:, :] = arr[::-1,:]
    base_img2[:,:]    = base_img2[:,:]*(readout_time/ny)*dark_noise
    return base_img1+base_img2

def add_poisson(img, chip, exptime=150., seed=0, sky_level=0., poisson_noise=None, dark_noise=None, InputDark=None):
    if poisson_noise is None:
        _, poisson_noise = get_poisson(seed=seed, sky_level=sky_level)
    read_noise = chip.read_noise
    if dark_noise is None:
        dark_noise = chip.dark_noise
Wei Chengliang's avatar
Wei Chengliang committed
    base_img = get_base_img(img=img, chip, read_noise=read_noise, readout_time=chip.readout_time, dark_noise=dark_noise, exptime=exptime, InputDark=None)
    img += base_img
    img.addNoise(poisson_noise)
Wei Chengliang's avatar
Wei Chengliang committed

    if InputDark != None:
        hdu = fits.open(InputDark)  ##"Instrument/data/dark/dark_1000s_example_0.fits"
        img += hdu[0].data/hdu[0].header['exptime']*exptime
        hdu.close()
    return img, base_img

def add_brighter_fatter(img):
    #Inital dynamic lib
    try:
        with pkg_resources.files('ObservationSim.Instrument.Chip.libBF').joinpath("libmoduleBF.so") as lib_path:
            lib_bf = ctypes.CDLL(lib_path)
    except AttributeError:
        with pkg_resources.path('ObservationSim.Instrument.Chip.libBF', "libmoduleBF.so") as lib_path:
            lib_bf = ctypes.CDLL(lib_path)
    lib_bf.addEffects.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_float), ctypes.POINTER(ctypes.c_float), ctypes.c_int]
    
    # Set bit flag
    bit_flag = 1
    bit_flag = bit_flag | (1 << 2)

    nx, ny = img.array.shape
    nn = nx * ny
    arr_ima= (ctypes.c_float*nn)()
    arr_imc= (ctypes.c_float*nn)()

    arr_ima[:]= img.array.reshape(nn)
    arr_imc[:]= np.zeros(nn)

    lib_bf.addEffects(nx, ny, arr_ima, arr_imc, bit_flag)
    img.array[:, :] = np.reshape(arr_imc, [nx, ny])
    return img
Wei Chengliang's avatar
Wei Chengliang committed
"""
def add_inputdark(img, chip, exptime):
    fname = "/share/home/weichengliang/CSST_git/test_new_sim/csst-simulation/ObservationSim/Instrument/data/dark/dark_1000s_example_0.fits"
    hdu = fits.open(fname)
    #ny, nx = img.array.shape
    #inputdark = np.zeros([ny, nx])
    img.array[:, :] += hdu[0].data/hdu[0].header['exptime']*exptime
    hdu.close()
    del inputdark
    return img
Wei Chengliang's avatar
Wei Chengliang committed
"""
def AddPreScan(GSImage, pre1=27, pre2=4, over1=71, over2=80, nsecy = 2, nsecx=8):
    img= GSImage.array
    ny, nx = img.shape
    dx = int(nx/nsecx)
    dy = int(ny/nsecy)

    imgt=np.zeros([int(nsecy*nsecx), int(ny/nsecy+pre2+over2), int(nx/nsecx+pre1+over1)])
    for iy in range(nsecy):
        for ix in range(nsecx):
            if iy % 2 == 0:
                tx = ix
            else:
                tx = (nsecx-1)-ix
            ty = iy 
            chunkidx = int(tx+ty*nsecx) #chunk1-[1,2,3,4], chunk2-[5,6,7,8], chunk3-[9,10,11,12], chunk4-[13,14,15,16]

            imgtemp = np.zeros([int(ny/nsecy+pre2+over2), int(nx/nsecx+pre1+over1)])
            if int(chunkidx/4) == 0:
                imgtemp[pre2:pre2+dy, pre1:pre1+dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx]
                imgt[chunkidx, :, :] = imgtemp
            if int(chunkidx/4) == 1:
                imgtemp[pre2:pre2+dy, over1:over1+dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx]
                imgt[chunkidx, :, :] = imgtemp #[:, ::-1]
            if int(chunkidx/4) == 2:
                imgtemp[over2:over2+dy, over1:over1+dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx]
                imgt[chunkidx, :, :] = imgtemp #[::-1, ::-1]
            if int(chunkidx/4) == 3:
                imgtemp[over2:over2+dy, pre1:pre1+dx] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx]
                imgt[chunkidx, :, :] = imgtemp #[::-1, :]

    imgtx1 = np.hstack(imgt[:nsecx:,       :, :])  #hstack chunk(1,2)-[1,2,3,4,5,6,7,8]
    imgtx2 = np.hstack(imgt[:(nsecx-1):-1, :, :])  #hstack chunk(4,3)-[16,15,14,13,12,11,,10,9]

    newimg = galsim.Image(int(nx+(pre1+over1)*nsecx), int(ny+(pre2+over2)*nsecy), init_value=0)
    newimg.array[:, :] = np.concatenate([imgtx1, imgtx2]) #vstack chunk(1,2) & chunk(4,3)

    newimg.wcs = GSImage.wcs
    return newimg

def AddPreScanFO(GSImage, pre1=27, pre2=4, over1=71, over2=80, nsecy = 1, nsecx=16):
    img= GSImage.array
    ny, nx = img.shape
    dx = int(nx/nsecx)
    dy = int(ny/nsecy)

    newimg = galsim.Image(int(nx+(pre1+over1)*nsecx), int(ny+(pre2+over2)*nsecy), init_value=0)
    for ix in range(nsecx):
        newimg.array[pre2:pre2+dy, pre1+ix*(dx+pre1+over1):pre1+dx+ix*(dx+pre1+over1)] = img[0:dy, 0+ix*dx:dx+ix*dx]

    newimg.wcs = GSImage.wcs
    return newimg


def formatOutput(GSImage, nsecy = 2, nsecx=8):
    img = GSImage.array
    ny, nx = img.shape
    dx = int(nx/nsecx)
    dy = int(ny/nsecy)
    
    imgt = np.zeros([int(nsecx*nsecy), dy, dx])
    for iy in range(nsecy):
        for ix in range(nsecx):
            if iy % 2 == 0:
                tx = ix
            else:
                tx = (nsecx-1)-ix
            ty = iy
            chunkidx = int(tx+ty*nsecx)
            if int(chunkidx/4) == 0:
                imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx]
            if int(chunkidx/4) == 1:
                imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx]
            if int(chunkidx/4) == 2:
                imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx]
            if int(chunkidx/4) == 3:
                imgt[chunkidx, :, :] = img[iy*dy:(iy+1)*dy, ix*dx:(ix+1)*dx]
    
    imgttx0 = np.hstack(imgt[ 0:4:,    :,    :])
    imgttx1 = np.hstack(imgt[ 4:8:,    :, ::-1])
    imgttx2 = np.hstack(imgt[8:12:, ::-1, ::-1])
    imgttx3 = np.hstack(imgt[12:16:,::-1,    :])
    
    newimg = galsim.Image(int(dx*nsecx*nsecy), dy, init_value=0)
    newimg.array[:, :] = np.hstack([imgttx0, imgttx1, imgttx2, imgttx3])
    return newimg

def formatRevert(GSImage, nsecy = 1, nsecx=16):
    img = GSImage.array
    ny, nx = img.shape
    dx = int(nx/nsecx)
    dy = int(ny/nsecy)

    newimg = galsim.Image(int(dx*8), int(dy*2), init_value=0)

    for ix in range(0,4):
        tx = ix
        newimg.array[0:dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx]
    for ix in range(4,8):
        tx = ix
        newimg.array[0:dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx][:, ::-1]
    for ix in range(8,12):
        tx = 7-(ix-8)
        newimg.array[0+dy:dy+dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx][::-1, ::-1]
    for ix in range(12,16):
        tx = 7-(ix-8)
        newimg.array[0+dy:dy+dy, 0+tx*dx:dx+tx*dx] = img[:, 0+ix*dx:dx+ix*dx][::-1, :]

    return newimg