Skip to content
test_imaging.py 6.93 KiB
Newer Older
Wei Chengliang's avatar
Wei Chengliang committed
import unittest

from scipy import interpolate
import astropy.constants as cons
from astropy.table import Table
import h5py as h5
import sys
import os
import math
Wei Chengliang's avatar
Wei Chengliang committed
from itertools import islice
import numpy as np
import galsim
import yaml
import copy
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
from ObservationSim.MockObject._util import getObservedSED
from ObservationSim.MockObject import CatalogBase, Galaxy
Wei Chengliang's avatar
Wei Chengliang committed

from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
from ObservationSim.PSF.PSFInterp import PSFInterp
from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG


class Catalog(CatalogBase):
    def __init__(self):
        super().__init__()
        self.rotation = 0.

    def load_norm_filt(self, obj):
        return None

    def load_sed(self, obj, **kward):
        pcs = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'),
                                   'csst_msc_sim/csst_fz_msc/sedlibs/pcs.h5'), "r")
        lamb = h5.File(os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'),
                                    'csst_msc_sim/csst_fz_msc/sedlibs/lamb.h5'), "r")
        lamb_gal = lamb['lamb'][()]
        pcs = pcs['pcs'][()]

        cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
        factor = 10**(-.4 * cosmo.distmod(obj.param['z']).value)
        flux = np.matmul(pcs, obj.param['coeff']) * factor
        #  if np.any(flux < 0):
        #     raise ValueError("Glaxy %s: negative SED fluxes"%obj.id)
        flux[flux < 0] = 0.
        sedcat = np.vstack((lamb_gal, flux)).T
        sed_data = getObservedSED(
            sedCat=sedcat,
            redshift=obj.param['z'],
            av=obj.param["av"],
            redden=obj.param["redden"]
Wei Chengliang's avatar
Wei Chengliang committed
        )
        wave, flux = sed_data[0], sed_data[1]

        speci = interpolate.interp1d(wave, flux)
        lamb = np.arange(2000, 11001+0.5, 0.5)
        y = speci(lamb)
        # erg/s/cm2/A --> photon/s/m2/A
        all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
        sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))

        # obj.param["sed"] = sed
        del wave
        del flux
        return sed

    def _load(self, file_path):
        gals_cat = h5.File(file_path, 'r')['galaxies']
        for ikeys in gals_cat.keys():
            gals = gals_cat[ikeys]

        param = self.initialize_param()
        igals = 9
        param['z'] = gals['redshift'][igals]
        param['mag_use_normal'] = gals['mag_csst_g'][igals]
        print(param['mag_use_normal'])

        param['e1'] = gals['ellipticity_true'][igals][0]
        param['e2'] = gals['ellipticity_true'][igals][1]

        # For shape calculation
        param['e1'], param['e2'], param['ell_total'] = self.rotate_ellipticity(
            e1=gals['ellipticity_true'][igals][0],
            e2=gals['ellipticity_true'][igals][1],
            rotation=self.rotation,
            unit='radians')

        param['e1_disk'] = param['e1']
        param['e2_disk'] = param['e2']
        param['e1_bulge'] = param['e1']
        param['e2_bulge'] = param['e2']

        # Masses
        param['bulgemass'] = gals['bulgemass'][igals]
        param['diskmass'] = gals['diskmass'][igals]

        param['size'] = gals['size'][igals]

        # Sersic index
        param['disk_sersic_idx'] = 1.
        param['bulge_sersic_idx'] = 4.

        # Sizes
        param['bfrac'] = param['bulgemass'] / \
            (param['bulgemass'] + param['diskmass'])
        if param['bfrac'] >= 0.6:
            param['hlr_bulge'] = param['size']
            param['hlr_disk'] = param['size'] * (1. - param['bfrac'])
        else:
            param['hlr_disk'] = param['size']
            param['hlr_bulge'] = param['size'] * param['bfrac']

        # SED coefficients
        param['coeff'] = gals['coeff'][igals]

        # TEST no redening and no extinction
        param['av'] = 0.0
        param['redden'] = 0

        obj = Galaxy(param)

        return obj
