from astropy.table import Table import astropy.constants as cons import collections from collections import OrderedDict from scipy import interpolate from astropy import units as u # from numpy import * import numpy as np from pylab import * import galsim import sys import os def rotate90(array_orig=None, xc=0, yc=0, isClockwise=0): if array_orig is None: return l1 = array_orig.shape[0] l2 = array_orig.shape[1] if xc < 0 or xc > l2 - 1 or yc < 0 or yc > l1 - 1: return n_xc = xc n_yc = yc array_final = np.zeros_like(array_orig.T) if isClockwise == 1: for i in np.arange(l2): array_final[i] = array_orig[:, l2 - i - 1] n_xc = yc n_yc = l2 - 1 - xc else: for i in np.arange(l2): array_final[i] = array_orig[::-1, i] n_xc = l1 - 1 - yc n_yc = xc return array_final, n_xc, n_yc class SpecDisperser(object): def __init__( self, orig_img=None, xcenter=0, ycenter=0, origin=[100, 100], tar_spec=None, band_start=6200, band_end=10000, isAlongY=0, conf="../param/CONF/csst.conf", gid=0, flat_cube=None, ignoreBeam=[], ): """ orig_img: normal image,galsim image xcenter, ycenter: the center of galaxy in orig_img origin : [int, int] `origin` defines the lower left pixel index (y,x) of the `direct` cutout from a larger detector-frame image tar_spec: galsim.SED """ # self.img_x = orig_img.shape[1] # self.img_y = orig_img.shape[0] # 5 orders, A, B , orderName = ["A", "B", "C", "D", "E"] self.orig_img_orders = OrderedDict() if isinstance(orig_img, list): orig_img_list = orig_img list_len = len(orig_img_list) if list_len < 5: for i in np.arange(5 - list_len): orig_img_list.append(orig_img_list[list_len - 1]) for i, k in enumerate(orig_img_list): self.orig_img_orders[orderName[i]] = k if isinstance(orig_img, galsim.Image): for i in np.arange(5): self.orig_img_orders[orderName[i]] = orig_img orig_img_one = self.orig_img_orders["A"] self.thumb_img = np.abs(orig_img_one.array) self.thumb_x = orig_img_one.center.x self.thumb_y = orig_img_one.center.y self.img_sh = orig_img_one.array.shape self.id = gid self.xcenter = xcenter self.ycenter = ycenter self.isAlongY = isAlongY self.flat_cube = flat_cube if self.isAlongY == 1: for order in orderName: self.orig_img_orders[order], self.thumb_x, self.thumb_y = rotate90( array_orig=self.orig_img_orders[order], xc=orig_img_one.center.x, yc=orig_img_one.center.y, isClockwise=1, ) # self.thumb_img, self.thumb_x, self.thumb_y = rotate90(array_orig=self.thumb_img, xc=orig_img_one.center.x, # yc=orig_img_one.center.y, isClockwise=1) self.img_sh = self.orig_img_orders[order].array.T.shape self.xcenter = ycenter self.ycenter = xcenter self.origin = origin self.band_start = band_start self.band_end = band_end self.spec = tar_spec self.beam_flux = OrderedDict() self.grating_conf = aXeConf(conf) self.grating_conf.get_beams() self.grating_conf_file = conf self.ignoreBeam = ignoreBeam def compute_spec_orders(self): all_orders = OrderedDict() beam_names = self.grating_conf.beams for beam in beam_names: if beam in self.ignoreBeam: continue all_orders[beam] = self.compute_spec(beam) return all_orders def compute_spec(self, beam): # if beam == "B": # return self.thumb_img, self.origin[1], self.origin[0], None, None, None from .disperse_c import interp from .disperse_c import disperse # from MockObject.disperse_c import disperse self.thumb_img = np.abs(self.orig_img_orders[beam].array) self.thumb_x = self.orig_img_orders[beam].center.x self.thumb_y = self.orig_img_orders[beam].center.y self.img_sh = self.orig_img_orders[beam].array.shape dx = self.grating_conf.dxlam[beam] xoff = 0 ytrace_beam, lam_beam = self.grating_conf.get_beam_trace( x=self.xcenter, y=self.ycenter, dx=(dx + xoff), beam=beam ) # Account for pixel centering of the trace yfrac_beam = ytrace_beam - floor(ytrace_beam + 0.5) ysens = lam_beam * 0 lam_index = argsort(lam_beam) conf_sens = self.grating_conf.sens[beam] lam_intep = np.linspace(self.band_start, self.band_end, int((self.band_end - self.band_start) / 0.1)) thri = interpolate.interp1d(conf_sens["WAVELENGTH"], conf_sens["SENSITIVITY"]) spci = interpolate.interp1d(self.spec["WAVELENGTH"], self.spec["FLUX"]) beam_thr = thri(lam_intep) spec_sample = spci(lam_intep) bean_thr_spec = beam_thr * spec_sample # generate sensitivity file for aXe # ysensitivity = lam_beam * 0 # # ysensitivity[lam_index] = interp.interp_conserve_c(lam_beam[lam_index], lam_intep, # beam_thr * math.pi * 100 * 100 * 1e-7 / ( # cons.h.value * cons.c.value / ( # lam_intep * 1e-10)), integrate=0, left=0, # right=0) # # self.writerSensitivityFile(conffile = self.grating_conf_file, beam = beam, w = lam_beam[lam_index], sens = ysensitivity[lam_index]) ysens[lam_index] = interp.interp_conserve_c( lam_beam[lam_index], lam_intep, bean_thr_spec, integrate=1, left=0, right=0 ) sensitivity_beam = ysens len_spec_x = len(dx) len_spec_y = int(abs(ceil(ytrace_beam[-1]) - floor(ytrace_beam[0])) + 1) beam_sh = (self.img_sh[0] + len_spec_y, self.img_sh[1] + len_spec_x) modelf = zeros(product(beam_sh), dtype=float) model = modelf.reshape(beam_sh) idx = np.arange(modelf.size, dtype=int64).reshape(beam_sh) x0 = array((self.thumb_y, self.thumb_x), dtype=int64) dxpix = dx - dx[0] + x0[1] dyc = cast[int](np.floor(ytrace_beam + 0.5)) # dypix = cast[int](np.floor(ytrace_beam - dyc[0] + x0[0] + 0.5)) dypix = dyc - dyc[0] + x0[0] frac_ids = yfrac_beam < 0 dypix[frac_ids] = dypix[frac_ids] - 1 yfrac_beam[frac_ids] = 1 + yfrac_beam[frac_ids] flat_index = idx[dypix, dxpix] nonz = sensitivity_beam != 0 origin_in = zeros_like(self.origin) dx0_in = dx[0] dy0_in = dyc[0] if self.isAlongY == 1: origin_in[0] = self.origin[0] origin_in[1] = self.origin[1] - len_spec_y dx0_in = -dyc[0] dy0_in = dx[0] else: origin_in[0] = self.origin[0] origin_in[1] = self.origin[1] dx0_in = dx[0] dy0_in = dyc[0] originOut_x = origin_in[1] + dx0_in originOut_y = origin_in[0] + dy0_in if self.flat_cube is None: beam_flat = None else: beam_flat = zeros([len(modelf), len(self.flat_cube)]) sub_flat_cube = zeros([len(self.flat_cube), beam_sh[0], beam_sh[1]]) sub_flat_cube[0] = sub_flat_cube[0] + 1.0 overlap_flag = 1 sub_y_s = originOut_y sub_y_e = originOut_y + beam_sh[0] - 1 sub_x_s = originOut_x sub_x_e = originOut_x + beam_sh[1] - 1 beam_x_s = max(sub_x_s, 0) if beam_x_s > self.flat_cube[0].shape[1] - 1: overlap_flag = 0 if overlap_flag == 1: beam_x_e = min(sub_x_e, self.flat_cube[0].shape[1] - 1) if beam_x_e < 0: overlap_flag = 0 if overlap_flag == 1: beam_y_s = max(sub_y_s, 0) if beam_y_s > self.flat_cube[0].shape[0] - 1: overlap_flag = 0 if overlap_flag == 1: beam_y_e = min(sub_y_e, self.flat_cube[0].shape[0] - 1) if beam_y_e < 0: overlap_flag = 0 if overlap_flag == 1: sub_flat_cube[ :, beam_y_s - originOut_y : beam_y_e - originOut_y + 1, beam_x_s - originOut_x : beam_x_e - originOut_x + 1, ] = self.flat_cube[:, beam_y_s : beam_y_e + 1, beam_x_s : beam_x_e + 1] for i in arange(0, len(self.flat_cube), 1): beam_flat[:, i] = sub_flat_cube[i].flatten() # beam_flat = zeros([len(modelf), len(self.flat_cube)]) # flat_sh = self.flat_cube[0].shape # for i in arange(0, beam_sh[0], 1): # for j in arange(0, beam_sh[1], 1): # k = i * beam_sh[1] + j # if originOut_y + i >= flat_sh[0] or originOut_y + i < 0 or originOut_x + j>= flat_sh[1] or originOut_x+j < 0: # temp_bf = np.zeros_like(self.flat_cube[:, 0, 0]) # temp_bf[0] = 1.0 # beam_flat[k] = temp_bf # else: # beam_flat[k] = self.flat_cube[:, originOut_y + i, originOut_x + j] status = disperse.disperse_grism_object( self.thumb_img.astype(np.float32), flat_index[nonz], yfrac_beam[nonz], sensitivity_beam[nonz], modelf, x0, array(self.img_sh, dtype=int64), array(beam_sh, dtype=int64), beam_flat, lam_beam[lam_index][nonz], ) model = modelf.reshape(beam_sh) # n1 = np.sum(np.isinf(model)) # n2 = np.sum(np.isnan(model)) # n3 = np.sum(np.isinf(modelf)) # n4 = np.sum(np.isnan(modelf)) # if n1>0 or n2 > 0: # print("DEBUG: SpecDisperser, inf:%d, nan:%d--------%d,%d"%(n1, n2, n3, n4)) # print(dypix) # n1 = np.sum(np.isinf(self.thumb_img.astype(np.float32))) # n2 = np.sum(np.isnan(self.thumb_img.astype(np.float32))) # n3 = np.sum(np.isinf(yfrac_beam)) # n4 = np.sum(np.isnan(yfrac_beam)) # n5 = np.sum(np.isinf(sensitivity_beam)) # n6 = np.sum(np.isnan(sensitivity_beam)) # print("DEBUG: SpecDisperser, innput ---inf:%d, nan:%d, yfrac_beam:%d/%d, sensitivity_beam:%d/%d"%(n1, n2, n3, n4, n5, n6)) self.beam_flux[beam] = sum(modelf) if self.isAlongY == 1: model, _, _ = rotate90(array_orig=model, isClockwise=0) return model, originOut_x, originOut_y, dxpix, dypix, lam_beam, ysens def writerSensitivityFile(self, conffile="", beam="", w=None, sens=None): orders = {"A": "1st", "B": "0st", "C": "2st", "D": "-1st", "E": "-2st"} sens_file_name = conffile[0:-5] + "_sensitivity_" + orders[beam] + ".fits" if not os.path.exists(sens_file_name) == True: senstivity_out = Table(array([w, sens]).T, names=("WAVELENGTH", "SENSITIVITY")) senstivity_out.write(sens_file_name, format="fits") """ Demonstrate aXe trace polynomials. """ class aXeConf: def __init__(self, conf_file="WFC3.IR.G141.V2.5.conf"): """Read an aXe-compatible configuration file Parameters ---------- conf_file: str Filename of the configuration file to read """ if conf_file is not None: self.conf = self.read_conf_file(conf_file) self.conf_file = conf_file self.count_beam_orders() # Global XOFF/YOFF offsets if "XOFF" in self.conf.keys(): self.xoff = np.float(self.conf["XOFF"]) else: self.xoff = 0.0 if "YOFF" in self.conf.keys(): self.yoff = np.float(self.conf["YOFF"]) else: self.yoff = 0.0 def read_conf_file(self, conf_file="WFC3.IR.G141.V2.5.conf"): """Read an aXe config file, convert floats and arrays Parameters ---------- conf_file: str Filename of the configuration file to read. Parameters are stored in an OrderedDict in `self.conf`. """ from collections import OrderedDict conf = OrderedDict() lines = open(conf_file).readlines() for line in lines: # empty / commented lines if (line.startswith("#")) | (line.strip() == "") | ('"' in line): continue # split the line, taking out ; and # comments spl = line.split(";")[0].split("#")[0].split() param = spl[0] if len(spl) > 2: value = np.cast[float](spl[1:]) else: try: value = float(spl[1]) except: value = spl[1] conf[param] = value return conf def count_beam_orders(self): """Get the maximum polynomial order in DYDX or DLDP for each beam""" self.orders = {} for beam in ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"]: order = 0 while "DYDX_{0:s}_{1:d}".format(beam, order) in self.conf.keys(): order += 1 while "DLDP_{0:s}_{1:d}".format(beam, order) in self.conf.keys(): order += 1 self.orders[beam] = order - 1 def get_beams(self): """Get beam parameters and read sensitivity curves""" import os from collections import OrderedDict from astropy.table import Table, Column self.dxlam = OrderedDict() self.nx = OrderedDict() self.sens = OrderedDict() self.beams = [] for beam in self.orders: if self.orders[beam] > 0: self.beams.append(beam) self.dxlam[beam] = np.arange( self.conf["BEAM{0}".format(beam)].min(), self.conf["BEAM{0}".format(beam)].max(), dtype=int ) self.nx[beam] = int(self.dxlam[beam].max() - self.dxlam[beam].min()) + 1 self.sens[beam] = Table.read( "{0}/{1}".format(os.path.dirname(self.conf_file), self.conf["SENSITIVITY_{0}".format(beam)]) ) # self.sens[beam].wave = np.cast[np.double](self.sens[beam]['WAVELENGTH']) # self.sens[beam].sens = np.cast[np.double](self.sens[beam]['SENSITIVITY']) # Need doubles for interpolating functions for col in self.sens[beam].colnames: data = np.cast[np.double](self.sens[beam][col]) self.sens[beam].remove_column(col) self.sens[beam].add_column(Column(data=data, name=col)) self.beams.sort() def field_dependent(self, xi, yi, coeffs): """aXe field-dependent coefficients See the `aXe manual `_ for a description of how the field-dependent coefficients are specified. Parameters ---------- xi, yi : float or array-like Coordinate to evaluate the field dependent coefficients, where `xi = x-REFX` and `yi = y-REFY`. coeffs : array-like Field-dependency coefficients Returns ------- a : float or array-like Evaluated field-dependent coefficients """ # number of coefficients for a given polynomial order # 1:1, 2:3, 3:6, 4:10, order:order*(order+1)/2 if isinstance(coeffs, float): order = 1 else: order = int(-1 + np.sqrt(1 + 8 * len(coeffs))) // 2 # Build polynomial terms array # $a = a_0+a_1x_i+a_2y_i+a_3x_i^2+a_4x_iy_i+a_5yi^2+$ ... xy = [] for p in range(order): for px in range(p + 1): # print 'x**%d y**%d' %(p-px, px) xy.append(xi ** (p - px) * yi ** (px)) # Evaluate the polynomial, allowing for N-dimensional inputs a = np.sum((np.array(xy).T * coeffs).T, axis=0) return a def evaluate_dp(self, dx, dydx): """Evalate arc length along the trace given trace polynomial coefficients Parameters ---------- dx : array-like x pixel to evaluate dydx : array-like Coefficients of the trace polynomial Returns ------- dp : array-like Arc length along the trace at position `dx`. For `dydx` polynomial orders 0, 1 or 2, integrate analytically. Higher orders must be integrated numerically. **Constant:** .. math:: dp = dx **Linear:** .. math:: dp = \sqrt{1+\mathrm{DYDX}[1]}\cdot dx **Quadratic:** .. math:: u = \mathrm{DYDX}[1] + 2\ \mathrm{DYDX}[2]\cdot dx .. math:: dp = (u \sqrt{1+u^2} + \mathrm{arcsinh}\ u) / (4\cdot \mathrm{DYDX}[2]) """ # dp is the arc length along the trace # $\lambda = dldp_0 + dldp_1 dp + dldp_2 dp^2$ ... poly_order = len(dydx) - 1 if poly_order == 2: if np.abs(np.unique(dydx[2])).max() == 0: poly_order = 1 if poly_order == 0: # dy=0 dp = dx elif poly_order == 1: # constant dy/dx dp = np.sqrt(1 + dydx[1] ** 2) * (dx) elif poly_order == 2: # quadratic trace u0 = dydx[1] + 2 * dydx[2] * (0) dp0 = (u0 * np.sqrt(1 + u0**2) + np.arcsinh(u0)) / (4 * dydx[2]) u = dydx[1] + 2 * dydx[2] * (dx) dp = (u * np.sqrt(1 + u**2) + np.arcsinh(u)) / (4 * dydx[2]) - dp0 else: # high order shape, numerical integration along trace # (this can be slow) xmin = np.minimum((dx).min(), 0) xmax = np.maximum((dx).max(), 0) xfull = np.arange(xmin, xmax) dyfull = 0 for i in range(1, poly_order): dyfull += i * dydx[i] * (xfull - 0.5) ** (i - 1) # Integrate from 0 to dx / -dx dpfull = xfull * 0.0 lt0 = xfull < 0 if lt0.sum() > 1: dpfull[lt0] = np.cumsum(np.sqrt(1 + dyfull[lt0][::-1] ** 2))[::-1] dpfull[lt0] *= -1 # gt0 = xfull > 0 if gt0.sum() > 0: dpfull[gt0] = np.cumsum(np.sqrt(1 + dyfull[gt0] ** 2)) dp = np.interp(dx, xfull, dpfull) if dp[-1] == dp[-2]: dp[-1] = dp[-2] + np.diff(dp)[-2] return dp def get_beam_trace(self, x=507, y=507, dx=0.0, beam="A"): """Get an aXe beam trace for an input reference pixel and list of output x pixels `dx` Parameters ---------- x, y : float or array-like Evaluate trace definition at detector coordinates `x` and `y`. dx : float or array-like Offset in x pixels from `(x,y)` where to compute trace offset and effective wavelength beam : str Beam name (i.e., spectral order) to compute. By aXe convention, `beam='A'` is the first order, 'B' is the zeroth order and additional beams are the higher positive and negative orders. Returns ------- dy : float or array-like Center of the trace in y pixels offset from `(x,y)` evaluated at `dx`. lam : float or array-like Effective wavelength along the trace evaluated at `dx`. """ NORDER = self.orders[beam] + 1 xi, yi = x - self.xoff, y - self.yoff xoff_beam = self.field_dependent(xi, yi, self.conf["XOFF_{0}".format(beam)]) yoff_beam = self.field_dependent(xi, yi, self.conf["YOFF_{0}".format(beam)]) # y offset of trace (DYDX) dydx = np.zeros(NORDER) # 0 #+1.e-80 dydx = [0] * NORDER for i in range(NORDER): if "DYDX_{0:s}_{1:d}".format(beam, i) in self.conf.keys(): coeffs = self.conf["DYDX_{0:s}_{1:d}".format(beam, i)] dydx[i] = self.field_dependent(xi, yi, coeffs) # $dy = dydx_0+dydx_1 dx+dydx_2 dx^2+$ ... dy = yoff_beam for i in range(NORDER): dy += dydx[i] * (dx - xoff_beam) ** i # wavelength solution dldp = np.zeros(NORDER) dldp = [0] * NORDER for i in range(NORDER): if "DLDP_{0:s}_{1:d}".format(beam, i) in self.conf.keys(): coeffs = self.conf["DLDP_{0:s}_{1:d}".format(beam, i)] dldp[i] = self.field_dependent(xi, yi, coeffs) self.eval_input = {"x": x, "y": y, "beam": beam, "dx": dx} self.eval_output = { "xi": xi, "yi": yi, "dldp": dldp, "dydx": dydx, "xoff_beam": xoff_beam, "yoff_beam": yoff_beam, "dy": dy, } dp = self.evaluate_dp(dx - xoff_beam, dydx) # ## dp is the arc length along the trace # ## $\lambda = dldp_0 + dldp_1 dp + dldp_2 dp^2$ ... # if self.conf['DYDX_ORDER_%s' %(beam)] == 0: ## dy=0 # dp = dx-xoff_beam # elif self.conf['DYDX_ORDER_%s' %(beam)] == 1: ## constant dy/dx # dp = np.sqrt(1+dydx[1]**2)*(dx-xoff_beam) # elif self.conf['DYDX_ORDER_%s' %(beam)] == 2: ## quadratic trace # u0 = dydx[1]+2*dydx[2]*(0) # dp0 = (u0*np.sqrt(1+u0**2)+np.arcsinh(u0))/(4*dydx[2]) # u = dydx[1]+2*dydx[2]*(dx-xoff_beam) # dp = (u*np.sqrt(1+u**2)+np.arcsinh(u))/(4*dydx[2])-dp0 # else: # ## high order shape, numerical integration along trace # ## (this can be slow) # xmin = np.minimum((dx-xoff_beam).min(), 0) # xmax = np.maximum((dx-xoff_beam).max(), 0) # xfull = np.arange(xmin, xmax) # dyfull = 0 # for i in range(1, NORDER): # dyfull += i*dydx[i]*(xfull-0.5)**(i-1) # # ## Integrate from 0 to dx / -dx # dpfull = xfull*0. # lt0 = xfull <= 0 # if lt0.sum() > 1: # dpfull[lt0] = np.cumsum(np.sqrt(1+dyfull[lt0][::-1]**2))[::-1] # dpfull[lt0] *= -1 # # # gt0 = xfull >= 0 # if gt0.sum() > 0: # dpfull[gt0] = np.cumsum(np.sqrt(1+dyfull[gt0]**2)) # # dp = np.interp(dx-xoff_beam, xfull, dpfull) # Evaluate dldp lam = dp * 0.0 for i in range(NORDER): lam += dldp[i] * dp**i return dy, lam def show_beams(self, beams=["E", "D", "C", "B", "A"]): """ Make a demo plot of the beams of a given configuration file """ import matplotlib.pyplot as plt x0, x1 = 507, 507 dx = np.arange(-800, 1200) if "WFC3.UV" in self.conf_file: x0, x1 = 2073, 250 dx = np.arange(-1200, 1200) if "G800L" in self.conf_file: x0, x1 = 2124, 1024 dx = np.arange(-1200, 1200) s = 200 # marker size fig = plt.figure(figsize=[10, 3]) plt.scatter(0, 0, marker="s", s=s, color="black", edgecolor="0.8", label="Direct") for beam in beams: if "XOFF_{0}".format(beam) not in self.conf.keys(): continue xoff = self.field_dependent(x0, x1, self.conf["XOFF_{0}".format(beam)]) dy, lam = self.get_beam_trace(x0, x1, dx=dx, beam=beam) xlim = self.conf["BEAM{0}".format(beam)] ok = (dx >= xlim[0]) & (dx <= xlim[1]) plt.scatter(dx[ok] + xoff, dy[ok], c=lam[ok] / 1.0e4, marker="s", s=s, alpha=0.5, edgecolor="None") plt.text(np.median(dx[ok]), np.median(dy[ok]) + 1, beam, ha="center", va="center", fontsize=14) print("Beam {0}, lambda=({1:.1f} - {2:.1f})".format(beam, lam[ok].min(), lam[ok].max())) plt.grid() plt.xlabel(r"$\Delta x$") plt.ylabel(r"$\Delta y$") cb = plt.colorbar(pad=0.01, fraction=0.05) cb.set_label(r"$\lambda\,(\mu\mathrm{m})$") plt.title(self.conf_file) plt.tight_layout() plt.savefig("{0}.pdf".format(self.conf_file)) # def load_grism_config(conf_file): # """Load parameters from an aXe configuration file # Parameters # ---------- # conf_file : str # Filename of the configuration file # Returns # ------- # conf : `~grizli.grismconf.aXeConf` # Configuration file object. Runs `conf.get_beams()` to read the # sensitivity curves. # """ # conf = aXeConf(conf_file) # conf.get_beams() # return conf