Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-sims
csst_msc_sim
Commits
07f72bde
Commit
07f72bde
authored
Nov 26, 2024
by
Wei Chengliang
Browse files
added C10_Catalog.py
parent
a03daa94
Pipeline
#7366
failed with stage
in 0 seconds
Changes
1
Pipelines
1
Show whitespace changes
Inline
Side-by-side
catalog/C10_Catalog.py
0 → 100644
View file @
07f72bde
import
os
import
galsim
import
random
import
copy
import
numpy
as
np
import
h5py
as
h5
import
healpy
as
hp
import
astropy.constants
as
cons
import
traceback
from
astropy.coordinates
import
spherical_to_cartesian
from
astropy.table
import
Table
from
scipy
import
interpolate
from
datetime
import
datetime
from
observation_sim.mock_objects
import
CatalogBase
,
Star
,
Galaxy
,
Quasar
,
ExtinctionMW
from
observation_sim.mock_objects._util
import
tag_sed
,
getObservedSED
,
getABMAG
,
integrate_sed_bandpass
,
comoving_dist
from
observation_sim.astrometry.Astrometry_util
import
on_orbit_obs_position
# (TEST)
from
astropy.cosmology
import
FlatLambdaCDM
from
astropy
import
constants
from
astropy
import
units
as
U
from
astropy.coordinates
import
SkyCoord
from
astropy.io
import
fits
import
astropy.constants
as
atcons
import
ctypes
try
:
import
importlib.resources
as
pkg_resources
except
ImportError
:
# Try backported to PY<37 'importlib_resources'
import
importlib_resources
as
pkg_resources
NSIDE
=
128
pointing_file_list
=
[
'pointing_C10_1000deg_lat80.dat'
,
'pointing_C10_1000deg_latm80_.dat'
,
'pointing_C10_1000deg_p180_m30.dat'
,
'pointing_C10_1000deg_p320_m40.dat'
]
star_file_list
=
[
'CSST_C10_1000sqd_1.hdf5'
,
'CSST_C10_1000sqd_2_Fornax.hdf5'
,
'CSST_C10_1000sqd_3.hdf5'
,
'CSST_C10_1000sqd_4.hdf5'
]
def
get_galaxy_qso_list
(
config
):
cat_dir
=
config
[
"catalog_options"
][
"input_path"
][
"cat_dir"
]
with
open
(
cat_dir
+
"galcat_C10/qsocat/gal_C10_file"
,
'r'
,
encoding
=
'utf-8'
)
as
fn1
:
fn1_list
=
fn1
.
readlines
()
bundle_file_list
=
[
line
.
strip
()
for
line
in
fn1_list
]
with
open
(
cat_dir
+
"galcat_C10/qsocat/qso_sed_C10_file"
,
'r'
,
encoding
=
'utf-8'
)
as
fn2
:
fn2_list
=
fn2
.
readlines
()
qsosed_file_list
=
[
line
.
strip
()
for
line
in
fn2_list
]
return
bundle_file_list
,
qsosed_file_list
class
StarParm
(
ctypes
.
Structure
):
_fields_
=
[
(
'logte'
,
ctypes
.
c_float
),
(
'logg'
,
ctypes
.
c_float
),
(
'Mass'
,
ctypes
.
c_float
),
(
'Av'
,
ctypes
.
c_float
),
(
'mu0'
,
ctypes
.
c_float
),
(
'Z'
,
ctypes
.
c_float
)]
def
get_star_cat
(
config
):
idx
=
pointing_file_list
.
index
(
os
.
path
.
basename
(
config
[
'obs_setting'
][
'pointing_file'
]))
return_star_path
=
star_file_list
[
idx
]
return
return_star_path
def
get_bundleIndex
(
healpixID_ring
,
bundleOrder
=
4
,
healpixOrder
=
7
):
assert
NSIDE
==
2
**
healpixOrder
shift
=
healpixOrder
-
bundleOrder
shift
=
2
*
shift
nside_bundle
=
2
**
bundleOrder
nside_healpix
=
2
**
healpixOrder
healpixID_nest
=
hp
.
ring2nest
(
nside_healpix
,
healpixID_ring
)
bundleID_nest
=
(
healpixID_nest
>>
shift
)
bundleID_ring
=
hp
.
nest2ring
(
nside_bundle
,
bundleID_nest
)
return
bundleID_ring
def
get_agnsed_file
(
bundle_file_name
,
config
):
bundle_file_list
,
qsosed_file_list
=
get_galaxy_qso_list
(
config
)
return
qsosed_file_list
[
bundle_file_list
.
index
(
bundle_file_name
)]
class
Catalog
(
CatalogBase
):
def
__init__
(
self
,
config
,
chip
,
pointing
,
chip_output
,
filt
,
**
kwargs
):
super
().
__init__
()
self
.
cat_dir
=
config
[
"catalog_options"
][
"input_path"
][
"cat_dir"
]
self
.
cosmo
=
FlatLambdaCDM
(
H0
=
67.66
,
Om0
=
0.3111
)
self
.
chip_output
=
chip_output
self
.
filt
=
filt
self
.
logger
=
chip_output
.
logger
with
pkg_resources
.
path
(
'catalog.data'
,
'SLOAN_SDSS.g.fits'
)
as
filter_path
:
self
.
normF_star
=
Table
.
read
(
str
(
filter_path
))
self
.
config
=
config
self
.
chip
=
chip
self
.
pointing
=
pointing
self
.
max_size
=
0.
# [TODO] Milky Way extinction
if
"enable_mw_ext_gal"
in
config
[
"catalog_options"
]
and
config
[
"catalog_options"
][
"enable_mw_ext_gal"
]:
if
"planck_ebv_map"
not
in
config
[
"catalog_options"
]:
raise
ValueError
(
"Planck dust map must be given to enable Milky Way extinction calculation for galaxies."
)
self
.
mw_ext
=
ExtinctionMW
()
self
.
mw_ext
.
init_ext_model
(
model_name
=
"odonnell"
)
self
.
mw_ext
.
load_Planck_ext
(
file_path
=
config
[
"catalog_options"
][
"planck_ebv_map"
])
if
"star_cat"
in
config
[
"catalog_options"
][
"input_path"
]
and
config
[
"catalog_options"
][
"input_path"
][
"star_cat"
]
and
not
config
[
"catalog_options"
][
"galaxy_only"
]:
# Get the cloest star catalog file
star_file_name
=
get_star_cat
(
self
.
config
)
star_path
=
os
.
path
.
join
(
config
[
"catalog_options"
][
"input_path"
][
"star_cat"
],
star_file_name
)
self
.
star_path
=
os
.
path
.
join
(
self
.
cat_dir
,
star_path
)
self
.
star_SED_path
=
config
[
"catalog_options"
][
"SED_templates_path"
][
"star_SED"
]
self
.
_load_SED_lib_star
()
if
"galaxy_cat"
in
config
[
"catalog_options"
][
"input_path"
]
and
config
[
"catalog_options"
][
"input_path"
][
"galaxy_cat"
]
and
not
config
[
"catalog_options"
][
"star_only"
]:
galaxy_dir
=
config
[
"catalog_options"
][
"input_path"
][
"galaxy_cat"
]
self
.
galaxy_path
=
os
.
path
.
join
(
self
.
cat_dir
,
galaxy_dir
)
self
.
galaxy_SED_path
=
config
[
"catalog_options"
][
"SED_templates_path"
][
"galaxy_SED"
]
self
.
_load_SED_lib_gals
()
self
.
agn_seds
=
{}
if
"AGN_SED"
in
config
[
"catalog_options"
][
"SED_templates_path"
]
and
not
config
[
"catalog_options"
][
"star_only"
]:
self
.
AGN_SED_path
=
config
[
"catalog_options"
][
"SED_templates_path"
][
"AGN_SED"
]
if
"rotateEll"
in
config
[
"catalog_options"
]:
self
.
rotation
=
np
.
radians
(
float
(
config
[
"catalog_options"
][
"rotateEll"
]))
else
:
self
.
rotation
=
0.
# Update output .cat header with catalog specific output columns
self
.
_add_output_columns_header
()
self
.
_get_healpix_list
()
self
.
_load
()
def
_add_output_columns_header
(
self
):
self
.
add_hdr
=
" av stellarmass dm teff logg feh"
self
.
add_hdr
+=
" bulgemass diskmass detA e1 e2 kappa g1 g2 size galType veldisp "
self
.
add_fmt
=
"%8.4f %8.4f %8.4f %8.4f %8.4f %8.4f"
self
.
add_fmt
+=
" %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %4d %8.4f "
self
.
chip_output
.
update_output_header
(
additional_column_names
=
self
.
add_hdr
)
def
_get_healpix_list
(
self
):
self
.
sky_coverage
=
self
.
chip
.
getSkyCoverageEnlarged
(
self
.
chip
.
img
.
wcs
,
margin
=
0.2
)
ra_min
,
ra_max
,
dec_min
,
dec_max
=
self
.
sky_coverage
.
xmin
,
self
.
sky_coverage
.
xmax
,
self
.
sky_coverage
.
ymin
,
self
.
sky_coverage
.
ymax
ra
=
np
.
deg2rad
(
np
.
array
([
ra_min
,
ra_max
,
ra_max
,
ra_min
]))
dec
=
np
.
deg2rad
(
np
.
array
([
dec_max
,
dec_max
,
dec_min
,
dec_min
]))
self
.
pix_list
=
hp
.
query_polygon
(
NSIDE
,
hp
.
ang2vec
(
np
.
radians
(
90.
)
-
dec
,
ra
),
inclusive
=
True
)
if
self
.
logger
is
not
None
:
msg
=
str
((
"HEALPix List: "
,
self
.
pix_list
))
self
.
logger
.
info
(
msg
)
else
:
print
(
"HEALPix List: "
,
self
.
pix_list
)
def
load_norm_filt
(
self
,
obj
):
if
obj
.
type
==
"star"
:
return
self
.
normF_star
elif
obj
.
type
==
"galaxy"
or
obj
.
type
==
"quasar"
:
# return self.normF_galaxy
return
None
else
:
return
None
# def _load_SED_lib_star(self):
# self.tempSED_star = h5.File(self.star_SED_path,'r')
def
_load_SED_lib_star
(
self
):
# self.tempSED_star = h5.File(self.star_SED_path,'r')
with
pkg_resources
.
path
(
'catalog.data'
,
'starSpecInterp.so'
)
as
ddl_path
:
self
.
starDDL
=
ctypes
.
CDLL
(
str
(
ddl_path
))
self
.
starDDL
.
loadSpecLibs
.
argtypes
=
[
ctypes
.
c_char_p
,
ctypes
.
c_char_p
]
self
.
starDDL
.
loadExts
.
argtypes
=
[
ctypes
.
c_char_p
]
nwv
=
self
.
starDDL
.
loadSpecLibs
(
str
.
encode
(
os
.
path
.
join
(
self
.
star_SED_path
,
'file_BT-Settl_CSST_wl1000-24000_R1000.par'
)),
str
.
encode
(
self
.
star_SED_path
))
self
.
starDDL
.
loadExts
(
str
.
encode
(
os
.
path
.
join
(
self
.
star_SED_path
,
"Ext_odonnell94_R3.1_CSST_wl1000-24000_R1000.fits"
)))
self
.
star_spec_len
=
nwv
def
_interp_star_sed
(
self
,
obj
):
spec
=
(
ctypes
.
c_float
*
self
.
star_spec_len
)()
wave
=
(
ctypes
.
c_float
*
self
.
star_spec_len
)()
self
.
starDDL
.
interpSingleStar
.
argtypes
=
[
ctypes
.
Structure
,
ctypes
.
POINTER
(
ctypes
.
c_float
)]
# s=Star(obj.param['teff'], obj.param['grav''], obj.paramstar['mwmsc_mass'], obj.param['AV'], obj.param['DM'], obj.param['z_met'])
s
=
StarParm
(
obj
.
param
[
'teff'
],
obj
.
param
[
'logg'
],
obj
.
param
[
'stellarMass'
],
obj
.
param
[
'av'
],
obj
.
param
[
'DM'
],
obj
.
param
[
'feh'
])
self
.
starDDL
.
interpSingleStar
(
s
,
spec
,
wave
)
rv_c
=
obj
.
param
[
'rv'
]
/
(
atcons
.
c
.
value
/
1000.
)
Doppler_factor
=
np
.
sqrt
((
1
+
rv_c
)
/
(
1
-
rv_c
))
wave_RV
=
wave
*
Doppler_factor
return
wave_RV
,
np
.
power
(
10
,
spec
[:])
def
_load_SED_lib_gals
(
self
):
pcs
=
h5
.
File
(
os
.
path
.
join
(
self
.
galaxy_SED_path
,
"pcs.h5"
),
"r"
)
lamb
=
h5
.
File
(
os
.
path
.
join
(
self
.
galaxy_SED_path
,
"lamb.h5"
),
"r"
)
self
.
lamb_gal
=
lamb
[
'lamb'
][()]
self
.
pcs
=
pcs
[
'pcs'
][()]
def
_load_gals
(
self
,
gals
,
pix_id
=
None
,
cat_id
=
0
,
agnsed_file
=
""
):
ngals
=
len
(
gals
[
'ra'
])
# Apply astrometric modeling
ra_arr
=
gals
[
'ra'
][:]
dec_arr
=
gals
[
'dec'
][:]
if
self
.
config
[
"obs_setting"
][
"enable_astrometric_model"
]:
ra_list
=
ra_arr
.
tolist
()
dec_list
=
dec_arr
.
tolist
()
pmra_list
=
np
.
zeros
(
ngals
).
tolist
()
pmdec_list
=
np
.
zeros
(
ngals
).
tolist
()
rv_list
=
np
.
zeros
(
ngals
).
tolist
()
parallax_list
=
[
1e-9
]
*
ngals
dt
=
datetime
.
utcfromtimestamp
(
self
.
pointing
.
timestamp
)
date_str
=
dt
.
date
().
isoformat
()
time_str
=
dt
.
time
().
isoformat
()
ra_arr
,
dec_arr
=
on_orbit_obs_position
(
input_ra_list
=
ra_list
,
input_dec_list
=
dec_list
,
input_pmra_list
=
pmra_list
,
input_pmdec_list
=
pmdec_list
,
input_rv_list
=
rv_list
,
input_parallax_list
=
parallax_list
,
input_nstars
=
ngals
,
input_x
=
self
.
pointing
.
sat_x
,
input_y
=
self
.
pointing
.
sat_y
,
input_z
=
self
.
pointing
.
sat_z
,
input_vx
=
self
.
pointing
.
sat_vx
,
input_vy
=
self
.
pointing
.
sat_vy
,
input_vz
=
self
.
pointing
.
sat_vz
,
input_epoch
=
"J2000"
,
input_date_str
=
date_str
,
input_time_str
=
time_str
)
# [TODO] get Milky Way extinction AVs
if
"enable_mw_ext_gal"
in
self
.
config
[
"catalog_options"
]
and
self
.
config
[
"catalog_options"
][
"enable_mw_ext_gal"
]:
MW_Av_arr
=
self
.
mw_ext
.
Av_from_Planck
(
ra
=
ra_arr
,
dec
=
dec_arr
)
else
:
MW_Av_arr
=
np
.
zeros
(
len
(
ra_arr
))
for
igals
in
range
(
ngals
):
# # (TEST)
# if igals > 100:
# break
param
=
self
.
initialize_param
()
param
[
'ra'
]
=
ra_arr
[
igals
]
param
[
'dec'
]
=
dec_arr
[
igals
]
param
[
'ra_orig'
]
=
gals
[
'ra'
][
igals
]
param
[
'dec_orig'
]
=
gals
[
'dec'
][
igals
]
# [TODO] get Milky Way extinction AVs
param
[
'mw_Av'
]
=
MW_Av_arr
[
igals
]
if
not
self
.
chip
.
isContainObj
(
ra_obj
=
param
[
'ra'
],
dec_obj
=
param
[
'dec'
],
margin
=
200
):
continue
# param['mag_use_normal'] = gals['mag_csst_%s'%(self.filt.filter_type)][igals]
# if self.filt.is_too_dim(mag=param['mag_use_normal'], margin=self.config["obs_setting"]["mag_lim_margin"]):
# continue
param
[
'z'
]
=
gals
[
'redshift'
][
igals
]
param
[
'model_tag'
]
=
'None'
param
[
'g1'
]
=
gals
[
'shear'
][
igals
][
0
]
param
[
'g2'
]
=
gals
[
'shear'
][
igals
][
1
]
param
[
'kappa'
]
=
gals
[
'kappa'
][
igals
]
param
[
'e1'
]
=
gals
[
'ellipticity_true'
][
igals
][
0
]
param
[
'e2'
]
=
gals
[
'ellipticity_true'
][
igals
][
1
]
# For shape calculation
param
[
'e1'
],
param
[
'e2'
],
param
[
'ell_total'
]
=
self
.
rotate_ellipticity
(
e1
=
gals
[
'ellipticity_true'
][
igals
][
0
],
e2
=
gals
[
'ellipticity_true'
][
igals
][
1
],
rotation
=
self
.
rotation
,
unit
=
'radians'
)
# param['ell_total'] = np.sqrt(param['e1']**2 + param['e2']**2)
if
param
[
'ell_total'
]
>
0.9
:
continue
# phi_e = cmath.phase(complex(param['e1'], param['e2']))
# param['e1'] = param['ell_total'] * np.cos(phi_e + 2*self.rotation)
# param['e2'] = param['ell_total'] * np.sin(phi_e + 2*self.rotation)
param
[
'e1_disk'
]
=
param
[
'e1'
]
param
[
'e2_disk'
]
=
param
[
'e2'
]
param
[
'e1_bulge'
]
=
param
[
'e1'
]
param
[
'e2_bulge'
]
=
param
[
'e2'
]
param
[
'delta_ra'
]
=
0
param
[
'delta_dec'
]
=
0
# Masses
param
[
'bulgemass'
]
=
gals
[
'bulgemass'
][
igals
]
param
[
'diskmass'
]
=
gals
[
'diskmass'
][
igals
]
param
[
'size'
]
=
gals
[
'size'
][
igals
]
if
param
[
'size'
]
>
self
.
max_size
:
self
.
max_size
=
param
[
'size'
]
# Sersic index
param
[
'disk_sersic_idx'
]
=
1.
param
[
'bulge_sersic_idx'
]
=
4.
# Sizes
param
[
'bfrac'
]
=
param
[
'bulgemass'
]
/
\
(
param
[
'bulgemass'
]
+
param
[
'diskmass'
])
if
param
[
'bfrac'
]
>=
0.6
:
param
[
'hlr_bulge'
]
=
param
[
'size'
]
param
[
'hlr_disk'
]
=
param
[
'size'
]
*
(
1.
-
param
[
'bfrac'
])
else
:
param
[
'hlr_disk'
]
=
param
[
'size'
]
param
[
'hlr_bulge'
]
=
param
[
'size'
]
*
param
[
'bfrac'
]
# SED coefficients
param
[
'coeff'
]
=
gals
[
'coeff'
][
igals
]
param
[
'detA'
]
=
gals
[
'detA'
][
igals
]
# Others
param
[
'galType'
]
=
gals
[
'type'
][
igals
]
param
[
'veldisp'
]
=
gals
[
'veldisp'
][
igals
]
# TEST no redening and no extinction
param
[
'av'
]
=
0.0
param
[
'redden'
]
=
0
# TEMP
self
.
ids
+=
1
param
[
'id'
]
=
'%06d'
%
(
int
(
pix_id
))
+
\
'%06d'
%
(
cat_id
)
+
'%08d'
%
(
igals
)
# Is this an Quasar?
param
[
'qsoindex'
]
=
gals
[
'qsoindex'
][
igals
]
if
param
[
'qsoindex'
]
==
-
1
:
param
[
'star'
]
=
0
# Galaxy
param
[
'agnsed_file'
]
=
""
obj
=
Galaxy
(
param
,
logger
=
self
.
logger
)
else
:
param_qso
=
copy
.
deepcopy
(
param
)
param_qso
[
'star'
]
=
2
# Quasar
param_qso
[
'agnsed_file'
]
=
agnsed_file
# First add QSO model
obj
=
Quasar
(
param_qso
,
logger
=
self
.
logger
)
# Need to deal with additional output columns
obj
.
additional_output_str
=
self
.
add_fmt
%
(
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0
,
0.
)
self
.
objs
.
append
(
obj
)
# Then add host galaxy model
param
[
'star'
]
=
0
# Galaxy
param
[
'agnsed_file'
]
=
""
obj
=
Galaxy
(
param
,
logger
=
self
.
logger
)
# Need to deal with additional output columns for (host) galaxy
obj
.
additional_output_str
=
self
.
add_fmt
%
(
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
param
[
'bulgemass'
],
param
[
'diskmass'
],
param
[
'detA'
],
param
[
'e1'
],
param
[
'e2'
],
param
[
'kappa'
],
param
[
'g1'
],
param
[
'g2'
],
param
[
'size'
],
param
[
'galType'
],
param
[
'veldisp'
])
self
.
objs
.
append
(
obj
)
def
_load_stars
(
self
,
stars
,
pix_id
=
None
):
nstars
=
len
(
stars
[
'RA'
])
# Apply astrometric modeling
ra_arr
=
stars
[
"RA"
][:]
dec_arr
=
stars
[
"DEC"
][:]
pmra_arr
=
stars
[
'pmra'
][:]
pmdec_arr
=
stars
[
'pmdec'
][:]
rv_arr
=
stars
[
'RV'
][:]
parallax_arr
=
stars
[
'parallax'
][:]
if
self
.
config
[
"obs_setting"
][
"enable_astrometric_model"
]:
ra_list
=
ra_arr
.
tolist
()
dec_list
=
dec_arr
.
tolist
()
pmra_list
=
pmra_arr
.
tolist
()
pmdec_list
=
pmdec_arr
.
tolist
()
rv_list
=
rv_arr
.
tolist
()
parallax_list
=
parallax_arr
.
tolist
()
dt
=
datetime
.
utcfromtimestamp
(
self
.
pointing
.
timestamp
)
date_str
=
dt
.
date
().
isoformat
()
time_str
=
dt
.
time
().
isoformat
()
ra_arr
,
dec_arr
=
on_orbit_obs_position
(
input_ra_list
=
ra_list
,
input_dec_list
=
dec_list
,
input_pmra_list
=
pmra_list
,
input_pmdec_list
=
pmdec_list
,
input_rv_list
=
rv_list
,
input_parallax_list
=
parallax_list
,
input_nstars
=
nstars
,
input_x
=
self
.
pointing
.
sat_x
,
input_y
=
self
.
pointing
.
sat_y
,
input_z
=
self
.
pointing
.
sat_z
,
input_vx
=
self
.
pointing
.
sat_vx
,
input_vy
=
self
.
pointing
.
sat_vy
,
input_vz
=
self
.
pointing
.
sat_vz
,
input_epoch
=
"J2000"
,
input_date_str
=
date_str
,
input_time_str
=
time_str
)
for
istars
in
range
(
nstars
):
# # (TEST)
# if istars > 100:
# break
param
=
self
.
initialize_param
()
param
[
'ra'
]
=
ra_arr
[
istars
]
param
[
'dec'
]
=
dec_arr
[
istars
]
param
[
'ra_orig'
]
=
stars
[
"RA"
][
istars
]
param
[
'dec_orig'
]
=
stars
[
"DEC"
][
istars
]
param
[
'pmra'
]
=
pmra_arr
[
istars
]
param
[
'pmdec'
]
=
pmdec_arr
[
istars
]
param
[
'rv'
]
=
rv_arr
[
istars
]
param
[
'parallax'
]
=
parallax_arr
[
istars
]
if
not
self
.
chip
.
isContainObj
(
ra_obj
=
param
[
'ra'
],
dec_obj
=
param
[
'dec'
],
margin
=
200
):
continue
param
[
'mag_use_normal'
]
=
stars
[
'app_SDSS_g'
][
istars
]
self
.
ids
+=
1
param
[
'id'
]
=
'%06d'
%
(
int
(
pix_id
))
+
'%08d'
%
(
istars
)
# param['sed_type'] = istars
# param['model_tag'] = ''
param
[
'teff'
]
=
stars
[
'teff'
][
istars
]
param
[
'logg'
]
=
stars
[
'grav'
][
istars
]
param
[
'feh'
]
=
stars
[
'z_met'
][
istars
]
param
[
'stellarMass'
]
=
stars
[
'mass'
][
istars
]
param
[
'av'
]
=
stars
[
'AV'
][
istars
]
param
[
'DM'
]
=
stars
[
'DM'
][
istars
]
# param['z_met'] = stars['z_met'][istars]
param
[
'z'
]
=
0.0
param
[
'star'
]
=
1
# Star
try
:
obj
=
Star
(
param
,
logger
=
self
.
logger
)
except
Exception
as
e
:
print
(
e
)
# Append additional output columns to the .cat file
obj
.
additional_output_str
=
self
.
add_fmt
%
(
param
[
"av"
],
param
[
'stellarMass'
],
param
[
'DM'
],
param
[
'teff'
],
param
[
'logg'
],
param
[
'feh'
],
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
0.
,
-
1
,
0.
)
self
.
objs
.
append
(
obj
)
def
_load
(
self
,
**
kwargs
):
self
.
objs
=
[]
self
.
ids
=
0
if
"star_cat"
in
self
.
config
[
"catalog_options"
][
"input_path"
]
and
self
.
config
[
"catalog_options"
][
"input_path"
][
"star_cat"
]
and
not
self
.
config
[
"catalog_options"
][
"galaxy_only"
]:
star_cat
=
h5
.
File
(
self
.
star_path
,
'r'
)[
'star_catalog'
]
for
pix
in
self
.
pix_list
:
try
:
stars
=
star_cat
[
str
(
pix
)]
self
.
_load_stars
(
stars
,
pix_id
=
pix
)
del
stars
except
Exception
as
e
:
self
.
logger
.
error
(
str
(
e
))
# print(e)
if
"galaxy_cat"
in
self
.
config
[
"catalog_options"
][
"input_path"
]
and
self
.
config
[
"catalog_options"
][
"input_path"
][
"galaxy_cat"
]
and
not
self
.
config
[
"catalog_options"
][
"star_only"
]:
for
pix
in
self
.
pix_list
:
try
:
bundleID
=
get_bundleIndex
(
pix
)
bundle_file
=
"galaxies_C10_bundle{:06}.h5"
.
format
(
bundleID
)
file_path
=
os
.
path
.
join
(
self
.
galaxy_path
,
bundle_file
)
gals_cat
=
h5
.
File
(
file_path
,
'r'
)[
'galaxies'
]
gals
=
gals_cat
[
str
(
pix
)]
# Get corresponding AGN SED file
agnsed_file
=
get_agnsed_file
(
bundle_file
,
self
.
config
)
agnsed_path
=
os
.
path
.
join
(
self
.
AGN_SED_path
,
agnsed_file
)
self
.
agn_seds
[
agnsed_file
]
=
fits
.
open
(
agnsed_path
)[
0
].
data
self
.
_load_gals
(
gals
,
pix_id
=
pix
,
cat_id
=
bundleID
,
agnsed_file
=
agnsed_file
)
del
gals
except
Exception
as
e
:
traceback
.
print_exc
()
self
.
logger
.
error
(
str
(
e
))
print
(
e
)
if
self
.
logger
is
not
None
:
self
.
logger
.
info
(
"maximum galaxy size: %.4f"
%
(
self
.
max_size
))
self
.
logger
.
info
(
"number of objects in catalog: %d"
%
(
len
(
self
.
objs
)))
else
:
print
(
"number of objects in catalog: "
,
len
(
self
.
objs
))
def
load_sed
(
self
,
obj
,
**
kwargs
):
if
obj
.
type
==
'star'
:
# _, wave, flux = tag_sed(
# h5file=self.tempSED_star,
# model_tag=obj.param['model_tag'],
# teff=obj.param['teff'],
# logg=obj.param['logg'],
# feh=obj.param['feh']
# )
wave
,
flux
=
self
.
_interp_star_sed
(
obj
)
elif
obj
.
type
==
'galaxy'
or
obj
.
type
==
'quasar'
:
factor
=
10
**
(
-
.
4
*
self
.
cosmo
.
distmod
(
obj
.
z
).
value
)
if
obj
.
type
==
'galaxy'
:
flux
=
np
.
matmul
(
self
.
pcs
,
obj
.
coeff
)
*
factor
# if np.any(flux < 0):
# raise ValueError("Glaxy %s: negative SED fluxes"%obj.id)
flux
[
flux
<
0
]
=
0.
sedcat
=
np
.
vstack
((
self
.
lamb_gal
,
flux
)).
T
sed_data
=
getObservedSED
(
sedCat
=
sedcat
,
redshift
=
obj
.
z
,
av
=
obj
.
param
[
"av"
],
redden
=
obj
.
param
[
"redden"
]
)
wave
,
flux
=
sed_data
[
0
],
sed_data
[
1
]
elif
obj
.
type
==
'quasar'
:
flux
=
self
.
agn_seds
[
obj
.
agnsed_file
][
int
(
obj
.
qsoindex
)]
*
1e-17
flux
[
flux
<
0
]
=
0.
wave
=
self
.
lamb_gal
*
(
1.0
+
obj
.
z
)
else
:
raise
ValueError
(
"Object type not known"
)
speci
=
interpolate
.
interp1d
(
wave
,
flux
,
fill_value
=
0.
,
bounds_error
=
False
)
lamb
=
np
.
arange
(
2000
,
11001
+
0.5
,
0.5
)
y
=
speci
(
lamb
)
# [TODO] Apply Milky Way extinction
if
obj
.
type
!=
'star'
and
(
"enable_mw_ext_gal"
in
self
.
config
[
"catalog_options"
]
and
self
.
config
[
"catalog_options"
][
"enable_mw_ext_gal"
]):
y
=
self
.
mw_ext
.
apply_extinction
(
y
,
Av
=
obj
.
mw_Av
)
# erg/s/cm2/A --> photon/s/m2/A
all_sed
=
y
*
lamb
/
(
cons
.
h
.
value
*
cons
.
c
.
value
)
*
1e-13
sed
=
Table
(
np
.
array
([
lamb
,
all_sed
]).
T
,
names
=
(
'WAVELENGTH'
,
'FLUX'
))
# if obj.type == 'quasar':
# # integrate to get the magnitudes
# sed_photon = np.array([sed['WAVELENGTH'], sed['FLUX']]).T
# sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(
# sed_photon[:, 1]), interpolant='nearest')
# sed_photon = galsim.SED(
# sed_photon, wave_type='A', flux_type='1', fast=False)
# interFlux = integrate_sed_bandpass(
# sed=sed_photon, bandpass=self.filt.bandpass_full)
# obj.param['mag_use_normal'] = getABMAG(
# interFlux, self.filt.bandpass_full)
# # mag = getABMAG(interFlux, self.filt.bandpass_full)
# # print("mag diff = %.3f"%(mag - obj.param['mag_use_normal']))
# integrate to get the magnitudes
if
obj
.
type
==
'quasar'
or
obj
.
type
==
'galaxy'
:
sed_photon
=
np
.
array
([
sed
[
'WAVELENGTH'
],
sed
[
'FLUX'
]]).
T
sed_photon
=
galsim
.
LookupTable
(
x
=
np
.
array
(
sed_photon
[:,
0
]),
f
=
np
.
array
(
sed_photon
[:,
1
]),
interpolant
=
'nearest'
)
sed_photon
=
galsim
.
SED
(
sed_photon
,
wave_type
=
'A'
,
flux_type
=
'1'
,
fast
=
False
)
interFlux
=
integrate_sed_bandpass
(
sed
=
sed_photon
,
bandpass
=
self
.
filt
.
bandpass_full
)
obj
.
param
[
'mag_use_normal'
]
=
getABMAG
(
interFlux
,
self
.
filt
.
bandpass_full
)
# mag = getABMAG(interFlux, self.filt.bandpass_full)
# print("mag diff = %.3f"%(mag - obj.param['mag_use_normal']))
del
wave
del
flux
return
sed
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment