import unittest
import numpy as np
import galsim
import os
import sys
from astropy.table import Table
from scipy import interpolate
import pickle
class test_field_distortion(unittest.TestCase):
def __init__(self, methodName="runTest"):
super(test_field_distortion, self).__init__(methodName)
self.dataMainPath = os.path.join(
os.getenv("UNIT_TEST_DATA_ROOT"), "csst_msc_sim/field_distortion"
self.dataInputPath = os.path.join(self.dataMainPath, "input_catalog")
self.fdModelName = "FieldDistModel_v2.0_test.pickle"
def test_fd_model(self):
cat_dir = self.dataInputPath
model_dir = self.dataMainPath
model_date = "2024-05-08"
model_name = self.fdModelName
def test_fd_apply(self):
model_name = self.fdModelName
model_dir = self.dataMainPath
cat_dir = self.dataMainPath
model_name, model_dir, cat_dir, ra_cen=60.0, dec_cen=-40.0, img_rot=0.0
def ccdParam():
Basic CCD size and noise parameters.
# CCD size
xt, yt = 59516, 49752
x0, y0 = 9216, 9232
xgap, ygap = (534, 1309), 898
xnchip, ynchip = 6, 5
ccdSize = xt, yt, x0, y0, xgap, ygap, xnchip, ynchip
# other parameters
readNoise = 5.0 # e/pix
darkNoise = 0.02 # e/pix/s
pixel_scale = 0.074 # pixel scale
gain = 1.0
ccdBase = readNoise, darkNoise, pixel_scale, gain
return ccdSize, ccdBase
def chipLim(chip):
ccdSize, ccdBase = ccdParam()
xt, yt, x0, y0, gx, gy, xnchip, ynchip = ccdSize
gx1, gx2 = gx
rowID = ((chip - 1) % 5) + 1
colID = 6 - ((chip - 1) // 5)
xrem = 2 * (colID - 1) - (xnchip - 1)
xcen = (x0 // 2 + gx1 // 2) * xrem
if chip <= 5 or chip == 10:
xcen = (x0 // 2 + gx1 // 2) * xrem + (gx2 - gx1)
if chip >= 26 or chip == 21:
xcen = (x0 // 2 + gx1 // 2) * xrem - (gx2 - gx1)
nx0 = xcen - x0 // 2 + 1
nx1 = xcen + x0 // 2
yrem = (rowID - 1) - ynchip // 2
ycen = (y0 + gy) * yrem
ny0 = ycen - y0 // 2 + 1
ny1 = ycen + y0 // 2
return nx0, nx1, ny0, ny1
def chip_filter(nchip):
return filter name of a given chip
filtype = ["nuv", "u", "g", "r", "i", "z", "y"]
# updated configurations
# if nchip>24 or nchip<7: raise ValueError("!!! Chip ID: [7,24]")
if nchip in [6, 15, 16, 25]:
filter_name = "y"
if nchip in [11, 20]:
filter_name = "z"
if nchip in [7, 24]:
filter_name = "i"
if nchip in [14, 17]:
filter_name = "u"
if nchip in [9, 22]:
filter_name = "r"
if nchip in [12, 13, 18, 19]:
filter_name = "nuv"
if nchip in [8, 23]:
filter_name = "g"
filter_id = filtype.index(filter_name)
return filter_id, filter_name
def skyLim(wcs, x0, x1, y0, y1):
The sky coverage of a single exposure image
r2d = 180.0 / np.pi
# xt, yt, x0, y0, gx, gy, xnchip, ynchip = ccdSize()
s1 = wcs.toWorld(galsim.PositionD(x0, y0))
s2 = wcs.toWorld(galsim.PositionD(x0, y1))
s3 = wcs.toWorld(galsim.PositionD(x1, y0))
s4 = wcs.toWorld(galsim.PositionD(x1, y1))
ra = [s1.ra.rad * r2d, s2.ra.rad * r2d, s3.ra.rad * r2d, s4.ra.rad * r2d]
dec = [s1.dec.rad * r2d, s2.dec.rad * r2d, s3.dec.rad * r2d, s4.dec.rad * r2d]
return min(ra), max(ra), min(dec), max(dec)
def wcsMain(imgRotation=0.0, raCenter=0.0, decCenter=0.0):
ccdSize, ccdBase = ccdParam()
xsize, ysize, _, _, _, _, _, _ = ccdSize
_, _, pixelScale, _ = ccdBase
xmcen, ymcen = 0.0, 0.0
imrot = imgRotation * galsim.degrees
racen = raCenter * galsim.degrees
deccen = decCenter * galsim.degrees
# define the wcs
dudx = -np.cos(imrot.rad) * pixelScale
dudy = +np.sin(imrot.rad) * pixelScale
dvdx = -np.sin(imrot.rad) * pixelScale
dvdy = -np.cos(imrot.rad) * pixelScale
moscen = galsim.PositionD(x=xmcen, y=ymcen)
skyCenter = galsim.CelestialCoord(ra=racen, dec=deccen)
affine = galsim.AffineTransform(dudx, dudy, dvdx, dvdy, origin=moscen)
wcs = galsim.TanWCS(affine, skyCenter, units=galsim.arcsec)
return wcs
# FD model
def field_distortion_model(
# default parameter setup
nccd, nwave, npsf = 30, 4, 30 * 30
# load a CSST-like wcs
wcs = wcsMain()
cd11, cd12 =[0, 0],[0, 1]
cd21, cd22 =[1, 0],[1, 1]
xmcen, ymcen = wcs.crpix
# obtain the interpolation model
fdFunList = {}
fdFunList["date"] = model_date
for iwave in range(1, nwave + 1):
# if iwave!=1: continue
iwaveKey = "wave%d" % iwave
# first construct the global interpolation
xwList, ywList = [], []
xdList, ydList = [], []
fdFunList[iwaveKey] = {}
for iccd in range(1, nccd + 1):
# if iccd!=9: continue
iccdKey = "ccd" + str("0%d" % (iccd))[-2:]
# load PSF data
ipsfDatn = cat_dir + "ccd%d_%s.dat" % (iccd, iwaveKey)
ipsfDat =, format="ascii")
for ipsf in range(1, npsf + 1):
# if ipsf!=2: continue
xField = ipsfDat["field_x"][ipsf - 1]
yField = ipsfDat["field_y"][ipsf - 1]
# image coordinate with field distortion
xImage = 100.0 * (
ipsfDat["image_x"][ipsf - 1] + ipsfDat["centroid_x"][ipsf - 1]
yImage = 100.0 * (
ipsfDat["image_y"][ipsf - 1] + ipsfDat["centroid_y"][ipsf - 1]
# image coordinate only with wcs projection
xwcs = (cd12 * yField - cd22 * xField) / (
cd12 * cd21 - cd11 * cd22
) + xmcen
ywcs = (cd21 * xField - cd11 * yField) / (
cd12 * cd21 - cd11 * cd22
) + ymcen
xwList += [xwcs]
ywList += [ywcs]
xdList += [xImage]
ydList += [yImage]
# global interpolation
xImageFun = interpolate.SmoothBivariateSpline(
xwList, ywList, xdList, kx=poly_degree, ky=poly_degree
yImageFun = interpolate.SmoothBivariateSpline(
xwList, ywList, ydList, kx=poly_degree, ky=poly_degree
fdFunList[iwaveKey] = {
"xImagePos": xImageFun,
"yImagePos": yImageFun,
"interpLimit": [
# construct the residual interpolation
fdFunList[iwaveKey]["residual"] = {}
for iccd in range(1, nccd + 1):
# if iccd!=1: continue
iccdKey = "ccd" + str("0%d" % (iccd))[-2:]
# open the ditortion data
ipsfDatn = cat_dir + "ccd%d_%s.dat" % (iccd, iwaveKey)
ipsfDat =, format="ascii")
ixwList, iywList = [], []
idxList, idyList = [], []
for ipsf in range(1, npsf + 1):
# if ipsf!=1: continue
"^_^ loading: iccd-{:} iwave-{:} ipsf-{:}".format(iccd, iwave, ipsf)
xField = ipsfDat["field_x"][ipsf - 1]
yField = ipsfDat["field_y"][ipsf - 1]
xImage = 100.0 * (
ipsfDat["image_x"][ipsf - 1] + ipsfDat["centroid_x"][ipsf - 1]
yImage = 100.0 * (
ipsfDat["image_y"][ipsf - 1] + ipsfDat["centroid_y"][ipsf - 1]
# image coordinate only with wcs projection
xwcs = (cd12 * yField - cd22 * xField) / (
cd12 * cd21 - cd11 * cd22
) + xmcen
ywcs = (cd21 * xField - cd11 * yField) / (
cd12 * cd21 - cd11 * cd22
) + ymcen
ixPred = xImageFun(xwcs, ywcs)[0][0]
iyPred = yImageFun(xwcs, ywcs)[0][0]
idx = xImage - ixPred
idy = yImage - iyPred
# print(idx, idy)
ixwList += [xwcs]
iywList += [ywcs]
idxList += [idx]
idyList += [idy]
# interpolation
xResFun = interpolate.SmoothBivariateSpline(
ixwList, iywList, idxList, kx=poly_degree, ky=poly_degree
yResFun = interpolate.SmoothBivariateSpline(
ixwList, iywList, idyList, kx=poly_degree, ky=poly_degree
fdFunList[iwaveKey]["residual"][iccdKey] = {
"xResidual": xResFun,
"yResidual": yResFun,
"interpLimit": [
# save the interpolation functions
model_name_full = os.path.join(model_dir, model_name)
with open(model_name_full, "wb") as out:
pickle.dump(fdFunList, out, pickle.HIGHEST_PROTOCOL)
def field_distortion_apply(
model_name, model_dir, cat_dir, ra_cen=60.0, dec_cen=-40.0, img_rot=0.0
# CCD and observation
ccdSize, ccdBase = ccdParam()
xsize, ysize, xchip, ychip, xgap, ygap, xnchip, ynchip = ccdSize
nchip = xnchip * ynchip
xmcen, ymcen = 0.0, 0.0
badchip = list(range(1, 6)) + list(range(26, 31)) + [10, 21]
# define the wcs of the image mosaic
"^_^ Construct the wcs of the entire image mosaic using Gnomonic/TAN projection"
wcs = wcsMain(imgRotation=img_rot, raCenter=ra_cen, decCenter=dec_cen)
# load the field distortion model
model_name_full = os.path.join(model_dir, model_name)
with open(model_name_full, "rb") as f:
fdModel = pickle.load(f)
raLow, raUp, decLow, decUp = skyLim(
wcs, -xsize // 2 + 1, xsize // 2, -ysize // 2 + 1, ysize // 2
dra = (raUp - raLow) * np.cos(dec_cen * np.pi / 180.0)
ddec = decUp - decLow
" Image pixel size: %d*%d; center: (Ra, Dec)=(%.3f, %.3f)."
% (xsize, ysize, ra_cen, dec_cen)
print(" Field of Veiw: %.2f * %.2f deg^2." % (dra, ddec))
# filters and corresponding bounds in the image mosaic
fbound = {}
print(" Model the filter distributions in the image mosaic ...")
stats = {}
for i in range(nchip):
chip_id = i + 1
if chip_id in badchip:
cx0, cx1, cy0, cy1 = chipLim(chip_id)
chip_bound = galsim.BoundsD(cx0 - 1, cx1 - 1, cy0 - 1, cy1 - 1)
chip_filter_id, chip_filt = chip_filter(chip_id)
# print "^_^ CHIP %d, Filter %s"%(chip_id,chip_filter)
fbound[chip_id] = [chip_filter_id, chip_filt, chip_bound]
stats[chip_id] = [0, 0, 0]
# generate object grid
ra_input = np.arange(ra_cen - 1.0, ra_cen + 1.0, 0.00125)
dec_input = np.arange(dec_cen - 1.0, dec_cen + 1.0, 0.00125)
nobj = len(ra_input) * len(dec_input)
crdCat = np.zeros((nobj, 2))
cid = 0
for id1 in range(len(ra_input)):
ira = ra_input[id1]
for id2 in range(len(dec_input)):
idec = dec_input[id2]
crdCat[cid, :] = ira, idec
cid += 1
print("^_^ Total %d objects are generaged" % nobj)
# main program
for i in range(nchip):
# if i not in [6]: continue
if i + 1 in badchip:
filtidk, filtnmk, boundk = fbound[i + 1]
idStr = str("0%d" % (i + 1))[-2:]
# 1) Use global field distortion model: FieldDistModelGlobal_v2.0.pickle
ifdModel = fdModel["wave1"]
irsModel = fdModel["wave1"]["residual"]["ccd" + idStr]
xLowI, xUpI, yLowI, yUpI = ifdModel["interpLimit"]
xlLowI, xlUpI, ylLowI, ylUpI = irsModel["interpLimit"]
# field distortion model along x/y-axis
ixfdModel = ifdModel["xImagePos"]
iyfdModel = ifdModel["yImagePos"]
ixrsModel = irsModel["xResidual"]
iyrsModel = irsModel["yResidual"]
# first-order derivatives of the global field distortion model
ifx_dx = ixfdModel.partial_derivative(1, 0)
ifx_dy = ixfdModel.partial_derivative(0, 1)
ify_dx = iyfdModel.partial_derivative(1, 0)
ify_dy = iyfdModel.partial_derivative(0, 1)
# first-order derivatives of the residual field distortion model
irx_dx = ixrsModel.partial_derivative(1, 0)
irx_dy = ixrsModel.partial_derivative(0, 1)
iry_dx = iyrsModel.partial_derivative(1, 0)
iry_dy = iyrsModel.partial_derivative(0, 1)
# construct the image mosaic firstly
xorigin, yorigin = xmcen - boundk.xmin, ymcen - boundk.ymin
print(" Construct the chip mosaic ...")
fimage = galsim.ImageF(xchip, ychip)
fimage.setOrigin(boundk.xmin, boundk.ymin)
fimage.wcs = wcs
raLow, raUp, decLow, decUp = skyLim(
wcs, boundk.xmin, boundk.xmax, boundk.ymin, boundk.ymax
dra = (raUp - raLow) * np.cos(dec_cen * np.pi / 180.0)
ddec = decUp - decLow
print(" Image coverage: %.2f * %.2f arcmin^2." % (dra * 60.0, ddec * 60.0))
# enlarge the sky coverage in order to catch the galaxies at the chip edge
raLow -= 0.2 / 60.0
decLow -= 0.2 / 60.0
raUp += 0.2 / 60.0
decUp += 0.2 / 60.0
" Range: RA=[%.4f, %.4f]; DEC=[%.4f, %.4f]"
% (raLow, raUp, decLow, decUp)
# generate the galaxy and star images
catxxn = os.path.join(
cat_dir, "" % (idStr, filtnmk)
hdrxx = "#id_obj id_chip filter ra_true dec_ture x_image_ture y_image_ture x_image y_image g1_fd g2_fd\n"
fmtxx = "%8d %3d %4s %12.6f %12.6f %13.6f %13.6f %13.6f %13.6f %9.5f %9.5f\n"
catxx = open(catxxn, "w")
oidxx = 0
for k in range(nobj):
# if k != 0: continue
# input galaxy parameters
rak = crdCat[k, 0]
deck = crdCat[k, 1]
# reject objects out of the image
if (rak - raLow) * (rak - raUp) > 0.0 or (deck - decLow) * (
deck - decUp
) > 0.0:
world_pos = galsim.CelestialCoord(
ra=rak * galsim.degrees, dec=deck * galsim.degrees
image_pos = fimage.wcs.toImage(world_pos)
xk_true = image_pos.x
yk_true = image_pos.y
# field distortion
if (xLowI - xk_true) * (xUpI - xk_true) > 0 or (yLowI - yk_true) * (
yUpI - yk_true
) > 0:
xk = ixfdModel(xk_true, yk_true)[0][0]
yk = iyfdModel(xk_true, yk_true)[0][0]
# global offset correction
if (xlLowI - xk) * (xlUpI - xk) > 0 or (ylLowI - yk) * (ylUpI - yk) > 0:
dxk = ixrsModel(xk, yk)[0][0]
dyk = iyrsModel(xk, yk)[0][0]
xk = xk + dxk
yk = yk + dyk
# field distortion induced ellipticity
ix_dx = ifx_dx(xk, yk) + irx_dx(xk, yk)
ix_dy = ifx_dy(xk, yk) + irx_dy(xk, yk)
iy_dx = ify_dx(xk, yk) + iry_dx(xk, yk)
iy_dy = ify_dy(xk, yk) + iry_dy(xk, yk)
g1k_fd = 0.0 + (iy_dy - ix_dx) / (iy_dy + ix_dx)
g2k_fd = 0.0 - (iy_dx + ix_dy) / (iy_dy + ix_dx)
dxk_true, dyk_true = xk_true - xmcen, yk_true - ymcen
xLock_true, yLock_true = dxk_true + xorigin + 1.0, dyk_true + yorigin + 1.0
dxk, dyk = xk - xmcen, yk - ymcen
xLock, yLock = dxk + xorigin + 1.0, dyk + yorigin + 1.0
if (xLock_true < 0) or (xLock_true > xchip):
if (yLock_true < 0) or (yLock_true > ychip):
if (xLock < 0) or (xLock > xchip):
if (yLock < 0) or (yLock > ychip):
linexx = fmtxx % (
k + 1,
i + 1,
if __name__ == "__main__":
