import os import numpy as np import random import galsim import h5py as h5 import healpy as hp from astropy.table import Table from astropy.coordinates import spherical_to_cartesian from ObservationSim.MockObject.Star import Star from ObservationSim.MockObject.Galaxy import Galaxy from ObservationSim.MockObject.Quasar import Quasar from ObservationSim.MockObject._util import seds, sed_assign, extAv NSIDE = 128 class Catalog(object): def __init__(self, config, chip, cat_dir=None, pRa=None, pDec=None, sed_dir=None, rotation=None): if cat_dir is not None: self.cat_dir = cat_dir else: self.cat_dir = os.path.join(config["data_dir"], config["input_path"]["cat_dir"]) self.sed_dir = sed_dir self.chip = chip self.seed_Av = config["random_seeds"]["seed_Av"] if pRa is not None: self.pRa = float('{:8.4f}'.format(pRa)) if pDec is not None: self.pDec = float('{:8.4f}'.format(pDec)) # star_file = 'stars_ccd' + chip.getChipLabel(chip.chipID) + '_p_RA'+str(self.pRa) + '_DE' + str(self.pDec) + '.hdf5' # galaxy_file = 'galaxies_ccd' + chip.getChipLabel(chip.chipID) + '_p_RA'+str(self.pRa) + '_DE' + str(self.pDec) + '.hdf5' if "star_cat" in config["input_path"] and config["input_path"]["star_cat"]: star_file = config["input_path"]["star_cat"] star_SED_file = config["SED_templates_path"]["star_SED"] self.star_path = os.path.join(self.cat_dir, star_file) self.star_SED_path = os.path.join(config["data_dir"], star_SED_file) self._load_SED_lib_star() if "galaxy_cat" in config["input_path"] and config["input_path"]["galaxy_cat"]: galaxy_file = config["input_path"]["galaxy_cat"] self.galaxy_path = os.path.join(self.cat_dir, galaxy_file) self.galaxy_SED_path = os.path.join(config["data_dir"], config["SED_templates_path"]["galaxy_SED"]) self._load_SED_lib_gals() if "rotateEll" in config["shear_setting"]: self.rotation = float(int(config["shear_setting"]["rotateEll"]/45.)) else: self.rotation = 0. # self._load_SED_info() self._get_healpix_list() self._load() def _get_healpix_list(self): self.sky_coverage = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.2) # self.sky_coverage_enlarged = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.5) ra_min, ra_max, dec_min, dec_max = self.sky_coverage.xmin, self.sky_coverage.xmax, self.sky_coverage.ymin, self.sky_coverage.ymax ra = np.deg2rad(np.array([ra_min, ra_max, ra_max, ra_min])) dec = np.deg2rad(np.array([dec_max, dec_max, dec_min, dec_min])) # phi, theta = ra, np.pi/2. - dec # vertices_pix = np.unique(hp.ang2pix(NSIDE, theta, phi)) # print(vertices_pix) vertices = spherical_to_cartesian(1., dec, ra) self.pix_list = hp.query_polygon(NSIDE, np.array(vertices).T, inclusive=True) print("HEALPix List: ", self.pix_list) def _load_SED_lib_star(self): self.tempSED_star = h5.File(self.star_SED_path,'r') def _load_SED_lib_gals(self): # tempdir_gal = os.path.join(self.template_dir, "Galaxy/") # tempdir_star = os.path.join(self.template_dir, "PicklesStars/") # self.tempSed_star, self.tempRed_star = seds("star.list", seddir=tempdir_star) self.tempSed_gal, self.tempRed_gal = seds("galaxy.list", seddir=self.galaxy_SED_path) def _load_SED_info(self): sed_info_file = os.path.join(self.sed_dir, "sed.info") sed_info = Table.read(sed_info_file, format="ascii") self.cosids = sed_info["IDCosmoDC2"] self.objtypes = sed_info["objType"] def _load_gals(self, gals, pix_id=None): ngals = len(gals['galaxyID']) self.rng_sedGal = random.Random() self.rng_sedGal.seed(pix_id) # Use healpix index as the random seed self.ud = galsim.UniformDeviate(pix_id) for igals in range(ngals): param = {} param['ra'] = gals['ra_true'][igals] param['dec'] = gals['dec_true'][igals] param['z'] = gals['redshift_true'][igals] param['model_tag'] = 'None' param['gamma1'] = 0 param['gamma2'] = 0 param['kappa'] = 0 param['delta_ra'] = 0 param['delta_dec'] = 0 sersicB = gals['sersic_bulge'][igals] hlrMajB = gals['size_bulge_true'][igals] hlrMinB = gals['size_minor_bulge_true'][igals] sersicD = gals['sersic_disk'][igals] hlrMajD = gals['size_disk_true'][igals] hlrMinD = gals['size_minor_disk_true'][igals] aGal = gals['size_true'][igals] bGal = gals['size_minor_true'][igals] param['bfrac'] = gals['bulge_to_total_ratio_i'][igals] param['theta'] = gals['position_angle_true'][igals] param['hlr_bulge'] = np.sqrt(hlrMajB * hlrMinB) param['hlr_disk'] = np.sqrt(hlrMajD * hlrMinD) param['ell_bulge'] = (hlrMajB - hlrMinB)/(hlrMajB + hlrMinB) param['ell_disk'] = (hlrMajD - hlrMinD)/(hlrMajD + hlrMinD) param['ell_tot'] = (aGal - bGal) / (aGal + bGal) # Assign each galaxy a template SED param['sed_type'] = sed_assign(phz=param['z'], btt=param['bfrac'], rng=self.rng_sedGal) param['redden'] = self.tempRed_gal[param['sed_type']] param['av'] = self.avGal[int(self.ud()*self.nav)] if param['sed_type'] <= 5: param['av'] = 0.0 param['redden'] = 0 param['star'] = 0 # Galaxy if param['sed_type'] >= 29: param['av'] = 0.6 * param['av'] / 3.0 # for quasar, av=[0, 0.2], 3.0=av.max-av.im param['star'] = 2 # Quasar if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200): continue param['mag_use_normal'] = gals['mag_true_g_lsst'][igals] if param['mag_use_normal'] >= 26.5: continue self.ids += 1 param['id'] = self.ids if param['star'] == 0: obj = Galaxy(param, self.rotation) self.objs.append(obj) if param['star'] == 2: obj = Quasar(param) self.objs.append(obj) def _load_stars(self, stars, pix_id=None): nstars = len(stars['sourceID']) for istars in range(nstars): param = {} param['ra'] = stars['RA'][istars] param['dec'] = stars['Dec'][istars] if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200): continue param['mag_use_normal'] = stars['app_sdss_g'][istars] if param['mag_use_normal'] >= 26.5: continue self.ids += 1 param['id'] = self.ids param['sed_type'] = stars['sourceID'][istars] param['model_tag'] = stars['model_tag'][istars] param['teff'] = stars['teff'][istars] param['logg'] = stars['grav'][istars] param['feh'] = stars['feh'][istars] param['z'] = 0.0 param['star'] = 1 # Star obj = Star(param) self.objs.append(obj) def _load(self): self.nav = 15005 self.avGal = extAv(self.nav, seed=self.seed_Av) gals_cat = h5.File(self.galaxy_path, 'r')['galaxies'] star_cat = h5.File(self.star_path, 'r')['catalog'] self.objs = [] self.ids = 0 for pix in self.pix_list: gals = gals_cat[str(pix)] stars = star_cat[str(pix)] self._load_gals(gals, pix_id=pix) self._load_stars(stars, pix_id=pix) print("number of objects in catalog: ", len(self.objs)) del self.avGal