import os
import galsim
import random
import numpy as np
import h5py as h5
import healpy as hp
import astropy.constants as cons
import traceback
from astropy.coordinates import spherical_to_cartesian
from astropy.table import Table
from scipy import interpolate
from datetime import datetime
from observation_sim.mock_objects import CatalogBase, Star, Galaxy, Quasar
from observation_sim.mock_objects._util import tag_sed, getObservedSED, getABMAG, integrate_sed_bandpass, comoving_dist
from observation_sim.astrometry.Astrometry_util import on_orbit_obs_position
# (TEST)
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 'importlib_resources'
import importlib_resources as pkg_resources
NSIDE = 128
def get_bundleIndex(healpixID_ring, bundleOrder=4, healpixOrder=7):
assert NSIDE == 2**healpixOrder
shift = healpixOrder - bundleOrder
shift = 2*shift
nside_bundle = 2**bundleOrder
nside_healpix= 2**healpixOrder
healpixID_nest= hp.ring2nest(nside_healpix, healpixID_ring)
bundleID_nest = (healpixID_nest >> shift)
bundleID_ring = hp.nest2ring(nside_bundle, bundleID_nest)
return bundleID_ring
class Catalog(CatalogBase):
def __init__(self, config, chip, pointing, chip_output, filt, **kwargs):
# self.cat_dir = config["catalog_options"]["input_path"]["cat_dir"]
# self.seed_Av = config["catalog_options"]["seed_Av"]
self.cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
self.chip_output = chip_output
self.filt = filt
self.logger = chip_output.logger
with pkg_resources.path('', 'SLOAN_SDSS.g.fits') as filter_path:
self.normF_star =
self.config = config
self.chip = chip
self.pointing = pointing
self.max_size = 0.
if "CALIB_cat" in config["catalog_options"]["input_path"] and config["catalog_options"]["input_path"][
"CALIB_cat"] and not config["catalog_options"]["star_only"]:
self.CALIB_cat_path = config["catalog_options"]["input_path"]["CALIB_cat"]
self.CALIB_SED_path = config["catalog_options"]["SED_templates_path"]["CALIB_SED"]
if "rotateEll" in config["catalog_options"]:
self.rotation = np.radians(float(config["catalog_options"]["rotateEll"]))
self.rotation = 0.
# Update output .cat header with catalog specific output columns
# self._get_healpix_list()
def _add_output_columns_header(self):
self.add_hdr = " model_tag teff logg feh"
self.add_hdr += " bulgemass diskmass detA e1 e2 kappa g1 g2 size galType veldisp "
self.add_fmt = " %10s %8.4f %8.4f %8.4f"
self.add_fmt += " %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %8.4f %4d %8.4f "
def load_norm_filt(self, obj):
if obj.type == "star":
return self.normF_star
elif obj.type == "galaxy" or obj.type == "quasar":
# return self.normF_galaxy
return None
return None
# def _load_SED_lib_star(self):
# self.tempSED_star = h5.File(self.star_SED_path,'r')
# def _load_SED_lib_gals(self):
# pcs = h5.File(os.path.join(self.galaxy_SED_path, "pcs.h5"), "r")
# lamb = h5.File(os.path.join(self.galaxy_SED_path, "lamb.h5"), "r")
# self.lamb_gal = lamb['lamb'][()]
# self.pcs = pcs['pcs'][()]
# def _load_SED_lib_AGN(self):
# from import fits
# self.SED_AGN =[0].data
# self.lamb_AGN = np.load(self.AGN_SED_wave_path)
def _load_stars(self, stars, pix_id=None):
nstars = len(stars['sourceID'])
# Apply astrometric modeling
ra_arr = stars["RA"][:]
dec_arr = stars["Dec"][:]
pmra_arr = stars['pmra'][:]
pmdec_arr = stars['pmdec'][:]
rv_arr = stars['RV'][:]
parallax_arr = stars['parallax'][:]
if self.config["obs_setting"]["enable_astrometric_model"]:
ra_list = ra_arr.tolist()
dec_list = dec_arr.tolist()
pmra_list = pmra_arr.tolist()
pmdec_list = pmdec_arr.tolist()
rv_list = rv_arr.tolist()
parallax_list = parallax_arr.tolist()
dt = datetime.utcfromtimestamp(self.pointing.timestamp)
date_str =
time_str = dt.time().isoformat()
ra_arr, dec_arr = on_orbit_obs_position(
for istars in range(nstars):
# # (TEST)
# if istars > 100:
# break
param = self.initialize_param()
param['ra'] = ra_arr[istars]
param['dec'] = dec_arr[istars]
param['ra_orig'] = stars["RA"][istars]
param['dec_orig'] = stars["Dec"][istars]
param['pmra'] = pmra_arr[istars]
param['pmdec'] = pmdec_arr[istars]
param['rv'] = rv_arr[istars]
param['parallax'] = parallax_arr[istars]
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
param['mag_use_normal'] = stars['app_sdss_g'][istars]
# if param['mag_use_normal'] >= 26.5:
# continue
self.ids += 1
param['id'] = stars['sourceID'][istars]
param['sed_type'] = stars['sourceID'][istars]
param['model_tag'] = stars['model_tag'][istars]
param['teff'] = stars['teff'][istars]
param['logg'] = stars['grav'][istars]
param['feh'] = stars['feh'][istars]
param['z'] = 0.0
param['star'] = 1 # Star
obj = Star(param, logger=self.logger)
# Append additional output columns to the .cat file
obj.additional_output_str = self.add_fmt%(param["model_tag"], param['teff'], param['logg'], param['feh'],
0., 0., 0., 0., 0., 0., 0., 0., 0., -1, 0.)
def _load_calibObj(self):
data =
ra_arr = data['RA']
dec_arr = data['DEC']
pSource_flag = data['POINTSOURCE_FLAG']
ngals = len(data)
for igals in range(ngals):
# # (TEST)
# if igals > 100:
# break
param = self.initialize_param()
param['ra'] = ra_arr[igals]
param['dec'] = dec_arr[igals]
param['ra_orig'] = data['RA'][igals]
param['dec_orig'] = data['DEC'][igals]
param['mag_use_normal'] = data['MAG_g'][igals]
# if self.filt.is_too_dim(mag=param['mag_use_normal'], margin=self.config["obs_setting"]["mag_lim_margin"]):
# continue
param['z'] = -99
param['model_tag'] = 'None'
param['g1'] = 0
param['g2'] = 0
param['kappa'] = 0
param['e1'] = 0
param['e2'] = 0
param['id'] = data['ID'][igals]
param['SPEC_FN'] = data['SPEC_FN'][igals]
# NOTE: this cut cannot be put before the SED type has been assigned
if not self.chip.isContainObj(ra_obj=param['ra'], dec_obj=param['dec'], margin=200):
# For shape calculation
if pSource_flag[igals]:
param['star'] = 4
obj = Star(param, logger=self.logger)
param['ell_total'] = np.sqrt(param['e1'] ** 2 + param['e2'] ** 2)
if param['ell_total'] > 0.9:
param['e1_disk'] = 0
param['e2_disk'] = 0
param['e1_bulge'] = 0
param['e2_bulge'] = 0
param['delta_ra'] = 0
param['delta_dec'] = 0
# Masses
# param['bulgemass'] = gals['bulgemass'][igals]
# param['diskmass'] = gals['diskmass'][igals]
# param['size'] = gals['size'][igals]
# if param['size'] > self.max_size:
# self.max_size = param['size']
# Sersic index
param['disk_sersic_idx'] = data['SERSIC_N'][igals]
param['bulge_sersic_idx'] = 1.
param['hlr_bulge'] = data['RE'][igals]
param['hlr_disk'] = data['RE'][igals]
param['bfrac'] = 0
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
param['star'] = 4
obj = Galaxy(param, logger=self.logger)
self.ids += 1
# param['id'] = self.ids
def _load(self, **kwargs):
self.objs = []
self.ids = 0
if "CALIB_cat" in self.config["catalog_options"]["input_path"] and \
"CALIB_cat"] and not self.config["catalog_options"]["star_only"]:
except Exception as e:
if self.logger is not None:"maximum galaxy size: %.4f"%(self.max_size))"number of objects in catalog: %d"%(len(self.objs)))
print("number of objects in catalog: ", len(self.objs))
def load_sed(self, obj, **kwargs):
if obj.type == 'calib':
data =,obj.SPEC_FN))
obj_w = data['WAVELENGTH']
obj_f = data['FLUX']
input_delt_w = np.min(obj_w[1:]-obj_w[0:-1])
if input_delt_w > 0.5:
lamb = np.arange(2000, 11000 + 0.5, 0.5)
speci = interpolate.interp1d(obj_w, obj_f)
y1 = speci(lamb)
lamb = obj_w
y1 = obj_f
# erg/s/cm2/A --> photon/s/m2/A
y1_phot = y1 * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, y1_phot]).T,
names=('WAVELENGTH', 'FLUX'))
sed_photon = np.array([sed['WAVELENGTH'], sed['FLUX']]).T
sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]),
sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False)
interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=self.filt.bandpass_full)
obj.param['mag_use_normal'] = getABMAG(interFlux, self.filt.bandpass_full)
return sed
raise ValueError("Object type not known")
# Configuration file for CSST simulation
# Overall settings
# CSST-Sim Group, 2024/01/08
# Base diretories and naming setup
# can add some of the command-line arguments here as well;
# ok to pass either way or both, as long as they are consistent
work_dir: "/home/zhangxin/CSST_SIM/CSST_sim_develop"
# run_name: "shutter_test"
run_name: "calib_test"
# Project cycle and run counter are used to name the outputs
project_cycle: 9
run_counter: 1
# Run options
use_mpi: YES
# Output catalog only?
# If yes, no imaging simulation will be run. Only the catalogs
# of corresponding footprints will be generated.
out_cat_only: NO
# Catalog setting
# Configure the input catalog: options should be implemented
# in the corresponding (user defined) 'Catalog' class
CALIB_cat: "/nfsdata/share/CSSOSDataProductsSims/data/calibration_data/PNE/calibrationPNESPEC_CHIP_ALL_SLS.fits"
# CALIB_cat: "/home/zhangxin/CSST_SIM/CSST_sim_develop/test_config/calibrationPNESPEC_CHIP2_GV_test.fits"
CALIB_SED: "/nfsdata/share/CSSOSDataProductsSims/data/calibration_data/PNE/spec/"
# Only simulate stars?
star_only: NO
# Only simulate galaxies?
galaxy_only: NO
# Observation setting
# (Optional) a file of point list
# if you just want to run default pointing:
# - pointing_dir: null
# - pointing_file: null
# pointing_file: "/nfsdata/share/CSSOSDataProductsSims/data/pointing_denseFiled.dat"
# pointing_file: "/nfsdata/share/CSSOSDataProductsSims/data/pointing_50_1_n.dat"
pointing_file: "/home/zhangxin/CSST_SIM/CSST_sim_develop/test_config/pointing_calib.dat"
# obs_config_file: "/home/zhangxin/CSST_SIM/CSST_sim_develop/test_config/obs_config_SCI_WIDE_phot.yaml"
# obs_config_file: "/home/zhangxin/CSST_SIM/CSST_sim_develop/test_config/obs_config_LEDFLAT_LED01_SHT-YES.yaml"
# Run specific pointing(s):
# - give a list of indexes of pointings: [ip_1, ip_2...]
# - run all pointings: null
# Note: only valid when a pointing list is specified
run_pointings: null
# Whether to enable astrometric modeling
enable_astrometric_model: YES
# Cut by saturation magnitude in which band?
cut_in_band: "z"
# saturation magnitude margin
mag_sat_margin: -2.5
# mag_sat_margin: -15.
# limiting magnitude margin
mag_lim_margin: +1.0
# PSF setting
# Which PSF model to use:
# "Gauss": simple gaussian profile
# "Interp": Interpolated PSF from sampled ray-tracing data
psf_model: "Interp"
# PSF size [arcseconds]
# radius of 80% energy encircled
# NOTE: only valid for "Gauss" PSF
psf_rcont: 0.3
# path to PSF data
# NOTE: only valid for "Interp" PSF
# PSF models for photometry survey simulation
psf_pho_dir: "/nfsdata/share/CSSOSDataProductsSims/data/psfcube/set1_dynamic/"
# PSF models for slitless spectrum survey simulation
# psf_sls_dir: "/nfsdata/share/CSSOSDataProductsSims/data/SLS_PSF_PCA_fp_orders/"
psf_sls_dir: "/nfsdata/share/CSSOSDataProductsSims/data/SLS_PSF_PCA_fp/"
# Shear setting
# Options to generate mock shear field:
# "constant": all galaxies are assigned a constant reduced shear
# "catalog": get shear values from catalog
shear_type: "constant"
# For constant shear field
reduced_g1: 0.
reduced_g2: 0.
# Output options
output_format: "channels" # Whether to export as 16 channels (subimages) with pre- and over-scan ("image"/"channels")
shutter_output: NO # Whether to export shutter effect 16-bit image
prnu_output: NO # Whether to export the PRNU (pixel-to-pixel flat-fielding) files
# Random seeds
seed_poisson: 20210601 # Seed for Poisson noise
seed_CR: 20210317 # Seed for generating random cosmic ray maps
seed_flat: 20210101 # Seed for generating random flat fields
seed_prnu: 20210102 # Seed for photo-response non-uniformity
seed_gainNonUniform: 20210202 # Seed for gain nonuniformity
seed_biasNonUniform: 20210203 # Seed for bias nonuniformity
seed_rnNonUniform: 20210204 # Seed for readout-noise nonuniformity
seed_badcolumns: 20240309 # Seed for bad columns
seed_defective: 20210304 # Seed for defective (bad) pixels
seed_readout: 20210601 # Seed for read-out gaussian noise
\ No newline at end of file
# Configuration file for CSST simulation
# For single exposure type:
# CSST-Sim Group, 2024/01/08
# Observation type
obs_type: "SCI"
obs_type_code: "101"
obs_id: "00000001" # this setting will only be used if pointing list file is not given
# Define list of chips
# run_chips: [6,7,8,9,11,12,13,14,15,16,17,18,19,20,22,23,24,25] # Photometric chips
#run_chips: [1,2,3,4,5,10,21,26,27,28,29,30] # Spectroscopic chips
run_chips: [1]
# Define observation sequence
# Accumulate fluxes from objects
# [Optional]: exposure time of the pointing will be used as default.
# Set it here is you want to override the default
# exptime: 150. # [s]
shutter_effect: YES
flat_fielding: YES
field_dist: NO
# Accumulate fluxes from sky background
# sky_background:
# # [Optional]: exposure time of the pointing will be used as default.
# # Set it here is you want to override the default
# # exptime: 150. # [s]
# shutter_effect: YES
# flat_fielding: YES
# enable_straylight_model: NO
# # flat_level: set the total skybackground value (e-) in the exptime,if none,set null, or delete the key
# # flat_level_filt: the vale of "flat_level" is in the filter "flat_level_filt", can set NUV, u, g, r, i, z, y, if
# # none,set null,or delete the key
# flat_level: 0.1
# flat_level_filt: g
# Apply PRNU to accumulated photons
PRNU_effect: {}
# Accumulate photons caused by cosmic rays
# cosmic_rays:
# # [Optional]: exposure time of the pointing will be used as default.
# # Set it here is you want to override the default
# # exptime: 150. # [s]
# save_cosmic_img: YES # # Whether to export cosmic ray image
# # Add Poission noise and dark current
# [Optional]: exposure time of the pointing will be used as default.
# Set it here is you want to override the default
# exptime: 150. # [s]
add_dark: YES
# Simulate brighter fatter effects
bright_fatter: {}
# Add detector defects: hot/warm pixels, bad columns
# detector_defects:
# hot_pixels: YES
# dead_pixels: YES
# bad_columns: YES
# Apply response nonlinearity
nonlinearity: {}
# Apply CCD Saturation & Blooming
blooming: {}
# Run CTE simulation
# CTE_effect: {}
# Add prescan and overscan
# prescan_overscan:
# add_dark: YES
# Add bias
bias_16channel: NO
# Add readout noise
readout_noise: {}
# Apply gain
gain_16channel: NO
# Output the final image
format_output: NO
