Commit 36189a3e authored by JX's avatar JX 😵
Browse files

Merge remote-tracking branch 'origin/develop'

parents dd26d370 27646bc4
Pipeline #4509 passed with stage
in 0 seconds
import unittest
import sys,os,math
import sys
import os
import math
from itertools import islice
import numpy as np
import galsim
import yaml
from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
#from ObservationSim.Instrument.Chip import ChipUtils as chip_utils
from observation_sim.instruments import Chip, Filter, FilterParam, FocalPlane
### test FUNCTION --- START ###
def get_base_img(img, chip, read_noise, readout_time, dark_noise, exptime=150., InputDark=None):
if InputDark == None:
# base_level = read_noise**2 + dark_noise*(exptime+0.5*readout_time)
## base_level = dark_noise*(exptime+0.5*readout_time)
# base_level = dark_noise*(exptime+0.5*readout_time)
base_level = dark_noise*(exptime)
base_img1 = base_level * np.ones_like(img.array)
......@@ -25,8 +28,8 @@ def get_base_img(img, chip, read_noise, readout_time, dark_noise, exptime=150.,
arr = np.broadcast_to(arr, (ny, nx))
base_img2 = np.zeros_like(img.array)
base_img2[:ny, :] = arr
base_img2[ny:, :] = arr[::-1,:]
base_img2[:,:] = base_img2[:,:]*(readout_time/ny)*dark_noise
base_img2[ny:, :] = arr[::-1, :]
base_img2[:, :] = base_img2[:, :]*(readout_time/ny)*dark_noise
return base_img1+base_img2
### test FUNCTION --- END ###
......@@ -35,16 +38,18 @@ def defineCCD(iccd, config_file):
with open(config_file, "r") as stream:
config = yaml.safe_load(stream)
#for key, value in config.items():
# for key, value in config.items():
# print (key + " : " + str(value))
except yaml.YAMLError as exc:
chip = Chip(chipID=iccd, config=config)
chip.img = galsim.ImageF(chip.npix_x, chip.npix_y)
focal_plane = FocalPlane(chip_list=[iccd])
chip.img.wcs= focal_plane.getTanWCS(192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
chip.img.wcs = focal_plane.getTanWCS(
192.8595, 27.1283, -113.4333*galsim.degrees, chip.pix_scale)
return chip
def defineFilt(chip):
filter_param = FilterParam()
filter_id, filter_type = chip.getChipFilter()
......@@ -60,7 +65,8 @@ def defineFilt(chip):
class detModule_coverage(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(detModule_coverage, self).__init__(methodName)
self.dataPath = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc')
self.dataPath = os.path.join(
os.getenv('UNIT_TEST_DATA_ROOT'), 'csst_msc_sim/csst_fz_msc')
self.iccd = 1
def test_add_dark(self):
......@@ -70,16 +76,19 @@ class detModule_coverage(unittest.TestCase):
print(chip.cen_pix_x, chip.cen_pix_y)
base_img = get_base_img(img=chip.img, chip=chip, read_noise=chip.read_noise, readout_time=chip.readout_time, dark_noise=chip.dark_noise, exptime=exptime, InputDark=None)
exptime = 150.
base_img = get_base_img(img=chip.img, chip=chip, read_noise=chip.read_noise,
readout_time=chip.readout_time, dark_noise=chip.dark_noise, exptime=exptime, InputDark=None)
ny = int(chip.npix_y/2)
self.assertTrue( np.abs(np.max(base_img) - (exptime*chip.dark_noise+(ny-1)*(chip.readout_time/ny)*chip.dark_noise )) < 1e-6 )
self.assertTrue( np.min(base_img) == 3 )
self.assertTrue(np.abs(np.max(base_img) - (exptime*chip.dark_noise +
(ny-1)*(chip.readout_time/ny)*chip.dark_noise)) < 1e-6)
self.assertTrue(np.min(base_img) == 3)
base_img = get_base_img(img=chip.img, chip=chip, read_noise=chip.read_noise, readout_time=chip.readout_time, dark_noise=chip.dark_noise, exptime=150., InputDark="testTag")
self.assertTrue( np.abs(np.max(base_img) - ((ny-1)*(chip.readout_time/ny)*chip.dark_noise )) < 1e-6 )
base_img = get_base_img(img=chip.img, chip=chip, read_noise=chip.read_noise,
readout_time=chip.readout_time, dark_noise=chip.dark_noise, exptime=150., InputDark="testTag")
self.assertTrue(np.abs(np.max(base_img) - ((ny-1) *
(chip.readout_time/ny)*chip.dark_noise)) < 1e-6)
if __name__ == '__main__':
import unittest
import numpy as np
from ObservationSim.Instrument.Chip import Effects
from observation_sim.instruments.chip import effects
import galsim
import matplotlib.pyplot as plt
import os,sys,math,copy
import os
import sys
import math
import copy
from numpy.random import Generator, PCG64
import warnings
from import fits
......@@ -13,20 +16,20 @@ warnings.filterwarnings("ignore", '.*Numba.*',)
width = 9216
height = 9232
class DetTest(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(DetTest, self).__init__(methodName)
def filePath(self, file_name):
self.datafn = os.path.join(os.getenv('UNIT_TEST_DATA_ROOT'), file_name)
self.outDataFn = os.path.join(self.datafn,'output')
self.outDataFn = os.path.join(self.datafn, 'output')
if os.path.isdir(self.outDataFn):
def test_prnu(self):
......@@ -35,13 +38,14 @@ class DetTest(unittest.TestCase):
print('PRNU Test:')
sigma = 0.01
seed = 20210911
prnuimg = Effects.PRNU_Img(width, height, sigma=sigma, seed=seed)
prnuimg = effects.PRNU_Img(width, height, sigma=sigma, seed=seed)
meanval, stdval = np.mean(prnuimg.array), np.std(prnuimg.array)
print(' Mean & STDDEV of PRNU image are %6.4f & %6.4f.' % (meanval, stdval))
print(' Mean & STDDEV of PRNU image are %6.4f & %6.4f.' %
(meanval, stdval))
print(' PRNU Image Array:')
print(' ',prnuimg.array)
print(' ', prnuimg.array)
self.assertTrue(np.abs(meanval-1) < 1e-6)
self.assertTrue(np.abs(stdval-sigma) < 0.002)
print('\nUnit test for PRNU has been passed.')
del prnuimg
......@@ -50,15 +54,17 @@ class DetTest(unittest.TestCase):
Test add dark current to image. Expected result: an image with dark current 3.4 e- and noise=1.844 e-.
rng_poisson = galsim.BaseDeviate(20210911)
dark_noise = galsim.DeviateNoise(galsim.PoissonDeviate(rng_poisson, 0.02*(150+0.5*40)))
img = galsim.Image(200,200,dtype=np.float32, init_value=0)
print('Initial Mean & STD = %6.3f & %6.3f' % (np.mean(img.array), np.std(img.array)))
dark_noise = galsim.DeviateNoise(
galsim.PoissonDeviate(rng_poisson, 0.02*(150+0.5*40)))
img = galsim.Image(200, 200, dtype=np.float32, init_value=0)
print('Initial Mean & STD = %6.3f & %6.3f' %
(np.mean(img.array), np.std(img.array)))
meanval = np.mean(img.array)
stdval = np.std(img.array)
print('Dark added Mean & STD = %6.3f & %6.3f' % (meanval, stdval))
self.assertTrue(np.abs(meanval-3.4) < 0.05)
self.assertTrue(np.abs(stdval-1.844) < 0.02)
print('\nUnit test for dark current has been passed.')
del img
......@@ -66,149 +72,161 @@ class DetTest(unittest.TestCase):
Test saturation and bleeding. Expected result: an image with bleeding effect.
img = galsim.Image(500,500,dtype=np.float32)
star = galsim.Gaussian(flux=60e5,fwhm=3)
img = star.drawImage(image=img,center=(150,200))
img = galsim.Image(500, 500, dtype=np.float32)
star = galsim.Gaussian(flux=60e5, fwhm=3)
img = star.drawImage(image=img, center=(150, 200))
# gal = galsim.Sersic(n=1, half_light_radius=3,flux=50e5)
# img = gal.drawImage(image=img,center=(350,300))
# plt.imshow(img.array)
filename1 = os.path.join(self.outDataFn,'test_satu_initimg.fits')
filename1 = os.path.join(self.outDataFn, 'test_satu_initimg.fits')
newimg = Effects.SaturBloom(img, fullwell=9e4)
newimg = effects.SaturBloom(img, fullwell=9e4)
# plt.imshow(newimg.array)
filename2 = os.path.join(self.outDataFn,'test_satu_bleedimg.fits')
filename2 = os.path.join(self.outDataFn, 'test_satu_bleedimg.fits')
del img,newimg, star
del img, newimg, star
def test_nonlinear(self):
Test non-linear effect. Expected result: an image with non-linearity effect.
imgarr = np.arange(1,9e4,4).reshape((150,150))
imgarr = np.arange(1, 9e4, 4).reshape((150, 150))
img = galsim.Image(copy.deepcopy(imgarr))
filename1 = os.path.join(self.outDataFn,'test_nonlinear_initimg.fits')
filename1 = os.path.join(self.outDataFn, 'test_nonlinear_initimg.fits')
newimg = Effects.NonLinearity(img, beta1=5E-7, beta2=0)
filename2 = os.path.join(self.outDataFn,'test_nonlinear_finalimg.fits')
newimg = effects.NonLinearity(img, beta1=5E-7, beta2=0)
filename2 = os.path.join(
self.outDataFn, 'test_nonlinear_finalimg.fits')
plt.scatter(imgarr.flatten(), newimg.array.flatten(), s=2, alpha=0.5)
plt.plot([-1e3,9e4],[-1e3,9e4],color='black', lw=1, ls='--')
plt.plot([-1e3, 9e4], [-1e3, 9e4], color='black', lw=1, ls='--')
plt.xlabel('input (e-)')
plt.ylabel('output (e-)')
plt.savefig(os.path.join(self.outDataFn,'test_nonlinearity.png'), dpi=200)
'test_nonlinearity.png'), dpi=200)
del img,newimg,imgarr
del img, newimg, imgarr
def test_badpixel_HtrDtr(self):
img = galsim.Image(500,500,init_value=1000)
img = galsim.Image(500, 500, init_value=1000)
rgbadpix = Generator(PCG64(20210911))
badfraction = 5E-5*(rgbadpix.random()*0.5+0.7)
img = Effects.DefectivePixels(img, IfHotPix=True, IfDeadPix=True, fraction=badfraction, seed=20210911, biaslevel=0)
img = effects.DefectivePixels(
img, IfHotPix=True, IfDeadPix=True, fraction=badfraction, seed=20210911, biaslevel=0)
img.write(os.path.join(self.outDataFn, 'test_badpixel_HtrDtr.fits'))
del img
def test_badpixel_HfsDtr(self):
img = galsim.Image(500,500,init_value=1000)
img = galsim.Image(500, 500, init_value=1000)
rgbadpix = Generator(PCG64(20210911))
badfraction = 5E-5*(rgbadpix.random()*0.5+0.7)
img = Effects.DefectivePixels(img, IfHotPix=False, IfDeadPix=True, fraction=badfraction, seed=20210911, biaslevel=0)
img = effects.DefectivePixels(
img, IfHotPix=False, IfDeadPix=True, fraction=badfraction, seed=20210911, biaslevel=0)
img.write(os.path.join(self.outDataFn, 'test_badpixel_HfsDtr.fits'))
del img
def test_badpixel_HtrDfs(self):
img = galsim.Image(500,500,init_value=1000)
img = galsim.Image(500, 500, init_value=1000)
rgbadpix = Generator(PCG64(20210911))
badfraction = 5E-5*(rgbadpix.random()*0.5+0.7)
img = Effects.DefectivePixels(img, IfHotPix=True, IfDeadPix=False, fraction=badfraction, seed=20210911, biaslevel=0)
img = effects.DefectivePixels(
img, IfHotPix=True, IfDeadPix=False, fraction=badfraction, seed=20210911, biaslevel=0)
img.write(os.path.join(self.outDataFn, 'test_badpixel_HtrDfs.fits'))
del img
def test_badpixel_HfsDfs(self):
img = galsim.Image(500,500,init_value=1000)
img = galsim.Image(500, 500, init_value=1000)
rgbadpix = Generator(PCG64(20210911))
badfraction = 5E-5*(rgbadpix.random()*0.5+0.7)
img = Effects.DefectivePixels(img, IfHotPix=False, IfDeadPix=False, fraction=badfraction, seed=20210911, biaslevel=0)
img = effects.DefectivePixels(
img, IfHotPix=False, IfDeadPix=False, fraction=badfraction, seed=20210911, biaslevel=0)
img.write(os.path.join(self.outDataFn, 'test_badpixel_HfsDfs.fits'))
del img
def test_badlines(self):
img = galsim.Image(500,500,init_value=-1000)
img = galsim.Image(500, 500, init_value=-1000)
newimg = Effects.BadColumns(copy.deepcopy(img), seed=20210911)
del newimg,img
newimg = effects.BadColumns(copy.deepcopy(img), seed=20210911)
newimg.write(os.path.join(self.outDataFn, 'test_badlines.fits'))
del newimg, img
# def test_cte(self):
# img = galsim.Image(200,200,init_value=1000)
# img.array[50,80] = 1e4
# img.array[150,150] = 3e4
# newimgcol = Effects.CTE_Effect(copy.deepcopy(img),direction='column')
# newimgrow = Effects.CTE_Effect(copy.deepcopy(img),direction='row')
# newimgcol = effects.CTE_Effect(copy.deepcopy(img),direction='column')
# newimgrow = effects.CTE_Effect(copy.deepcopy(img),direction='row')
# newimgcol.write(os.path.join(self.outDataFn,'test_ctecol.fits'))
# newimgrow.write(os.path.join(self.outDataFn,'test_cterow.fits'))
# del img,newimgcol,newimgrow
def test_readnoise(self):
img = galsim.Image(200,200,init_value=1000)
img = galsim.Image(200, 200, init_value=1000)
seed = 20210911
rng_readout = galsim.BaseDeviate(seed)
readout_noise = galsim.GaussianNoise(rng=rng_readout, sigma=5)
img.write(os.path.join(self.outDataFn, 'test_readnoise.fits'))
stdval = np.std(img.array)
self.assertTrue(np.abs(stdval-5) < 0.01*5)
print('\nUnit test for readout noise has been passed.')
del img
def test_addbias(self):
img = galsim.Image(200,200,init_value=0)
img = Effects.AddBiasNonUniform16(img,bias_level=500, nsecy = 2, nsecx=8,seed=20210911)
img = galsim.Image(200, 200, init_value=0)
img = effects.AddBiasNonUniform16(
img, bias_level=500, nsecy=2, nsecx=8, seed=20210911)
del img
def test_apply16gains(self):
img = galsim.Image(500,500,init_value=100)
img,_ = Effects.ApplyGainNonUniform16(img, gain=1.5, nsecy=2, nsecx=8, seed=202102)
img = galsim.Image(500, 500, init_value=100)
img, _ = effects.ApplyGainNonUniform16(
img, gain=1.5, nsecy=2, nsecx=8, seed=202102)
img.write(os.path.join(self.outDataFn, 'test_apply16gains.fits'))
rightedge = int(500/8)*8
print('gain=%6.2f' % 1.5)
meanimg = np.mean(img.array[:,:rightedge])
sigmaimg = np.std(img.array[:,:rightedge])
print('mean, sigma = %6.2f, %6.2f' % (meanimg,sigmaimg))
meanimg = np.mean(img.array[:, :rightedge])
sigmaimg = np.std(img.array[:, :rightedge])
print('mean, sigma = %6.2f, %6.2f' % (meanimg, sigmaimg))
self.assertTrue(np.abs(meanimg-100/1.5) < 1)
self.assertTrue(np.abs(sigmaimg/meanimg-0.01) < 0.001)
print('\nUnit test for applying 16 channel gains has been passed.')
del img
def test_cosmicray(self):
attachedSizes = np.loadtxt(os.path.join(self.datafn,'wfc-cr-attachpixel.dat'))
cr_map,_ = Effects.produceCR_Map(
xLen=500, yLen=500, exTime=150+0.5*40,
gain=1, attachedSizes=attachedSizes, seed=20210911)
attachedSizes = np.loadtxt(os.path.join(
self.datafn, 'wfc-cr-attachpixel.dat'))
cr_map, _ = effects.produceCR_Map(
xLen=500, yLen=500, exTime=150+0.5*40,
gain=1, attachedSizes=attachedSizes, seed=20210911)
crimg = galsim.Image(cr_map)
del cr_map,crimg
crimg.write(os.path.join(self.outDataFn, 'test_cosmicray.fits'))
del cr_map, crimg
def test_shutter(self):
img = galsim.Image(5000,5000,init_value=1000)
shuttimg = Effects.ShutterEffectArr(img, t_exp=150, t_shutter=1.3, dist_bearing=735, dt=1E-3) # shutter effect normalized image for this chip
img = galsim.Image(5000, 5000, init_value=1000)
# shutter effect normalized image for this chip
shuttimg = effects.ShutterEffectArr(
img, t_exp=150, t_shutter=1.3, dist_bearing=735, dt=1E-3)
img *= shuttimg
img.write(os.path.join(self.outDataFn, 'test_shutter.fits'))
del img
def test_vignette(self):
img = galsim.Image(2000,2000,init_value=1000)
img = galsim.Image(2000, 2000, init_value=1000)
# # img.bounds = galsim.BoundsI(1, width, 1, height)
flat_img = Effects.MakeFlatSmooth(img.bounds,20210911)
img.setOrigin(10000, 10000)
flat_img = effects.MakeFlatSmooth(img.bounds, 20210911)
flat_normal = flat_img / np.mean(flat_img.array)
del flat_img,img,flat_normal
flat_normal.write(os.path.join(self.outDataFn, 'test_vignette.fits'))
del flat_img, img, flat_normal
if __name__ == '__main__':
\ No newline at end of file
import unittest
import os
import galsim
from ObservationSim.Instrument import FocalPlane, Chip
from observation_sim.instruments import FocalPlane, Chip
class TestFocalPlane(unittest.TestCase):
......@@ -15,12 +15,12 @@ import copy
from astropy.cosmology import FlatLambdaCDM
from astropy import constants
from astropy import units as U
from ObservationSim.MockObject._util import getObservedSED
from ObservationSim.MockObject import CatalogBase, Galaxy
from observation_sim.mock_objects._util import getObservedSED
from observation_sim.mock_objects import CatalogBase, Galaxy
from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
from ObservationSim.PSF.PSFInterp import PSFInterp
from ObservationSim.MockObject._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG
from observation_sim.instruments import Chip, Filter, FilterParam, FocalPlane
from observation_sim.PSF.PSFInterp import PSFInterp
from observation_sim.mock_objects._util import integrate_sed_bandpass, getNormFactorForSpecWithABMAG, getABMAG
class Catalog(CatalogBase):
......@@ -6,8 +6,7 @@ import numpy as np
import galsim
import yaml
from ObservationSim.Instrument import Chip, Filter, FilterParam, FocalPlane
#from ObservationSim.Instrument.Chip import ChipUtils as chip_utils
from observation_sim.instruments import Chip, Filter, FilterParam, FocalPlane
### test FUNCTION --- START ###
def AddPreScan(GSImage, pre1=27, pre2=4, over1=71, over2=80, nsecy = 2, nsecx=8):
......@@ -14,17 +14,19 @@ chip_filename = 'chip_definition.json'
# "npix_y": 7680,
# "x_cen": -273.35, # [mm]
# "y_cen": 211.36, # [mm]
# "rotate_angle": 90. # [deg]
# "rotate_angle": 90. # [deg]
# }
# chip_list[chip_id] = chip_dict
def get_chip_row_col_main_fp(chip_id):
rowID = ((chip_id - 1) % 5) + 1
colID = 6 - ((chip_id - 1) // 5)
return rowID, colID
def get_chip_center_main_fp(chip_id, pixel_size=1e-2):
row, col = get_chip_row_col_main_fp(chip_id)
npix_x = 9216
npix_y = 9232
......@@ -39,16 +41,20 @@ def get_chip_center_main_fp(chip_id, pixel_size=1e-2):
xcen = (npix_x//2 + gx1//2) * xrem - (gx2-gx1)
if chip_id <= 5 or chip_id == 10:
xcen = (npix_x//2 + gx1//2) * xrem + (gx2-gx1)
# ylim of a given CCD chip
yrem = (row - 1) - nchip_y // 2
ycen = (npix_y + gy) * yrem
return xcen * pixel_size, ycen * pixel_size
def create_chip_dict_main_fp(chip_id, pixel_size=1e-2):
filter_list = ["GV", "GI", "y", "z", "y", "GI", "GU", "r", "u", "NUV", "i", "GV", "GU", "g", "NUV", "NUV", "g", "GU", "GV", "i", "NUV", "u", "r", "GU", "GI", "y", "z", "y", "GI", "GV"]
chip_label_list = [3,3,3,1,1,1,3,2,2,1,1,1,4,2,3,2,1,1,4,2,4,1,1,2,4,2,2,4,2,2]
chip_id_list = [26, 21, 16, 11, 6, 1, 27, 22, 17, 12, 7, 2, 28, 23, 18, 13, 8, 3, 29, 24, 19, 14, 9, 4, 30, 25, 20, 15, 10, 5]
filter_list = ["GV", "GI", "y", "z", "y", "GI", "GU", "r", "u", "NUV", "i", "GV", "GU", "g",
"NUV", "NUV", "g", "GU", "GV", "i", "NUV", "u", "r", "GU", "GI", "y", "z", "y", "GI", "GV"]
chip_label_list = [3, 3, 3, 1, 1, 1, 3, 2, 2, 1, 1, 1,
4, 2, 3, 2, 1, 1, 4, 2, 4, 1, 1, 2, 4, 2, 2, 4, 2, 2]
chip_id_list = [26, 21, 16, 11, 6, 1, 27, 22, 17, 12, 7, 2, 28,
23, 18, 13, 8, 3, 29, 24, 19, 14, 9, 4, 30, 25, 20, 15, 10, 5]
npix_x = 9216
npix_y = 9232
idx = chip_id_list.index(chip_id)
......@@ -63,10 +69,10 @@ def create_chip_dict_main_fp(chip_id, pixel_size=1e-2):
chip_dict = {
"chip_name": chip_name,
"pix_size": 1e-2, # [mm]
"pix_scale": 0.074, # [arcsec/pix]
"pix_scale": 0.074, # [arcsec/pix]
"npix_x": npix_x,
"npix_y": npix_y,
"x_cen": xcen, # [mm]
"x_cen": xcen, # [mm]
"y_cen": ycen, # [mm]
"rotate_angle": rotate_angle, # [deg]
"n_psf_samples": 900,
......@@ -80,6 +86,7 @@ def create_chip_dict_main_fp(chip_id, pixel_size=1e-2):
return chip_dict
def set_fgs_chips(filepath):
with open(filepath, "r") as f:
data = json.load(f)
......@@ -94,7 +101,7 @@ def set_fgs_chips(filepath):
data[chip_id]["full_well"] = 90000
with open(filepath, "w") as f:
json.dump(data, f, indent=4)
def add_main_fp(filepath):
for i in range(30):
......@@ -102,6 +109,7 @@ def add_main_fp(filepath):
chip_dict = create_chip_dict_main_fp(chip_id)
add_dict_to_json(filepath, str(chip_id), chip_dict)
def add_dict_to_json(filepath, key, value):
with open(filepath, 'r') as f:
data = json.load(f)
......@@ -109,8 +117,9 @@ def add_dict_to_json(filepath, key, value):
with open(filepath, "w") as f:
json.dump(data, f, indent=4)
if __name__=="__main__":
src = "../ObservationSim/Instrument/data/ccd/chip_definition.json"
if __name__ == "__main__":
src = "../observation_sim/instruments/data/ccd/chip_definition.json"
shutil.copy(src, chip_filename)
\ No newline at end of file
import os
import numpy as np
import ObservationSim.PSF.PSFInterp as PSFInterp
from ObservationSim.Instrument import Chip, Filter, FilterParam
import observation_sim.PSF.PSFInterp as PSFInterp
from observation_sim.instruments import Chip, Filter, FilterParam
import yaml
import galsim
import as fitsio
# Setup PATH
SIMPATH = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C8/testRun_FGS"
config_filename= SIMPATH+"/config_C6_fits.yaml"
cat_filename = SIMPATH+"/MSC_00000000/"
config_filename = SIMPATH+"/config_C6_fits.yaml"
cat_filename = SIMPATH+"/MSC_00000000/"
# Read cat file
catFn = open(cat_filename,"r")
catFn = open(cat_filename, "r")
line = catFn.readline()
print(cat_filename, '\n', line)
imgPos = []
chipID = -1
for line in catFn:
line = line.strip()
columns = line.split()
if chipID == -1:
chipID = int(columns[1])
......@@ -37,41 +37,46 @@ with open(config_filename, "r") as stream:
config = yaml.safe_load(stream)
for key, value in config.items():
print (key + " : " + str(value))
print(key + " : " + str(value))
except yaml.YAMLError as exc:
# Setup Chip
chip = Chip(chipID=chipID, config=config)
print('chip.bound::', chip.bound.xmin, chip.bound.xmax, chip.bound.ymin, chip.bound.ymax)
print('chip.bound::', chip.bound.xmin, chip.bound.xmax,
chip.bound.ymin, chip.bound.ymax)
for iobj in range(nobj):
print("\nget psf for iobj-", iobj, '\t', 'bandpass:', end=" ", flush=True)
# Setup Position on focalplane
x, y = imgPos[iobj, :] # try get the PSF at some location (1234, 1234) on the chip
# try get the PSF at some location (1234, 1234) on the chip
x, y = imgPos[iobj, :]
x = x+chip.bound.xmin
y = y+chip.bound.ymin
pos_img = galsim.PositionD(x, y)
# Setup sub-bandpass
# (There are 4 sub-bandpasses for each PSF sample)
filter_param = FilterParam()
filter_id, filter_type = chip.getChipFilter()
filt = Filter(
bandpass_list = filt.bandpass_sub_list
for i in range(len(bandpass_list)):
print(i, end=" ", flush=True)
bandpass = bandpass_list[i] # say you want to access the PSF for the sub-bandpass at the blue end for that chip
# say you want to access the PSF for the sub-bandpass at the blue end for that chip
bandpass = bandpass_list[i]
# Get corresponding PSF model
psf_model = PSFInterp(chip=chip, npsf=100, PSF_data_file=config["psf_setting"]["psf_dir"])
psf = psf_model.get_PSF(chip=chip, pos_img=pos_img, bandpass=bandpass, galsimGSObject=False)
psf_model = PSFInterp(chip=chip, npsf=100,
psf = psf_model.get_PSF(
chip=chip, pos_img=pos_img, bandpass=bandpass, galsimGSObject=False)
if True:
fn = "psf_{:}.{:}.{:}.fits".format(chipID, iobj, i)
if fn != None:
......@@ -81,6 +86,3 @@ for iobj in range(nobj): = psf
hdu.header.set('pixScale', 5)
......@@ -26,7 +26,8 @@ import galsim
def test_fits(nfits=100, dir_cat=None):
for ifits in range(nfits):
gal = galsim.Gaussian(sigma=np.random.uniform(0.2, 0.3)).shear(g1=np.random.uniform(-0.5, 0.5), g2=np.random.uniform(-0.5, 0.5))
gal = galsim.Gaussian(sigma=np.random.uniform(0.2, 0.3)).shear(
g1=np.random.uniform(-0.5, 0.5), g2=np.random.uniform(-0.5, 0.5))
arr = gal.drawImage(nx=64, ny=64, scale=0.074).array
hdu = fitsio.PrimaryHDU()
......@@ -38,41 +39,44 @@ def test_fits(nfits=100, dir_cat=None):
hdu.header.set('mag_g', 22+np.random.uniform(-1, 1))
hdu.header.set('pixScale', 0.074)
fout = dir_cat+"stampCats/testStamp_{:}.fits".format(ifits)
if os.path.exists(fout):
def write_StampsIndex(dir_cat=None, DEBUG=False):
NSIDE = 128
fp = h5py.File(dir_cat+'stampCatsIndex.hdf5', 'w')
grp1 = fp.create_group('Stamps')
dataSet_Size = np.zeros(healpy.nside2npix(NSIDE), dtype=np.int64)
fitsList = os.listdir(dir_cat+'stampCats/') #获取fits文件列表
fitsList = os.listdir(dir_cat+'stampCats/') # 获取fits文件列表
for istamp in range(len(fitsList)):
print(istamp, ': ', fitsList[istamp], end='\r')"stampCats/"+fitsList[istamp])
hdu ="stampCats/"+fitsList[istamp])
tra = hdu[0].header['RA']
tdec= hdu[0].header['DEC']
tdec = hdu[0].header['DEC']
healpixID= healpy.ang2pix(NSIDE, tra, tdec, nest=False, lonlat=True)
healpixID = healpy.ang2pix(NSIDE, tra, tdec, nest=False, lonlat=True)
if not(str(healpixID) in grp1):
if not (str(healpixID) in grp1):
grp2 = grp1.create_group(str(healpixID))
grp2 = grp1[str(healpixID)]
if not('ra' in grp2):
dset_ra = grp2.create_dataset('ra', (0,), dtype='f16' , maxshape=(MAXNUMBERINDEX, ))
dset_dec= grp2.create_dataset('dec', (0,), dtype='f16', maxshape=(MAXNUMBERINDEX, ))
if not ('ra' in grp2):
dset_ra = grp2.create_dataset(
'ra', (0,), dtype='f16', maxshape=(MAXNUMBERINDEX, ))
dset_dec = grp2.create_dataset(
'dec', (0,), dtype='f16', maxshape=(MAXNUMBERINDEX, ))
dt = h5py.special_dtype(vlen=str)
dset_fn = grp2.create_dataset('filename', (0,), dtype=dt, maxshape=(MAXNUMBERINDEX, ))
dset_fn = grp2.create_dataset(
'filename', (0,), dtype=dt, maxshape=(MAXNUMBERINDEX, ))
dset_ra = grp2['ra']
dset_ra = grp2['ra']
dset_dec = grp2['dec']
dset_fn = grp2['filename']
......@@ -82,13 +86,13 @@ def write_StampsIndex(dir_cat=None, DEBUG=False):
dset_ra[dataSet_Size[healpixID]-1] = tra
dset_dec[dataSet_Size[healpixID]-1]= tdec
dset_fn[dataSet_Size[healpixID]-1]= fitsList[istamp]
dset_dec[dataSet_Size[healpixID]-1] = tdec
dset_fn[dataSet_Size[healpixID]-1] = fitsList[istamp]
ff = h5py.File(dir_cat+"stampCatsIndex.hdf5","r")
ff = h5py.File(dir_cat+"stampCatsIndex.hdf5", "r")
ss = 0
for kk in ff['Stamps'].keys():
print(kk, ff['Stamps'][kk]['ra'].size)
......@@ -98,6 +102,5 @@ def write_StampsIndex(dir_cat=None, DEBUG=False):
if __name__ == '__main__':
dir_temp = "./Catalog_test/"
# test_fits(dir_cat=dir_temp)
# NOTE: This is a stand-alone function, meaning that you do not need
# to install the entire CSST image simulation pipeline.
# NOTE: This is a stand-alone function, meaning that you do not need
# to install the entire CSST image simulation pipeline.
# For a given object's coordinate (Ra, Dec), the function will predict
# the object's image position and corresponding filter in the focal plane
# For a given object's coordinate (Ra, Dec), the function will predict
# the object's image position and corresponding filter in the focal plane
# under a specified CSST pointing centered at (rap, decp).
import galsim
import numpy as np
import argparse
import matplotlib.pyplot as plt
import os, sys
import os
import sys
def focalPlaneInf(ra_target, dec_target, ra_point, dec_point, image_rot=-113.4333, figout="zTargetOnCCD.pdf"):
......@@ -37,60 +39,71 @@ def focalPlaneInf(ra_target, dec_target, ra_point, dec_point, image_rot=-113.433
or type >> python ra_target dec_target ra_point dec_point -image_rot=floatNum
or type >> python ra_target dec_target ra_point dec_point -image_rot=floatNum -figout=FigureName
print("^_^ Input target coordinate: [Ra, Dec] = [%10.6f, %10.6f]"%(ra_target,dec_target))
print("^_^ Input telescope pointing center: [Ra, Dec] = [%10.6f, %10.6f]"%(ra_point,dec_point))
print("^_^ Input camera orientation: %12.6f degree(s)"%image_rot)
print("^_^ Input target coordinate: [Ra, Dec] = [%10.6f, %10.6f]" % (
ra_target, dec_target))
print("^_^ Input telescope pointing center: [Ra, Dec] = [%10.6f, %10.6f]" % (
ra_point, dec_point))
print("^_^ Input camera orientation: %12.6f degree(s)" % image_rot)
print(" ")
# load ccd parameters
xsize, ysize, xchip, ychip, xgap, ygap, xnchip, ynchip = ccdParam()
print("^_^ Pixel range of focal plane: x = [%5d, %5d], y = [%5d, %5d]"%(-xsize/2,xsize/2,-ysize/2,ysize/2))
# wcs
wcs = getTanWCS(ra_point, dec_point, image_rot, pix_scale=0.074)
skyObj = galsim.CelestialCoord(ra=ra_target*galsim.degrees,dec=dec_target*galsim.degrees)
pixObj = wcs.toImage(skyObj)
print("^_^ Pixel range of focal plane: x = [%5d, %5d], y = [%5d, %5d]" % (
-xsize/2, xsize/2, -ysize/2, ysize/2))
# wcs
wcs = getTanWCS(ra_point, dec_point, image_rot, pix_scale=0.074)
skyObj = galsim.CelestialCoord(
ra=ra_target*galsim.degrees, dec=dec_target*galsim.degrees)
pixObj = wcs.toImage(skyObj)
xpixObj = pixObj.x
ypixObj = pixObj.y
print("^_^ Image position of target: [xImage, yImage] = [%9.3f, %9.3f]"%(xpixObj,ypixObj))
print("^_^ Image position of target: [xImage, yImage] = [%9.3f, %9.3f]" % (
xpixObj, ypixObj))
# first determine if the target is in the focal plane
xin = (xpixObj+xsize/2)*(xpixObj-xsize/2)
yin = (ypixObj+ysize/2)*(ypixObj-ysize/2)
if xin>0 or yin>0: raise ValueError("!!! Input target is out of the focal plane")
if xin > 0 or yin > 0:
raise ValueError("!!! Input target is out of the focal plane")
# second determine the location of the target
trigger = False
for i in range(30):
ichip = i+1
ischip = str("0%d"%ichip)[-2:]
fId, fType = getChipFilter(ichip)
ichip = i+1
ischip = str("0%d" % ichip)[-2:]
fId, fType = getChipFilter(ichip)
ix0, ix1, iy0, iy1 = getChipLim(ichip)
ixin = (xpixObj-ix0)*(xpixObj-ix1)
iyin = (ypixObj-iy0)*(ypixObj-iy1)
if ixin<=0 and iyin<=0:
ixin = (xpixObj-ix0)*(xpixObj-ix1)
iyin = (ypixObj-iy0)*(ypixObj-iy1)
if ixin <= 0 and iyin <= 0:
trigger = True
idx = xpixObj - ix0
idy = ypixObj - iy0
idx = xpixObj - ix0
idy = ypixObj - iy0
print(" ---------------------------------------------")
print(" ** Target locates in CHIP#%s with filter %s **"%(ischip,fType))
print(" ** Target position in the chip: [x, y] = [%7.2f, %7.2f]"%(idx, idy))
print(" ** Target locates in CHIP#%s with filter %s **" %
(ischip, fType))
" ** Target position in the chip: [x, y] = [%7.2f, %7.2f]" % (idx, idy))
print(" ---------------------------------------------")
if not trigger: print("^|^ Target locates in CCD gap")
if not trigger:
print("^|^ Target locates in CCD gap")
# show the figure
print(" Target on CCD layout is saved into %s"%figout)
print(" Target on CCD layout is saved into %s" % figout)
ccdLayout(xpixObj, ypixObj, figout=figout)
def ccdParam():
xt, yt = 59516, 49752
x0, y0 = 9216, 9232
xgap, ygap = (534,1309), 898
xgap, ygap = (534, 1309), 898
xnchip, ynchip = 6, 5
ccdSize = xt, yt, x0, y0, xgap, ygap, xnchip, ynchip
return ccdSize
def getTanWCS(ra, dec, img_rot, pix_scale=0.074):
Get the WCS of the image mosaic using Gnomonic/TAN projection
......@@ -105,40 +118,53 @@ def getTanWCS(ra, dec, img_rot, pix_scale=0.074):
WCS of the focal plane
xcen, ycen = 0, 0
img_rot = img_rot * galsim.degrees
dudx = -np.cos(img_rot.rad) * pix_scale
dudy = -np.sin(img_rot.rad) * pix_scale
dvdx = -np.sin(img_rot.rad) * pix_scale
dvdy = +np.cos(img_rot.rad) * pix_scale
moscen = galsim.PositionD(x=xcen, y=ycen)
sky_center = galsim.CelestialCoord(ra=ra*galsim.degrees, dec=dec*galsim.degrees)
affine = galsim.AffineTransform(dudx, dudy, dvdx, dvdy, origin=moscen)
WCS = galsim.TanWCS(affine, sky_center, units=galsim.arcsec)
img_rot = img_rot * galsim.degrees
dudx = -np.cos(img_rot.rad) * pix_scale
dudy = -np.sin(img_rot.rad) * pix_scale
dvdx = -np.sin(img_rot.rad) * pix_scale
dvdy = +np.cos(img_rot.rad) * pix_scale
moscen = galsim.PositionD(x=xcen, y=ycen)
sky_center = galsim.CelestialCoord(
ra=ra*galsim.degrees, dec=dec*galsim.degrees)
affine = galsim.AffineTransform(dudx, dudy, dvdx, dvdy, origin=moscen)
WCS = galsim.TanWCS(affine, sky_center, units=galsim.arcsec)
return WCS
def getChipFilter(chipID):
Return the filter index and type for a given chip #(chipID)
filter_type_list = ["nuv","u", "g", "r", "i","z","y","GU", "GV", "GI"]
filter_type_list = ["nuv", "u", "g", "r", "i", "z", "y", "GU", "GV", "GI"]
# TODO: maybe a more elegent way other than hard coded?
# e.g. use something like a nested dict:
if chipID in [6, 15, 16, 25]: filter_type = "y"
if chipID in [11, 20]: filter_type = "z"
if chipID in [7, 24]: filter_type = "i"
if chipID in [14, 17]: filter_type = "u"
if chipID in [9, 22]: filter_type = "r"
if chipID in [12, 13, 18, 19]: filter_type = "nuv"
if chipID in [8, 23]: filter_type = "g"
if chipID in [1, 10, 21, 30]: filter_type = "GI"
if chipID in [2, 5, 26, 29]: filter_type = "GV"
if chipID in [3, 4, 27, 28]: filter_type = "GU"
if chipID in [6, 15, 16, 25]:
filter_type = "y"
if chipID in [11, 20]:
filter_type = "z"
if chipID in [7, 24]:
filter_type = "i"
if chipID in [14, 17]:
filter_type = "u"
if chipID in [9, 22]:
filter_type = "r"
if chipID in [12, 13, 18, 19]:
filter_type = "nuv"
if chipID in [8, 23]:
filter_type = "g"
if chipID in [1, 10, 21, 30]:
filter_type = "GI"
if chipID in [2, 5, 26, 29]:
filter_type = "GV"
if chipID in [3, 4, 27, 28]:
filter_type = "GU"
filter_id = filter_type_list.index(filter_type)
return filter_id, filter_type
def getChipLim(chipID):
Calculate the edges in pixel for a given CCD chip on the focal plane
......@@ -173,20 +199,22 @@ def getChipLim(chipID):
return nx0-1, nx1-1, ny0-1, ny1-1
def ccdLayout(xpixTar, ypixTar, figout="ccdLayout.pdf"):
fig = plt.figure(figsize=(10.0,8.0))
ax = fig.add_axes([0.1,0.1,0.80,0.80])
fig = plt.figure(figsize=(10.0, 8.0))
ax = fig.add_axes([0.1, 0.1, 0.80, 0.80])
# plot the layout of the ccd distribution
for i in range(30):
ichip = i+1
fId, fType = getChipFilter(ichip)
ischip = str("0%d"%ichip)[-2:]
ischip = str("0%d" % ichip)[-2:]
ix0, ix1, iy0, iy1 = getChipLim(ichip)
ax.plot([ix0,ix1],[iy0,iy0],"k-", linewidth=2.5)
ax.plot([ix0,ix1],[iy1,iy1],"k-", linewidth=2.5)
ax.plot([ix0,ix0],[iy0,iy1],"k-", linewidth=2.5)
ax.plot([ix1,ix1],[iy0,iy1],"k-", linewidth=2.5)
ax.text(ix0+500,iy0+1500,"%s#%s"%(fType, ischip), fontsize=12, color="grey")
ax.plot([ix0, ix1], [iy0, iy0], "k-", linewidth=2.5)
ax.plot([ix0, ix1], [iy1, iy1], "k-", linewidth=2.5)
ax.plot([ix0, ix0], [iy0, iy1], "k-", linewidth=2.5)
ax.plot([ix1, ix1], [iy0, iy1], "k-", linewidth=2.5)
ax.text(ix0+500, iy0+1500, "%s#%s" %
(fType, ischip), fontsize=12, color="grey")
ax.plot(xpixTar, ypixTar, "r*", ms=12)
ax.set_xlabel("$X\,[\mathrm{pixels}]$", fontsize=20)
ax.set_ylabel("$Y\,[\mathrm{pixels}]$", fontsize=20)
......@@ -194,6 +222,7 @@ def ccdLayout(xpixTar, ypixTar, figout="ccdLayout.pdf"):
def parseArguments():
# Create argument parser
parser = argparse.ArgumentParser()
......@@ -203,7 +232,7 @@ def parseArguments():
parser.add_argument("dec_target", type=float)
parser.add_argument("ra_point", type=float)
parser.add_argument("dec_point", type=float)
# Optional arguments
parser.add_argument("-image_rot", type=float, default=-113.4333)
parser.add_argument("-figout", type=str, default="zTargetOnCCD.pdf")
......@@ -213,10 +242,11 @@ def parseArguments():
return args
if __name__ == "__main__":
# Parse the arguments
args = parseArguments()
# Run function
focalPlaneInf(args.ra_target, args.dec_target, args.ra_point, args.dec_point, args.image_rot, args.figout)
focalPlaneInf(args.ra_target, args.dec_target, args.ra_point,
args.dec_point, args.image_rot, args.figout)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment