unittest of PSF module:
--testdata: S30x30_psfCube_1.h5 config_test.yaml
--testdata: S30x30_psfCube_1.h5 S20x20_psfCube_1.h5 config_test.yaml
--testdata: ccd1-w1
test galsim.interpolatedImage & galsim.convolve
import os
import unittest
import numpy as np
import matplotlib.pyplot as plt
import galsim
from scipy import ndimage
def psfEncircle(img, fraction=0.8, psfSampleSizeInMicrons=2.5, focalLengthInMeters=28, cenPix=None):
#imgMaxPix_x, imgMaxPix_y = findMaxPix(img)
y,x = ndimage.center_of_mass(img) #y-rows, x-cols
imgMaxPix_x = x #int(x)
imgMaxPix_y = y #int(y)
if cenPix != None:
imgMaxPix_x = cenPix[0]
imgMaxPix_y = cenPix[1]
im1 = img.copy()
im1size = im1.shape
dis = np.zeros_like(img)
for irow in range(im1size[0]):
for icol in range(im1size[1]):
dx = icol - imgMaxPix_x
dy = irow - imgMaxPix_y
dis[irow, icol] = np.hypot(dx, dy)
nn = im1size[1]*im1size[0]
disX = dis.reshape(nn)
disXsortId = np.argsort(disX)
imgX = img.reshape(nn)
imgY = imgX[disXsortId]
psfFrac = np.cumsum(imgY)/np.sum(imgY)
ind = np.where(psfFrac > fraction)[0][0]
REE80 = np.rad2deg(dis[np.where(img == imgY[ind])]*psfSampleSizeInMicrons*1e-6/focalLengthInMeters)*3600
return REE80
def check_galsimConvolve(path=None, plotImage=True):
#load psf data
imPSF = data['psf']
pixSize = np.rad2deg(5.*1e-6/28)*3600
imPSF = imPSF/np.sum(imPSF)
#psf -> galsimInterpolatedImage
img = galsim.ImageF(imPSF, scale=pixSize)
imgt= galsim.InterpolatedImage(img)
#imPSFt = imgt.drawImage(nx=256, ny=256, scale=pixSize, method='no_pixel')
imPSFt = imgt.drawImage(nx=256, ny=256, scale=pixSize)
ree80 = psfEncircle(imPSF, fraction=0.8, psfSampleSizeInMicrons=5.)
ree80_pix = ree80/(np.rad2deg((5.*1e-6/28))*3600)
sliceX = slice(128-int(np.round(ree80_pix[0])), 128+int(np.round(ree80_pix[0]))+1, 1)
#set a point sorce
src = galsim.DeltaFunction(flux=1.0)
result = galsim.Convolve(src, imgt)
#drawImage with same pixSize
#tmp = result.drawImage(nx=256, ny=256, scale=pixSize, method='no_pixel')
tmp = result.drawImage(nx=256, ny=256, scale=pixSize)
if plotImage != True:
res = (imPSFt.array - imPSF)/imPSF
d0 = np.mean(res[sliceX, sliceX].flatten())
res = (tmp.array - imPSFt.array)/imPSFt.array
d1 = np.mean(res[sliceX, sliceX].flatten())
return d0, d1
#plot images
fig = plt.figure(figsize=(22, 5))
plt.imshow(imPSF[128-10:128+10, 128-10:128+10])
plt.annotate("ORG", [0.1, 0.9],xycoords="axes fraction", fontsize=16, color="w")
plt.imshow(imPSFt.array[128-10:128+10, 128-10:128+10])
plt.annotate("InterpolatedImage", [0.1, 0.9],xycoords="axes fraction", fontsize=16, color="w")
plt.imshow(tmp.array[128-10:128+10, 128-10:128+10])
plt.annotate("ConvolvedImage", [0.1, 0.9],xycoords="axes fraction", fontsize=16, color="w")
fig = plt.figure(figsize=(13, 10))
res = (imPSFt.array - imPSF)/imPSF
plt.imshow(res[128-10:128+10, 128-10:128+10])
plt.annotate("$\Delta_1$", [0.1, 0.9],xycoords="axes fraction", fontsize=16, color="w")
plt.hist(res[sliceX, sliceX].flatten(), alpha=0.75, bins=4)
#plt.annotate("$\Delta_1^{\\rm REE80}$", [0.1, 0.9],xycoords="axes fraction", fontsize=16, color="k")
plt.xlabel("$\Delta_1^{\\rm REE80}$", fontsize=16)
plt.ylabel("PDF", fontsize=16)
res = (tmp.array - imPSFt.array)/imPSFt.array
plt.imshow(res[128-10:128+10, 128-10:128+10])
plt.annotate("$\Delta_2$", [0.1, 0.9],xycoords="axes fraction", fontsize=16, color="w")
plt.hist(res[sliceX, sliceX].flatten(), alpha=0.75, bins=4)
#plt.annotate("$\Delta_2^{\\rm REE80}$", [0.1, 0.9],xycoords="axes fraction", fontsize=16, color="k")
plt.xlabel("$\Delta_2^{\\rm REE80}$", fontsize=16)
plt.ylabel("PDF", fontsize=16)
def check_galsimConvolveALL(dataPath):
d0 = np.zeros(900)
d1 = np.zeros(900)
for ipsf in range(1,901,1):
print("ipsf={:}".format(ipsf), end="\r")
psfPath = dataPath+"/ccd1-w1/psf_{:}_centroidWgt_BC.mat".format(ipsf)
t0,t1=check_galsimConvolve(path = psfPath, plotImage=False)
d0[ipsf-1] = t0
d1[ipsf-1] = t1
fig = plt.figure(figsize=(12,6))
ax = plt.subplot(1,2,1)
#plt.scatter(np.linspace(1,900,900), d0)
plt.hist(d0, bins=8, alpha=0.75)
plt.xlabel("mean($\Delta_1^{\\rm REE80}$)", fontsize=16)
plt.ylabel("PDF", fontsize=16)
ax = plt.subplot(1,2,2)
#plt.scatter(np.linspace(1,900,900), d1)
plt.hist(d1, bins=8, alpha=0.75)
plt.xlabel("mean($\Delta_2^{\\rm REE80}$)", fontsize=16)
plt.ylabel("PDF", fontsize=16)
class testConvolve(unittest.TestCase):
def __init__(self, methodName='runTest'):
self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1')
OUTPUTPATH = os.path.join(self.dataPath, 'outputs')
def test_galsimConvolve(self):
ipsf = 1
psfPath = self.dataPath+"/ccd1-w1/psf_{:}_centroidWgt_BC.mat".format(ipsf)
check_galsimConvolve(path = psfPath)
def test_galsimConvolveALL(self):
if __name__ == "__main__":
import unittest
import sys,os,math
from itertools import islice
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import yaml
from ObservationSim.Instrument import Chip
from ObservationSim.PSF.PSFInterp import PSFInterp
def defineCCD(iccd, config_file):
with open(config_file, "r") as stream:
config = yaml.safe_load(stream)
#for key, value in config.items():
# print (key + " : " + str(value))
except yaml.YAMLError as exc:
chip = Chip(chipID=iccd, config=config)
#chip = Chip(chipID=iccd, ccdEffCurve_dir=path_dict["ccd_dir"], CRdata_dir=path_dict["CRdata_dir"], normalize_dir=path_dict["normalize_dir"], sls_dir=path_dict['sls_dir'], config=config)
return chip
def psfSecondMoments(psfMat, cenX, cenY, pixSize=1):
apr = 0.5 #arcsec, 0.5角秒内测量
fl = 28. #meters
pxs = 2.5 #microns
apr = np.deg2rad(apr/3600.)*fl*1e6
apr = apr/pxs
apr = int(np.ceil(apr))
I = psfMat
ncol = I.shape[1]
nrow = I.shape[0]
w = 0.0
w11 = 0.0
w12 = 0.0
w22 = 0.0
for icol in range(ncol):
for jrow in range(nrow):
x = icol*pixSize - cenX
y = jrow*pixSize - cenY
rr = np.sqrt(x*x + y*y)
wgt= 0.0
if rr <= apr:
wgt = 1.0
w += I[jrow, icol]*wgt
w11 += x*x*I[jrow, icol]*wgt
w12 += x*y*I[jrow, icol]*wgt
w22 += y*y*I[jrow, icol]*wgt
w11 /= w
w12 /= w
w22 /= w
sz = w11 + w22
e1 = (w11 - w22)/sz
e2 = 2.0*w12/sz
return sz, e1, e2
def test_psfEll(iccd, iwave, psfMat):
psfMat_iwave = psfMat.psfMat[iwave-1, :,:,:]
npsf = np.shape(psfMat_iwave)[0]
imx = np.zeros(npsf)
imy = np.zeros(npsf)
psf_e1 = np.zeros(npsf)
psf_e2 = np.zeros(npsf)
psf_sz = np.zeros(npsf)
for ipsf in range(1, npsf+1):
print('ipsf-{:}'.format(ipsf), end='\r')
imx[ipsf-1] = psfMat.cen_col[iwave-1, ipsf-1]
imy[ipsf-1] = psfMat.cen_row[iwave-1, ipsf-1]
psfMat_iwave_ipsf = psfMat_iwave[ipsf-1, :, :]
cenX = 256
cenY = 256
sz, e1, e2 = psfSecondMoments(psfMat_iwave_ipsf, cenX, cenY, pixSize=1)
psf_e1[ipsf-1] = e1
psf_e2[ipsf-1] = e2
psf_sz[ipsf-1] = sz
#print('ell======', ipsf, np.sqrt(e1**2 + e2**2))
arr = [imx, imy, psf_e1, psf_e2, psf_sz]'/psfEll{:}_{:}_{:}'.format(int(np.sqrt(npsf)),iccd, iwave), arr)
def test_psfEllPlot(OVERPLOT=False):
#if ThisTask == 0:
if True:
prefix = 'psfEll30'
iccd = 1
iwave= 1
data = np.load(OUTPUTPATH+'/'+prefix+'_1_1.npy')
imx= data[0]
imy= data[1]
psf_e1 = data[2]
psf_e2 = data[3]
npsf = np.shape(imx)[0]
fig = plt.figure(figsize=(12, 12))
plt.subplots_adjust(wspace=0.1, hspace=0.1)
ax = plt.subplot(1, 1, 1)
for ipsf in range(npsf):
plt.plot(imx[ipsf], imy[ipsf], 'r.')
ang = np.arctan2(psf_e2[ipsf], psf_e1[ipsf])/2
ell = np.sqrt(psf_e1[ipsf]**2 + psf_e2[ipsf]**2)
ell *= 15
lcos = ell*np.cos(ang)
lsin = ell*np.sin(ang)
plt.plot([imx[ipsf]-lcos, imx[ipsf]+lcos],[imy[ipsf]-lsin, imy[ipsf]+lsin],'r', lw=2)
ang = 0.
ell = 0.05
ell*= 15
lcos = ell*np.cos(ang)
lsin = ell*np.sin(ang)
plt.plot([imx[898]-lcos, imx[898]+lcos],[imy[898]+5.-lsin, imy[898]+5.+lsin],'k', lw=2)
plt.annotate('{:}'.format(ell/15), (imx[898]-2., imy[898]+6.), xycoords='data', fontsize=10)
plt.xlabel('CCD X (mm)')
plt.ylabel('CCD Y (mm)')
if OVERPLOT == True:
prefix = 'psfEll20'
data = np.load(OUTPUTPATH+'/'+prefix+'_1_1.npy')
imx= data[0]
imy= data[1]
psf_e1 = data[2]
psf_e2 = data[3]
npsf = np.shape(imx)[0]
for ipsf in range(npsf):
plt.plot(imx[ipsf], imy[ipsf], 'b.')
ang = np.arctan2(psf_e2[ipsf], psf_e1[ipsf])/2
ell = np.sqrt(psf_e1[ipsf]**2 + psf_e2[ipsf]**2)
ell *= 15
lcos = ell*np.cos(ang)
lsin = ell*np.sin(ang)
plt.plot([imx[ipsf]-lcos, imx[ipsf]+lcos],[imy[ipsf]-lsin, imy[ipsf]+lsin],'b', lw=2)
if OVERPLOT == True:
prefix = 'psfEllOP'
def test_psfIDW(iccd, iwave, psfMatA, chip, psfMatB):
bandpass = iwave-1
class pos_img():
def __init__(self,x, y):
self.x = x*1e3/10. #in unit of pixels
self.y = y*1e3/10.
psfMat_iwave = psfMatA.psfMat[iwave-1, :,:,:]
npsf = np.shape(psfMat_iwave)[0]
psf_e1 = np.zeros(npsf)
psf_e2 = np.zeros(npsf)
psf_sz = np.zeros(npsf)
for ipsf in range(1, npsf+1):
print('ipsf:', ipsf, end='\r', flush=True)
tpos_img = pos_img(psfMatA.cen_col[iwave-1, ipsf-1], psfMatA.cen_row[iwave-1, ipsf-1])
psfIDW = psfMatB.get_PSF(chip, tpos_img, bandpass, galsimGSObject=False, findNeighMode='treeFind')'/psfIDW_{:}_{:}_{:}'.format(iccd, iwave, ipsf), psfIDW)
cenX = 256
cenY = 256
sz, e1, e2 = psfSecondMoments(psfIDW, cenX, cenY, pixSize=1)
psf_e1[ipsf-1] = e1
psf_e2[ipsf-1] = e2
psf_sz[ipsf-1] = sz
arr = [psf_e1, psf_e2, psf_sz]'/psfEll20IDW_{:}_{:}'.format(iccd, iwave), arr)
def test_psfResidualPlot(iccd, iwave, ipsf, psfMatA):
psfMat_iwave = psfMatA.psfMat[iwave-1, :,:,:]
psfMatORG = psfMat_iwave[ipsf-1, :, :]
psfMatIDW = np.load(OUTPUTPATH+'/psfIDW_{:}_{:}_{:}.npy'.format(iccd, iwave, ipsf))
npix = psfMatORG.shape[0]
pixCutEdge= int(npix/2-15)
img0 = psfMatORG[pixCutEdge:npix-pixCutEdge, pixCutEdge:npix-pixCutEdge]
img1 = psfMatIDW[pixCutEdge:npix-pixCutEdge, pixCutEdge:npix-pixCutEdge]
imgX = (img1 - img0)/img0
img0 = np.log10(img0)
img1 = np.log10(img1)
imgX = np.log10(np.abs(imgX))
fig = plt.figure(figsize=(18,4))
ax = plt.subplot(1,3,1)
plt.imshow(img0, origin='lower', vmin=-7, vmax=-1.3)
plt.plot([npix/2-pixCutEdge, npix/2-pixCutEdge],[0, (npix/2-pixCutEdge)*2-1],'w--')
plt.plot([0, (npix/2-pixCutEdge)*2-1],[npix/2-pixCutEdge, npix/2-pixCutEdge],'w--')
plt.annotate('ORG', [0,(npix/2-pixCutEdge)*2-5], c='w', size=15)
cticks=[-7, -6, -5, -4, -3, -2, -1]
cbar = plt.colorbar(ticks=cticks)['$10^{-7}$', '$10^{-6}$', '$10^{-5}$','$10^{-4}$','$10^{-3}$','$10^{-2}$', '$10^{-1}$'])
print(img0.min(), img0.max())
ax = plt.subplot(1,3,2)
plt.imshow(img1, origin='lower', vmin=-7, vmax=-1.3)
plt.plot([npix/2-pixCutEdge, npix/2-pixCutEdge],[0, (npix/2-pixCutEdge)*2-1],'w--')
plt.plot([0, (npix/2-pixCutEdge)*2-1],[npix/2-pixCutEdge, npix/2-pixCutEdge],'w--')
plt.annotate('IDW', [0,(npix/2-pixCutEdge)*2-5], c='w', size=15)
cticks=[-7, -6, -5, -4, -3, -2, -1]
cbar = plt.colorbar(ticks=cticks)['$10^{-7}$', '$10^{-6}$', '$10^{-5}$','$10^{-4}$','$10^{-3}$','$10^{-2}$', '$10^{-1}$'])
print(img1.min(), img1.max())
ax = plt.subplot(1,3,3)
plt.imshow(imgX, origin='lower', vmin =-3, vmax =np.log10(3e-1))
plt.plot([npix/2-pixCutEdge, npix/2-pixCutEdge],[0, (npix/2-pixCutEdge)*2-1],'w--')
plt.plot([0, (npix/2-pixCutEdge)*2-1],[npix/2-pixCutEdge, npix/2-pixCutEdge],'w--')
#plt.annotate('(IDW-ORG)/ORG', [0,(npix/2-pixCutEdge)*2-5], c='w', size=15)
cticks=[-5, -4, -3, -2, -1]
cbar = plt.colorbar(ticks=cticks)['$10^{-5}$','$10^{-4}$','$10^{-3}$','$10^{-2}$', '$10^{-1}$'])
def test_psfEllIDWPlot(OVERPLOT=False):
#if ThisTask == 0:
if True:
prefix = 'psfEll20'
iccd = 1
iwave= 1
data = np.load(OUTPUTPATH+'/'+prefix+'_1_1.npy')
imx= data[0]
imy= data[1]
psf_e1 = data[2]
psf_e2 = data[3]
npsf = np.shape(imx)[0]
fig = plt.figure(figsize=(12, 12))
plt.subplots_adjust(wspace=0.1, hspace=0.1)
ax = plt.subplot(1, 1, 1)
for ipsf in range(npsf):
plt.plot(imx[ipsf], imy[ipsf], 'b.')
ang = np.arctan2(psf_e2[ipsf], psf_e1[ipsf])/2
ell = np.sqrt(psf_e1[ipsf]**2 + psf_e2[ipsf]**2)
ell *= 15
lcos = ell*np.cos(ang)
lsin = ell*np.sin(ang)
plt.plot([imx[ipsf]-lcos, imx[ipsf]+lcos],[imy[ipsf]-lsin, imy[ipsf]+lsin],'b', lw=2)
ang = 0.
ell = 0.05
ell*= 15
lcos = ell*np.cos(ang)
lsin = ell*np.sin(ang)
#plt.plot([imx[898]-lcos, imx[898]+lcos],[imy[898]+5.-lsin, imy[898]+5.+lsin],'k', lw=2)
#plt.annotate('{:}'.format(ell/15), (imx[898]-2., imy[898]+6.), xycoords='data', fontsize=10)
plt.xlabel('CCD X (mm)')
plt.ylabel('CCD Y (mm)')
if OVERPLOT == True:
prefix = 'psfEll20IDW'
data = np.load(OUTPUTPATH+'/'+prefix+'_1_1.npy')
#imx= data[0]
#imy= data[1]
psf_e1 = data[0]
psf_e2 = data[1]
npsf = np.shape(imx)[0]
for ipsf in range(npsf):
#plt.plot(imx[ipsf], imy[ipsf], 'r.')
ang = np.arctan2(psf_e2[ipsf], psf_e1[ipsf])/2
ell = np.sqrt(psf_e1[ipsf]**2 + psf_e2[ipsf]**2)
ell *= 15
lcos = ell*np.cos(ang)
lsin = ell*np.sin(ang)
plt.plot([imx[ipsf]-lcos, imx[ipsf]+lcos],[imy[ipsf]-lsin, imy[ipsf]+lsin],'r', lw=1)
if OVERPLOT == True:
prefix = 'psfEllOPIDW'
def test_psfdEllabsPlot(iccd):
#if ThisTask == 0:
if True:
prefix = 'psfEll20'
#iccd = 1
#iwave= 1
data = np.load(OUTPUTPATH+'/'+prefix+'_{:}_1.npy'.format(iccd))
imx= data[0]
imy= data[1]
psf_e1 = data[2]
psf_e2 = data[3]
psf_sz = data[4]
npsf = np.shape(imx)[0]
ellX = np.sqrt(psf_e1**2 + psf_e2**2)
angX = np.arctan2(psf_e2, psf_e1)/2
angX = np.rad2deg(angX)
szX = psf_sz
prefix = 'psfEll20IDW'
data = np.load(OUTPUTPATH+'/'+prefix+'_{:}_1.npy'.format(iccd))
#imx= data[0]
#imy= data[1]
psf_e1 = data[0]
psf_e2 = data[1]
psf_sz = data[2]
ellY = np.sqrt(psf_e1**2 + psf_e2**2)
angY = np.arctan2(psf_e2, psf_e1)/2
angY = np.rad2deg(angY)
szY = psf_sz
fig=plt.figure(figsize=(6, 5))
grid = plt.GridSpec(3,1,left=0.15, right=0.95, wspace=None, hspace=0.02)
ax = plt.subplot(grid[0:2,0])
plt.plot([0.01,0.1],[0.01,0.1], 'k--', lw=1. )
plt.scatter(ellX, ellY, color='b', alpha=1., s=3., edgecolors='None')
#plt.xlim([0.015, 0.085])
#plt.ylim([0.015, 0.085])
plt.ylabel('$\epsilon_{\\rm IDW}$')
ax = plt.subplot(grid[2,0])
plt.plot([0.015,0.085],[0.,0.], 'k--', lw=1. )
plt.scatter(ellX, (ellY-ellX), color='b', s=3., edgecolors='None')
#plt.xlim([0.015, 0.085])
#plt.ylim([-0.0018, 0.0018])
plt.xlabel('$\epsilon_{\\rm ORG}$')
fig=plt.figure(figsize=(6, 6))
plt.hist((szY-szX)/szX, bins=20, color='r', alpha=0.5)
plt.xlabel('$(R_{\\rm IDW}-R_{\\rm ORG})/R_{\\rm ORG}$')
class PSFInterpModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'):
self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1')
OUTPUTPATH = os.path.join(self.dataPath, 'outputs')
def test_psfEll_(self):
iccd = 1
iwave= 1
config_file = os.path.join(self.dataPath, 'config_test.yaml')
chip = defineCCD(iccd, config_file)
print(chip.cen_pix_x, chip.cen_pix_y)
psfMatA = PSFInterp(chip, npsf=400, PSF_data_file=self.dataPath, PSF_data_prefix="S20x20_")
psfMatB = PSFInterp(chip, npsf=900, PSF_data_file=self.dataPath, PSF_data_prefix="S30x30_")
test_psfEll(iccd, iwave, psfMatA)
test_psfEll(iccd, iwave, psfMatB)
test_psfIDW(iccd, iwave, psfMatA, chip, psfMatB)
ipsf = 1
test_psfResidualPlot(iccd, iwave, ipsf, psfMatA)
if __name__ == '__main__':
import unittest
import sys,os,math
from itertools import islice
import numpy as np
import yaml
from ObservationSim.Instrument import Chip
from ObservationSim.PSF.PSFInterp import PSFInterp
def defineCCD(iccd, config_file):
with open(config_file, "r") as stream:
config = yaml.safe_load(stream)
#for key, value in config.items():
# print (key + " : " + str(value))
except yaml.YAMLError as exc:
chip = Chip(chipID=iccd, config=config)
#chip = Chip(chipID=iccd, ccdEffCurve_dir=path_dict["ccd_dir"], CRdata_dir=path_dict["CRdata_dir"], normalize_dir=path_dict["normalize_dir"], sls_dir=path_dict['sls_dir'], config=config)
return chip
def loadPSFSet(iccd, dataPath):
config_file = os.path.join(dataPath, 'config_test.yaml')
chip = defineCCD(iccd, config_file)
print(chip.cen_pix_x, chip.cen_pix_y)
psfMat= PSFInterp(chip, npsf=900, PSF_data_file=dataPath, PSF_data_prefix="S30x30_")
psfSet= psfMat._loadPSF(iccd, dataPath, PSF_data_prefix="S30x30_")
twave = 0 #[0...3]
tpsf = 0 #[0...899]
field_x= psfSet[twave][tpsf]['field_x']
field_y= psfSet[twave][tpsf]['field_y']
image_x= psfSet[twave][tpsf]['image_x']
image_y= psfSet[twave][tpsf]['image_y']
centroid_x= psfSet[twave][tpsf]['centroid_x']
centroid_y= psfSet[twave][tpsf]['centroid_y']
print("pos_info:", field_x, field_y, image_x, image_y, centroid_x, centroid_y)
return psfSet
class PSFInterpModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'):
self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1')
def test_psfEll_(self):
iccd = 1 #[1...30]
psfSet = loadPSFSet(iccd, dataPath=self.dataPath)
if __name__ == '__main__':
# Configuration file for CSST simulation
# Overall settings
# CSST-Sim Group, 2024/01/08
# Base diretories and naming setup
# can add some of the command-line arguments here as well;
# ok to pass either way or both, as long as they are consistent
work_dir: "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA/"
data_dir: "/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/"
run_name: "testRun"
# Project cycle and run counter are used to name the outputs
project_cycle: 9
run_counter: 0
# Run options
use_mpi: YES
# NOTE: "n_threads" paramters is currently not used in the backend
# simulation codes. It should be implemented later in the web frontend
# in order to config the number of threads to request from NAOC cluster
n_threads: 80
# Output catalog only?
# If yes, no imaging simulation will run
out_cat_only: NO
# Catalog setting
# Configure the input catalog: options should be implemented
# in the corresponding (user defined) 'Catalog' class
# cat_dir: "Catalog_C6_20221212"
cat_dir: ""
star_cat: "starcat/"
galaxy_cat: "qsocat/cat2CSSTSim_bundle-50sqDeg/"
# AGN_cat: "AGN_C6_ross13_rand_pos_rmax-1.3.fits"
star_SED: "SpecLib.hdf5"
galaxy_SED: "sedlibs/"
AGN_SED: "qsocat/qsosed/"
# AGN_SED_WAVE: "wave_ross13.npy"
# Only simulate stars?
star_only: NO
# Only simulate galaxies?
galaxy_only: NO
# rotate galaxy ellipticity
rotateEll: 0. # [degree]
seed_Av: 121212 # Seed for generating random intrinsic extinction
# Observation setting
# (Optional) a file of point list
# if you just want to run default pointing:
# - pointing_dir: null
# - pointing_file: null
pointing_dir: "/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/pointing_gir25/"
pointing_file: "pointing_50_1.dat"
obs_config_file: "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/config/obs_config_SCI_WIDE_phot.yaml"
# Run specific pointing(s):
# - give a list of indexes of pointings: [ip_1, ip_2...]
# - run all pointings: null
# Note: only valid when a pointing list is specified
run_pointings: [0]
# Whether to enable astrometric modeling
enable_astrometric_model: True
# Cut by saturation magnitude in which band?
cut_in_band: "z"
# saturation magnitude margin
mag_sat_margin: -2.5
# mag_sat_margin: -15.
# limiting magnitude margin
mag_lim_margin: +1.0
# PSF setting
# Which PSF model to use:
# "Gauss": simple gaussian profile
# "Interp": Interpolated PSF from sampled ray-tracing data
psf_model: "Interp"
# PSF size [arcseconds]
# radius of 80% energy encircled
# NOTE: only valid for "Gauss" PSF
# psf_rcont: 0.15
# path to PSF data
# NOTE: only valid for "Interp" PSF
# PSF models for photometry survey simulation
psf_pho_dir: "/public/share/yangxuliu/CSSOSDataProductsSims/psfCube/set1_dynamic/"
# PSF models for slitless spectrum survey simulation
psf_sls_dir: "/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/SLS_PSF_PCA_fp/"
# Shear setting
# Options to generate mock shear field:
# "constant": all galaxies are assigned a constant reduced shear
# "catalog": get shear values from catalog
shear_type: "constant"
# For constant shear field
reduced_g1: 0.
reduced_g2: 0.
# Output options
output_format: "channels" # Whether to export as 16 channels (subimages) with pre- and over-scan ("image"/"channels")
shutter_output: OFF # Whether to export shutter effect 16-bit image
prnu_output: OFF # Whether to export the PRNU (pixel-to-pixel flat-fielding) files
# Random seeds
seed_poisson: 20210601 # Seed for Poisson noise
seed_CR: 20210317 # Seed for generating random cosmic ray maps
seed_flat: 20210101 # Seed for generating random flat fields
seed_prnu: 20210102 # Seed for photo-response non-uniformity
seed_gainNonUniform: 20210202 # Seed for gain nonuniformity
seed_biasNonUniform: 20210203 # Seed for bias nonuniformity
seed_rnNonUniform: 20210204 # Seed for readout-noise nonuniformity
seed_badcolumns: 20240309 # Seed for bad columns
seed_defective: 20210304 # Seed for defective (bad) pixels
seed_readout: 20210601 # Seed for read-out gaussian noise
class detModule_coverage(unittest.TestCase):
class detModule_coverage(unittest.TestCase): class detModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'): def __init__(self, methodName='runTest'):
super(detModule_coverage, self).__init__(methodName) super(detModule_coverage, self).__init__(methodName)
self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1') ##self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1')
self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_msc')
self.iccd = 1 self.iccd = 1
class detModule_coverage(unittest.TestCase):
rho_trap = np.array([0.6,1.6,1.4],dtype=np.float32) rho_trap = np.array([0.6,1.6,1.4],dtype=np.float32)
trap_seeds = np.array([0,100,1000],dtype=np.int32) trap_seeds = np.array([0,100,1000],dtype=np.int32)
release_seed = 500 release_seed = 500
image = fits.getdata("UNIT_TEST_DATA/testCTE_image_before.fits").astype(np.int32) image = fits.getdata(os.path.join(self.dataPath, "testCTE_image_before.fits")).astype(np.int32)
#get_trap_map(trap_seeds,nx,ny,nmax,rho_trap,beta,c,".") #get_trap_map(trap_seeds,nx,ny,nmax,rho_trap,beta,c,".")
#bin2fits("trap.bin",".",nsp,nx,ny,nmax) #bin2fits("trap.bin",".",nsp,nx,ny,nmax)
image_cti = CTI_sim(image,nx,ny,noverscan,nsp,nmax,beta,w,c,t,rho_trap,trap_seeds,release_seed) image_cti = CTI_sim(image,nx,ny,noverscan,nsp,nmax,beta,w,c,t,rho_trap,trap_seeds,release_seed)
fits.writeto("UNIT_TEST_DATA/testCTE_image_after.fits",data=image_cti,overwrite=True) fits.writeto(os.path.join(self.dataPath, "testCTE_image_after.fits"),data=image_cti,overwrite=True)
if __name__ == '__main__': if __name__ == '__main__':
class PSFInterpModule_coverage(unittest.TestCase):
class PSFInterpModule_coverage(unittest.TestCase): class PSFInterpModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'): def __init__(self, methodName='runTest'):
super(PSFInterpModule_coverage, self).__init__(methodName) super(PSFInterpModule_coverage, self).__init__(methodName)
self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1') self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_msc')
self.iccd = 1 self.iccd = 8
def test_loadPSFSet(self): def test_loadPSFSet(self):
config_file = os.path.join(self.dataPath, 'config_test.yaml') config_file = os.path.join(self.dataPath, 'config_test.yaml')
...@@ -49,7 +49,7 @@ class PSFInterpModule_coverage(unittest.TestCase): ...@@ -49,7 +49,7 @@ class PSFInterpModule_coverage(unittest.TestCase):
print(chip.chipID) print(chip.chipID)
print(chip.cen_pix_x, chip.cen_pix_y) print(chip.cen_pix_x, chip.cen_pix_y)
pathTemp = "/public/share/yangxuliu/CSSOSDataProductsSims/psfCube/set1_dynamic/" pathTemp = self.dataPath #"/public/share/yangxuliu/CSSOSDataProductsSims/psfCube/set1_dynamic/"
psfModel= PSFInterp(chip, npsf=900, PSF_data_file=pathTemp, PSF_data_prefix="", HocBuild=True, LOG_DEBUG=True) psfModel= PSFInterp(chip, npsf=900, PSF_data_file=pathTemp, PSF_data_prefix="", HocBuild=True, LOG_DEBUG=True)
x, y = 4096, 4096 #imgPos[iobj, :] # try get the PSF at some location (1234, 1234) on the chip x, y = 4096, 4096 #imgPos[iobj, :] # try get the PSF at some location (1234, 1234) on the chip
class detModule_coverage(unittest.TestCase):
class detModule_coverage(unittest.TestCase): class detModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'): def __init__(self, methodName='runTest'):
super(detModule_coverage, self).__init__(methodName) super(detModule_coverage, self).__init__(methodName)
self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1') self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_msc')
self.iccd = 1 self.iccd = 1
def test_add_dark(self): def test_add_dark(self):
import unittest
from scipy import interpolate
import astropy.constants as cons
from astropy.table import Table
import h5py as h5
import sys,os,math
from itertools import islice
import numpy as np
import galsim
import yaml
import copy
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
from ObservationSim.MockObject._util import getObservedSED
from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
from ObservationSim.PSF.PSFInterp import PSFInterp
from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG
def convert_sed(mag, sed, target_filt, norm_filt=None):
bandpass = target_filt.bandpass_full
if norm_filt is not None:
norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001
norm_filt = Table(
np.array(np.array([bandpass.wave_list*10.0, bandpass.func(bandpass.wave_list)])).T, names=(['WAVELENGTH', 'SENSITIVITY'])
norm_thr_rang_ids = norm_filt['SENSITIVITY'] > 0.001
sedNormFactor = getNormFactorForSpecWithABMAG(ABMag=mag,
sed_photon = copy.copy(sed)
sed_photon = np.array([sed_photon['WAVELENGTH'], sed_photon['FLUX']*sedNormFactor]).T
sed_photon = galsim.LookupTable(x=np.array(sed_photon[:, 0]), f=np.array(sed_photon[:, 1]), interpolant='nearest')
# Get magnitude
sed_photon = galsim.SED(sed_photon, wave_type='A', flux_type='1', fast=False)
interFlux = integrate_sed_bandpass(sed=sed_photon, bandpass=bandpass)
mag_csst = getABMAG(
if target_filt.survey_type == "photometric":
return sed_photon, mag_csst, interFlux
elif target_filt.survey_type == "spectroscopic":
del sed_photon
return sed, mag_csst, interFlux
def _load_gals(file_path):
gals_cat = h5.File(file_path, 'r')['galaxies']
for ikeys in gals_cat.keys():
gals = gals_cat[ikeys]
param = {}
igals = 9
param['z'] = gals['redshift'][igals]
param['mag_use_normal'] = gals['mag_csst_g'][igals]
print(param['mag_use_normal'] )
param['e1'] = gals['ellipticity_true'][igals][0]
param['e2'] = gals['ellipticity_true'][igals][1]
param['e1_disk'] = param['e1']
param['e2_disk'] = param['e2']
param['e1_bulge'] = param['e1']
param['e2_bulge'] = param['e2']
# Masses
param['bulgemass'] = gals['bulgemass'][igals]
param['diskmass'] = gals['diskmass'][igals]
param['size'] = gals['size'][igals]
# Sersic index
param['disk_sersic_idx'] = 1.
param['bulge_sersic_idx'] = 4.
# Sizes
param['bfrac'] = param['bulgemass']/(param['bulgemass'] + param['diskmass'])
if param['bfrac'] >= 0.6:
param['hlr_bulge'] = param['size']
param['hlr_disk'] = param['size'] * (1. - param['bfrac'])
param['hlr_disk'] = param['size']
param['hlr_bulge'] = param['size'] * param['bfrac']
# SED coefficients
param['coeff'] = gals['coeff'][igals]
# TEST no redening and no extinction
param['av'] = 0.0
param['redden'] = 0
pcs = h5.File("/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/sedlibs/"+"pcs.h5", "r")
lamb = h5.File("/public/share/yangxuliu/CSSOSDataProductsSims/data_50sqDeg/sedlibs/"+"lamb.h5", "r")
lamb_gal = lamb['lamb'][()]
pcs = pcs['pcs'][()]
cosmo = FlatLambdaCDM(H0=67.66, Om0=0.3111)
factor = 10**(-.4 * cosmo.distmod(param['z']).value)
flux = np.matmul(pcs, param['coeff']) * factor
# if np.any(flux < 0):
# raise ValueError("Glaxy %s: negative SED fluxes"
flux[flux < 0] = 0.
sedcat = np.vstack((lamb_gal, flux)).T
sed_data = getObservedSED(
wave, flux = sed_data[0], sed_data[1]
speci = interpolate.interp1d(wave, flux)
lamb = np.arange(2000, 11001+0.5, 0.5)
y = speci(lamb)
# erg/s/cm2/A --> photon/s/m2/A
all_sed = y * lamb / (cons.h.value * cons.c.value) * 1e-13
sed = Table(np.array([lamb, all_sed]).T, names=('WAVELENGTH', 'FLUX'))
param["sed"] = sed
del wave
del flux
return param
def defineCCD(iccd, config_file):
with open(config_file, "r") as stream:
config = yaml.safe_load(stream)
#for key, value in config.items():
# print (key + " : " + str(value))
except yaml.YAMLError as exc:
chip = Chip(chipID=iccd, config=config)
chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
focal_plane = FocalPlane(chip_list=[iccd])
chip.img.wcs= focal_plane.getTanWCS(192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
return chip
def defineFilt(chip):
filter_param = FilterParam()
filter_id, filter_type = chip.getChipFilter()
filt = Filter(
return filt
class imagingModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(imagingModule_coverage, self).__init__(methodName)
self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1')
self.iccd = 8
def test_imaging(self):
config_file = os.path.join(self.dataPath, 'config_test.yaml')
chip = defineCCD(self.iccd, config_file)
bandpass = defineFilt(chip)
filt = defineFilt(chip)
print(chip.cen_pix_x, chip.cen_pix_y)
obj = _load_gals("UNIT_TEST_DATA/galaxies_C6_bundle000287.h5")
sed_data = obj['sed']
norm_filt = None
obj_sed, obj_mag, obj_flux = convert_sed(
pupil_area= np.pi * (0.5 * 2.)**2
exptime = 150.
nphotons_tot = obj_flux*pupil_area * exptime #getElectronFluxFilt(filt, tel, exptime)
full = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_full)
print(full, nphotons_tot, obj_mag)
for i in range(4):
sub = integrate_sed_bandpass(sed=obj_sed, bandpass=filt.bandpass_sub_list[i])
ratio = sub / full
nphotons = ratio * nphotons_tot
disk = galsim.Sersic(n=obj['disk_sersic_idx'], half_light_radius=obj['hlr_disk'], flux=1.0)
disk_shape = galsim.Shear(g1=obj['e1_disk'], g2=obj['e2_disk'])
disk = disk.shear(disk_shape)
gal_temp = disk
gal_temp = gal_temp.withFlux(nphotons)
psf = galsim.Gaussian(sigma=0.1, flux=1)
gal_temp = galsim.Convolve(psf, gal_temp)
if i == 0:
gal = gal_temp
gal = gal + gal_temp
self.assertTrue( gal != None )
if __name__ == '__main__':
import unittest
import sys,os,math
from itertools import islice
import numpy as np
import galsim
import yaml
from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
from ObservationSim.Instrument.Chip import ChipUtils as chip_utils
def defineCCD(iccd, config_file):
with open(config_file, "r") as stream:
config = yaml.safe_load(stream)
#for key, value in config.items():
# print (key + " : " + str(value))
except yaml.YAMLError as exc:
chip = Chip(chipID=iccd, config=config)
chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
focal_plane = FocalPlane(chip_list=[iccd])
chip.img.wcs= focal_plane.getTanWCS(192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
return chip
def defineFilt(chip):
filter_param = FilterParam()
filter_id, filter_type = chip.getChipFilter()
filt = Filter(
bandpass_list = filt.bandpass_sub_list
return bandpass_list
class detModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(detModule_coverage, self).__init__(methodName)
self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1')
self.iccd = 1
def test_add_prescan_overscan(self):
config_file = os.path.join(self.dataPath, 'config_test.yaml')
chip = defineCCD(self.iccd, config_file)
bandpass = defineFilt(chip)
print(chip.cen_pix_x, chip.cen_pix_y)
chip.img = chip_utils.AddPreScan(GSImage=chip.img,
self.assertTrue( (chip.prescan_x+chip.overscan_x)*8+chip.npix_x == np.shape(chip.img.array)[1] )
self.assertTrue( (chip.prescan_y+chip.overscan_y)*2+chip.npix_y == np.shape(chip.img.array)[0] )
if __name__ == '__main__':
class detModule_coverage(unittest.TestCase):
class detModule_coverage(unittest.TestCase): class detModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'): def __init__(self, methodName='runTest'):
super(detModule_coverage, self).__init__(methodName) super(detModule_coverage, self).__init__(methodName)
self.dataPath = "/public/home/chengliang/CSSOSDataProductsSims/csst-simulation/tests/UNIT_TEST_DATA" ##os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_gc1') self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_fz_msc')
self.iccd = 1 self.iccd = 1
def test_add_prescan_overscan(self): def test_add_prescan_overscan(self):
