Commit 6078791f authored by Fang Yuedong's avatar Fang Yuedong
Browse files

first commit of user-customizable catalog version

No related merge requests found
Showing with 403 additions and 0 deletions
+403 -0
import os
import galsim
import random
import numpy as np
import h5py as h5
import healpy as hp
import astropy.constants as cons
from astropy.coordinates import spherical_to_cartesian
from astropy.table import Table
from scipy import interpolate
from ObservationSim.MockObject import CatalogBase, Star, Galaxy, Quasar
from ObservationSim.MockObject._util import seds, sed_assign, extAv, tag_sed, getObservedSED
NSIDE = 128
class C3Catalog(CatalogBase):
def __init__(self, config, **kwargs):
super().__init__()
self.cat_dir = os.path.join(config["data_dir"], config["input_path"]["cat_dir"])
self.seed_Av = config["random_seeds"]["seed_Av"]
self.normalize_dir = os.path.join(config["data_dir"], config["SLS_path"]["SLS_norm"])
self.normF_star = Table.read(os.path.join(self.normalize_dir, 'SLOAN_SDSS.g.fits'))
self.normF_galaxy = Table.read(os.path.join(self.normalize_dir, 'lsst_throuput_g.fits'))
try:
self.chip = kwargs["chip"]
except:
raise ValueError('For Cycle-3 Catalog class, must give a "chip" object to initiate')
if "star_cat" in config["input_path"] and config["input_path"]["star_cat"]:
star_file = config["input_path"]["star_cat"]
star_SED_file = config["SED_templates_path"]["star_SED"]
self.star_path = os.path.join(self.cat_dir, star_file)
self.star_SED_path = os.path.join(config["data_dir"], star_SED_file)
self._load_SED_lib_star()
if "galaxy_cat" in config["input_path"] and config["input_path"]["galaxy_cat"]:
galaxy_file = config["input_path"]["galaxy_cat"]
self.galaxy_path = os.path.join(self.cat_dir, galaxy_file)
self.galaxy_SED_path = os.path.join(config["data_dir"], config["SED_templates_path"]["galaxy_SED"])
self._load_SED_lib_gals()
if "rotateEll" in config["shear_setting"]:
self.rotation = float(int(config["shear_setting"]["rotateEll"]/45.))
else:
self.rotation = 0.
self._get_healpix_list()
self._load()
def _get_healpix_list(self):
self.sky_coverage = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.2)
ra_min, ra_max, dec_min, dec_max = self.sky_coverage.xmin, self.sky_coverage.xmax, self.sky_coverage.ymin, self.sky_coverage.ymax
ra = np.deg2rad(np.array([ra_min, ra_max, ra_max, ra_min]))
dec = np.deg2rad(np.array([dec_max, dec_max, dec_min, dec_min]))
vertices = spherical_to_cartesian(1., dec, ra)
self.pix_list = hp.query_polygon(NSIDE, np.array(vertices).T, inclusive=True)
print("HEALPix List: ", self.pix_list)
def load_norm_filt(self, obj):
if obj.type == "star":
return self.normF_star
elif obj.type == "galaxy" or obj.type == "quasar":
return self.normF_galaxy
else:
return None
def _load_SED_lib_star(self):
self.tempSED_star = h5.File(self.star_SED_path,'r')
def _load_SED_lib_gals(self):
self.tempSed_gal, self.tempRed_gal = seds("galaxy.list", seddir=self.galaxy_SED_path)
def _load_gals(self, gals, pix_id=None):
ngals = len(gals['galaxyID'])
self.rng_sedGal = random.Random()
self.rng_sedGal.seed(pix_id) # Use healpix index as the random seed
self.ud = galsim.UniformDeviate(pix_id)
for igals in range(ngals):
param = self.initialize_param()
param['ra'] = gals['ra_true'][igals]
param['dec'] = gals['dec_true'][igals]
param['z'] = gals['redshift_true'][igals]
param['model_tag'] = 'None'
param['gamma1'] = 0
param['gamma2'] = 0
param['kappa'] = 0
param['delta_ra'] = 0
param['delta_dec'] = 0
sersicB = gals['sersic_bulge'][igals]
hlrMajB = gals['size_bulge_true'][igals]
hlrMinB = gals['size_minor_bulge_true'][igals]
sersicD = gals['sersic_disk'][igals]
hlrMajD = gals['size_disk_true'][igals]
hlrMinD = gals['size_minor_disk_true'][igals]
aGal = gals['size_true'][igals]
bGal = gals['size_minor_true'][igals]
param['bfrac'] = gals['bulge_to_total_ratio_i'][igals]
param['theta'] = gals['position_angle_true'][igals]
param['hlr_bulge'] = np.sqrt(hlrMajB * hlrMinB)
param['hlr_disk'] = np.sqrt(hlrMajD * hlrMinD)
param['ell_bulge'] = (hlrMajB - hlrMinB)/(hlrMajB + hlrMinB)
param['ell_disk'] = (hlrMajD - hlrMinD)/(hlrMajD + hlrMinD)
param['ell_tot'] = (aGal - bGal) / (aGal + bGal)
# Assign each galaxy a template SED
param['sed_type'] = sed_assign(phz=param['z'], btt=param['bfrac'], rng=self.rng_sedGal)
param['redden'] = self.tempRed_gal[param['sed_type']]
param['av'] = self.avGal[int(self.ud()*self.nav)]
if param['sed_type'] <= 5:
param['av'] = 0.0
param['redden'] = 0
param['star'] = 0 # Galaxy
if param['sed_type'] >= 29:
param['av'] = 0.6 * param['av'] / 3.0 # for quasar, av=[0, 0.2], 3.0=av.max-av.im
param['star'] = 2 # Quasar
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
param['mag_use_normal'] = gals['mag_true_g_lsst'][igals]
if param['mag_use_normal'] >= 26.5:
continue
self.ids += 1
param['id'] = self.ids
if param['star'] == 0:
obj = Galaxy(param, self.rotation)
self.objs.append(obj)
if param['star'] == 2:
obj = Quasar(param)
self.objs.append(obj)
def _load_stars(self, stars, pix_id=None):
nstars = len(stars['sourceID'])
for istars in range(nstars):
param = self.initialize_param()
param['ra'] = stars['RA'][istars]
param['dec'] = stars['Dec'][istars]
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
param['mag_use_normal'] = stars['app_sdss_g'][istars]
if param['mag_use_normal'] >= 26.5:
continue
self.ids += 1
param['id'] = self.ids
param['sed_type'] = stars['sourceID'][istars]
param['model_tag'] = stars['model_tag'][istars]
param['teff'] = stars['teff'][istars]
param['logg'] = stars['grav'][istars]
param['feh'] = stars['feh'][istars]
param['z'] = 0.0
param['star'] = 1 # Star
obj = Star(param)
self.objs.append(obj)
def _load(self, **kwargs):
self.nav = 15005
self.avGal = extAv(self.nav, seed=self.seed_Av)
gals_cat = h5.File(self.galaxy_path, 'r')['galaxies']
star_cat = h5.File(self.star_path, 'r')['catalog']
self.objs = []
self.ids = 0
for pix in self.pix_list:
gals = gals_cat[str(pix)]
stars = star_cat[str(pix)]
self._load_gals(gals, pix_id=pix)
self._load_stars(stars, pix_id=pix)
print("number of objects in catalog: ", len(self.objs))
del self.avGal
def load_sed(self, obj, **kwargs):
if obj.type == 'star':
_, wave, flux = tag_sed(
h5file=self.tempSED_star,
model_tag=obj.param['model_tag'],
teff=obj.param['teff'],
logg=obj.param['logg'],
feh=obj.param['feh']
)
elif obj.type == 'galaxy' or obj.type == 'quasar':
sed_data = getObservedSED(
sedCat=self.tempSed_gal[obj.sed_type],
redshift=obj.z,
av=obj.param["av"],
redden=obj.param["redden"]
)
wave, flux = sed_data[0], sed_data[1]
else:
raise ValueError("Object type not known")
speci = interpolate.interp1d(wave, flux)
# lamb = np.arange(2500, 10001 + 0.5, 0.5)
lamb = np.arange(2400, 11001 + 0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photo/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
return sed
import os
import numpy as np
import astropy.constants as cons
from astropy.table import Table
from scipy import interpolate
from ObservationSim.MockObject import CatalogBase, Star, Galaxy, Quasar
class Catalog_example(CatalogBase):
"""An user customizable class for reading in catalog(s) of objects and SEDs.
NOTE: must inherit the "CatalogBase" abstract class
...
Attributes
----------
cat_dir : str
a directory that contains the catalog file(s)
star_path : str
path to the star catalog file
star_SED_path : str
path to the star SED data
objs : list
a list of ObservationSim.MockObject (Star, Galaxy, or Quasar)
NOTE: must have "obj" list when implement your own Catalog
Methods
----------
load_sed(obj, **kwargs):
load the corresponding SED data for one object
load_norm_filt(obj):
load the filter throughput for the input catalog's photometric system.
"""
def __init__(self, config, **kwargs):
"""Constructor method.
Parameters
----------
config : dict
configuration dictionary which is parsed from the input YAML file
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
None
"""
super().__init__()
self.cat_dir = os.path.join(config["data_dir"], config["input_path"]["cat_dir"])
if "star_cat" in config["input_path"] and config["input_path"]["star_cat"]:
star_file = config["input_path"]["star_cat"]
star_SED_file = config["SED_templates_path"]["star_SED"]
self.star_path = os.path.join(self.cat_dir, star_file)
self.star_SED_path = os.path.join(config["data_dir"], star_SED_file)
# NOTE: must call _load() method here to read in all objects
self.objs = []
self._load()
def _load(self, **kwargs):
"""Read in all objects in from the catalog file(s).
This is a must implemented method which is used to read in all objects, and
then convert them to ObservationSim.MockObject (Star, Galaxy, or Quasar).
Currently,
the model of ObservationSim.MockObject.Star class requires:
param["star"] : int
specify the object type: 0: galaxy, 1: star, 2: quasar
param["id"] : int
ID number of the object
param["ra"] : float
Right ascension (in degrees)
param["dec"] : float
Declination (in degrees)
param["mag_use_normal"]: float
the absolute magnitude in a particular filter
NOTE: if that filter is not the corresponding CSST filter, the
load_norm_filt(obj) function must be implemented to load the filter
throughput of that particular photometric system
the model of ObservationSim.MockObject.Galaxy class requires:
param["star"] : int
specify the object type: 0: galaxy, 1: star, 2: quasar
param["id"] : int
ID number of the object
param["ra"] : float
Right ascension (in degrees)
param["dec"] : float
Declination (in degrees)
param["mag_use_normal"]: float
the absolute magnitude in a particular filter
NOTE: if that filter is not the corresponding CSST filter, the
load_norm_filt(obj) function must be implemented to load the filter
throughput of that particular photometric system
param["theta"] : float
the position angle (in degrees)
param["bfrac"] : float
the bulge fraction
param["hlr_bulge] : float
the half-light-radius of the bulge
param["hlr_disk] : float
the half-light-radius of the disk
param["ell_bulge] : float
the ellipticity of the bulge
param["ell_disk] : float
the ellipticity of the disk
param["ell_tot] : float
the total ellipticity
the model of ObservationSim.MockObject.Galaxy class requires:
Currently a Quasar is modeled as a point source, just like a Star.
NOTE: To construct an object, according to its type, just call:
Star(param), Galaxy(param), or Quasar(param)
NOTE: All constructed objects should be appened to "self.objs".
Parameters
----------
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
None
"""
stars = Table.read(self.star_path)
nstars = stars['sourceID'].size
for istars in range(nstars):
param = self.initialize_param()
param['id'] = istars + 1
param['ra'] = stars['RA'][istars]
param['dec'] = stars['Dec'][istars]
param['sed_type'] = stars['sourceID'][istars]
param['model_tag'] = stars['model_tag'][istars]
param['z'] = 0.0
param['star'] = 1 # Star
param['mag_use_normal'] = stars['app_sdss_g'][istars]
obj = Star(param)
self.objs.append(obj)
def load_sed(self, obj, **kwargs):
"""Load the corresponding SED data for a particular obj.
Parameters
----------
obj : ObservationSim.MockObject
the object to get SED data for
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
sed : Astropy.Table
the SED Table with two columns (namely, "WAVELENGTH", "FLUX"):
sed["WAVELENGTH"] : wavelength in Angstroms
sed["FLUX"] : fluxes in photons/s/m^2/A
NOTE: the range of wavelengthes must at least cover [2450 - 11000] Angstorms
"""
if obj.type == 'star':
wave = Table.read(self.star_SED_path, path=f"/SED/wave_{obj.model_tag}")
flux = Table.read(self.star_SED_path, path=f"/SED/{obj.sed_type}")
wave, flux = wave['col0'].data, flux['col0'].data
else:
raise ValueError("Object type not known")
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2400, 11001 + 0.5, 0.5)
y = speci(lamb)
# erg/s/cm^2/A --> photons/s/m^2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
return sed
def load_norm_filt(self, obj):
"""Load the corresponding thourghput for the input magnitude "param["mag_use_normal"]".
NOTE: if the input magnitude is already in CSST magnitude, simply return None
Parameters
----------
obj : ObservationSim.MockObject
the object to get thourghput data for
Returns
----------
norm_filt : Astropy.Table
the throughput Table with two columns (namely, "WAVELENGTH", "SENSITIVITY"):
norm_filt["WAVELENGTH"] : wavelengthes in Angstroms
norm_filt["SENSITIVITY"] : efficiencies
"""
return None
\ No newline at end of file
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
......@@ -14,6 +14,13 @@ class Filter(object):
self._getParam(filter_param, filter_type)
if self.filter_dir is not None:
self.bandpass_full, self.bandpass_sub_list = self._get_bandpasses(self.filter_dir)
self.survey_type = self._getSurveyType()
def _getSurveyType(self):
if self.filter_type in ["GI", "GV", "GU"]:
return "spectroscopic"
else:
return "photometric"
def _getParam(self, filter_param, filter_type, filter_id=None):
self.effective_wavelength = filter_param.param[filter_type][0]
......
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment