Commit 38e2c452 authored by Fang Yuedong's avatar Fang Yuedong
Browse files

Merge branch 'master' into 'current_stable_for_tests'

for tests before release

See merge request !23
parents e0f2b9f7 1c35c0e2
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import os
import galsim
import random
import numpy as np
import h5py as h5
import healpy as hp
import astropy.constants as cons
from astropy.coordinates import spherical_to_cartesian
from astropy.table import Table
from scipy import interpolate
from datetime import datetime
from ObservationSim.MockObject import CatalogBase, Star, Galaxy, Quasar
from ObservationSim.MockObject._util import seds, sed_assign, extAv, tag_sed, getObservedSED
from ObservationSim.Astrometry.Astrometry_util import on_orbit_obs_position
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 'importlib_resources'
import importlib_resources as pkg_resources
NSIDE = 128
class Catalog(CatalogBase):
def __init__(self, config, chip, pointing, chip_output, **kwargs):
super().__init__()
self.cat_dir = os.path.join(config["data_dir"], config["catalog_options"]["input_path"]["cat_dir"])
self.seed_Av = config["catalog_options"]["seed_Av"]
self.chip_output = chip_output
self.logger = chip_output.logger
with pkg_resources.path('Catalog.data', 'SLOAN_SDSS.g.fits') as filter_path:
self.normF_star = Table.read(str(filter_path))
with pkg_resources.path('Catalog.data', 'lsst_throuput_g.fits') as filter_path:
self.normF_galaxy = Table.read(str(filter_path))
self.config = config
self.chip = chip
self.pointing = pointing
if "star_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"]["star_cat"] and not config["catalog_options"]["galaxy_only"]:
star_file = config["catalog_options"]["input_path"]["star_cat"]
star_SED_file = config["catalog_options"]["SED_templates_path"]["star_SED"]
self.star_path = os.path.join(self.cat_dir, star_file)
self.star_SED_path = os.path.join(config["data_dir"], star_SED_file)
self._load_SED_lib_star()
if "galaxy_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"]["galaxy_cat"] and not config["catalog_options"]["star_only"]:
galaxy_file = config["catalog_options"]["input_path"]["galaxy_cat"]
self.galaxy_path = os.path.join(self.cat_dir, galaxy_file)
self.galaxy_SED_path = os.path.join(config["data_dir"], config["catalog_options"]["SED_templates_path"]["galaxy_SED"])
self._load_SED_lib_gals()
if "rotateEll" in config["catalog_options"]:
self.rotation = np.radians(float(config["catalog_options"]["rotateEll"]))
else:
self.rotation = 0.
# Update output .cat header with catalog specific output columns
self._add_output_columns_header()
self._get_healpix_list()
self._load()
def _add_output_columns_header(self):
self.add_hdr = " model_tag teff logg feh"
self.add_fmt = " %10s %8.4f %8.4f %8.4f"
self.chip_output.update_output_header(additional_column_names=self.add_hdr)
def _get_healpix_list(self):
self.sky_coverage = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.2)
ra_min, ra_max, dec_min, dec_max = self.sky_coverage.xmin, self.sky_coverage.xmax, self.sky_coverage.ymin, self.sky_coverage.ymax
ra = np.deg2rad(np.array([ra_min, ra_max, ra_max, ra_min]))
dec = np.deg2rad(np.array([dec_max, dec_max, dec_min, dec_min]))
vertices = spherical_to_cartesian(1., dec, ra)
self.pix_list = hp.query_polygon(NSIDE, np.array(vertices).T, inclusive=True)
if self.logger is not None:
msg = str(("HEALPix List: ", self.pix_list))
self.logger.info(msg)
else:
print("HEALPix List: ", self.pix_list)
def load_norm_filt(self, obj):
if obj.type == "star":
return self.normF_star
elif obj.type == "galaxy" or obj.type == "quasar":
return self.normF_galaxy
else:
return None
def _load_SED_lib_star(self):
self.tempSED_star = h5.File(self.star_SED_path,'r')
def _load_SED_lib_gals(self):
self.tempSed_gal, self.tempRed_gal = seds("galaxy.list", seddir=self.galaxy_SED_path)
def _load_gals(self, gals, pix_id=None):
ngals = len(gals['galaxyID'])
self.rng_sedGal = random.Random()
self.rng_sedGal.seed(pix_id) # Use healpix index as the random seed
self.ud = galsim.UniformDeviate(pix_id)
# Apply astrometric modeling
# in C3 case only aberration
ra_arr = gals['ra_true'][:]
dec_arr = gals['dec_true'][:]
if self.config["obs_setting"]["enable_astrometric_model"]:
ra_list = ra_arr.tolist()
dec_list = dec_arr.tolist()
pmra_list = np.zeros(ngals).tolist()
pmdec_list = np.zeros(ngals).tolist()
rv_list = np.zeros(ngals).tolist()
parallax_list = [1e-9] * ngals
dt = datetime.utcfromtimestamp(self.pointing.timestamp)
date_str = dt.date().isoformat()
time_str = dt.time().isoformat()
ra_arr, dec_arr = on_orbit_obs_position(
input_ra_list=ra_list,
input_dec_list=dec_list,
input_pmra_list=pmra_list,
input_pmdec_list=pmdec_list,
input_rv_list=rv_list,
input_parallax_list=parallax_list,
input_nstars=ngals,
input_x=self.pointing.sat_x,
input_y=self.pointing.sat_y,
input_z=self.pointing.sat_z,
input_vx=self.pointing.sat_vx,
input_vy=self.pointing.sat_vy,
input_vz=self.pointing.sat_vz,
input_epoch="J2015.5",
input_date_str=date_str,
input_time_str=time_str
)
for igals in range(ngals):
param = self.initialize_param()
param['ra'] = ra_arr[igals]
param['dec'] = dec_arr[igals]
param['ra_orig'] = gals['ra_true'][igals]
param['dec_orig'] = gals['dec_true'][igals]
param['mag_use_normal'] = gals['mag_true_g_lsst'][igals]
# if param['mag_use_normal'] >= 26.5:
# continue
param['z'] = gals['redshift_true'][igals]
param['model_tag'] = 'None'
param['g1'] = 0
param['g2'] = 0
param['kappa'] = 0
param['delta_ra'] = 0
param['delta_dec'] = 0
# sersicB = gals['sersic_bulge'][igals]
hlrMajB = gals['size_bulge_true'][igals]
hlrMinB = gals['size_minor_bulge_true'][igals]
# sersicD = gals['sersic_disk'][igals]
hlrMajD = gals['size_disk_true'][igals]
hlrMinD = gals['size_minor_disk_true'][igals]
aGal = gals['size_true'][igals]
bGal = gals['size_minor_true'][igals]
param['bfrac'] = gals['bulge_to_total_ratio_i'][igals]
param['theta'] = gals['position_angle_true'][igals]
param['hlr_bulge'] = np.sqrt(hlrMajB * hlrMinB)
param['hlr_disk'] = np.sqrt(hlrMajD * hlrMinD)
param['ell_bulge'] = (hlrMajB - hlrMinB)/(hlrMajB + hlrMinB)
param['ell_disk'] = (hlrMajD - hlrMinD)/(hlrMajD + hlrMinD)
param['ell_tot'] = (aGal - bGal) / (aGal + bGal)
# Assign each galaxy a template SED
param['sed_type'] = sed_assign(phz=param['z'], btt=param['bfrac'], rng=self.rng_sedGal)
# param['redden'] = self.tempRed_gal[param['sed_type']]
# param['av'] = self.avGal[int(self.ud()*self.nav)]
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
if param['sed_type'] <= 5:
param['av'] = 0.0
param['redden'] = 0
param['star'] = 0 # Galaxy
if param['sed_type'] >= 29:
param['av'] = 0.6 * param['av'] / 3.0 # for quasar, av=[0, 0.2], 3.0=av.max-av.im
param['star'] = 2 # Quasar
# NOTE: this cut cannot be put before the SED type has been assigned
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
self.ids += 1
# param['id'] = self.ids
param['id'] = gals['galaxyID'][igals]
if param['star'] == 0:
obj = Galaxy(param, logger=self.logger)
if param['star'] == 2:
obj = Quasar(param, logger=self.logger)
# Need to deal with additional output columns
obj.additional_output_str = self.add_fmt%("n", 0., 0., 0.)
self.objs.append(obj)
def _load_stars(self, stars, pix_id=None):
nstars = len(stars['sourceID'])
# Apply astrometric modeling
ra_arr = stars["RA"][:]
dec_arr = stars["Dec"][:]
pmra_arr = stars['pmra'][:]
pmdec_arr = stars['pmdec'][:]
rv_arr = stars['RV'][:]
parallax_arr = stars['parallax'][:]
if self.config["obs_setting"]["enable_astrometric_model"]:
ra_list = ra_arr.tolist()
dec_list = dec_arr.tolist()
pmra_list = pmra_arr.tolist()
pmdec_list = pmdec_arr.tolist()
rv_list = rv_arr.tolist()
parallax_list = parallax_arr.tolist()
dt = datetime.utcfromtimestamp(self.pointing.timestamp)
date_str = dt.date().isoformat()
time_str = dt.time().isoformat()
ra_arr, dec_arr = on_orbit_obs_position(
input_ra_list=ra_list,
input_dec_list=dec_list,
input_pmra_list=pmra_list,
input_pmdec_list=pmdec_list,
input_rv_list=rv_list,
input_parallax_list=parallax_list,
input_nstars=nstars,
input_x=self.pointing.sat_x,
input_y=self.pointing.sat_y,
input_z=self.pointing.sat_z,
input_vx=self.pointing.sat_vx,
input_vy=self.pointing.sat_vy,
input_vz=self.pointing.sat_vz,
input_epoch="J2015.5",
input_date_str=date_str,
input_time_str=time_str
)
for istars in range(nstars):
param = self.initialize_param()
param['ra'] = ra_arr[istars]
param['dec'] = dec_arr[istars]
param['ra_orig'] = stars["RA"][istars]
param['dec_orig'] = stars["Dec"][istars]
param['pmra'] = pmra_arr[istars]
param['pmdec'] = pmdec_arr[istars]
param['rv'] = rv_arr[istars]
param['parallax'] = parallax_arr[istars]
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
param['mag_use_normal'] = stars['app_sdss_g'][istars]
# if param['mag_use_normal'] >= 26.5:
# continue
self.ids += 1
# param['id'] = self.ids
param['id'] = stars['sourceID'][istars]
param['sed_type'] = stars['sourceID'][istars]
param['model_tag'] = stars['model_tag'][istars]
param['teff'] = stars['teff'][istars]
param['logg'] = stars['grav'][istars]
param['feh'] = stars['feh'][istars]
param['z'] = 0.0
param['star'] = 1 # Star
obj = Star(param, logger=self.logger)
# Append additional output columns to the .cat file
obj.additional_output_str = self.add_fmt%(param["model_tag"], param['teff'], param['logg'], param['feh'])
self.objs.append(obj)
def _load(self, **kwargs):
self.nav = 15005
self.avGal = extAv(self.nav, seed=self.seed_Av)
self.objs = []
self.ids = 0
if "star_cat" in self.config["catalog_options"]["input_path"] and self.config["catalog_options"]["input_path"]["star_cat"] and not self.config["catalog_options"]["galaxy_only"]:
star_cat = h5.File(self.star_path, 'r')['catalog']
for pix in self.pix_list:
try:
stars = star_cat[str(pix)]
self._load_stars(stars, pix_id=pix)
del stars
except Exception as e:
self.logger.error(str(e))
print(e)
if "galaxy_cat" in self.config["catalog_options"]["input_path"] and self.config["catalog_options"]["input_path"]["galaxy_cat"] and not self.config["catalog_options"]["star_only"]:
gals_cat = h5.File(self.galaxy_path, 'r')['galaxies']
for pix in self.pix_list:
try:
gals = gals_cat[str(pix)]
self._load_gals(gals, pix_id=pix)
del gals
except Exception as e:
self.logger.error(str(e))
print(e)
if self.logger is not None:
self.logger.info("number of objects in catalog: %d"%(len(self.objs)))
else:
print("number of objects in catalog: ", len(self.objs))
del self.avGal
def load_sed(self, obj, **kwargs):
if obj.type == 'star':
_, wave, flux = tag_sed(
h5file=self.tempSED_star,
model_tag=obj.param['model_tag'],
teff=obj.param['teff'],
logg=obj.param['logg'],
feh=obj.param['feh']
)
elif obj.type == 'galaxy' or obj.type == 'quasar':
sed_data = getObservedSED(
sedCat=self.tempSed_gal[obj.sed_type],
redshift=obj.z,
av=obj.param["av"],
redden=obj.param["redden"]
)
wave, flux = sed_data[0], sed_data[1]
else:
raise ValueError("Object type not known")
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 18001 + 0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photo/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
del wave
del flux
return sed
This diff is collapsed.
import os
import galsim
import random
import numpy as np
import h5py as h5
import healpy as hp
import astropy.constants as cons
import traceback
from astropy.coordinates import spherical_to_cartesian
from astropy.table import Table
from scipy import interpolate
from datetime import datetime
from ObservationSim.MockObject import CatalogBase, Star, Galaxy, Quasar, Stamp
from ObservationSim.MockObject._util import tag_sed, getObservedSED, getABMAG, integrate_sed_bandpass, comoving_dist
from ObservationSim.Astrometry.Astrometry_util import on_orbit_obs_position
import astropy.io.fits as fitsio
from ObservationSim.MockObject._util import seds, sed_assign, extAv
# (TEST)
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 'importlib_resources'
import importlib_resources as pkg_resources
NSIDE = 128
def get_bundleIndex(healpixID_ring, bundleOrder=4, healpixOrder=7):
assert NSIDE == 2**healpixOrder
shift = healpixOrder - bundleOrder
shift = 2*shift
nside_bundle = 2**bundleOrder
nside_healpix= 2**healpixOrder
healpixID_nest= hp.ring2nest(nside_healpix, healpixID_ring)
bundleID_nest = (healpixID_nest >> shift)
bundleID_ring = hp.nest2ring(nside_bundle, bundleID_nest)
return bundleID_ring
class Catalog(CatalogBase):
def __init__(self, config, chip, pointing, chip_output, filt, **kwargs):
super().__init__()
self.cat_dir = os.path.join(config["data_dir"], config["catalog_options"]["input_path"]["cat_dir"])
self.seed_Av = config["catalog_options"]["seed_Av"]
# (TEST)
self.cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
self.chip_output = chip_output
self.filt = filt
self.logger = chip_output.logger
with pkg_resources.path('Catalog.data', 'SLOAN_SDSS.g.fits') as filter_path:
self.normF_star = Table.read(str(filter_path))
self.config = config
self.chip = chip
self.pointing = pointing
self.max_size = 0.
if "galaxy_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"]["galaxy_cat"] and config["catalog_options"]["galaxy_yes"]:
self.galaxy_path = os.path.join(self.cat_dir, config["catalog_options"]["input_path"]["galaxy_cat"])
self.galaxy_SED_path = os.path.join(self.cat_dir, config["catalog_options"]["SED_templates_path"]["galaxy_SED"])
self._load_SED_lib_gals()
if "AGN_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"]["AGN_cat"] and config["catalog_options"]["galaxy_yes"]:
self.AGN_path = os.path.join(self.cat_dir, config["catalog_options"]["input_path"]["AGN_cat"])
self.AGN_SED_path = os.path.join(self.cat_dir, config["catalog_options"]["SED_templates_path"]["AGN_SED"])
self.AGN_SED_wave_path = os.path.join(self.cat_dir, config["catalog_options"]["SED_templates_path"]["AGN_SED_WAVE"])
self._load_SED_lib_AGN()
if "rotateEll" in config["catalog_options"]:
self.rotation = np.radians(float(config["catalog_options"]["rotateEll"]))
else:
self.rotation = 0.
# Update output .cat header with catalog specific output columns
self._add_output_columns_header()
self._get_healpix_list()
self._load()
def _add_output_columns_header(self):
self.add_hdr = " model_tag teff logg feh"
self.add_hdr += " bulgemass diskmass detA e1 e2 kappa g1 g2 size galType veldisp "
self.add_fmt = " %10s %8.4f %8.4f %8.4f"
self.add_fmt += " %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %4d %8.4f "
self.chip_output.update_output_header(additional_column_names=self.add_hdr)
def _get_healpix_list(self):
self.sky_coverage = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.2)
ra_min, ra_max, dec_min, dec_max = self.sky_coverage.xmin, self.sky_coverage.xmax, self.sky_coverage.ymin, self.sky_coverage.ymax
ra = np.deg2rad(np.array([ra_min, ra_max, ra_max, ra_min]))
dec = np.deg2rad(np.array([dec_max, dec_max, dec_min, dec_min]))
# vertices = spherical_to_cartesian(1., dec, ra)
self.pix_list = hp.query_polygon(
NSIDE,
hp.ang2vec(np.radians(90.) - dec, ra),
inclusive=True
)
# self.pix_list = hp.query_polygon(NSIDE, np.array(vertices).T, inclusive=True)
if self.logger is not None:
msg = str(("HEALPix List: ", self.pix_list))
self.logger.info(msg)
else:
print("HEALPix List: ", self.pix_list)
def load_norm_filt(self, obj):
if obj.type == "galaxy" or obj.type == "quasar":
# return self.normF_galaxy
return None
else:
return None
def _load_SED_lib_gals(self):
pcs = h5.File(os.path.join(self.galaxy_SED_path, "pcs.h5"), "r")
lamb = h5.File(os.path.join(self.galaxy_SED_path, "lamb.h5"), "r")
self.lamb_gal = lamb['lamb'][()]
self.pcs = pcs['pcs'][()]
def _load_SED_lib_AGN(self):
from astropy.io import fits
self.SED_AGN = fits.open(self.AGN_SED_path)[0].data
self.lamb_AGN = np.load(self.AGN_SED_wave_path)
def _load_gals(self, gals, pix_id=None, cat_id=0):
ngals = len(gals['ra'])
# Apply astrometric modeling
# in C3 case only aberration
ra_arr = gals['ra'][:]
dec_arr = gals['dec'][:]
if self.config["obs_setting"]["enable_astrometric_model"]:
ra_list = ra_arr.tolist()
dec_list = dec_arr.tolist()
pmra_list = np.zeros(ngals).tolist()
pmdec_list = np.zeros(ngals).tolist()
rv_list = np.zeros(ngals).tolist()
parallax_list = [1e-9] * ngals
dt = datetime.utcfromtimestamp(self.pointing.timestamp)
date_str = dt.date().isoformat()
time_str = dt.time().isoformat()
ra_arr, dec_arr = on_orbit_obs_position(
input_ra_list=ra_list,
input_dec_list=dec_list,
input_pmra_list=pmra_list,
input_pmdec_list=pmdec_list,
input_rv_list=rv_list,
input_parallax_list=parallax_list,
input_nstars=ngals,
input_x=self.pointing.sat_x,
input_y=self.pointing.sat_y,
input_z=self.pointing.sat_z,
input_vx=self.pointing.sat_vx,
input_vy=self.pointing.sat_vy,
input_vz=self.pointing.sat_vz,
input_epoch="J2000",
input_date_str=date_str,
input_time_str=time_str
)
for igals in range(ngals):
# # (TEST)
# if igals > 100:
# break
# if igals < 3447:
# continue
param = self.initialize_param()
param['ra'] = ra_arr[igals]
param['dec'] = dec_arr[igals]
param['ra_orig'] = gals['ra'][igals]
param['dec_orig'] = gals['dec'][igals]
param['mag_use_normal'] = gals['mag_csst_%s'%(self.filt.filter_type)][igals]
if self.filt.is_too_dim(mag=param['mag_use_normal'], margin=self.config["obs_setting"]["mag_lim_margin"]):
continue
# if param['mag_use_normal'] >= 26.5:
# continue
param['z'] = gals['redshift'][igals]
param['model_tag'] = 'None'
param['g1'] = gals['shear'][igals][0]
param['g2'] = gals['shear'][igals][1]
param['kappa'] = gals['kappa'][igals]
param['e1'] = gals['ellipticity_true'][igals][0]
param['e2'] = gals['ellipticity_true'][igals][1]
# For shape calculation
param['ell_total'] = np.sqrt(param['e1']**2 + param['e2']**2)
if param['ell_total'] > 0.9:
continue
param['e1_disk'] = param['e1']
param['e2_disk'] = param['e2']
param['e1_bulge'] = param['e1']
param['e2_bulge'] = param['e2']
param['delta_ra'] = 0
param['delta_dec'] = 0
# Masses
param['bulgemass'] = gals['bulgemass'][igals]
param['diskmass'] = gals['diskmass'][igals]
param['size'] = gals['size'][igals]
if param['size'] > self.max_size:
self.max_size = param['size']
# Sizes
param['bfrac'] = param['bulgemass']/(param['bulgemass'] + param['diskmass'])
if param['bfrac'] >= 0.6:
param['hlr_bulge'] = param['size']
param['hlr_disk'] = param['size'] * (1. - param['bfrac'])
else:
param['hlr_disk'] = param['size']
param['hlr_bulge'] = param['size'] * param['bfrac']
# SED coefficients
param['coeff'] = gals['coeff'][igals]
param['detA'] = gals['detA'][igals]
# Others
param['galType'] = gals['type'][igals]
param['veldisp'] = gals['veldisp'][igals]
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
param['star'] = 0 # Galaxy
# NOTE: this cut cannot be put before the SED type has been assigned
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
# TEMP
self.ids += 1
# param['id'] = self.ids
param['id'] = '%06d'%(int(pix_id)) + '%06d'%(cat_id) + '%08d'%(igals)
if param['star'] == 0:
obj = Galaxy(param, logger=self.logger)
# Need to deal with additional output columns
obj.additional_output_str = self.add_fmt%("n", 0., 0., 0.,
param['bulgemass'], param['diskmass'], param['detA'],
param['e1'], param['e2'], param['kappa'], param['g1'], param['g2'], param['size'],
param['galType'], param['veldisp'])
self.objs.append(obj)
def _load_AGNs(self):
data = Table.read(self.AGN_path)
ra_arr = data['ra']
dec_arr = data['dec']
nAGNs = len(data)
if self.config["obs_setting"]["enable_astrometric_model"]:
ra_list = ra_arr.tolist()
dec_list = dec_arr.tolist()
pmra_list = np.zeros(nAGNs).tolist()
pmdec_list = np.zeros(nAGNs).tolist()
rv_list = np.zeros(nAGNs).tolist()
parallax_list = [1e-9] * nAGNs
dt = datetime.utcfromtimestamp(self.pointing.timestamp)
date_str = dt.date().isoformat()
time_str = dt.time().isoformat()
ra_arr, dec_arr = on_orbit_obs_position(
input_ra_list=ra_list,
input_dec_list=dec_list,
input_pmra_list=pmra_list,
input_pmdec_list=pmdec_list,
input_rv_list=rv_list,
input_parallax_list=parallax_list,
input_nstars=nAGNs,
input_x=self.pointing.sat_x,
input_y=self.pointing.sat_y,
input_z=self.pointing.sat_z,
input_vx=self.pointing.sat_vx,
input_vy=self.pointing.sat_vy,
input_vz=self.pointing.sat_vz,
input_epoch="J2000",
input_date_str=date_str,
input_time_str=time_str
)
for iAGNs in range(nAGNs):
param = self.initialize_param()
param['ra'] = ra_arr[iAGNs]
param['dec'] = dec_arr[iAGNs]
param['ra_orig'] = data['ra'][iAGNs]
param['dec_orig'] = data['dec'][iAGNs]
param['z'] = data['z'][iAGNs]
param['appMag'] = data['appMag'][iAGNs]
param['absMag'] = data['absMag'][iAGNs]
# NOTE: this cut cannot be put before the SED type has been assigned
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
param['star'] = 2 # Quasar
param['id'] = data['igmlos'][iAGNs]
if param['star'] == 2:
obj = Quasar(param, logger=self.logger)
# Append additional output columns to the .cat file
obj.additional_output_str = self.add_fmt%("n", 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., -1, 0.)
self.objs.append(obj)
def _load(self, **kwargs):
self.objs = []
self.ids = 0
if "galaxy_cat" in self.config["catalog_options"]["input_path"] and self.config["catalog_options"]["input_path"]["galaxy_cat"] and self.config["catalog_options"]["galaxy_yes"]:
# for i in range(76):
# file_path = os.path.join(self.galaxy_path, "galaxies%04d_C6.h5"%(i))
# gals_cat = h5.File(file_path, 'r')['galaxies']
# for pix in self.pix_list:
# try:
# gals = gals_cat[str(pix)]
# self._load_gals(gals, pix_id=pix, cat_id=i)
# del gals
# except Exception as e:
# traceback.print_exc()
# self.logger.error(str(e))
# print(e)
for pix in self.pix_list:
try:
bundleID = get_bundleIndex(pix)
# # (TEST C6):
# if pix != 35421 or bundleID != 523:
# continue
gals_cat = h5.File(self.galaxy_path, 'r')['galaxies']
gals = gals_cat[str(pix)]
self._load_gals(gals, pix_id=pix, cat_id=bundleID)
del gals
except Exception as e:
traceback.print_exc()
self.logger.error(str(e))
print(e)
#if "AGN_cat" in self.config["input_path"] and self.config["input_path"]["AGN_cat"] and not self.config["run_option"]["star_only"]:
if "AGN_cat" in self.config["catalog_options"]["input_path"] and self.config["catalog_options"]["input_path"]["AGN_cat"] and self.config["catalog_options"]["galaxy_yes"]:
try:
self._load_AGNs()
except Exception as e:
traceback.print_exc()
self.logger.error(str(e))
print(e)
if self.logger is not None:
self.logger.info("maximum galaxy size: %.4f"%(self.max_size))
self.logger.info("number of objects in catalog: %d"%(len(self.objs)))
else:
print("number of objects in catalog: ", len(self.objs))
# (TEST)
# def convert_mags(self, obj):
# spec = np.matmul(obj.coeff, self.pcs.T)
# lamb = self.lamb_gal * U.angstrom
# unit_sed = ((lamb * lamb)/constants.c*U.erg/U.second/U.cm**2/U.angstrom).to(U.jansky)
# unit_sed = unit_sed.value
# lamb = lamb.value
# lamb *= (1 + obj.z)
# spec *= (1 + obj.z)
# mags = -2.5 * np.log10(unit_sed*spec) + 8.9
# mags += self.cosmo.distmod(obj.z).value
# return lamb, mags
def load_sed(self, obj, **kwargs):
if obj.type == 'galaxy' or obj.type == 'quasar':
# dist_L_pc = (1 + obj.z) * comoving_dist(z=obj.z)[0]
# factor = (10 / dist_L_pc)**2
factor = 10**(-.4 * self.cosmo.distmod(obj.z).value)
if obj.type == 'galaxy':
flux = np.matmul(self.pcs, obj.coeff) * factor
# if np.any(flux < 0):
# raise ValueError("Glaxy %s: negative SED fluxes"%obj.id)
flux[flux < 0] = 0.
sedcat = np.vstack((self.lamb_gal, flux)).T
sed_data = getObservedSED(
sedCat=sedcat,
redshift=obj.z,
av=obj.param["av"],
redden=obj.param["redden"]
)
wave, flux = sed_data[0], sed_data[1]
elif obj.type == 'quasar':
# flux = self.SED_AGN[int(obj.id)] * factor * 1e-17
flux = self.SED_AGN[int(obj.id)] * 1e-17
# if np.any(flux < 0):
# raise ValueError("Glaxy %s: negative SED fluxes"%obj.id)
flux[flux < 0] = 0.
# sedcat = np.vstack((self.lamb_AGN, flux)).T
wave = self.lamb_AGN
# print("sed (erg/s/cm2/A) = ", sed_data)
# np.savetxt(os.path.join(self.config["work_dir"], "%s_sed.txt"%(obj.id)), sedcat)
else:
raise ValueError("Object type not known")
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 11001+0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photon/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
if obj.type == 'quasar':
# integrate to get the magnitudes
sed_photon = np.array([sed['WAVELENGTH'], sed['FLUX']]).T
sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest')
sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False)
interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=self.filt.bandpass_full)
obj.param['mag_use_normal'] = getABMAG(interFlux, self.filt.bandpass_full)
# if obj.param['mag_use_normal'] >= 30:
# print("obj ID = %d"%obj.id)
# print("mag_use_normal = %.3f"%obj.param['mag_use_normal'])
# print("integrated flux = %.7f"%(interFlux))
# print("app mag = %.3f"%obj.param['appMag'])
# np.savetxt('./AGN_SED_test/sed_objID_%d.txt'%obj.id, np.transpose([self.lamb_AGN, self.SED_AGN[int(obj.id)]]))
# print("obj ID = %d"%obj.id)
# print("mag_use_normal = %.3f"%obj.param['mag_use_normal'])
# print("integrated flux = %.7f"%(interFlux))
# print("app mag = %.3f"%obj.param['appMag'])
# print("abs mag = %.3f"%obj.param['absMag'])
# mag = getABMAG(interFlux, self.filt.bandpass_full)
# print("mag diff = %.3f"%(mag - obj.param['mag_use_normal']))
del wave
del flux
return sed
import os
import galsim
import random
import numpy as np
import h5py as h5
import healpy as hp
import astropy.constants as cons
import traceback
from astropy.coordinates import spherical_to_cartesian
from astropy.table import Table
from scipy import interpolate
from datetime import datetime
from ObservationSim.MockObject import CatalogBase, Star, Galaxy, Quasar, Stamp
from ObservationSim.MockObject._util import tag_sed, getObservedSED, getABMAG, integrate_sed_bandpass, comoving_dist
from ObservationSim.Astrometry.Astrometry_util import on_orbit_obs_position
import astropy.io.fits as fitsio
from ObservationSim.MockObject._util import seds, sed_assign, extAv
# (TEST)
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 'importlib_resources'
import importlib_resources as pkg_resources
NSIDE = 128
class Catalog(CatalogBase):
"""An user customizable class for reading in catalog(s) of objects and SEDs.
NOTE: must inherit the "CatalogBase" abstract class
...
Attributes
----------
cat_dir : str
a directory that contains the catalog file(s)
star_path : str
path to the star catalog file
star_SED_path : str
path to the star SED data
objs : list
a list of ObservationSim.MockObject (Star, Galaxy, or Quasar)
NOTE: must have "obj" list when implement your own Catalog
Methods
----------
load_sed(obj, **kwargs):
load the corresponding SED data for one object
load_norm_filt(obj):
load the filter throughput for the input catalog's photometric system.
"""
def __init__(self, config, chip, pointing, chip_output, filt, **kwargs):
"""Constructor method.
Parameters
----------
config : dict
configuration dictionary which is parsed from the input YAML file
chip: ObservationSim.Instrument.Chip
an ObservationSim.Instrument.Chip instance, can be used to identify the band etc.
pointing: ObservationSim.Config.Pointing
an ObservationSim.Config.Pointing instance, can be used to configure the astrometry module
chip_output: ObservationSim.Config.ChipOutput
an ObservationSim.Config.ChipOutput instance, can be used to setup the output format
filt: ObservationSim.Instrument.Filter
an ObservationSim.Instrument.Filter instance, can be used to identify the filter type
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
None
"""
super().__init__()
self.cat_dir = os.path.join(config["data_dir"], config["catalog_options"]["input_path"]["cat_dir"])
self.seed_Av = config["catalog_options"]["seed_Av"]
#self.cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
self.chip_output = chip_output
self.filt = filt
self.logger = chip_output.logger
with pkg_resources.path('Catalog.data', 'SLOAN_SDSS.g.fits') as filter_path:
self.normF_star = Table.read(str(filter_path))
self.config = config
self.chip = chip
self.pointing = pointing
self.max_size = 0.
if "star_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"]["star_cat"] and config["catalog_options"]["star_yes"]:
self.star_path = os.path.join(self.cat_dir, config["catalog_options"]["input_path"]["star_cat"])
self.star_SED_path = os.path.join(self.cat_dir, config["catalog_options"]["SED_templates_path"]["star_SED"])
self._load_SED_lib_star()
if "rotateEll" in config["catalog_options"]:
self.rotation = float(int(config["catalog_options"]["rotateEll"]/45.))
else:
self.rotation = 0.
# Update output .cat header with catalog specific output columns
self._add_output_columns_header()
self._get_healpix_list()
self._load()
def _add_output_columns_header(self):
self.add_hdr = " model_tag teff logg feh"
self.add_hdr += " bulgemass diskmass detA e1 e2 kappa g1 g2 size galType veldisp "
self.add_fmt = " %10s %8.4f %8.4f %8.4f"
self.add_fmt += " %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %4d %8.4f "
self.chip_output.update_output_header(additional_column_names=self.add_hdr)
def _get_healpix_list(self):
self.sky_coverage = self.chip.getSkyCoverageEnlarged(self.chip.img.wcs, margin=0.2)
ra_min, ra_max, dec_min, dec_max = self.sky_coverage.xmin, self.sky_coverage.xmax, self.sky_coverage.ymin, self.sky_coverage.ymax
ra = np.deg2rad(np.array([ra_min, ra_max, ra_max, ra_min]))
dec = np.deg2rad(np.array([dec_max, dec_max, dec_min, dec_min]))
# vertices = spherical_to_cartesian(1., dec, ra)
self.pix_list = hp.query_polygon(
NSIDE,
hp.ang2vec(np.radians(90.) - dec, ra),
inclusive=True
)
# self.pix_list = hp.query_polygon(NSIDE, np.array(vertices).T, inclusive=True)
if self.logger is not None:
msg = str(("HEALPix List: ", self.pix_list))
self.logger.info(msg)
else:
print("HEALPix List: ", self.pix_list)
def load_norm_filt(self, obj):
"""Load the corresponding thourghput for the input magnitude "param["mag_use_normal"]".
NOTE: if the input magnitude is already in CSST magnitude, simply return None
Parameters
----------
obj : ObservationSim.MockObject
the object to get thourghput data for
Returns
----------
norm_filt : Astropy.Table
the throughput Table with two columns (namely, "WAVELENGTH", "SENSITIVITY"):
norm_filt["WAVELENGTH"] : wavelengthes in Angstroms
norm_filt["SENSITIVITY"] : efficiencies
"""
if obj.type == "star":
return self.normF_star
else:
return None
def _load_SED_lib_star(self):
self.tempSED_star = h5.File(self.star_SED_path,'r')
def _load_stars(self, stars, pix_id=None):
nstars = len(stars['sourceID'])
# Apply astrometric modeling
ra_arr = stars["RA"][:]
dec_arr = stars["Dec"][:]
pmra_arr = stars['pmra'][:]
pmdec_arr = stars['pmdec'][:]
rv_arr = stars['RV'][:]
parallax_arr = stars['parallax'][:]
if self.config["obs_setting"]["enable_astrometric_model"]:
ra_list = ra_arr.tolist()
dec_list = dec_arr.tolist()
pmra_list = pmra_arr.tolist()
pmdec_list = pmdec_arr.tolist()
rv_list = rv_arr.tolist()
parallax_list = parallax_arr.tolist()
dt = datetime.utcfromtimestamp(self.pointing.timestamp)
date_str = dt.date().isoformat()
time_str = dt.time().isoformat()
ra_arr, dec_arr = on_orbit_obs_position(
input_ra_list=ra_list,
input_dec_list=dec_list,
input_pmra_list=pmra_list,
input_pmdec_list=pmdec_list,
input_rv_list=rv_list,
input_parallax_list=parallax_list,
input_nstars=nstars,
input_x=self.pointing.sat_x,
input_y=self.pointing.sat_y,
input_z=self.pointing.sat_z,
input_vx=self.pointing.sat_vx,
input_vy=self.pointing.sat_vy,
input_vz=self.pointing.sat_vz,
input_epoch="J2000",
input_date_str=date_str,
input_time_str=time_str
)
for istars in range(nstars):
# # (TEST)
# if istars > 100:
# break
param = self.initialize_param()
param['ra'] = ra_arr[istars]
param['dec'] = dec_arr[istars]
param['ra_orig'] = stars["RA"][istars]
param['dec_orig'] = stars["Dec"][istars]
param['pmra'] = pmra_arr[istars]
param['pmdec'] = pmdec_arr[istars]
param['rv'] = rv_arr[istars]
param['parallax'] = parallax_arr[istars]
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
continue
param['mag_use_normal'] = stars['app_sdss_g'][istars]
# if param['mag_use_normal'] >= 26.5:
# continue
self.ids += 1
param['id'] = stars['sourceID'][istars]
param['sed_type'] = stars['sourceID'][istars]
param['model_tag'] = stars['model_tag'][istars]
param['teff'] = stars['teff'][istars]
param['logg'] = stars['grav'][istars]
param['feh'] = stars['feh'][istars]
param['z'] = 0.0
param['star'] = 1 # Star
obj = Star(param, logger=self.logger)
# Append additional output columns to the .cat file
obj.additional_output_str = self.add_fmt%(param["model_tag"], param['teff'], param['logg'], param['feh'],
0., 0., 0., 0., 0., 0., 0., 0., 0., -1, 0.)
self.objs.append(obj)
def _load(self, **kwargs):
"""Read in all objects in from the catalog file(s).
This is a must implemented method which is used to read in all objects, and
then convert them to ObservationSim.MockObject (Star, Galaxy, or Quasar).
Currently,
the model of ObservationSim.MockObject.Star class requires:
param["star"] : int
specify the object type: 0: galaxy, 1: star, 2: quasar
param["id"] : int
ID number of the object
param["ra"] : float
Right ascension (in degrees)
param["dec"] : float
Declination (in degrees)
param["mag_use_normal"]: float
the absolute magnitude in a particular filter
NOTE: if that filter is not the corresponding CSST filter, the
load_norm_filt(obj) function must be implemented to load the filter
throughput of that particular photometric system
the model of ObservationSim.MockObject.Galaxy class requires:
param["star"] : int
specify the object type: 0: galaxy, 1: star, 2: quasar
param["id"] : int
ID number of the object
param["ra"] : float
Right ascension (in degrees)
param["dec"] : float
Declination (in degrees)
param["mag_use_normal"]: float
the absolute magnitude in a particular filter
NOTE: if that filter is not the corresponding CSST filter, the
load_norm_filt(obj) function must be implemented to load the filter
throughput of that particular photometric system
param["bfrac"] : float
the bulge fraction
param["hlr_bulge"] : float
the half-light-radius of the bulge
param["hlr_disk"] : float
the half-light-radius of the disk
param["e1_bulge"], param["e2_bulge"] : float
the ellipticity of the bulge components
param["e1_disk"], param["e2_disk"] : float
the ellipticity of the disk components
(Optional parameters):
param['disk_sersic_idx']: float
Sersic index for galaxy disk component
param['bulge_sersic_idx']: float
Sersic index for galaxy bulge component
param['g1'], param['g2']: float
Reduced weak lensing shear components (valid for shear type: catalog)
the model of ObservationSim.MockObject.Galaxy class requires:
Currently a Quasar is modeled as a point source, just like a Star.
NOTE: To construct an object, according to its type, just call:
Star(param), Galaxy(param), or Quasar(param)
NOTE: All constructed objects should be appened to "self.objs".
NOTE: Any other parameters can also be set within "param" dict:
Used to calculate required quantities and/or SEDs etc.
Parameters
----------
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
None
"""
self.objs = []
self.ids = 0
#if "star_cat" in self.config["input_path"] and self.config["input_path"]["star_cat"] and not self.config["run_option"]["galaxy_only"]:
if "star_cat" in self.config["catalog_options"]["input_path"] and self.config["catalog_options"]["input_path"]["star_cat"] and self.config["catalog_options"]["star_yes"]:
star_cat = h5.File(self.star_path, 'r')['stars']
for pix in self.pix_list:
try:
stars = star_cat[str(pix)]
self._load_stars(stars, pix_id=pix)
del stars
except Exception as e:
self.logger.error(str(e))
print(e)
if self.logger is not None:
self.logger.info("number of objects in catalog: %d"%(len(self.objs)))
else:
print("number of objects in catalog: ", len(self.objs))
def load_sed(self, obj, **kwargs):
"""Load the corresponding SED data for a particular obj.
Parameters
----------
obj : ObservationSim.MockObject
the object to get SED data for
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
sed : Astropy.Table
the SED Table with two columns (namely, "WAVELENGTH", "FLUX"):
sed["WAVELENGTH"] : wavelength in Angstroms
sed["FLUX"] : fluxes in photons/s/m^2/A
NOTE: the range of wavelengthes must at least cover [2450 - 11000] Angstorms
"""
if obj.type == 'star':
_, wave, flux = tag_sed(
h5file=self.tempSED_star,
model_tag=obj.param['model_tag'],
teff=obj.param['teff'],
logg=obj.param['logg'],
feh=obj.param['feh']
)
else:
raise ValueError("Object type not known")
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 11001+0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photon/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
del wave
del flux
return sed
This diff is collapsed.
"""
generate image header
"""
import numpy as np
from astropy.io import fits
import astropy.wcs as pywcs
from scipy import math
import random
import os
import sys
def chara2digit(char):
""" Function to judge and convert characters to digitals
Parameters
----------
"""
try:
float(char) # for int, long and float
except ValueError:
pass
return char
else:
data = float(char)
return data
def read_header_parameter(filename='global_header.param'):
""" Function to read the header parameters
Parameters
----------
"""
name = []
value = []
description = []
for line in open(filename):
line = line.strip("\n")
arr = line.split('|')
# csvReader = csv.reader(csvDataFile)
# for arr in csvReader:
name.append(arr[0])
value.append(chara2digit(arr[1]))
description.append(arr[2])
# print(value)
return name, value, description
def rotate_CD_matrix(cd, pa_aper):
"""Rotate CD matrix
Parameters
----------
cd: (2,2) array
CD matrix
pa_aper: float
Position angle, in degrees E from N, of y axis of the detector
Returns
-------
cd_rot: (2,2) array
Rotated CD matrix
Comments
--------
`astropy.wcs.WCS.rotateCD` doesn't work for non-square pixels in that it
doesn't preserve the pixel scale! The bug seems to come from the fact
that `rotateCD` assumes a transposed version of its own CD matrix.
"""
rad = np.deg2rad(-pa_aper)
mat = np.zeros((2,2))
mat[0,:] = np.array([np.cos(rad),-np.sin(rad)])
mat[1,:] = np.array([np.sin(rad),np.cos(rad)])
cd_rot = np.dot(mat, cd)
return cd_rot
def Header_extention(xlen = 9232, ylen = 9216, gain = 1.0, readout = 5.0, dark = 0.02,saturation=90000, row_num = 1, col_num = 1):
""" Creat an image frame for CCST with multiple extensions
Parameters
----------
"""
flag_ltm_x = [0,1,-1,1,-1]
flag_ltm_y = [0,1,1,-1,-1]
flag_ltv_x = [0,0,1,0,1]
flag_ltv_y = [0,0,0,1,1]
detector_size_x = int(xlen)
detector_size_y = int(ylen)
data_x = str(int(detector_size_x))
data_y = str(int(detector_size_y))
data_sec = '[1:'+data_x+',1:'+data_y+']'
name = []
value = []
description = []
for k in range(1,2):
# f = open("extension"+str(k)+"_image.param","w")
j = row_num
i = col_num
ccdnum = str((j-1)*5+i)
name = ['EXTNAME',
'BSCALE',
'BZERO',
'OBSID',
'CCDNAME',
'AMPNAME',
'GAIN',
'RDNOISE',
'DARK',
'SATURATE',
'RSPEED',
'CHIPTEMP',
'CCDCHIP',
'DATASEC',
'CCDSUM',
'NSUM',
'LTM1_1',
'LTM2_2',
'LTV1',
'LTV2',
'ATM1_1',
'ATM2_2',
'ATV1',
'ATV2',
'DTV1',
'DTV2',
'DTM1_1',
'DTM2_2']
value = ['IM'+str(k),
1.0,
0.0,
'CSST.20200101T000000',
'ccd' + ccdnum.rjust(2,'0'),
'ccd' + ccdnum.rjust(2,'0') + ':'+str(k),
gain,
readout,
dark,
saturation,
10.0,
-100.0,
'ccd' + ccdnum.rjust(2,'0'),
data_sec,
'1 1',
'1 1',
flag_ltm_x[k],
flag_ltm_y[k],
flag_ltv_x[k]*(detector_size_x-20*2+1),
flag_ltv_y[k]*(detector_size_y+1),
flag_ltm_x[k],
flag_ltm_y[k],
flag_ltv_x[k]*(detector_size_x-20*2+1),
flag_ltv_y[k]*(detector_size_y+1),
0,
0,
1,
1]
description = ['Extension name',
' ',
' ',
'Observation ID',
'CCD name',
'Amplifier name',
'Gain (e-/ADU)',
'Readout noise (e-/pixel)',
'Dark noise (e-/pixel/s)',
'Saturation (e-)',
'Read speed',
'Chip temperature',
'CCD chip ID',
'Data section',
'CCD pixel summing',
'CCD pixel summing',
'CCD to image transformation',
'CCD to image transformation',
'CCD to image transformation',
'CCD to image transformation',
'CCD to amplifier transformation',
'CCD to amplifier transformation',
'CCD to amplifier transformation',
'CCD to amplifier transformation',
'CCD to detector transformatio',
'CCD to detector transformatio',
'CCD to detector transformatio',
'CCD to detector transformatio']
return name, value, description
##9232 9216 898 534 1309 60 -40 -23.4333
def WCS_def(xlen = 9232, ylen = 9216, gapx = 898.0, gapy1 = 534, gapy2 = 1309, ra = 60, dec = -40, pa = -23.433,psize = 0.074, row_num = 1, col_num = 1):
""" Creat a wcs frame for CCST with multiple extensions
Parameters
----------
"""
flag_x = [0, 1, -1, 1, -1]
flag_y = [0, 1, 1, -1, -1]
flag_ext_x = [0,-1,1,-1,1]
flag_ext_y = [0,-1,-1,1,1]
x_num = 5
y_num = 6
detector_num = x_num*y_num
detector_size_x = xlen
detector_size_y = ylen
gap_x = gapx
gap_y = [gapy1,gapy2]
ra_ref = ra
dec_ref = dec
pa_aper = pa
pixel_size = psize
gap_y1_num = 3
gap_y2_num = 2
x_center = (detector_size_x*x_num+gap_x*(x_num-1))/2
y_center = (detector_size_y*y_num+gap_y[0]*gap_y1_num+gap_y[1]*gap_y2_num)/2
gap_y_map = np.array([[0,0,0,0,0],[gap_y[0],gap_y[1],gap_y[1],gap_y[1],gap_y[1]],[gap_y[1],gap_y[0],gap_y[0],gap_y[0],gap_y[0]],[gap_y[0],gap_y[0],gap_y[0],gap_y[0],gap_y[0]],[gap_y[0],gap_y[0],gap_y[0],gap_y[0],gap_y[1]],[gap_y[1],gap_y[1],gap_y[1],gap_y[1],gap_y[0]]])
frame_array = np.empty((5,6),dtype=np.float64)
# print(x_center,y_center)
j = row_num
i = col_num
ccdnum = str((j-1)*5+i)
x_ref, y_ref = (detector_size_x+gap_x)*i-gap_x-detector_size_x/2, detector_size_y*j + sum(gap_y_map[0:j,i-1]) - detector_size_y/2
# print(i,j,x_ref,y_ref,ra_ref,dec_ref)
name = []
value = []
description = []
for k in range(1,2):
cd = np.array([[ pixel_size, 0], [0, pixel_size]])/3600.*flag_x[k]
cd_rot = rotate_CD_matrix(cd, pa_aper)
# f = open("CCD"+ccdnum.rjust(2,'0')+"_extension"+str(k)+"_wcs.param","w")
name = ['EQUINOX',
'WCSDIM',
'CTYPE1',
'CTYPE2',
'CRVAL1',
'CRVAL2',
'CRPIX1',
'CRPIX2',
'CD1_1',
'CD1_2',
'CD2_1',
'CD2_2']
value = [2000.0,
2.0,
'RA---TAN',
'DEC--TAN',
ra_ref,
dec_ref,
flag_ext_x[k]*((x_ref+flag_ext_x[k]*detector_size_x/2)-x_center),
flag_ext_y[k]*((y_ref+flag_ext_y[k]*detector_size_y/2)-y_center),
cd_rot[0,0],
cd_rot[0,1],
cd_rot[1,0],
cd_rot[1,1]]
description = ['Equinox of WCS',
'WCS Dimensionality',
'Coordinate type',
'Coordinate typ',
'Coordinate reference value',
'Coordinate reference value',
'Coordinate reference pixel',
'Coordinate reference pixel',
'Coordinate matrix',
'Coordinate matrix',
'Coordinate matrix',
'Coordinate matrix']
return name, value, description
def generatePrimaryHeader(xlen = 9232, ylen = 9216,pointNum = '1', ra = 60, dec = -40, psize = 0.074, row_num = 1, col_num = 1):
# array_size1, array_size2, flux, sigma = int(argv[1]), int(argv[2]), 1000.0, 5.0
filerParm_fn = os.path.split(os.path.realpath(__file__))[0] + '/filter.lst'
f = open(filerParm_fn)
s = f.readline()
s = s.strip("\n")
filter = s.split(' ')
k = (row_num-1)*5+col_num
ccdnum = str(k)
g_header_fn = os.path.split(os.path.realpath(__file__))[0] + '/global_header.param'
name, value, description = read_header_parameter(g_header_fn)
h_prim = fits.Header()
date = '200930'
time_obs = '120000'
for i in range(len(name)):
if(name[i]=='FILTER'):
value[i] = filter[k-1]
if(name[i]=='FILENAME'):
value[i] = 'CSST_' + date + '_' +time_obs + '_' + pointNum.rjust(6,'0') + '_' +ccdnum.rjust(2,'0')+'_raw'
if(name[i]=='DETSIZE'):
value[i] = '[1:' + str(int(xlen)) + ',1:'+ str(int(ylen)) + ']'
if(name[i]=='PIXSCAL1'):
value[i] = str(psize)
if(name[i]=='PIXSCAL2'):
value[i] = str(psize)
h_prim[name[i]] = (value[i],description[i])
h_prim.add_comment('==================================================================',after='FILETYPE')
h_prim.add_comment('Target information')
h_prim.add_comment('==================================================================')
h_prim.add_comment('==================================================================',after='EQUINOX')
h_prim.add_comment('Exposure information')
h_prim.add_comment('==================================================================')
h_prim.add_comment('==================================================================',after='MJDEND')
h_prim.add_comment('Telescope information')
h_prim.add_comment('==================================================================')
h_prim.add_comment('==================================================================',after='REFFRAME')
h_prim.add_comment('Detector information')
h_prim.add_comment('==================================================================')
h_prim.add_comment('==================================================================',after='FILTER')
h_prim.add_comment('Other information')
h_prim.add_comment('==================================================================')
return h_prim
def generateExtensionHeader(xlen = 9232, ylen = 9216,ra = 60, dec = -40, pa = -23.433, gain = 1.0, readout = 5.0, dark = 0.02, saturation=90000, psize = 0.074, row_num = 1, col_num = 1):
h_ext = fits.Header()
for i in range(1,2):
# NAXIS1:Number of pixels per row; NAXIS2:Number of rows
h_ext['NAXIS1'] = xlen
h_ext['NAXIS2'] = ylen
name, value, description = Header_extention(xlen = xlen, ylen = ylen, gain = gain, readout = readout, dark = dark, saturation=saturation, row_num = row_num, col_num = col_num)
for j in range(len(name)):
h_ext[name[j]] = (value[j],description[j])
name, value, description = WCS_def(xlen = xlen, ylen = ylen, gapx = 898.0, gapy1 = 534, gapy2 = 1309, ra = ra, dec = dec, pa = pa ,psize = psize, row_num = row_num, col_num = col_num)
for j in range(len(name)):
h_ext[name[j]] = (value[j],description[j])
h_ext.add_comment('==================================================================',after='OBSID')
h_ext.add_comment('Readout information')
h_ext.add_comment('==================================================================')
h_ext.add_comment('==================================================================',after='CHIPTEMP')
h_ext.add_comment('Chip information')
h_ext.add_comment('==================================================================')
h_ext.add_comment('==================================================================',after='DTM2_2')
h_ext.add_comment('WCS information')
h_ext.add_comment('==================================================================')
return h_ext
def main(argv):
xlen = int(argv[1])
ylen = int(argv[2])
pointingNum = argv[3]
ra = float(argv[4])
dec = float(argv[5])
pSize = float(argv[6])
ccd_row_num = int(argv[7])
ccd_col_num = int(argv[8])
pa_aper = float(argv[9])
gain = float(argv[10])
readout = float(argv[11])
dark = float(argv[12])
fw = float(argv[13])
h_prim = generatePrimaryHeader(xlen = xlen, ylen = ylen,ra = ra, dec = dec, psize = pSize, row_num = ccd_row_num, col_num = ccd_col_num, pointNum = pointingNum)
h_ext = generateExtensionHeader(xlen = xlen, ylen = ylen,ra = ra, dec = dec, pa = pa_aper, gain = gain, readout = readout, dark = dark, saturation=fw, psize = pSize, row_num = ccd_row_num, col_num = ccd_col_num)
hdu1 = fits.PrimaryHDU(header=h_prim)
hdu2 = fits.ImageHDU(np.zeros([ylen,xlen]),header = h_ext)
hdul = fits.HDUList([hdu1,hdu2])
hdul.writeto(h_prim['FILENAME']+'.fits',output_verify='ignore')
# if __name__ == "__main__":
# main(sys.argv)
This diff is collapsed.
import numpy as np
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 'importlib_resources'
import importlib_resources as pkg_resources
class Telescope(object):
def __init__(self, param=None, optEffCurve_path=None):
self.diameter = 2.0 # in unit of meter
if param is not None:
self.diameter = param["diameter"]
self.pupil_area = np.pi * (0.5 * self.diameter)**2
if optEffCurve_path is not None:
self.efficiency = self._get_efficiency(optEffCurve_path)
else:
try:
with pkg_resources.files('ObservationSim.Instrument.data').joinpath('mirror_ccdnote.txt') as optEffCurve_path:
self.efficiency = self._get_efficiency(optEffCurve_path)
except AttributeError:
with pkg_resources.path('ObservationSim.Instrument.data', 'mirror_ccdnote.txt') as optEffCurve_path:
self.efficiency = self._get_efficiency(optEffCurve_path)
def _get_efficiency(self, effCurve_path):
""" Read in the efficiency of optics
for each band
Parameters:
effCurve_path: the path for efficiency file
Returns:
opticsEff: a dictionary of efficiency (a scalar) for each band
"""
f = open(effCurve_path, 'r')
for _ in range(2):
header = f.readline()
iline = 0
opticsEff = {}
for line in f:
line = line.strip()
columns = line.split()
opticsEff[str(columns[0])] = float(columns[2])
f.close()
return opticsEff
\ No newline at end of file
This diff is collapsed.
from ObservationSim.MockObject.MockObject import MockObject
class CosmicRay(MockObject):
pass
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment