Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-sims
csst_ifs_sim
Commits
41b0c78a
Commit
41b0c78a
authored
May 11, 2024
by
Yan Zhaojun
Browse files
debug
parent
e6a27c29
Changes
1
Hide whitespace changes
Inline
Side-by-side
csst_ifs_sim/csst_ifs_sim.py
View file @
41b0c78a
...
...
@@ -34,10 +34,10 @@ import astropy.coordinates as coord
import
ctypes
import
sys
from
csst_ifs_sim.support
import
IFSinstrumentModel
from
csst_ifs_sim.support
import
cosmicrays
from
csst_ifs_sim.support
import
logger
as
lg
from
csst_ifs_sim.CTI
import
CTI
#
from csst_ifs_sim.support import IFSinstrumentModel
#
from csst_ifs_sim.support import cosmicrays
#
from csst_ifs_sim.support import logger as lg
#
from csst_ifs_sim.CTI import CTI
sys
.
path
.
append
(
'./csst_ifs_sim'
)
conf
.
auto_max_age
=
None
...
...
@@ -90,9 +90,531 @@ Note:: This class is Python 3 compatible.
2024.5.10 ---updata and correct the bug of frame transfer effect simulation
"""
#### functions definition #####
######################## functions definition ################################
"""
Charge Transfer Inefficiency
============================
This file contains a simple class to run a CDM03 CTI model developed by Alex Short (ESA).
This now contains both the official CDM03 and a new version that allows different trap
parameters in parallel and serial direction.
:requires: NumPy
:requires: CDM03 (FORTRAN code, f2py -c -m cdm03bidir cdm03bidir.f90)
"""
#CDM03bidir
class
CDM03bidir
():
"""
Class to run CDM03 CTI model, class Fortran routine to perform the actual CDM03 calculations.
:param settings: input parameters
:type settings: dict
:param data: input data to be radiated
:type data: ndarray
:param log: instance to Python logging
:type log: logging instance
"""
def
__init__
(
self
,
settings
,
data
,
log
=
None
):
"""
Class constructor.
:param settings: input parameters
:type settings: dict
:param data: input data to be radiated
:type data: ndarray
:param log: instance to Python logging
:type log: logging instance
"""
self
.
data
=
data
self
.
values
=
dict
(
quads
=
(
0
,
1
,
2
,
3
),
xsize
=
2048
,
ysize
=
2066
,
dob
=
0.0
,
rdose
=
8.0e9
)
self
.
values
.
update
(
settings
)
self
.
log
=
log
self
.
_setupLogger
()
# #default CDM03 settings
# self.params = dict(beta_p=0.6, beta_s=0.6, fwc=200000., vth=1.168e7, vg=6.e-11, t=20.48e-3,
# sfwc=730000., svg=1.0e-10, st=5.0e-6, parallel=1., serial=1.)
self
.
params
=
dict
(
beta_p
=
0.6
,
beta_s
=
0.6
,
fwc
=
100000.
,
vth
=
1.168e7
,
vg
=
6.e-11
,
t
=
1.0e-3
,
sfwc
=
700000.
,
svg
=
1.0e-10
,
st
=
1.0e-6
,
parallel
=
1.
,
serial
=
0.
)
#update with inputs
self
.
params
.
update
(
self
.
values
)
#read in trap information
trapdata
=
np
.
loadtxt
(
self
.
values
[
'dir_path'
]
+
self
.
values
[
'paralleltrapfile'
])
if
trapdata
.
ndim
>
1
:
self
.
nt_p
=
trapdata
[:,
0
]
self
.
sigma_p
=
trapdata
[:,
1
]
self
.
taur_p
=
trapdata
[:,
2
]
else
:
#only one trap species
self
.
nt_p
=
[
trapdata
[
0
],]
self
.
sigma_p
=
[
trapdata
[
1
],]
self
.
taur_p
=
[
trapdata
[
2
],]
trapdata
=
np
.
loadtxt
(
self
.
values
[
'dir_path'
]
+
self
.
values
[
'serialtrapfile'
])
if
trapdata
.
ndim
>
1
:
self
.
nt_s
=
trapdata
[:,
0
]
self
.
sigma_s
=
trapdata
[:,
1
]
self
.
taur_s
=
trapdata
[:,
2
]
else
:
#only one trap species
self
.
nt_s
=
[
trapdata
[
0
],]
self
.
sigma_s
=
[
trapdata
[
1
],]
self
.
taur_s
=
[
trapdata
[
2
],]
#scale thibaut's values
if
'thibaut'
in
self
.
values
[
'parallelTrapfile'
]:
self
.
nt_p
/=
0.576
#thibaut's values traps / pixel
self
.
sigma_p
*=
1.e4
#thibaut's values in m**2
if
'thibaut'
in
self
.
values
[
'serialTrapfile'
]:
self
.
nt_s
*=
0.576
#thibaut's values traps / pixel #should be division?
self
.
sigma_s
*=
1.e4
#thibaut's values in m**2
def
_setupLogger
(
self
):
"""
Set up the logger.
"""
self
.
logger
=
True
# if self.log is None:
# self.logger = False
def
applyRadiationDamage
(
self
,
data
,
iquadrant
=
0
):
"""
Apply radian damage based on FORTRAN CDM03 model. The method assumes that
input data covers only a single quadrant defined by the iquadrant integer.
:param data: imaging data to which the CDM03 model will be applied to.
:type data: ndarray
:param iquandrant: number of the quadrant to process
:type iquandrant: int
cdm03 - Function signature::
sout = cdm03(sinp,iflip,jflip,dob,rdose,in_nt,in_sigma,in_tr,[xdim,ydim,zdim])
Required arguments:
sinp : input rank-2 array('d') with bounds (xdim,ydim)
iflip : input int
jflip : input int
dob : input float
rdose : input float
in_nt : input rank-1 array('d') with bounds (zdim)
in_sigma : input rank-1 array('d') with bounds (zdim)
in_tr : input rank-1 array('d') with bounds (zdim)
Optional arguments:
xdim := shape(sinp,0) input int
ydim := shape(sinp,1) input int
zdim := len(in_nt) input int
Return objects:
sout : rank-2 array('d') with bounds (xdim,ydim)
.. Note:: Because Python/NumPy arrays are different row/column based, one needs
to be extra careful here. NumPy.asfortranarray will be called to get
an array laid out in Fortran order in memory. Before returning the
array will be laid out in memory in C-style (row-major order).
:return: image that has been run through the CDM03 model
:rtype: ndarray
"""""
#return data
iflip
=
iquadrant
/
2
jflip
=
iquadrant
%
2
params
=
[
self
.
params
[
'beta_p'
],
self
.
params
[
'beta_s'
],
self
.
params
[
'fwc'
],
self
.
params
[
'vth'
],
self
.
params
[
'vg'
],
self
.
params
[
't'
],
self
.
params
[
'sfwc'
],
self
.
params
[
'svg'
],
self
.
params
[
'st'
],
self
.
params
[
'parallel'
],
self
.
params
[
'serial'
]]
if
self
.
logger
:
self
.
log
.
info
(
'nt_p='
+
str
(
self
.
nt_p
))
self
.
log
.
info
(
'nt_s='
+
str
(
self
.
nt_s
))
self
.
log
.
info
(
'sigma_p= '
+
str
(
self
.
sigma_p
))
self
.
log
.
info
(
'sigma_s= '
+
str
(
self
.
sigma_s
))
self
.
log
.
info
(
'taur_p= '
+
str
(
self
.
taur_p
))
self
.
log
.
info
(
'taur_s= '
+
str
(
self
.
taur_s
))
self
.
log
.
info
(
'dob=%f'
%
self
.
values
[
'dob'
])
self
.
log
.
info
(
'rdose=%e'
%
self
.
values
[
'rdose'
])
self
.
log
.
info
(
'xsize=%i'
%
data
.
shape
[
1
])
self
.
log
.
info
(
'ysize=%i'
%
data
.
shape
[
0
])
self
.
log
.
info
(
'quadrant=%i'
%
iquadrant
)
self
.
log
.
info
(
'iflip=%i'
%
iflip
)
self
.
log
.
info
(
'jflip=%i'
%
jflip
)
#################################################################################
###modify
#sys.path.append('../so')
from
ifs_so
import
cdm03bidir
# from ifs_so.cdm03.cpython-38-x86_64-linux-gnu import cdm03bidir
# import cdm03bidir
CTIed
=
cdm03bidir
.
cdm03
(
np
.
asfortranarray
(
data
),
jflip
,
iflip
,
self
.
values
[
'dob'
],
self
.
values
[
'rdose'
],
self
.
nt_p
,
self
.
sigma_p
,
self
.
taur_p
,
self
.
nt_s
,
self
.
sigma_s
,
self
.
taur_s
,
params
,
[
data
.
shape
[
0
],
data
.
shape
[
1
],
len
(
self
.
nt_p
),
len
(
self
.
nt_s
),
len
(
self
.
params
)])
return
np
.
asanyarray
(
CTIed
)
#################################################################################################################
#################################################################################################################
"""
These functions can be used for logging information.
.. Warning:: logger is not multiprocessing safe.
:version: 0.3
"""
import
logging
import
logging.handlers
def
setUpLogger
(
log_filename
,
loggername
=
'logger'
):
"""
Sets up a logger.
:param: log_filename: name of the file to save the log.
:param: loggername: name of the logger
:return: logger instance
"""
# create logger
logger
=
logging
.
getLogger
(
loggername
)
logger
.
setLevel
(
logging
.
DEBUG
)
# Add the log message handler to the logger
handler
=
logging
.
handlers
.
RotatingFileHandler
(
log_filename
)
#maxBytes=20, backupCount=5)
# create formatter
formatter
=
logging
.
Formatter
(
'%(asctime)s - %(module)s - %(funcName)s - %(levelname)s - %(message)s'
)
# add formatter to ch
handler
.
setFormatter
(
formatter
)
# add handler to logger
if
(
logger
.
hasHandlers
()):
logger
.
handlers
.
clear
()
logger
.
addHandler
(
handler
)
return
logger
##############################################################################
"""
IFS Instrument Model
====================
The file provides a function that returns IFS related information such as pixel
size, dark current, gain...
"""
def
IFSinformation
():
"""
Returns a dictionary describing VIS. The following information is provided (id: value - reference)::
:return: instrument model parameters
:rtype: dict
"""
#########################################################################################################
#out = dict(readnoise=4, pixel_size=0.1, dark=0.0008333, fullwellcapacity=90000, bluesize=4000, redsize=6000, readtime=300.)
out
=
dict
()
out
.
update
({
'dob'
:
0
,
'rdose'
:
8.0e9
,
'parallelTrapfile'
:
'cdm_euclid_parallel.dat'
,
'serialTrapfile'
:
'cdm_euclid_serial.dat'
,
'beta_s'
:
0.6
,
'beta_p'
:
0.6
,
'fwc'
:
90000
,
'vth'
:
1.168e7
,
't'
:
20.48e-3
,
'vg'
:
6.e-11
,
'st'
:
5.0e-6
,
'sfwc'
:
730000.
,
'svg'
:
1.0e-10
})
return
out
def
CCDnonLinearityModel
(
data
,
beta
=
6e-7
):
"""
The non-linearity is modelled based on the results presented.
:param data: data to which the non-linearity model is being applied to
:type data: ndarray
:return: input data after conversion with the non-linearity model
:rtype: float or ndarray
"""
out
=
data
-
beta
*
data
**
2
return
out
#############################################################################
"""
Cosmic Rays
===========
This simple class can be used to include cosmic ray events to an image.
By default the cosmic ray events are drawn from distributions describing
the length and energy of the events. Such distributions can be generated
for example using Stardust code (http://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=04636917).
The energy of the cosmic ray events can also be set to constant for
testing purposes. The class can be used to draw a single cosmic ray
event or up to a covering fraction.
:requires: NumPy
:requires: SciPy
:version: 0.2
"""
from
scipy.interpolate
import
InterpolatedUnivariateSpline
class
cosmicrays
():
"""
Cosmic ray generation class. Can either draw events from distributions or
set the energy of the events to a constant.
:param log: logger instance
:param image: image to which cosmic rays are added to (a copy is made not to change the original numpy array)
:param crInfo: column information (cosmic ray file)
:param information: cosmic ray track information (file containing track length and energy information) and
exposure time.
"""
def
__init__
(
self
,
log
,
image
,
crInfo
=
None
,
information
=
None
):
"""
Cosmic ray generation class. Can either draw events from distributions or
set the energy of the events to a constant.
:param log: logger instance
:param image: image to which cosmic rays are added to (a copy is made not to change the original numpy array)
:param crInfo: column information (cosmic ray file)
:param information: cosmic ray track information (file containing track length and energy information) and
exposure time.
"""
#setup logger
self
.
log
=
log
#image and size
self
.
image
=
image
.
copy
()
self
.
ysize
,
self
.
xsize
=
self
.
image
.
shape
#set up the information dictionary, first with defaults and then overwrite with inputs if given
self
.
information
=
(
dict
(
cosmicraylengths
=
'/home/yan/csst-master/data/cdf_cr_length.dat'
,
cosmicraydistance
=
'/home/yan/csst-master/data/cdf_cr_total.dat'
,
exptime
=
565
))
if
information
is
not
None
:
self
.
information
.
update
(
information
)
if
crInfo
is
not
None
:
self
.
cr
=
crInfo
else
:
self
.
_readCosmicrayInformation
()
##############################################################################
def
_cosmicRayIntercepts
(
self
,
lum
,
x0
,
y0
,
l
,
phi
):
"""
Derive cosmic ray streak intercept points.
:param lum: luminosities of the cosmic ray tracks
:param x0: central positions of the cosmic ray tracks in x-direction
:param y0: central positions of the cosmic ray tracks in y-direction
:param l: lengths of the cosmic ray tracks
:param phi: orientation angles of the cosmic ray tracks
:return: cosmic ray map (image)
:rtype: nd-array
"""
#create empty array
crImage
=
np
.
zeros
((
self
.
ysize
,
self
.
xsize
),
dtype
=
np
.
float64
)
#x and y shifts
dx
=
l
*
np
.
cos
(
phi
)
/
2.
dy
=
l
*
np
.
sin
(
phi
)
/
2.
mskdx
=
np
.
abs
(
dx
)
<
1e-8
mskdy
=
np
.
abs
(
dy
)
<
1e-8
dx
[
mskdx
]
=
0.
dy
[
mskdy
]
=
0.
#pixels in x-direction
ilo
=
np
.
round
(
x0
.
copy
()
-
dx
)
msk
=
ilo
<
0.
ilo
[
msk
]
=
0
ilo
=
ilo
.
astype
(
int
)
ihi
=
1
+
np
.
round
(
x0
.
copy
()
+
dx
)
msk
=
ihi
>
self
.
xsize
ihi
[
msk
]
=
self
.
xsize
ihi
=
ihi
.
astype
(
int
)
#pixels in y-directions
jlo
=
np
.
round
(
y0
.
copy
()
-
dy
)
msk
=
jlo
<
0.
jlo
[
msk
]
=
0
jlo
=
jlo
.
astype
(
int
)
jhi
=
1
+
np
.
round
(
y0
.
copy
()
+
dy
)
msk
=
jhi
>
self
.
ysize
jhi
[
msk
]
=
self
.
ysize
jhi
=
jhi
.
astype
(
int
)
#loop over the individual events
for
i
,
luminosity
in
enumerate
(
lum
):
n
=
0
# count the intercepts
u
=
[]
x
=
[]
y
=
[]
#Compute X intercepts on the pixel grid
if
ilo
[
i
]
<
ihi
[
i
]:
for
xcoord
in
range
(
ilo
[
i
],
ihi
[
i
]):
ok
=
(
xcoord
-
x0
[
i
])
/
dx
[
i
]
if
np
.
abs
(
ok
)
<=
0.5
:
n
+=
1
u
.
append
(
ok
)
x
.
append
(
xcoord
)
y
.
append
(
y0
[
i
]
+
ok
*
dy
[
i
])
else
:
for
xcoord
in
range
(
ihi
[
i
],
ilo
[
i
]):
ok
=
(
xcoord
-
x0
[
i
])
/
dx
[
i
]
if
np
.
abs
(
ok
)
<=
0.5
:
n
+=
1
u
.
append
(
ok
)
x
.
append
(
xcoord
)
y
.
append
(
y0
[
i
]
+
ok
*
dy
[
i
])
#Compute Y intercepts on the pixel grid
if
jlo
[
i
]
<
jhi
[
i
]:
for
ycoord
in
range
(
jlo
[
i
],
jhi
[
i
]):
ok
=
(
ycoord
-
y0
[
i
])
/
dy
[
i
]
if
np
.
abs
(
ok
)
<=
0.5
:
n
+=
1
u
.
append
(
ok
)
x
.
append
(
x0
[
i
]
+
ok
*
dx
[
i
])
y
.
append
(
ycoord
)
else
:
for
ycoord
in
range
(
jhi
[
i
],
jlo
[
i
]):
ok
=
(
ycoord
-
y0
[
i
])
/
dy
[
i
]
if
np
.
abs
(
ok
)
<=
0.5
:
n
+=
1
u
.
append
(
ok
)
x
.
append
(
x0
[
i
]
+
ok
*
dx
[
i
])
y
.
append
(
ycoord
)
#check if no intercepts were found
if
n
<
1
:
xc
=
int
(
np
.
floor
(
x0
[
i
]))
yc
=
int
(
np
.
floor
(
y0
[
i
]))
crImage
[
yc
,
xc
]
+=
luminosity
#Find the arguments that sort the intersections along the track
u
=
np
.
asarray
(
u
)
x
=
np
.
asarray
(
x
)
y
=
np
.
asarray
(
y
)
args
=
np
.
argsort
(
u
)
u
=
u
[
args
]
x
=
x
[
args
]
y
=
y
[
args
]
#Decide which cell each interval traverses, and the path length
for
i
in
range
(
1
,
n
-
1
):
w
=
u
[
i
+
1
]
-
u
[
i
]
cx
=
int
(
1
+
np
.
floor
((
x
[
i
+
1
]
+
x
[
i
])
/
2.
))
cy
=
int
(
1
+
np
.
floor
((
y
[
i
+
1
]
+
y
[
i
])
/
2.
))
if
0
<=
cx
<
self
.
xsize
and
0
<=
cy
<
self
.
ysize
:
crImage
[
cy
,
cx
]
+=
(
w
*
luminosity
)
return
crImage
def
_drawEventsToCoveringFactor
(
self
,
coveringFraction
=
3.0
,
limit
=
1000
,
verbose
=
False
):
"""
Generate cosmic ray events up to a covering fraction and include it to a cosmic ray map (self.cosmicrayMap).
:param coveringFraction: covering fraction of cosmic rya events in per cent of total number of pixels
:type coveringFraction: float
:param limit: limiting energy for the cosmic ray event [None = draw from distribution]
:type limit: None or float
:param verbose: print out information to stdout
:type verbose: bool
:return: None
"""
self
.
cosmicrayMap
=
np
.
zeros
((
self
.
ysize
,
self
.
xsize
))
#how many events to draw at once, too large number leads to exceeding the covering fraction
cr_n
=
int
(
295
*
self
.
information
[
'exptime'
]
/
565.
*
coveringFraction
/
1.4
)
covering
=
0.0
while
covering
<
coveringFraction
:
#pseudo-random numbers taken from a uniform distribution between 0 and 1
np
.
random
.
seed
()
luck
=
np
.
random
.
rand
(
cr_n
)
#draw the length of the tracks
ius
=
InterpolatedUnivariateSpline
(
self
.
cr
[
'cr_cdf'
],
self
.
cr
[
'cr_u'
])
self
.
cr
[
'cr_l'
]
=
ius
(
luck
)
if
limit
is
None
:
ius
=
InterpolatedUnivariateSpline
(
self
.
cr
[
'cr_cde'
],
self
.
cr
[
'cr_v'
])
self
.
cr
[
'cr_e'
]
=
ius
(
luck
)
else
:
#set the energy directly to the limit
self
.
cr
[
'cr_e'
]
=
np
.
asarray
([
limit
,])
#Choose the properties such as positions and an angle from a random Uniform dist
np
.
random
.
seed
()
cr_x
=
self
.
xsize
*
np
.
random
.
rand
(
int
(
np
.
floor
(
cr_n
)))
np
.
random
.
seed
()
cr_y
=
self
.
ysize
*
np
.
random
.
rand
(
int
(
np
.
floor
(
cr_n
)))
np
.
random
.
seed
()
cr_phi
=
np
.
pi
*
np
.
random
.
rand
(
int
(
np
.
floor
(
cr_n
)))
#find the intercepts
self
.
cosmicrayMap
+=
self
.
_cosmicRayIntercepts
(
self
.
cr
[
'cr_e'
],
cr_x
,
cr_y
,
self
.
cr
[
'cr_l'
],
cr_phi
)
#count the covering factor
area_cr
=
np
.
count_nonzero
(
self
.
cosmicrayMap
)
covering
=
100.
*
area_cr
/
(
self
.
xsize
*
self
.
ysize
)
def
addUpToFraction
(
self
,
coveringFraction
,
limit
=
None
,
verbose
=
False
):
"""
Add cosmic ray events up to the covering Fraction.
:param coveringFraction: covering fraction of cosmic rya events in per cent of total number of pixels
:type coveringFraction: float
:param limit: limiting energy for the cosmic ray event [None = draw from distribution]
:type limit: None or float
:param verbose: print out information to stdout
:type verbose: bool
:return: image with cosmic rays
:rtype: ndarray
"""
self
.
_drawEventsToCoveringFactor
(
coveringFraction
,
limit
=
limit
,
verbose
=
verbose
)
#paste cosmic rays
self
.
image
+=
self
.
cosmicrayMap
return
self
.
image
###############################################################################
def
transRaDec2D
(
ra
,
dec
):
"""
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment