import os import galsim import random import numpy as np import h5py as h5 import healpy as hp import astropy.constants as cons import traceback from astropy.coordinates import spherical_to_cartesian from astropy.table import Table from scipy import interpolate from datetime import datetime from ObservationSim.MockObject import CatalogBase, Star, Galaxy, Quasar, Stamp from ObservationSim.MockObject._util import tag_sed, getObservedSED, getABMAG, integrate_sed_bandpass, comoving_dist from ObservationSim.Astrometry.Astrometry_util import on_orbit_obs_position import astropy.io.fits as fitsio from ObservationSim.MockObject._util import seds, sed_assign, extAv # (TEST) from astropy.cosmology import FlatLambdaCDM from astropy import constants from astropy import units as U try: import importlib.resources as pkg_resources except ImportError: # Try backported to PY<37 'importlib_resources' import importlib_resources as pkg_resources NSIDE = 128 class Catalog(CatalogBase): """An user customizable class for reading in catalog(s) of objects and SEDs. NOTE: must inherit the "CatalogBase" abstract class ... Attributes ---------- cat_dir : str a directory that contains the catalog file(s) star_path : str path to the star catalog file star_SED_path : str path to the star SED data objs : list a list of ObservationSim.MockObject (Star, Galaxy, or Quasar) NOTE: must have "obj" list when implement your own Catalog Methods ---------- load_sed(obj, **kwargs): load the corresponding SED data for one object load_norm_filt(obj): load the filter throughput for the input catalog's photometric system. """ def __init__(self, config, chip, pointing, chip_output, filt, **kwargs): """Constructor method. Parameters ---------- config : dict configuration dictionary which is parsed from the input YAML file chip: ObservationSim.Instrument.Chip an ObservationSim.Instrument.Chip instance, can be used to identify the band etc. pointing: ObservationSim.Config.Pointing an ObservationSim.Config.Pointing instance, can be used to configure the astrometry module chip_output: ObservationSim.Config.ChipOutput an ObservationSim.Config.ChipOutput instance, can be used to setup the output format filt: ObservationSim.Instrument.Filter an ObservationSim.Instrument.Filter instance, can be used to identify the filter type **kwargs : dict other needed input parameters (in key-value pairs), please modify corresponding initialization call in "ObservationSim.py" as you need. Returns ---------- None """ super().__init__() self.cat_dir = os.path.join(config["data_dir"], config["catalog_options"]["input_path"]["cat_dir"]) self.seed_Av = config["catalog_options"]["seed_Av"] #self.cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111) self.chip_output = chip_output self.filt = filt self.logger = chip_output.logger with pkg_resources.path('Catalog.data', 'SLOAN_SDSS.g.fits') as filter_path: self.normF_star = Table.read(str(filter_path)) self.config = config self.chip = chip self.pointing = pointing self.max_size = 0. if "star_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"]["star_cat"] and config["catalog_options"]["star_yes"]: self.star_path = os.path.join(self.cat_dir, config["catalog_options"]["input_path"]["star_cat"]) self.star_SED_path = os.path.join(self.cat_dir, config["catalog_options"]["SED_templates_path"]["star_SED"]) self._load_SED_lib_star() if "rotateEll" in config["catalog_options"]: self.rotation = float(int(config["catalog_options"]["rotateEll"]/45.)) else: self.rotation = 0. # Update output .cat header with catalog specific output columns self._add_output_columns_header() self._get_healpix_list() self._load() def _add_output_columns_header(self): self.add_hdr = " model_tag teff logg feh" self.add_hdr += " bulgemass diskmass detA e1 e2 kappa g1 g2 size galType veldisp " self.add_fmt = " %10s %8.4f %8.4f %8.4f" self.add_fmt += " %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %4d %8.4f " self.chip_output.update_ouptut_header(additional_column_names=self.add_hdr) def _get_healpix_list(self): self.sky_coverage = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.2) ra_min, ra_max, dec_min, dec_max = self.sky_coverage.xmin, self.sky_coverage.xmax, self.sky_coverage.ymin, self.sky_coverage.ymax ra = np.deg2rad(np.array([ra_min, ra_max, ra_max, ra_min])) dec = np.deg2rad(np.array([dec_max, dec_max, dec_min, dec_min])) # vertices = spherical_to_cartesian(1., dec, ra) self.pix_list = hp.query_polygon( NSIDE, hp.ang2vec(np.radians(90.) - dec, ra), inclusive=True ) # self.pix_list = hp.query_polygon(NSIDE, np.array(vertices).T, inclusive=True) if self.logger is not None: msg = str(("HEALPix List: ", self.pix_list)) self.logger.info(msg) else: print("HEALPix List: ", self.pix_list) def load_norm_filt(self, obj): """Load the corresponding thourghput for the input magnitude "param["mag_use_normal"]". NOTE: if the input magnitude is already in CSST magnitude, simply return None Parameters ---------- obj : ObservationSim.MockObject the object to get thourghput data for Returns ---------- norm_filt : Astropy.Table the throughput Table with two columns (namely, "WAVELENGTH", "SENSITIVITY"): norm_filt["WAVELENGTH"] : wavelengthes in Angstroms norm_filt["SENSITIVITY"] : efficiencies """ if obj.type == "star": return self.normF_star else: return None def _load_SED_lib_star(self): self.tempSED_star = h5.File(self.star_SED_path,'r') def _load_stars(self, stars, pix_id=None): nstars = len(stars['sourceID']) # Apply astrometric modeling ra_arr = stars["RA"][:] dec_arr = stars["Dec"][:] pmra_arr = stars['pmra'][:] pmdec_arr = stars['pmdec'][:] rv_arr = stars['RV'][:] parallax_arr = stars['parallax'][:] if self.config["obs_setting"]["enable_astrometric_model"]: ra_list = ra_arr.tolist() dec_list = dec_arr.tolist() pmra_list = pmra_arr.tolist() pmdec_list = pmdec_arr.tolist() rv_list = rv_arr.tolist() parallax_list = parallax_arr.tolist() dt = datetime.utcfromtimestamp(self.pointing.timestamp) date_str = dt.date().isoformat() time_str = dt.time().isoformat() ra_arr, dec_arr = on_orbit_obs_position( input_ra_list=ra_list, input_dec_list=dec_list, input_pmra_list=pmra_list, input_pmdec_list=pmdec_list, input_rv_list=rv_list, input_parallax_list=parallax_list, input_nstars=nstars, input_x=self.pointing.sat_x, input_y=self.pointing.sat_y, input_z=self.pointing.sat_z, input_vx=self.pointing.sat_vx, input_vy=self.pointing.sat_vy, input_vz=self.pointing.sat_vz, input_epoch="J2000", input_date_str=date_str, input_time_str=time_str ) for istars in range(nstars): # # (TEST) # if istars > 100: # break param = self.initialize_param() param['ra'] = ra_arr[istars] param['dec'] = dec_arr[istars] param['ra_orig'] = stars["RA"][istars] param['dec_orig'] = stars["Dec"][istars] param['pmra'] = pmra_arr[istars] param['pmdec'] = pmdec_arr[istars] param['rv'] = rv_arr[istars] param['parallax'] = parallax_arr[istars] if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200): continue param['mag_use_normal'] = stars['app_sdss_g'][istars] # if param['mag_use_normal'] >= 26.5: # continue self.ids += 1 param['id'] = stars['sourceID'][istars] param['sed_type'] = stars['sourceID'][istars] param['model_tag'] = stars['model_tag'][istars] param['teff'] = stars['teff'][istars] param['logg'] = stars['grav'][istars] param['feh'] = stars['feh'][istars] param['z'] = 0.0 param['star'] = 1 # Star obj = Star(param, logger=self.logger) # Append additional output columns to the .cat file obj.additional_output_str = self.add_fmt%(param["model_tag"], param['teff'], param['logg'], param['feh'], 0., 0., 0., 0., 0., 0., 0., 0., 0., -1, 0.) self.objs.append(obj) def _load(self, **kwargs): """Read in all objects in from the catalog file(s). This is a must implemented method which is used to read in all objects, and then convert them to ObservationSim.MockObject (Star, Galaxy, or Quasar). Currently, the model of ObservationSim.MockObject.Star class requires: param["star"] : int specify the object type: 0: galaxy, 1: star, 2: quasar param["id"] : int ID number of the object param["ra"] : float Right ascension (in degrees) param["dec"] : float Declination (in degrees) param["mag_use_normal"]: float the absolute magnitude in a particular filter NOTE: if that filter is not the corresponding CSST filter, the load_norm_filt(obj) function must be implemented to load the filter throughput of that particular photometric system the model of ObservationSim.MockObject.Galaxy class requires: param["star"] : int specify the object type: 0: galaxy, 1: star, 2: quasar param["id"] : int ID number of the object param["ra"] : float Right ascension (in degrees) param["dec"] : float Declination (in degrees) param["mag_use_normal"]: float the absolute magnitude in a particular filter NOTE: if that filter is not the corresponding CSST filter, the load_norm_filt(obj) function must be implemented to load the filter throughput of that particular photometric system param["bfrac"] : float the bulge fraction param["hlr_bulge"] : float the half-light-radius of the bulge param["hlr_disk"] : float the half-light-radius of the disk param["e1_bulge"], param["e2_bulge"] : float the ellipticity of the bulge components param["e1_disk"], param["e2_disk"] : float the ellipticity of the disk components (Optional parameters): param['disk_sersic_idx']: float Sersic index for galaxy disk component param['bulge_sersic_idx']: float Sersic index for galaxy bulge component param['g1'], param['g2']: float Reduced weak lensing shear components (valid for shear type: catalog) the model of ObservationSim.MockObject.Galaxy class requires: Currently a Quasar is modeled as a point source, just like a Star. NOTE: To construct an object, according to its type, just call: Star(param), Galaxy(param), or Quasar(param) NOTE: All constructed objects should be appened to "self.objs". NOTE: Any other parameters can also be set within "param" dict: Used to calculate required quantities and/or SEDs etc. Parameters ---------- **kwargs : dict other needed input parameters (in key-value pairs), please modify corresponding initialization call in "ObservationSim.py" as you need. Returns ---------- None """ self.objs = [] self.ids = 0 #if "star_cat" in self.config["input_path"] and self.config["input_path"]["star_cat"] and not self.config["run_option"]["galaxy_only"]: if "star_cat" in self.config["catalog_options"]["input_path"] and self.config["catalog_options"]["input_path"]["star_cat"] and self.config["catalog_options"]["star_yes"]: star_cat = h5.File(self.star_path, 'r')['stars'] for pix in self.pix_list: try: stars = star_cat[str(pix)] self._load_stars(stars, pix_id=pix) del stars except Exception as e: self.logger.error(str(e)) print(e) if self.logger is not None: self.logger.info("number of objects in catalog: %d"%(len(self.objs))) else: print("number of objects in catalog: ", len(self.objs)) def load_sed(self, obj, **kwargs): """Load the corresponding SED data for a particular obj. Parameters ---------- obj : ObservationSim.MockObject the object to get SED data for **kwargs : dict other needed input parameters (in key-value pairs), please modify corresponding initialization call in "ObservationSim.py" as you need. Returns ---------- sed : Astropy.Table the SED Table with two columns (namely, "WAVELENGTH", "FLUX"): sed["WAVELENGTH"] : wavelength in Angstroms sed["FLUX"] : fluxes in photons/s/m^2/A NOTE: the range of wavelengthes must at least cover [2450 - 11000] Angstorms """ if obj.type == 'star': _, wave, flux = tag_sed( h5file=self.tempSED_star, model_tag=obj.param['model_tag'], teff=obj.param['teff'], logg=obj.param['logg'], feh=obj.param['feh'] ) else: raise ValueError("Object type not known") speci = interpolate.interp1d(wave, flux) lamb = np.arange(2000, 11001+0.5, 0.5) y = speci(lamb) # erg/s/cm2/A --> photon/s/m2/A all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13 sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX')) del wave del flux return sed