Wei Chengliang's avatar
Wei Chengliang committed


def defineCCD(iccd, config_file):
    with open(config_file, "r") as stream:
        try:
            config = yaml.safe_load(stream)
            # for key, value in config.items():
Wei Chengliang's avatar
Wei Chengliang committed
            #    print (key + " : " + str(value))
        except yaml.YAMLError as exc:
            print(exc)
    chip = Chip(chipID=iccd, config=config)
    chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
    focal_plane = FocalPlane(chip_list=[iccd])
    chip.img.wcs = focal_plane.getTanWCS(
        192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
Wei Chengliang's avatar
Wei Chengliang committed
    return chip

Wei Chengliang's avatar
Wei Chengliang committed
def defineFilt(chip):
    filter_param = FilterParam()
    filter_id, filter_type = chip.getChipFilter()
    filt = Filter(
        filter_id=filter_id,
        filter_type=filter_type,
        filter_param=filter_param,
        ccd_bandpass=chip.effCurve)
    return filt


class imagingModule_coverage(unittest.TestCase):
    def __init__(self, methodName='runTest'):
        super(imagingModule_coverage, self).__init__(methodName)
        self.dataPath = os.path.join(
            os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc')
Wei Chengliang's avatar
Wei Chengliang committed
        self.iccd = 8

    def test_imaging(self):
        config_file = os.path.join(self.dataPath, 'config_test.yaml')
        chip = defineCCD(self.iccd, config_file)
        bandpass = defineFilt(chip)
        filt = defineFilt(chip)
        print(chip.chipID)
        print(chip.cen_pix_x, chip.cen_pix_y)

        catalog = Catalog()

        obj = catalog._load(os.path.join(
            self.dataPath, 'galaxies_C6_bundle000287.h5'))
        sed_data = catalog.load_sed(obj)
        norm_filt = catalog.load_norm_filt(obj)

        obj_sed, obj_mag, obj_flux = catalog.convert_sed(
            mag=obj.param["mag_use_normal"],
Wei Chengliang's avatar
Wei Chengliang committed
            sed=sed_data,
            target_filt=filt,
            norm_filt=norm_filt,
        )

        pupil_area = np.pi * (0.5 * 2.)**2
Wei Chengliang's avatar
Wei Chengliang committed
        exptime = 150.
        # getElectronFluxFilt(filt, tel, exptime)
        nphotons_tot = obj_flux*pupil_area * exptime
Wei Chengliang's avatar
Wei Chengliang committed
        full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full)
        print(full, nphotons_tot, obj_mag)
        for i in range(4):
            sub = integrate_sed_bandpass(
                sed=obj_sed, bandpass=filt.bandpass_sub_list[i])

Wei Chengliang's avatar
Wei Chengliang committed
            ratio = sub / full
            nphotons = ratio * nphotons_tot
            disk = galsim.Sersic(
                n=obj.param['disk_sersic_idx'], half_light_radius=obj.param['hlr_disk'], flux=1.0)
            disk_shape = galsim.Shear(
                g1=obj.param['e1_disk'], g2=obj.param['e2_disk'])
Wei Chengliang's avatar
Wei Chengliang committed
            disk = disk.shear(disk_shape)
            gal_temp = disk
            gal_temp = gal_temp.withFlux(nphotons)

            psf = galsim.Gaussian(sigma=0.1, flux=1)
            gal_temp = galsim.Convolve(psf, gal_temp)

            if i == 0:
                gal = gal_temp
            else:
                gal = gal + gal_temp
        print(gal)

        self.assertTrue(gal != None)
Wei Chengliang's avatar
Wei Chengliang committed


if __name__ == '__main__':
    unittest.main()