import os import galsim import random import numpy as np import h5py as h5 import healpy as hp import astropy.constants as cons import traceback from astropy.coordinates import spherical_to_cartesian from astropy.table import Table from scipy import interpolate from datetime import datetime from observation_sim.mock_objects import CatalogBase, Star, Galaxy, Quasar from observation_sim.mock_objects._util import tag_sed, getObservedSED, getABMAG, integrate_sed_bandpass, comoving_dist from observation_sim.astrometry.Astrometry_util import on_orbit_obs_position # (TEST) from astropy.cosmology import FlatLambdaCDM from astropy import constants from astropy import units as U try: import importlib.resources as pkg_resources except ImportError: # Try backported to PY<37 'importlib_resources' import importlib_resources as pkg_resources NSIDE = 128 def get_bundleIndex(healpixID_ring, bundleOrder=4, healpixOrder=7): assert NSIDE == 2**healpixOrder shift = healpixOrder - bundleOrder shift = 2*shift nside_bundle = 2**bundleOrder nside_healpix= 2**healpixOrder healpixID_nest= hp.ring2nest(nside_healpix, healpixID_ring) bundleID_nest = (healpixID_nest >> shift) bundleID_ring = hp.nest2ring(nside_bundle, bundleID_nest) return bundleID_ring class Catalog(CatalogBase): def __init__(self, config, chip, pointing, chip_output, filt, **kwargs): super().__init__() # self.cat_dir = config["catalog_options"]["input_path"]["cat_dir"] # self.seed_Av = config["catalog_options"]["seed_Av"] self.cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111) self.chip_output = chip_output self.filt = filt self.logger = chip_output.logger with pkg_resources.path('catalog.data', 'SLOAN_SDSS.g.fits') as filter_path: self.normF_star = Table.read(str(filter_path)) self.config = config self.chip = chip self.pointing = pointing self.max_size = 0. if "CALIB_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"][ "CALIB_cat"] and not config["catalog_options"]["star_only"]: self.CALIB_cat_path = config["catalog_options"]["input_path"]["CALIB_cat"] self.CALIB_SED_path = config["catalog_options"]["SED_templates_path"]["CALIB_SED"] if "rotateEll" in config["catalog_options"]: self.rotation = np.radians(float(config["catalog_options"]["rotateEll"])) else: self.rotation = 0. # Update output .cat header with catalog specific output columns self._add_output_columns_header() # self._get_healpix_list() self._load() def _add_output_columns_header(self): self.add_hdr = " model_tag teff logg feh" self.add_hdr += " bulgemass diskmass detA e1 e2 kappa g1 g2 size galType veldisp " self.add_fmt = " %10s %8.4f %8.4f %8.4f" self.add_fmt += " %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %4d %8.4f " self.chip_output.update_output_header(additional_column_names=self.add_hdr) def load_norm_filt(self, obj): if obj.type == "star": return self.normF_star elif obj.type == "galaxy" or obj.type == "quasar": # return self.normF_galaxy return None else: return None # def _load_SED_lib_star(self): # self.tempSED_star = h5.File(self.star_SED_path,'r') # def _load_SED_lib_gals(self): # pcs = h5.File(os.path.join(self.galaxy_SED_path, "pcs.h5"), "r") # lamb = h5.File(os.path.join(self.galaxy_SED_path, "lamb.h5"), "r") # self.lamb_gal = lamb['lamb'][()] # self.pcs = pcs['pcs'][()] # def _load_SED_lib_AGN(self): # from astropy.io import fits # self.SED_AGN = fits.open(self.AGN_SED_path)[0].data # self.lamb_AGN = np.load(self.AGN_SED_wave_path) def _load_stars(self, stars, pix_id=None): nstars = len(stars['sourceID']) # Apply astrometric modeling ra_arr = stars["RA"][:] dec_arr = stars["Dec"][:] pmra_arr = stars['pmra'][:] pmdec_arr = stars['pmdec'][:] rv_arr = stars['RV'][:] parallax_arr = stars['parallax'][:] if self.config["obs_setting"]["enable_astrometric_model"]: ra_list = ra_arr.tolist() dec_list = dec_arr.tolist() pmra_list = pmra_arr.tolist() pmdec_list = pmdec_arr.tolist() rv_list = rv_arr.tolist() parallax_list = parallax_arr.tolist() dt = datetime.utcfromtimestamp(self.pointing.timestamp) date_str = dt.date().isoformat() time_str = dt.time().isoformat() ra_arr, dec_arr = on_orbit_obs_position( input_ra_list=ra_list, input_dec_list=dec_list, input_pmra_list=pmra_list, input_pmdec_list=pmdec_list, input_rv_list=rv_list, input_parallax_list=parallax_list, input_nstars=nstars, input_x=self.pointing.sat_x, input_y=self.pointing.sat_y, input_z=self.pointing.sat_z, input_vx=self.pointing.sat_vx, input_vy=self.pointing.sat_vy, input_vz=self.pointing.sat_vz, input_epoch="J2000", input_date_str=date_str, input_time_str=time_str ) for istars in range(nstars): # # (TEST) # if istars > 100: # break param = self.initialize_param() param['ra'] = ra_arr[istars] param['dec'] = dec_arr[istars] param['ra_orig'] = stars["RA"][istars] param['dec_orig'] = stars["Dec"][istars] param['pmra'] = pmra_arr[istars] param['pmdec'] = pmdec_arr[istars] param['rv'] = rv_arr[istars] param['parallax'] = parallax_arr[istars] if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200): continue param['mag_use_normal'] = stars['app_sdss_g'][istars] # if param['mag_use_normal'] >= 26.5: # continue self.ids += 1 param['id'] = stars['sourceID'][istars] param['sed_type'] = stars['sourceID'][istars] param['model_tag'] = stars['model_tag'][istars] param['teff'] = stars['teff'][istars] param['logg'] = stars['grav'][istars] param['feh'] = stars['feh'][istars] param['z'] = 0.0 param['star'] = 1 # Star obj = Star(param, logger=self.logger) # Append additional output columns to the .cat file obj.additional_output_str = self.add_fmt%(param["model_tag"], param['teff'], param['logg'], param['feh'], 0., 0., 0., 0., 0., 0., 0., 0., 0., -1, 0.) self.objs.append(obj) def _load_calibObj(self): data = Table.read(self.CALIB_cat_path) ra_arr = data['RA'] dec_arr = data['DEC'] pSource_flag = data['POINTSOURCE_FLAG'] ngals = len(data) for igals in range(ngals): # # (TEST) # if igals > 100: # break param = self.initialize_param() param['ra'] = ra_arr[igals] param['dec'] = dec_arr[igals] param['ra_orig'] = data['RA'][igals] param['dec_orig'] = data['DEC'][igals] param['mag_use_normal'] = data['MAG_g'][igals] # if self.filt.is_too_dim(mag=param['mag_use_normal'], margin=self.config["obs_setting"]["mag_lim_margin"]): # continue param['z'] = -99 param['model_tag'] = 'None' param['g1'] = 0 param['g2'] = 0 param['kappa'] = 0 param['e1'] = 0 param['e2'] = 0 param['id'] = data['ID'][igals] param['SPEC_FN'] = data['SPEC_FN'][igals] # NOTE: this cut cannot be put before the SED type has been assigned if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200): continue # For shape calculation if pSource_flag[igals]: param['star'] = 4 obj = Star(param, logger=self.logger) else: param['ell_total'] = np.sqrt(param['e1'] ** 2 + param['e2'] ** 2) if param['ell_total'] > 0.9: continue param['e1_disk'] = 0 param['e2_disk'] = 0 param['e1_bulge'] = 0 param['e2_bulge'] = 0 param['delta_ra'] = 0 param['delta_dec'] = 0 # Masses # param['bulgemass'] = gals['bulgemass'][igals] # param['diskmass'] = gals['diskmass'][igals] # param['size'] = gals['size'][igals] # if param['size'] > self.max_size: # self.max_size = param['size'] # Sersic index param['disk_sersic_idx'] = data['SERSIC_N'][igals] param['bulge_sersic_idx'] = 1. param['hlr_bulge'] = data['RE'][igals] param['hlr_disk'] = data['RE'][igals] param['bfrac'] = 0 # TEST no redening and no extinction param['av'] = 0.0 param['redden'] = 0 param['star'] = 4 obj = Galaxy(param, logger=self.logger) # TEMP self.ids += 1 # param['id'] = self.ids self.objs.append(obj) def _load(self, **kwargs): self.objs = [] self.ids = 0 if "CALIB_cat" in self.config["catalog_options"]["input_path"] and \ self.config["catalog_options"]["input_path"][ "CALIB_cat"] and not self.config["catalog_options"]["star_only"]: try: self._load_calibObj() except Exception as e: traceback.print_exc() self.logger.error(str(e)) print(e) if self.logger is not None: self.logger.info("maximum galaxy size: %.4f"%(self.max_size)) self.logger.info("number of objects in catalog: %d"%(len(self.objs))) else: print("number of objects in catalog: ", len(self.objs)) def load_sed(self, obj, **kwargs): if obj.type == 'calib': data = Table.read(os.path.join(self.CALIB_SED_path,obj.SPEC_FN)) obj_w = data['WAVELENGTH'] obj_f = data['FLUX'] input_delt_w = np.min(obj_w[1:]-obj_w[0:-1]) if input_delt_w > 0.5: lamb = np.arange(2000, 11000 + 0.5, 0.5) speci = interpolate.interp1d(obj_w, obj_f) y1 = speci(lamb) else: lamb = obj_w y1 = obj_f # erg/s/cm2/A --> photon/s/m2/A y1_phot = y1 * lamb / (cons.h.value * cons.c.value) * 1e-13 sed = Table(np.array([lamb, y1_phot]).T, names=('WAVELENGTH', 'FLUX')) sed_photon = np.array([sed['WAVELENGTH'], sed['FLUX']]).T sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest') sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False) interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=self.filt.bandpass_full) obj.param['mag_use_normal'] = getABMAG(interFlux, self.filt.bandpass_full) return sed else: raise ValueError("Object type not known")