Commit 4afd1181 authored by Fang Yuedong's avatar Fang Yuedong
Browse files

add renamed files

parent 4c9c940a
This diff is collapsed.
import os
import numpy as np
import astropy.constants as cons
from astropy.table import Table
from scipy import interpolate
from observation_sim.mock_objects import CatalogBase, Star, Galaxy, Quasar
class Catalog(CatalogBase):
"""An user customizable class for reading in catalog(s) of objects and SEDs.
NOTE: must inherit the "CatalogBase" abstract class
...
Attributes
----------
cat_dir : str
a directory that contains the catalog file(s)
star_path : str
path to the star catalog file
star_SED_path : str
path to the star SED data
objs : list
a list of ObservationSim.MockObject (Star, Galaxy, or Quasar)
NOTE: must have "obj" list when implement your own Catalog
Methods
----------
load_sed(obj, **kwargs):
load the corresponding SED data for one object
load_norm_filt(obj):
load the filter throughput for the input catalog's photometric system.
"""
def __init__(self, config, chip, **kwargs):
"""Constructor method.
Parameters
----------
config : dict
configuration dictionary which is parsed from the input YAML file
chip: ObservationSim.Instrument.Chip
an ObservationSim.Instrument.Chip instance, can be used to identify the band etc.
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
None
"""
super().__init__()
self.cat_dir = os.path.join(
config["data_dir"], config["catalog_options"]["input_path"]["cat_dir"])
self.chip = chip
if "star_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"]["star_cat"]:
star_file = config["catalog_options"]["input_path"]["star_cat"]
star_SED_file = config["catalog_options"]["SED_templates_path"]["star_SED"]
self.star_path = os.path.join(self.cat_dir, star_file)
self.star_SED_path = os.path.join(
config["data_dir"], star_SED_file)
# NOTE: must call _load() method here to read in all objects
self.objs = []
self._load()
def _load(self, **kwargs):
"""Read in all objects in from the catalog file(s).
This is a must implemented method which is used to read in all objects, and
then convert them to ObservationSim.MockObject (Star, Galaxy, or Quasar).
Currently,
the model of ObservationSim.MockObject.Star class requires:
param["star"] : int
specify the object type: 0: galaxy, 1: star, 2: quasar
param["id"] : int
ID number of the object
param["ra"] : float
Right ascension (in degrees)
param["dec"] : float
Declination (in degrees)
param["mag_use_normal"]: float
the absolute magnitude in a particular filter
NOTE: if that filter is not the corresponding CSST filter, the
load_norm_filt(obj) function must be implemented to load the filter
throughput of that particular photometric system
the model of ObservationSim.MockObject.Galaxy class requires:
param["star"] : int
specify the object type: 0: galaxy, 1: star, 2: quasar
param["id"] : int
ID number of the object
param["ra"] : float
Right ascension (in degrees)
param["dec"] : float
Declination (in degrees)
param["mag_use_normal"]: float
the absolute magnitude in a particular filter
NOTE: if that filter is not the corresponding CSST filter, the
load_norm_filt(obj) function must be implemented to load the filter
throughput of that particular photometric system
param["bfrac"] : float
the bulge fraction
param["hlr_bulge"] : float
the half-light-radius of the bulge
param["hlr_disk"] : float
the half-light-radius of the disk
param["e1_bulge"], param["e2_bulge"] : float
the ellipticity of the bulge components
param["e1_disk"], param["e2_disk"] : float
the ellipticity of the disk components
(Optional parameters):
param['disk_sersic_idx']: float
Sersic index for galaxy disk component
param['bulge_sersic_idx']: float
Sersic index for galaxy bulge component
param['g1'], param['g2']: float
Reduced weak lensing shear components (valid for shear type: catalog)
the model of ObservationSim.MockObject.Galaxy class requires:
Currently a Quasar is modeled as a point source, just like a Star.
NOTE: To construct an object, according to its type, just call:
Star(param), Galaxy(param), or Quasar(param)
NOTE: All constructed objects should be appened to "self.objs".
NOTE: Any other parameters can also be set within "param" dict:
Used to calculate required quantities and/or SEDs etc.
Parameters
----------
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
None
"""
stars = Table.read(self.star_path)
nstars = stars['sourceID'].size
for istars in range(nstars):
param = self.initialize_param()
param['id'] = istars + 1
param['ra'] = stars['RA'][istars]
param['dec'] = stars['Dec'][istars]
param['sed_type'] = stars['sourceID'][istars]
param['model_tag'] = stars['model_tag'][istars]
param['z'] = 0.0
param['star'] = 1 # Star
param['mag_use_normal'] = stars['app_sdss_g'][istars]
obj = Star(param)
self.objs.append(obj)
def load_sed(self, obj, **kwargs):
"""Load the corresponding SED data for a particular obj.
Parameters
----------
obj : ObservationSim.MockObject
the object to get SED data for
**kwargs : dict
other needed input parameters (in key-value pairs), please modify corresponding
initialization call in "ObservationSim.py" as you need.
Returns
----------
sed : Astropy.Table
the SED Table with two columns (namely, "WAVELENGTH", "FLUX"):
sed["WAVELENGTH"] : wavelength in Angstroms
sed["FLUX"] : fluxes in photons/s/m^2/A
NOTE: the range of wavelengthes must at least cover [2450 - 11000] Angstorms
"""
if obj.type == 'star':
wave = Table.read(self.star_SED_path,
path=f"/SED/wave_{obj.model_tag}")
flux = Table.read(self.star_SED_path, path=f"/SED/{obj.sed_type}")
wave, flux = wave['col0'].data, flux['col0'].data
else:
raise ValueError("Object type not known")
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2400, 11001 + 0.5, 0.5)
y = speci(lamb)
# erg/s/cm^2/A --> photons/s/m^2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
return sed
def load_norm_filt(self, obj):
"""Load the corresponding thourghput for the input magnitude "param["mag_use_normal"]".
NOTE: if the input magnitude is already in CSST magnitude, simply return None
Parameters
----------
obj : ObservationSim.MockObject
the object to get thourghput data for
Returns
----------
norm_filt : Astropy.Table
the throughput Table with two columns (namely, "WAVELENGTH", "SENSITIVITY"):
norm_filt["WAVELENGTH"] : wavelengthes in Angstroms
norm_filt["SENSITIVITY"] : efficiencies
"""
return None
import os
import galsim
import random
import numpy as np
import h5py as h5
import healpy as hp
import astropy.constants as cons
import traceback
from astropy.coordinates import spherical_to_cartesian
from astropy.table import Table
from scipy import interpolate
from datetime import datetime
from observation_sim.mock_objects import CatalogBase, Star, Galaxy, Quasar, Stamp
from observation_sim.mock_objects._util import tag_sed, getObservedSED, getABMAG, integrate_sed_bandpass, comoving_dist
from observation_sim.astrometry.Astrometry_util import on_orbit_obs_position
import astropy.io.fits as fitsio
from observation_sim.mock_objects._util import seds, sed_assign, extAv
# (TEST)
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 'importlib_resources'
import importlib_resources as pkg_resources
NSIDE = 128
class Catalog(CatalogBase):
def __init__(self, config, chip, pointing, chip_output, filt, **kwargs):
super().__init__()
self.cat_dir = config["catalog_options"]["input_path"]["cat_dir"]
self.seed_Av = 121212 # config["catalog_options"]["seed_Av"]
# (TEST)
self.cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
self.chip_output = chip_output
self.filt = filt
self.logger = chip_output.logger
with pkg_resources.path('catalog.data', 'SLOAN_SDSS.g.fits') as filter_path:
self.normF_star = Table.read(str(filter_path))
with pkg_resources.path('catalog.data', 'lsst_throuput_g.fits') as filter_path:
self.normF_galaxy = Table.read(str(filter_path))
self.config = config
self.chip = chip
self.pointing = pointing
self.max_size = 0.
if "stamp_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"]["stamp_cat"] and config["catalog_options"]["stamp_yes"]:
stamp_file = config["catalog_options"]["input_path"]["stamp_cat"]
self.stamp_path = os.path.join(self.cat_dir, stamp_file)
# self.stamp_SED_path = os.path.join(config["data_dir"], config["SED_templates_path"]["stamp_SED"]) ###shoule be stamp-SED
# self._load_SED_lib_stamps() ###shoule be stamp-SED
self.tempSed_gal, self.tempRed_gal = seds(
"galaxy.list", seddir="/public/home/chengliang/CSSOSDataProductsSims/testCats/Templates/Galaxy/") # only for test
self._add_output_columns_header()
self._get_healpix_list()
self._load()
def _add_output_columns_header(self):
self.add_hdr = " model_tag teff logg feh"
self.add_hdr += " bulgemass diskmass detA e1 e2 kappa g1 g2 size galType veldisp "
self.add_fmt = " %10s %8.4f %8.4f %8.4f"
self.add_fmt += " %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %4d %8.4f "
self.chip_output.update_output_header(
additional_column_names=self.add_hdr)
def _get_healpix_list(self):
self.sky_coverage = self.chip.getSkyCoverageEnlarged(
self.chip.img.wcs, margin=0.2)
ra_min, ra_max, dec_min, dec_max = self.sky_coverage.xmin, self.sky_coverage.xmax, self.sky_coverage.ymin, self.sky_coverage.ymax
ra = np.deg2rad(np.array([ra_min, ra_max, ra_max, ra_min]))
dec = np.deg2rad(np.array([dec_max, dec_max, dec_min, dec_min]))
# vertices = spherical_to_cartesian(1., dec, ra)
self.pix_list = hp.query_polygon(
NSIDE,
hp.ang2vec(np.radians(90.) - dec, ra),
inclusive=True
)
# self.pix_list = hp.query_polygon(NSIDE, np.array(vertices).T, inclusive=True)
if self.logger is not None:
msg = str(("HEALPix List: ", self.pix_list))
self.logger.info(msg)
else:
print("HEALPix List: ", self.pix_list)
def load_norm_filt(self, obj):
if obj.type == "stamp":
return self.normF_galaxy # normalize_filter for stamp
else:
return None
def _load_stamps(self, stamps, pix_id=None):
print("debug:: load_stamps")
nstamps = len(stamps['filename'])
self.rng_sedGal = random.Random()
# Use healpix index as the random seed
self.rng_sedGal.seed(float(pix_id))
self.ud = galsim.UniformDeviate(pix_id)
for istamp in range(nstamps):
print("debug::", istamp)
fitsfile = os.path.join(
self.cat_dir, "stampCats/"+stamps['filename'][istamp].decode('utf-8'))
print("debug::", istamp, fitsfile)
hdu = fitsio.open(fitsfile)
param = self.initialize_param()
param['id'] = hdu[0].header['index'] # istamp
param['star'] = 3 # Stamp type in .cat file
param['ra'] = hdu[0].header['ra']
param['dec'] = hdu[0].header['dec']
param['pixScale'] = hdu[0].header['pixScale']
# param['srcGalaxyID'] = hdu[0].header['srcGID']
# param['mu']= hdu[0].header['mu']
# param['PA']= hdu[0].header['PA']
# param['bfrac']= hdu[0].header['bfrac']
# param['z']= hdu[0].header['z']
# gals['mag_true_g_lsst']
param['mag_use_normal'] = hdu[0].header['mag_g']
# Apply astrometric modeling
# in C3 case only aberration
param['ra_orig'] = param['ra']
param['dec_orig'] = param['dec']
if self.config["obs_setting"]["enable_astrometric_model"]:
ra_list = [param['ra']] # ra_arr.tolist()
dec_list = [param['dec']] # dec_arr.tolist()
pmra_list = np.zeros(1).tolist()
pmdec_list = np.zeros(1).tolist()
rv_list = np.zeros(1).tolist()
parallax_list = [1e-9] * 1
dt = datetime.fromtimestamp(self.pointing.timestamp)
date_str = dt.date().isoformat()
time_str = dt.time().isoformat()
ra_arr, dec_arr = on_orbit_obs_position(
input_ra_list=ra_list,
input_dec_list=dec_list,
input_pmra_list=pmra_list,
input_pmdec_list=pmdec_list,
input_rv_list=rv_list,
input_parallax_list=parallax_list,
input_nstars=1,
input_x=self.pointing.sat_x,
input_y=self.pointing.sat_y,
input_z=self.pointing.sat_z,
input_vx=self.pointing.sat_vx,
input_vy=self.pointing.sat_vy,
input_vz=self.pointing.sat_vz,
input_epoch="J2015.5",
input_date_str=date_str,
input_time_str=time_str
)
param['ra'] = ra_arr[0]
param['dec'] = dec_arr[0]
# Assign each galaxy a template SED
param['sed_type'] = sed_assign(
phz=param['z'], btt=param['bfrac'], rng=self.rng_sedGal)
param['redden'] = self.tempRed_gal[param['sed_type']]
param['av'] = 0.0
param['redden'] = 0
param['mu'] = 1
# param["CSSTmag"]= True
# param["mag_r"] = 20.
# param['']
### more keywords for stamp###
param['image'] = hdu[0].data
param['image'] = param['image']/(np.sum(param['image']))
obj = Stamp(param)
self.objs.append(obj)
def _load(self, **kwargs):
self.objs = []
self.ids = 0
if "stamp_cat" in self.config["catalog_options"]["input_path"] and self.config["catalog_options"]["input_path"]["stamp_cat"] and self.config["catalog_options"]["stamp_yes"]:
stamps_cat = h5.File(self.stamp_path, 'r')['Stamps']
print("debug::", stamps_cat.keys())
for pix in self.pix_list:
try:
stamps = stamps_cat[str(pix)]
print("debug::", stamps.keys())
self._load_stamps(stamps, pix_id=pix)
del stamps
except Exception as e:
self.logger.error(str(e))
print(e)
if self.logger is not None:
self.logger.info("maximum galaxy size: %.4f" % (self.max_size))
self.logger.info("number of objects in catalog: %d" %
(len(self.objs)))
else:
print("number of objects in catalog: ", len(self.objs))
def load_sed(self, obj, **kwargs):
if obj.type == 'stamp':
sed_data = getObservedSED(
sedCat=self.tempSed_gal[obj.sed_type],
redshift=obj.z,
av=obj.param["av"],
redden=obj.param["redden"]
)
wave, flux = sed_data[0], sed_data[1]
else:
raise ValueError("Object type not known")
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 11001+0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photon/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
del wave
del flux
return sed
import os
import numpy as np
import mpi4py.MPI as MPI
import galsim
import psutil
import gc
from datetime import datetime
import traceback
from observation_sim.config import ChipOutput
from observation_sim.instruments import Telescope, Filter, FilterParam, FocalPlane, Chip
from observation_sim.instruments.chip import effects
from observation_sim.instruments.chip import chip_utils as chip_utils
from observation_sim.astrometry.Astrometry_util import on_orbit_obs_position
from observation_sim.sim_steps import SimSteps, SIM_STEP_TYPES
class Observation(object):
def __init__(self, config, Catalog, work_dir=None, data_dir=None):
self.config = config
self.tel = Telescope()
self.filter_param = FilterParam()
self.Catalog = Catalog
def prepare_chip_for_exposure(self, chip, ra_cen, dec_cen, pointing, wcs_fp=None):
# Get WCS for the focal plane
if wcs_fp == None:
wcs_fp = self.focal_plane.getTanWCS(
ra_cen, dec_cen, pointing.img_pa, chip.pix_scale)
# Create chip Image
chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
chip.img.setOrigin(chip.bound.xmin, chip.bound.ymin)
chip.img.wcs = wcs_fp
# Get random generators for this chip
chip.rng_poisson, chip.poisson_noise = chip_utils.get_poisson(
seed=int(self.config["random_seeds"]["seed_poisson"]) + pointing.id*30 + chip.chipID, sky_level=0.)
# Get flat, shutter, and PRNU images
chip.flat_img, _ = chip_utils.get_flat(
img=chip.img, seed=int(self.config["random_seeds"]["seed_flat"]))
if chip.chipID > 30:
chip.shutter_img = np.ones_like(chip.img.array)
else:
chip.shutter_img = effects.ShutterEffectArr(
chip.img, t_shutter=1.3, dist_bearing=735, dt=1E-3)
chip.prnu_img = effects.PRNU_Img(xsize=chip.npix_x, ysize=chip.npix_y, sigma=0.01,
seed=int(self.config["random_seeds"]["seed_prnu"]+chip.chipID))
return chip
def run_one_chip(self, chip, filt, pointing, chip_output, wcs_fp=None, psf_model=None, cat_dir=None, sed_dir=None):
chip_output.Log_info(
':::::::::::::::::::Current Pointing Information::::::::::::::::::')
chip_output.Log_info("RA: %f, DEC; %f" % (pointing.ra, pointing.dec))
chip_output.Log_info("Time: %s" % datetime.utcfromtimestamp(
pointing.timestamp).isoformat())
chip_output.Log_info("Exposure time: %f" % pointing.exp_time)
chip_output.Log_info("Satellite Position (x, y, z): (%f, %f, %f)" % (
pointing.sat_x, pointing.sat_y, pointing.sat_z))
chip_output.Log_info("Satellite Velocity (x, y, z): (%f, %f, %f)" % (
pointing.sat_vx, pointing.sat_vy, pointing.sat_vz))
chip_output.Log_info("Position Angle: %f" % pointing.img_pa.deg)
chip_output.Log_info('Chip : %d' % chip.chipID)
chip_output.Log_info(
':::::::::::::::::::::::::::END:::::::::::::::::::::::::::::::::::')
# Apply astrometric simulation for pointing
if self.config["obs_setting"]["enable_astrometric_model"]:
dt = datetime.utcfromtimestamp(pointing.timestamp)
date_str = dt.date().isoformat()
time_str = dt.time().isoformat()
ra_cen, dec_cen = on_orbit_obs_position(
input_ra_list=[pointing.ra],
input_dec_list=[pointing.dec],
input_pmra_list=[0.],
input_pmdec_list=[0.],
input_rv_list=[0.],
input_parallax_list=[1e-9],
input_nstars=1,
input_x=pointing.sat_x,
input_y=pointing.sat_y,
input_z=pointing.sat_z,
input_vx=pointing.sat_vx,
input_vy=pointing.sat_vy,
input_vz=pointing.sat_vz,
input_epoch="J2000",
input_date_str=date_str,
input_time_str=time_str
)
ra_cen, dec_cen = ra_cen[0], dec_cen[0]
else:
ra_cen = pointing.ra
dec_cen = pointing.dec
# Prepare necessary chip properties for simulation
chip = self.prepare_chip_for_exposure(chip, ra_cen, dec_cen, pointing)
# Initialize SimSteps
sim_steps = SimSteps(overall_config=self.config,
chip_output=chip_output, all_filters=self.all_filters)
for step in pointing.obs_param["call_sequence"]:
if self.config["run_option"]["out_cat_only"]:
if step != "scie_obs":
continue
chip_output.Log_info("Starting simulation step: %s, calling function: %s" % (
step, SIM_STEP_TYPES[step]))
obs_param = pointing.obs_param["call_sequence"][step]
step_name = SIM_STEP_TYPES[step]
try:
step_func = getattr(sim_steps, step_name)
chip, filt, tel, pointing = step_func(
chip=chip,
filt=filt,
tel=self.tel,
pointing=pointing,
catalog=self.Catalog,
obs_param=obs_param)
chip_output.Log_info("Finished simulation step: %s" % (step))
except Exception as e:
traceback.print_exc()
chip_output.Log_error(e)
chip_output.Log_error("Failed simulation on step: %s" % (step))
break
chip_output.Log_info("check running:1: pointing-%d chip-%d pid-%d memory-%6.2fGB" % (pointing.id,
chip.chipID, os.getpid(), (psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024)))
del chip.img
def runExposure_MPI_PointingList(self, pointing_list, chips=None):
comm = MPI.COMM_WORLD
ind_thread = comm.Get_rank()
num_thread = comm.Get_size()
process_counter = 0
for ipoint in range(len(pointing_list)):
# Construct chips & filters:
pointing = pointing_list[ipoint]
# pointing_ID = pointing.id
pointing_ID = pointing.obs_id
pointing.make_output_pointing_dir(
overall_config=self.config, copy_obs_config=True)
self.focal_plane = FocalPlane(
chip_list=pointing.obs_param["run_chips"])
# Make Chip & Filter lists
self.chip_list = []
self.filter_list = []
self.all_filters = []
for i in range(self.focal_plane.nchips):
chipID = i + 1
chip = Chip(chipID=chipID, config=self.config)
filter_id, filter_type = chip.getChipFilter()
filt = Filter(
filter_id=filter_id,
filter_type=filter_type,
filter_param=self.filter_param)
if not self.focal_plane.isIgnored(chipID=chipID):
self.chip_list.append(chip)
self.filter_list.append(filt)
self.all_filters.append(filt)
if chips is None:
# Run all chips defined in configuration of this pointing
run_chips = self.chip_list
run_filts = self.filter_list
nchips_per_fp = len(self.chip_list)
else:
# Only run a particular set of chips
run_chips = []
run_filts = []
for ichip in range(len(self.chip_list)):
chip = self.chip_list[ichip]
filt = self.filter_list[ichip]
if chip.chipID in chips:
run_chips.append(chip)
run_filts.append(filt)
nchips_per_fp = len(chips)
for ichip in range(nchips_per_fp):
i_process = process_counter + ichip
if i_process % num_thread != ind_thread:
continue
pid = os.getpid()
chip = run_chips[ichip]
filt = run_filts[ichip]
chip_output = ChipOutput(
config=self.config,
chip=chip,
filt=filt,
pointing=pointing
)
chip_output.Log_info("running pointing#%d, chip#%d, at PID#%d..." % (
int(pointing_ID), chip.chipID, pid))
self.run_one_chip(
chip=chip,
filt=filt,
chip_output=chip_output,
pointing=pointing)
chip_output.Log_info(
"finished running chip#%d..." % (chip.chipID))
for handler in chip_output.logger.handlers[:]:
chip_output.logger.removeHandler(handler)
gc.collect()
process_counter += nchips_per_fp
import galsim
import numpy as np
import cmath
class FieldDistortion(object):
def __init__(self, chip, fdModel=None, fdModel_path=None, img_rot=0.):
if fdModel is None:
if hasattr(chip, 'fdModel'):
self.fdModel = chip.fdModel
elif fdModel_path is not None:
import pickle
with open(fdModel_path, "rb") as f:
self.fdModel = pickle.load(f)
else:
raise ValueError(
"Error: no field distortion model has been specified!")
else:
self.fdModel = fdModel
self.img_rot = img_rot
self.ifdModel = self.fdModel["wave1"]
self.ixfdModel = self.ifdModel["xImagePos"]
self.iyfdModel = self.ifdModel["yImagePos"]
# first-order derivatives of the global field distortion model
self.ifx_dx = self.ixfdModel.partial_derivative(1, 0)
self.ifx_dy = self.ixfdModel.partial_derivative(0, 1)
self.ify_dx = self.iyfdModel.partial_derivative(1, 0)
self.ify_dy = self.iyfdModel.partial_derivative(0, 1)
if "residual" in self.fdModel["wave1"]:
self.irsModel = self.fdModel["wave1"]["residual"]["ccd" +
chip.getChipLabel(chipID=chip.chipID)]
self.ixrsModel = self.irsModel["xResidual"]
self.iyrsModel = self.irsModel["yResidual"]
# first-order derivatives of the residual field distortion model
self.irx_dx = self.ixrsModel.partial_derivative(1, 0)
self.irx_dy = self.ixrsModel.partial_derivative(0, 1)
self.iry_dx = self.iyrsModel.partial_derivative(1, 0)
self.iry_dy = self.iyrsModel.partial_derivative(0, 1)
else:
self.irsModel = None
def isContainObj_FD(self, chip, pos_img):
xLowI, xUpI, yLowI, yUpI = self.ifdModel["interpLimit"]
x, y = pos_img.x, pos_img.y
if (xLowI - x) * (xUpI - x) > 0 or (yLowI - y) * (yUpI - y) > 0:
return False
return True
def get_distorted(self, chip, pos_img, bandpass=None, img_rot=None):
""" Get the distored position for an undistorted image position
Parameters:
chip: A 'Chip' object representing the
chip we want to extract PSF from.
pos_img: A 'galsim.Position' object representing
the image position.
bandpass: A 'galsim.Bandpass' object representing
the wavelength range.
Returns:
pos_distorted: A 'galsim.Position' object representing
the distored position.
"""
if not self.isContainObj_FD(chip=chip, pos_img=pos_img):
return galsim.PositionD(-1, -1), None
if not img_rot:
img_rot = np.radians(self.img_rot)
else:
img_rot = np.radians(img_rot)
x, y = pos_img.x, pos_img.y
x = self.ixfdModel(x, y)[0][0]
y = self.iyfdModel(x, y)[0][0]
if self.irsModel:
# x1LowI, x1UpI, y1LowI, y1UpI = self.irsModel["interpLimit"]
# if (x1LowI-x)*(x1UpI-x) <=0 and (y1LowI-y)*(y1UpI-y)<=0:
# dx = self.ixrsModel(x, y)[0][0]
# dy = self.iyrsModel(x, y)[0][0]
# x += dx
# y += dy
# # field distortion induced ellipticity
# ix_dx = self.ifx_dx(x, y) + self.irx_dx(x, y)
# ix_dy = self.ifx_dy(x, y) + self.irx_dy(x, y)
# iy_dx = self.ify_dx(x, y) + self.iry_dx(x, y)
# iy_dy = self.ify_dy(x, y) + self.iry_dy(x, y)
# else:
# return galsim.PositionD(-1, -1), None
dx = self.ixrsModel(x, y)[0][0]
dy = self.iyrsModel(x, y)[0][0]
x += dx
y += dy
# field distortion induced ellipticity
ix_dx = self.ifx_dx(x, y) + self.irx_dx(x, y)
ix_dy = self.ifx_dy(x, y) + self.irx_dy(x, y)
iy_dx = self.ify_dx(x, y) + self.iry_dx(x, y)
iy_dy = self.ify_dy(x, y) + self.iry_dy(x, y)
else:
ix_dx = self.ifx_dx(x, y)
ix_dy = self.ifx_dy(x, y)
iy_dx = self.ify_dx(x, y)
iy_dy = self.ify_dy(x, y)
g1k_fd = 0.0 + (iy_dy - ix_dx) / (iy_dy + ix_dx)
g2k_fd = 0.0 - (iy_dx + ix_dy) / (iy_dy + ix_dx)
# [TODO] [TESTING] Rotate the shear:
g_abs = np.sqrt(g1k_fd**2 + g2k_fd**2)
phi = cmath.phase(complex(g1k_fd, g2k_fd))
# g_abs = 0.7
g1k_fd = g_abs * np.cos(phi + 2*img_rot)
g2k_fd = g_abs * np.sin(phi + 2*img_rot)
fd_shear = galsim.Shear(g1=g1k_fd, g2=g2k_fd)
return galsim.PositionD(x, y), fd_shear
import galsim
import sep
import numpy as np
from scipy.interpolate import interp1d
from observation_sim.PSF.PSFModel import PSFModel
class PSFGauss(PSFModel):
def __init__(self, chip, fwhm=0.187, sigSpin=0., psfRa=None):
self.pix_size = chip.pix_scale
self.chip = chip
if psfRa is None:
self.fwhm = fwhm
self.sigGauss = 0.15
else:
self.fwhm = self.fwhmGauss(r=psfRa)
self.sigGauss = psfRa # 80% light radius
self.sigSpin = sigSpin
self.psf = galsim.Gaussian(flux=1.0, fwhm=fwhm)
def perfGauss(self, r, sig):
"""
pseudo-error function, i.e. Cumulative distribution function of Gaussian distribution
Parameter:
r: radius
sig: sigma of the Gaussian distribution
Return:
the value of the pseudo CDF
"""
def gaussFun(sigma, r): return 1.0/(np.sqrt(2.0*np.pi)
* sigma) * np.exp(-r**2/(2.0*sigma**2))
nxx = 1000
rArr = np.linspace(0.0, r, nxx)
gauss = gaussFun(sig, rArr)
erf = 2.0*np.trapz(gauss, rArr)
return erf
def fracGauss(self, sig, r=0.15, pscale=None):
"""
For a given Gaussian PSF with sigma=sig,
derive the flux ratio ar the given radius r
Parameters:
sig: sigma of the Gauss PSF Function in arcsec
r: radius in arcsec
pscale: pixel scale
Return: the flux ratio
"""
if pscale == None:
pscale = self.pix_size
gaussx = galsim.Gaussian(flux=1.0, sigma=sig)
gaussImg = gaussx.drawImage(scale=pscale, method='no_pixel')
gaussImg = gaussImg.array
size = np.size(gaussImg, axis=0)
cxy = 0.5*(size-1)
flux, ferr, flag = sep.sum_circle(
gaussImg, [cxy], [cxy], [r/pscale], subpix=0)
return flux
def fwhmGauss(self, r=0.15, fr=0.8, pscale=None):
"""
Given a total flux ratio 'fr' within a fixed radius 'r',
estimate the fwhm of the Gaussian function
return the fwhm in arcsec
"""
if pscale == None:
pscale = self.pix_size
err = 1.0e-3
nxx = 100
sig = np.linspace(0.5*pscale, 1.0, nxx)
frA = np.zeros(nxx)
for i in range(nxx):
frA[i] = self.fracGauss(sig[i], r=r, pscale=pscale)
index = [i for i in range(nxx-1) if (fr-frA[i])
* (fr-frA[i+1]) <= 0.0][0]
while abs(frA[index]-fr) > 1.0e-3:
sig = np.linspace(sig[index], sig[index+1], nxx)
for i in range(nxx):
frA[i] = self.fracGauss(sig[i], r=r, pscale=pscale)
index = [i for i in range(
nxx-1) if (fr-frA[i])*(fr-frA[i+1]) <= 0.0][0]
fwhm = 2.35482*sig[index]
return fwhm
def get_PSF(self, pos_img, chip=None, bandpass=None, folding_threshold=5.e-3):
dx = pos_img.x - self.chip.cen_pix_x
dy = pos_img.y - self.chip.cen_pix_y
return self.PSFspin(dx, dy)
def PSFspin(self, x, y):
"""
The PSF profile at a given image position relative to the axis center
Parameters:
theta : spin angles in a given exposure in unit of [arcsecond]
dx, dy: relative position to the axis center in unit of [pixels]
Return:
Spinned PSF: g1, g2 and axis ratio 'a/b'
"""
a2Rad = np.pi/(60.0*60.0*180.0)
ff = self.sigGauss * 0.107 * (1000.0/10.0) # in unit of [pixels]
rc = np.sqrt(x*x + y*y)
cpix = rc*(self.sigSpin*a2Rad)
beta = (np.arctan2(y, x) + np.pi/2)
ell = cpix**2/(2.0*ff**2+cpix**2)
# ell *= 10.0
qr = np.sqrt((1.0+ell)/(1.0-ell))
# psfShape = galsim.Shear(e=ell, beta=beta)
# g1, g2 = psfShape.g1, psfShape.g2
# qr = np.sqrt((1.0+ell)/(1.0-ell))
# return ell, beta, qr
PSFshear = galsim.Shear(e=ell, beta=beta*galsim.radians)
return self.psf.shear(PSFshear), PSFshear
'''
PSF interpolation for CSST-Sim
NOTE: [iccd, iwave, ipsf] are counted from 1 to n, but [tccd, twave, tpsf] are counted from 0 to n-1
'''
import sys
import time
import copy
import numpy as np
import scipy.spatial as spatial
import galsim
import h5py
from observation_sim.PSF.PSFModel import PSFModel
NPSF = 900 # ***# 30*30
PixSizeInMicrons = 5. # ***# in microns
### find neighbors-KDtree###
def findNeighbors(tx, ty, px, py, dr=0.1, dn=1, OnlyDistance=True):
"""
find nearest neighbors by 2D-KDTree
Parameters:
tx, ty (float, float): a given position
px, py (numpy.array, numpy.array): position data for tree
dr (float-optional): distance
dn (int-optional): nearest-N
OnlyDistance (bool-optional): only use distance to find neighbors. Default: True
Returns:
dataq (numpy.array): index
"""
datax = px
datay = py
tree = spatial.KDTree(list(zip(datax.ravel(), datay.ravel())))
dataq = []
rr = dr
if OnlyDistance == True:
dataq = tree.query_ball_point([tx, ty], rr)
if OnlyDistance == False:
while len(dataq) < dn:
dataq = tree.query_ball_point([tx, ty], rr)
rr += dr
dd = np.hypot(datax[dataq]-tx, datay[dataq]-ty)
ddSortindx = np.argsort(dd)
dataq = np.array(dataq)[ddSortindx[0:dn]]
return dataq
### find neighbors-hoclist###
def hocBuild(partx, party, nhocx, nhocy, dhocx, dhocy):
if np.max(partx) > nhocx*dhocx:
print('ERROR')
sys.exit()
if np.max(party) > nhocy*dhocy:
print('ERROR')
sys.exit()
npart = partx.size
hoclist = np.zeros(npart, dtype=np.int32)-1
hoc = np.zeros([nhocy, nhocx], dtype=np.int32)-1
for ipart in range(npart):
ix = int(partx[ipart]/dhocx)
iy = int(party[ipart]/dhocy)
hoclist[ipart] = hoc[iy, ix]
hoc[iy, ix] = ipart
return hoc, hoclist
def hocFind(px, py, dhocx, dhocy, hoc, hoclist):
ix = int(px/dhocx)
iy = int(py/dhocy)
neigh = []
it = hoc[iy, ix]
while it != -1:
neigh.append(it)
it = hoclist[it]
return neigh
def findNeighbors_hoclist(px, py, tx=None, ty=None, dn=4, hoc=None, hoclist=None):
nhocy = nhocx = 20
pxMin = np.min(px)
pxMax = np.max(px)
pyMin = np.min(py)
pyMax = np.max(py)
dhocx = (pxMax - pxMin)/(nhocx-1)
dhocy = (pyMax - pyMin)/(nhocy-1)
partx = px - pxMin + dhocx/2
party = py - pyMin + dhocy/2
if hoc is None:
hoc, hoclist = hocBuild(partx, party, nhocx, nhocy, dhocx, dhocy)
return hoc, hoclist
if hoc is not None:
tx = tx - pxMin + dhocx/2
ty = ty - pyMin + dhocy/2
itx = int(tx/dhocx)
ity = int(ty/dhocy)
ps = [-1, 0, 1]
neigh = []
for ii in range(3):
for jj in range(3):
ix = itx + ps[ii]
iy = ity + ps[jj]
if ix < 0:
continue
if iy < 0:
continue
if ix > nhocx-1:
continue
if iy > nhocy-1:
continue
# neightt = myUtil.hocFind(ppx, ppy, dhocx, dhocy, hoc, hoclist)
it = hoc[iy, ix]
while it != -1:
neigh.append(it)
it = hoclist[it]
# neigh.append(neightt)
# ll = [i for k in neigh for i in k]
if dn != -1:
ptx = np.array(partx[neigh])
pty = np.array(party[neigh])
dd = np.hypot(ptx-tx, pty-ty)
idx = np.argsort(dd)
neigh = np.array(neigh)[idx[0:dn]]
return neigh
### PSF-IDW###
def psfMaker_IDW(px, py, PSFMat, cen_col, cen_row, IDWindex=2, OnlyNeighbors=True, hoc=None, hoclist=None, PSFCentroidWgt=False):
"""
psf interpolation by IDW
Parameters:
px, py (float, float): position of the target
PSFMat (numpy.array): image
cen_col, cen_row (numpy.array, numpy.array): potions of the psf centers
IDWindex (int-optional): the power index of IDW
OnlyNeighbors (bool-optional): only neighbors are used for psf interpolation
Returns:
psfMaker (numpy.array)
"""
minimum_psf_weight = 1e-8
ref_col = px
ref_row = py
ngy, ngx = PSFMat[0, :, :].shape
npsf = PSFMat[:, :, :].shape[0]
psfWeight = np.zeros([npsf])
if OnlyNeighbors == True:
if hoc is None:
neigh = findNeighbors(px, py, cen_col, cen_row,
dr=5., dn=4, OnlyDistance=False)
if hoc is not None:
neigh = findNeighbors_hoclist(
cen_col, cen_row, tx=px, ty=py, dn=4, hoc=hoc, hoclist=hoclist)
neighFlag = np.zeros(npsf)
neighFlag[neigh] = 1
for ipsf in range(npsf):
if OnlyNeighbors == True:
if neighFlag[ipsf] != 1:
continue
dist = np.sqrt((ref_col - cen_col[ipsf])
** 2 + (ref_row - cen_row[ipsf])**2)
if IDWindex == 1:
psfWeight[ipsf] = dist
if IDWindex == 2:
psfWeight[ipsf] = dist**2
if IDWindex == 3:
psfWeight[ipsf] = dist**3
if IDWindex == 4:
psfWeight[ipsf] = dist**4
psfWeight[ipsf] = max(psfWeight[ipsf], minimum_psf_weight)
psfWeight[ipsf] = 1./psfWeight[ipsf]
psfWeight /= np.sum(psfWeight)
psfMaker = np.zeros([ngy, ngx], dtype=np.float32)
for ipsf in range(npsf):
if OnlyNeighbors == True:
if neighFlag[ipsf] != 1:
continue
iPSFMat = PSFMat[ipsf, :, :].copy()
ipsfWeight = psfWeight[ipsf]
psfMaker += iPSFMat * ipsfWeight
psfMaker /= np.nansum(psfMaker)
return psfMaker
### define PSFInterp###
class PSFInterp(PSFModel):
def __init__(self, chip, npsf=NPSF, PSF_data=None, PSF_data_file=None, PSF_data_prefix="", sigSpin=0, psfRa=0.15, HocBuild=False, LOG_DEBUG=False):
self.LOG_DEBUG = LOG_DEBUG
if self.LOG_DEBUG:
print('===================================================')
print('DEBUG: psf module for csstSim '
+ time.strftime("(%Y-%m-%d %H:%M:%S)", time.localtime()), flush=True)
print('===================================================')
self.sigSpin = sigSpin
self.sigGauss = psfRa
self.iccd = int(chip.getChipLabel(chipID=chip.chipID))
# self.iccd = chip.chip_name
if PSF_data_file == None:
print('Error - PSF_data_file is None')
sys.exit()
self.nwave = self._getPSFwave(
self.iccd, PSF_data_file, PSF_data_prefix)
self.npsf = npsf
self.PSF_data = self._loadPSF(
self.iccd, PSF_data_file, PSF_data_prefix)
if self.LOG_DEBUG:
print('nwave-{:} on ccd-{:}::'.format(self.nwave,
self.iccd), flush=True)
print('self.PSF_data ... ok', flush=True)
print(
'Preparing self.[psfMat,cen_col,cen_row] for psfMaker ... ', end='', flush=True)
ngy, ngx = self.PSF_data[0][0]['psfMat'].shape
self.psfMat = np.zeros(
[self.nwave, self.npsf, ngy, ngx], dtype=np.float32)
self.cen_col = np.zeros([self.nwave, self.npsf], dtype=np.float32)
self.cen_row = np.zeros([self.nwave, self.npsf], dtype=np.float32)
self.hoc = []
self.hoclist = []
for twave in range(self.nwave):
for tpsf in range(self.npsf):
self.psfMat[twave, tpsf, :,
:] = self.PSF_data[twave][tpsf]['psfMat']
self.PSF_data[twave][tpsf]['psfMat'] = 0 # free psfMat
self.pixsize = self.PSF_data[twave][tpsf]['pixsize']*1e-3 # mm
self.cen_col[twave, tpsf] = self.PSF_data[twave][tpsf]['image_x'] + \
self.PSF_data[twave][tpsf]['centroid_x']
self.cen_row[twave, tpsf] = self.PSF_data[twave][tpsf]['image_y'] + \
self.PSF_data[twave][tpsf]['centroid_y']
if HocBuild:
# hoclist on twave for neighborsFinding
hoc, hoclist = findNeighbors_hoclist(
self.cen_col[twave], self.cen_row[twave])
self.hoc.append(hoc)
self.hoclist.append(hoclist)
if self.LOG_DEBUG:
print('ok', flush=True)
def _getPSFwave(self, iccd, PSF_data_file, PSF_data_prefix):
# fq = h5py.File(PSF_data_file+'/' +PSF_data_prefix +'psfCube_ccd{:}.h5'.format(iccd), 'r')
fq = h5py.File(PSF_data_file+'/' + PSF_data_prefix +
'psfCube_{:}.h5'.format(iccd), 'r')
nwave = len(fq.keys())
fq.close()
return nwave
def _loadPSF(self, iccd, PSF_data_file, PSF_data_prefix):
psfSet = []
# fq = h5py.File(PSF_data_file+'/' +PSF_data_prefix +'psfCube_ccd{:}.h5'.format(iccd), 'r')
fq = h5py.File(PSF_data_file+'/' + PSF_data_prefix +
'psfCube_{:}.h5'.format(iccd), 'r')
for ii in range(self.nwave):
iwave = ii+1
psfWave = []
fq_iwave = fq['w_{:}'.format(iwave)]
for jj in range(self.npsf):
ipsf = jj+1
psfInfo = {}
psfInfo['wavelength'] = fq_iwave['wavelength'][()]
fq_iwave_ipsf = fq_iwave['psf_{:}'.format(ipsf)]
psfInfo['pixsize'] = PixSizeInMicrons
psfInfo['field_x'] = fq_iwave_ipsf['field_x'][()]
psfInfo['field_y'] = fq_iwave_ipsf['field_y'][()]
psfInfo['image_x'] = fq_iwave_ipsf['image_x'][()]
psfInfo['image_y'] = fq_iwave_ipsf['image_y'][()]
psfInfo['centroid_x'] = fq_iwave_ipsf['cx'][()]
psfInfo['centroid_y'] = fq_iwave_ipsf['cy'][()]
psfInfo['psfMat'] = fq_iwave_ipsf['psfMat'][()]
psfWave.append(psfInfo)
psfSet.append(psfWave)
fq.close()
if self.LOG_DEBUG:
print('psfSet has been loaded:', flush=True)
print('psfSet[iwave][ipsf][keys]:',
psfSet[0][0].keys(), flush=True)
return psfSet
def _findWave(self, bandpass):
if isinstance(bandpass, int):
twave = bandpass
return twave
for twave in range(self.nwave):
bandwave = self.PSF_data[twave][0]['wavelength']
if bandpass.blue_limit < bandwave and bandwave < bandpass.red_limit:
return twave
return -1
def get_PSF(self, chip, pos_img, bandpass, galsimGSObject=True, findNeighMode='treeFind', folding_threshold=5.e-3, pointing_pa=0.0):
"""
Get the PSF at a given image position
Parameters:
chip: A 'Chip' object representing the chip we want to extract PSF from.
pos_img: A 'galsim.Position' object representing the image position.
bandpass: A 'galsim.Bandpass' object representing the wavelength range.
pixSize: The pixels size of psf matrix
findNeighMode: 'treeFind' or 'hoclistFind'
Returns:
PSF: A 'galsim.GSObject'.
"""
pixSize = np.rad2deg(self.pixsize*1e-3/28)*3600 # set psf pixsize
# assert self.iccd == int(chip.getChipLabel(chipID=chip.chipID)), 'ERROR: self.iccd != chip.chipID'
twave = self._findWave(bandpass)
if twave == -1:
print("!!!PSF bandpass does not match.")
exit()
PSFMat = self.psfMat[twave]
cen_col = self.cen_col[twave]
cen_row = self.cen_row[twave]
px = (pos_img.x - chip.cen_pix_x)*0.01
py = (pos_img.y - chip.cen_pix_y)*0.01
if findNeighMode == 'treeFind':
imPSF = psfMaker_IDW(px, py, PSFMat, cen_col, cen_row,
IDWindex=2, OnlyNeighbors=True, PSFCentroidWgt=True)
if findNeighMode == 'hoclistFind':
assert (self.hoc != 0), 'hoclist should be built correctly!'
imPSF = psfMaker_IDW(px, py, PSFMat, cen_col, cen_row, IDWindex=2, OnlyNeighbors=True,
hoc=self.hoc[twave], hoclist=self.hoclist[twave], PSFCentroidWgt=True)
'''
############TEST: START
TestGaussian = False
if TestGaussian:
gsx = galsim.Gaussian(sigma=0.04)
#pointing_pa = -23.433333
imPSF= gsx.shear(g1=0.8, g2=0.).rotate(0.*galsim.degrees).drawImage(nx = 256, ny=256, scale=pixSize).array
############TEST: END
'''
if galsimGSObject:
imPSFt = np.zeros([257, 257])
imPSFt[0:256, 0:256] = imPSF
# imPSFt[120:130, 0:256] = 1.
img = galsim.ImageF(imPSFt, scale=pixSize)
gsp = galsim.GSParams(folding_threshold=folding_threshold)
# TEST: START
# Use sheared PSF to test the PSF orientation
# self.psf = galsim.InterpolatedImage(img, gsparams=gsp).shear(g1=0.8, g2=0.)
# TEST: END
self.psf = galsim.InterpolatedImage(img, gsparams=gsp)
wcs = chip.img.wcs.local(pos_img)
scale = galsim.PixelScale(0.074)
self.psf = wcs.toWorld(scale.toImage(
self.psf), image_pos=(pos_img))
# return self.PSFspin(x=px/0.01, y=py/0.01)
return self.psf, galsim.Shear(e=0., beta=(np.pi/2)*galsim.radians)
return imPSF
'''
def PSFspin(self, x, y):
"""
The PSF profile at a given image position relative to the axis center
Parameters:
theta : spin angles in a given exposure in unit of [arcsecond]
dx, dy: relative position to the axis center in unit of [pixels]
Return:
Spinned PSF: g1, g2 and axis ratio 'a/b'
"""
a2Rad = np.pi/(60.0*60.0*180.0)
ff = self.sigGauss * 0.107 * (1000.0/10.0) # in unit of [pixels]
rc = np.sqrt(x*x + y*y)
cpix = rc*(self.sigSpin*a2Rad)
beta = (np.arctan2(y,x) + np.pi/2)
ell = cpix**2/(2.0*ff**2+cpix**2)
qr = np.sqrt((1.0+ell)/(1.0-ell))
PSFshear = galsim.Shear(e=ell, beta=beta*galsim.radians)
return self.psf.shear(PSFshear), PSFshear
'''
This diff is collapsed.
import galsim
import sep
import numpy as np
from scipy.interpolate import interp1d
import pylab as pl
import os
import sys
class PSFModel(object):
def __init__(self, sigSpin=0., psfRa=0.15):
# TODO: what are the nesseary fields in PSFModel class?
pass
def PSFspin(self, psf, sigSpin, sigGauss, dx, dy):
"""
The PSF profile at a given image position relative to the axis center
Parameters:
theta : spin angles in a given exposure in unit of [arcsecond]
dx, dy: relative position to the axis center in unit of [pixels]
Return:
Spinned PSF: g1, g2 and axis ratio 'a/b'
"""
a2Rad = np.pi/(60.0*60.0*180.0)
ff = sigGauss * 0.107 * (1000.0/10.0) # in unit of [pixels]
rc = np.sqrt(dx*dx + dy*dy)
cpix = rc*(sigSpin*a2Rad)
beta = (np.arctan2(dy, dx) + np.pi/2)
ell = cpix**2/(2.0*ff**2+cpix**2)
# ell *= 10.0
qr = np.sqrt((1.0+ell)/(1.0-ell))
# psfShape = galsim.Shear(e=ell, beta=beta)
# g1, g2 = psfShape.g1, psfShape.g2
# qr = np.sqrt((1.0+ell)/(1.0-ell))
# return ell, beta, qr
PSFshear = galsim.Shear(e=ell, beta=beta*galsim.radians)
return psf.shear(PSFshear), PSFshear
from .PSFModel import PSFModel
from .PSFGauss import PSFGauss
# from .PSFInterp.PSFInterp import PSFInterp
from .PSFInterp import PSFInterp
from .PSFInterpSLS import PSFInterpSLS
from .FieldDistortion import FieldDistortion
\ No newline at end of file
import numpy as np
import os
from datetime import datetime
import argparse
from astropy.time import Time
from observation_sim.config import Pointing
def parse_args():
'''
Parse command line arguments. Many of the following
can be set in the .yaml config file as well.
'''
parser = argparse.ArgumentParser()
parser.add_argument('--config_file', type=str, required=True,
help='.yaml config file for simulation settings.')
parser.add_argument('--catalog', type=str,
help='name of the catalog interface class to be loaded.')
parser.add_argument('-c', '--config_dir', type=str,
help='Directory that houses the .yaml config file.')
parser.add_argument('-d', '--data_dir', type=str,
help='Directory that houses the input data.')
parser.add_argument('-w', '--work_dir', type=str,
help='The path for output.')
return parser.parse_args()
def generate_pointing_list(config, pointing_filename=None, data_dir=None):
pointing_list = []
# Only valid when the pointing list does not contain time stamp column
t0 = datetime(2021, 5, 25, 12, 0, 0)
delta_t = 10. # Time elapsed between exposures (minutes)
# Calculate starting time(s) for CAL exposures
# NOTE: temporary implementation
t = datetime.timestamp(t0)
ipoint = 0
run_pointings = config['obs_setting']['run_pointings']
if "obs_config_file" in config['obs_setting']:
obs_config_file = config['obs_setting']["obs_config_file"]
else:
obs_config_file = None
# if pointing_filename and data_dir:
if pointing_filename:
if data_dir:
pointing_file = os.path.join(data_dir, pointing_filename)
else:
pointing_file = pointing_filename
f = open(pointing_file, 'r')
# for _ in range(1):
# header = f.readline()
iline = 0
for line in f:
if len(line.strip()) == 0 or line[0] == '#':
continue
if run_pointings and iline not in run_pointings:
iline += 1
ipoint += 1
continue
line = line.strip()
columns = line.split()
pointing = Pointing(obs_config_file=obs_config_file)
pointing.read_pointing_columns(columns=columns, id=ipoint)
t += delta_t * 60.
pointing_list.append(pointing)
iline += 1
ipoint += 1
f.close()
else:
if config["obs_setting"]["exp_time"]:
exp_time = config["obs_setting"]["exp_time"]
else:
exp_time = 150.
pointing = Pointing(
id=ipoint,
ra=config["obs_setting"]["ra_center"],
dec=config["obs_setting"]["dec_center"],
img_pa=config["obs_setting"]["image_rot"],
timestamp=t,
exp_time=exp_time,
pointing_type='SCI',
obs_config_file=obs_config_file
)
t += delta_t * 60.
pointing_list.append(pointing)
ipoint += 1
return pointing_list
def make_run_dirs(work_dir, run_name, pointing_list):
if not os.path.exists(work_dir):
try:
os.makedirs(work_dir, exist_ok=True)
except OSError:
pass
imgDir = os.path.join(work_dir, run_name)
if not os.path.exists(imgDir):
try:
os.makedirs(imgDir, exist_ok=True)
except OSError:
pass
return imgDir
def make_output_pointing_dir(path_dict, config, pointing_ID=0):
imgDir = os.path.join(path_dict["work_dir"], config["run_name"])
if not os.path.exists(imgDir):
try:
os.makedirs(imgDir, exist_ok=True)
except OSError:
pass
prefix = "MSC_" + str(pointing_ID).rjust(8, '0')
subImgdir = os.path.join(imgDir, prefix)
if not os.path.exists(subImgdir):
try:
os.makedirs(subImgdir, exist_ok=True)
except OSError:
pass
return subImgdir, prefix
def get_shear_field(config):
if not config["shear_setting"]["shear_type"] in ["constant", "catalog"]:
raise ValueError("Please set a right 'shear_method' parameter.")
if config["shear_setting"]["shear_type"] == "constant":
g1 = config["shear_setting"]["reduced_g1"]
g2 = config["shear_setting"]["reduced_g2"]
nshear = 1
# TODO logging
else:
g1, g2 = 0., 0.
nshear = 0
return g1, g2, nshear
from ctypes import *
import numpy as np
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 'importlib_resources'
import importlib_resources as pkg_resources
def checkInputList(input_list, n):
if not isinstance(input_list, list):
raise TypeError("Input type is not list!", input_list)
for i in input_list:
if type(i) != type(1.1):
if type(i) != type(1):
raise TypeError(
"Input list's element is not float or int!", input_list)
if len(input_list) != n:
raise RuntimeError(
"Length of input list is not equal to stars' number!", input_list)
def on_orbit_obs_position(input_ra_list, input_dec_list, input_pmra_list, input_pmdec_list, input_rv_list, input_parallax_list, input_nstars, input_x, input_y, input_z, input_vx, input_vy, input_vz, input_epoch, input_date_str, input_time_str, lib_path=None):
# Check input parameters
if not isinstance(input_nstars, int):
raise TypeError("Parameter 7 is not int!", input_nstars)
checkInputList(input_ra_list, input_nstars)
checkInputList(input_dec_list, input_nstars)
checkInputList(input_pmra_list, input_nstars)
checkInputList(input_pmdec_list, input_nstars)
checkInputList(input_rv_list, input_nstars)
checkInputList(input_parallax_list, input_nstars)
if not isinstance(input_x, float):
raise TypeError("Parameter 8 is not double!", input_x)
if not isinstance(input_y, float):
raise TypeError("Parameter 9 is not double!", input_y)
if not isinstance(input_z, float):
raise TypeError("Parameter 10 is not double!", input_z)
if not isinstance(input_vx, float):
raise TypeError("Parameter 11 is not double!", input_vx)
if not isinstance(input_vy, float):
raise TypeError("Parameter 12 is not double!", input_vy)
if not isinstance(input_vz, float):
raise TypeError("Parameter 13 is not double!", input_vz)
# Convert km -> m
input_x = input_x*1000.0
input_y = input_y*1000.0
input_z = input_z*1000.0
input_vx = input_vx*1000.0
input_vy = input_vy*1000.0
input_vz = input_vz*1000.0
if not isinstance(input_date_str, str):
raise TypeError("Parameter 15 is not string!", input_date_str)
else:
input_date_str = input_date_str.strip()
if not (input_date_str[4] == "-" and input_date_str[7] == "-"):
raise TypeError("Parameter 15 format error (1)!", input_date_str)
else:
tmp = input_date_str.split("-")
if len(tmp) != 3:
raise TypeError(
"Parameter 15 format error (2)!", input_date_str)
input_year = int(tmp[0])
input_month = int(tmp[1])
input_day = int(tmp[2])
if not (input_year >= 1900 and input_year <= 2100):
raise TypeError(
"Parameter 15 year range error [1900 ~ 2100]!", input_year)
if not (input_month >= 1 and input_month <= 12):
raise TypeError(
"Parameter 15 month range error [1 ~ 12]!", input_month)
if not (input_day >= 1 and input_day <= 31):
raise TypeError(
"Parameter 15 day range error [1 ~ 31]!", input_day)
if not isinstance(input_time_str, str):
raise TypeError("Parameter 16 is not string!", input_time_str)
else:
input_time_str = input_time_str.strip()
if not (input_time_str[2] == ":" and input_time_str[5] == ":"):
raise TypeError("Parameter 16 format error (1)!", input_time_str)
else:
tmp = input_time_str.split(":")
if len(tmp) != 3:
raise TypeError(
"Parameter 16 format error (2)!", input_time_str)
input_hour = int(tmp[0])
input_minute = int(tmp[1])
input_second = float(tmp[2])
if not (input_hour >= 0 and input_hour <= 23):
raise TypeError(
"Parameter 16 hour range error [0 ~ 23]!", input_hour)
if not (input_minute >= 0 and input_minute <= 59):
raise TypeError(
"Parameter 16 minute range error [0 ~ 59]!", input_minute)
if not (input_second >= 0 and input_second < 60.0):
raise TypeError(
"Parameter 16 second range error [0 ~ 60)!", input_second)
# Inital dynamic lib
try:
with pkg_resources.files('observation_sim.astrometry.lib').joinpath("libshao.so") as lib_path:
shao = cdll.LoadLibrary(lib_path)
except AttributeError:
with pkg_resources.path('observation_sim.astrometry.lib', "libshao.so") as lib_path:
shao = cdll.LoadLibrary(lib_path)
shao.onOrbitObs.restype = c_int
d3 = c_double * 3
shao.onOrbitObs.argtypes = [c_double, c_double, c_double, c_double, c_double, c_double,
c_int, c_int, c_int, c_int, c_int, c_double,
c_double, POINTER(d3), POINTER(d3),
c_int, c_int, c_int, c_int, c_int, c_double,
POINTER(c_double), POINTER(c_double)]
output_ra_list = list()
output_dec_list = list()
for i in range(input_nstars):
input_ra = c_double(input_ra_list[i])
input_dec = c_double(input_dec_list[i])
input_pmra = c_double(input_pmra_list[i])
input_pmdec = c_double(input_pmdec_list[i])
# input_rv = c_double(input_rv_list[i] * 3600.) # Convert from km/s to km/h
input_rv = c_double(input_rv_list[i])
input_parallax = c_double(input_parallax_list[i])
p3 = d3(input_x, input_y, input_z)
v3 = d3(input_vx, input_vy, input_vz)
input_year_c = c_int(input_year)
input_month_c = c_int(input_month)
input_day_c = c_int(input_day)
input_hour_c = c_int(input_hour)
input_minute_c = c_int(input_minute)
input_second_c = c_double(input_second)
DAT = c_double(37.0)
output_ra = c_double(0.0)
output_dec = c_double(0.0)
rs = shao.onOrbitObs(input_ra, input_dec, input_parallax, input_pmra, input_pmdec, input_rv,
input_year_c, input_month_c, input_day_c, input_hour_c, input_minute_c, input_second_c,
DAT, byref(p3), byref(v3),
input_year_c, input_month_c, input_day_c, input_hour_c, input_minute_c, input_second_c,
byref(output_ra), byref(output_dec))
if rs != 0:
raise RuntimeError("Calculate error!")
output_ra_list.append(output_ra.value)
output_dec_list.append(output_dec.value)
return np.array(output_ra_list), np.array(output_dec_list)
import os
import logging
import observation_sim.config._util as _util
from observation_sim.config.header import generatePrimaryHeader
class ChipOutput(object):
def __init__(self, config, chip, filt, pointing, logger_filename=None):
self.config = config
self.chip = chip
self.filt = filt
self.pointing_type = pointing.pointing_type
self.chip_label = str(chip.chipID).rjust(2, '0')
# Get primary header based on chip and pointing
self.h_prim = generatePrimaryHeader(
xlen=chip.npix_x,
ylen=chip.npix_y,
pointing_id=pointing.obs_id,
pointing_type_code=pointing.pointing_type_code,
ra=pointing.ra,
dec=pointing.dec,
pixel_scale=chip.pix_scale,
time_pt=pointing.timestamp,
exptime=pointing.exp_time,
im_type=pointing.pointing_type,
sat_pos=[pointing.sat_x, pointing.sat_y, pointing.sat_z],
sat_vel=[pointing.sat_vx, pointing.sat_vy, pointing.sat_vz],
project_cycle=self.config["project_cycle"],
run_counter=self.config["run_counter"],
chip_name=self.chip_label)
obs_id = _util.get_obs_id(img_type=self.pointing_type, project_cycle=config["project_cycle"], run_counter=config[
"run_counter"], pointing_id=pointing.obs_id, pointing_type_code=pointing.pointing_type_code)
self.subdir = pointing.output_dir
self.cat_name = self.h_prim['FILENAME'] + '.cat'
if logger_filename is None:
logger_filename = self.h_prim['FILENAME'] + '.log'
self.logger = logging.getLogger()
fh = logging.FileHandler(os.path.join(
self.subdir, logger_filename), mode='w+', encoding='utf-8')
fh.setLevel(logging.DEBUG)
self.logger.setLevel(logging.DEBUG)
logging.getLogger('numba').setLevel(logging.WARNING)
formatter = logging.Formatter(
'%(asctime)s - %(msecs)d - %(levelname)-8s - [%(filename)s:%(lineno)d] - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
hdr1 = "# obj_ID ID_chip filter xImage yImage ra dec ra_orig dec_orig z mag obj_type "
hdr2 = "pm_ra pm_dec RV parallax"
fmt1 = "%20s %4d %5s %10.3f %10.3f %15.8f %15.8f %15.8f %15.8f %7.4f %8.4f %15s "
fmt2 = "%15.8f %15.8f %15.8f %15.8f"
self.hdr = hdr1 + hdr2
self.fmt = fmt1 + fmt2
self.logger.info("pointing_type = %s\n" % (self.pointing_type))
def Log_info(self, message):
print(message)
self.logger.info(message)
def Log_error(self, message):
print(message)
self.logger.error(message)
def update_output_header(self, additional_column_names=""):
self.hdr += additional_column_names
def create_output_file(self):
if self.pointing_type == 'SCI':
self.cat = open(os.path.join(self.subdir, self.cat_name), "w")
self.logger.info("Creating catalog file %s ...\n" %
(os.path.join(self.subdir, self.cat_name)))
if not self.hdr.endswith("\n"):
self.hdr += "\n"
self.cat.write(self.hdr)
def cat_add_obj(self, obj, pos_img, pos_shear):
ximg = obj.real_pos.x + 1.0
yimg = obj.real_pos.y + 1.0
line = self.fmt % (
obj.id, int(self.chip_label), self.filt.filter_type, ximg, yimg, obj.ra, obj.dec, obj.ra_orig, obj.dec_orig, obj.z, obj.getMagFilter(
self.filt), obj.type,
obj.pmra, obj.pmdec, obj.rv, obj.parallax)
line += obj.additional_output_str
if not line.endswith("\n"):
line += "\n"
self.cat.write(line)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment