Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-sims
csst_msc_sim
Commits
2c2dac03
Commit
2c2dac03
authored
May 12, 2024
by
Fang Yuedong
Browse files
remove files
parent
4afd1181
Changes
211
Hide whitespace changes
Inline
Side-by-side
ObservationSim/sim_steps/add_LED_flat.py
deleted
100644 → 0
View file @
4afd1181
import
numpy
as
np
from
ObservationSim.MockObject
import
FlatLED
import
galsim
from
astropy.time
import
Time
from
datetime
import
datetime
,
timezone
import
gc
def
add_LED_Flat
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
if
not
hasattr
(
self
,
'h_ext'
):
_
,
_
=
self
.
prepare_headers
(
chip
=
chip
,
pointing
=
pointing
)
chip_wcs
=
galsim
.
FitsWCS
(
header
=
self
.
h_ext
)
pf_map
=
np
.
zeros_like
(
chip
.
img
.
array
)
if
obs_param
[
"LED_TYPE"
]
is
not
None
:
if
len
(
obs_param
[
"LED_TYPE"
])
!=
0
:
print
(
"LED OPEN--------"
)
led_obj
=
FlatLED
(
chip
,
filt
)
led_flat
,
ledstat
,
letts
=
led_obj
.
drawObj_LEDFlat
(
led_type_list
=
obs_param
[
"LED_TYPE"
],
exp_t_list
=
obs_param
[
"LED_TIME"
])
pf_map
=
led_flat
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'LEDSTAT'
],
values
=
[
ledstat
])
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'LEDT01'
,
'LEDT02'
,
'LEDT03'
,
'LEDT04'
,
'LEDT05'
,
'LEDT06'
,
'LEDT07'
,
'LEDT08'
,
'LEDT09'
,
'LEDT10'
,
'LEDT11'
,
'LEDT12'
,
'LEDT13'
,
'LEDT14'
],
values
=
letts
)
if
obs_param
[
"shutter_effect"
]
==
True
:
pf_map
=
pf_map
*
chip
.
shutter_img
pf_map
=
np
.
array
(
pf_map
,
dtype
=
'float32'
)
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
],
values
=
[
True
])
else
:
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
,
'SHTOPEN1'
,
'SHTCLOS0'
],
values
=
[
True
,
self
.
h_ext
[
'SHTCLOS1'
],
self
.
h_ext
[
'SHTOPEN0'
]])
chip
.
img
=
chip
.
img
+
pf_map
# renew header info
datetime_obs
=
datetime
.
utcfromtimestamp
(
pointing
.
timestamp
)
datetime_obs
=
datetime_obs
.
replace
(
tzinfo
=
timezone
.
utc
)
t_obs
=
Time
(
datetime_obs
)
##ccd刷新2s,等待0.5s,开灯后等待0.5s,开始曝光
t_obs_renew
=
Time
(
t_obs
.
mjd
-
(
2.
)
/
86400.
,
format
=
"mjd"
)
t_obs_utc
=
datetime
.
utcfromtimestamp
(
np
.
round
(
datetime
.
utcfromtimestamp
(
t_obs_renew
.
unix
).
replace
(
tzinfo
=
timezone
.
utc
).
timestamp
(),
1
))
self
.
updateHeaderInfo
(
header_flag
=
'prim'
,
keys
=
[
'DATE-OBS'
],
values
=
[
t_obs_utc
.
strftime
(
"%Y-%m-%dT%H:%M:%S.%f"
)[:
-
5
]])
#dark time :
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'DARKTIME'
],
values
=
[
pointing
.
exp_time
])
gc
.
collect
()
return
chip
,
filt
,
tel
,
pointing
\ No newline at end of file
ObservationSim/sim_steps/add_brighter_fatter_CTE.py
deleted
100644 → 0
View file @
4afd1181
import
numpy
as
np
import
galsim
from
ObservationSim.Instrument.Chip
import
ChipUtils
as
chip_utils
from
ObservationSim.Instrument.Chip.libCTI.CTI_modeling
import
CTI_sim
def
add_brighter_fatter
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
chip
.
img
=
chip_utils
.
add_brighter_fatter
(
img
=
chip
.
img
)
return
chip
,
filt
,
tel
,
pointing
def
apply_CTE
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
self
.
chip_output
.
Log_info
(
" Apply CTE Effect"
)
### 2*8 -> 1*16 img-layout
img
=
chip_utils
.
formatOutput
(
GSImage
=
chip
.
img
)
chip
.
nsecy
=
1
chip
.
nsecx
=
16
img_arr
=
img
.
array
ny
,
nx
=
img_arr
.
shape
dx
=
int
(
nx
/
chip
.
nsecx
)
dy
=
int
(
ny
/
chip
.
nsecy
)
newimg
=
galsim
.
Image
(
nx
,
int
(
ny
+
chip
.
overscan_y
),
init_value
=
0
)
for
ichannel
in
range
(
16
):
print
(
'
\n
***add CTI effects: pointing-{:} chip-{:} channel-{:}***'
.
format
(
pointing
.
id
,
chip
.
chipID
,
ichannel
+
1
))
noverscan
,
nsp
,
nmax
=
chip
.
overscan_y
,
3
,
10
beta
,
w
,
c
=
0.478
,
84700
,
0
t
=
np
.
array
([
0.74
,
7.7
,
37
],
dtype
=
np
.
float32
)
rho_trap
=
np
.
array
([
0.6
,
1.6
,
1.4
],
dtype
=
np
.
float32
)
trap_seeds
=
np
.
array
([
0
,
1000
,
10000
],
dtype
=
np
.
int32
)
+
ichannel
+
chip
.
chipID
*
16
release_seed
=
50
+
ichannel
+
pointing
.
id
*
30
+
chip
.
chipID
*
16
newimg
.
array
[:,
0
+
ichannel
*
dx
:
dx
+
ichannel
*
dx
]
=
CTI_sim
(
img_arr
[:,
0
+
ichannel
*
dx
:
dx
+
ichannel
*
dx
],
dx
,
dy
,
noverscan
,
nsp
,
nmax
,
beta
,
w
,
c
,
t
,
rho_trap
,
trap_seeds
,
release_seed
)
newimg
.
wcs
=
img
.
wcs
del
img
img
=
newimg
### 1*16 -> 2*8 img-layout
chip
.
img
=
chip_utils
.
formatRevert
(
GSImage
=
img
)
chip
.
nsecy
=
2
chip
.
nsecx
=
8
# [TODO] make overscan_y == 0
chip
.
overscan_y
=
0
return
chip
,
filt
,
tel
,
pointing
\ No newline at end of file
ObservationSim/sim_steps/add_cosmic_rays.py
deleted
100644 → 0
View file @
4afd1181
from
ObservationSim.Instrument.Chip
import
ChipUtils
as
chip_utils
def
add_cosmic_rays
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
self
.
chip_output
.
Log_info
(
" Adding Cosmic-Ray"
)
# Get exposure time
if
(
obs_param
)
and
(
"exptime"
in
obs_param
)
and
(
obs_param
[
"exptime"
]
is
not
None
):
exptime
=
obs_param
[
"exptime"
]
else
:
exptime
=
pointing
.
exp_time
chip
.
img
,
crmap_gsimg
,
cr_event_num
=
chip_utils
.
add_cosmic_rays
(
img
=
chip
.
img
,
chip
=
chip
,
exptime
=
exptime
,
seed
=
self
.
overall_config
[
"random_seeds"
][
"seed_CR"
]
+
pointing
.
id
*
30
+
chip
.
chipID
)
# Save cosmic ray image
if
(
obs_param
)
and
(
"save_cosmic_img"
in
obs_param
)
and
(
obs_param
[
"save_cosmic_img"
]
is
not
None
):
if
obs_param
[
"save_cosmic_img"
]:
chip_utils
.
output_fits_image
(
chip
=
chip
,
pointing
=
pointing
,
img
=
crmap_gsimg
,
output_dir
=
self
.
chip_output
.
subdir
,
img_type
=
'CRS'
,
img_type_code
=
pointing
.
pointing_type_code
,
project_cycle
=
self
.
overall_config
[
"project_cycle"
],
run_counter
=
self
.
overall_config
[
"run_counter"
]
)
return
chip
,
filt
,
tel
,
pointing
\ No newline at end of file
ObservationSim/sim_steps/add_objects.py
deleted
100644 → 0
View file @
4afd1181
import
os
import
gc
import
psutil
import
traceback
import
numpy
as
np
import
galsim
from
ObservationSim._util
import
get_shear_field
from
ObservationSim.PSF
import
PSFGauss
,
FieldDistortion
,
PSFInterp
,
PSFInterpSLS
from
astropy.time
import
Time
from
datetime
import
datetime
,
timezone
def
add_objects
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
# Get exposure time
if
(
obs_param
)
and
(
"exptime"
in
obs_param
)
and
(
obs_param
[
"exptime"
]
is
not
None
):
exptime
=
obs_param
[
"exptime"
]
else
:
exptime
=
pointing
.
exp_time
# Load catalogues
if
catalog
is
None
:
self
.
chip_output
.
Log_error
(
"Catalog interface class must be specified for SCIE-OBS"
)
raise
ValueError
(
"Catalog interface class must be specified for SCIE-OBS"
)
cat
=
catalog
(
config
=
self
.
overall_config
,
chip
=
chip
,
pointing
=
pointing
,
chip_output
=
self
.
chip_output
,
filt
=
filt
)
# Prepare output file(s) for this chip
# [NOTE] Headers of output .cat file may be updated by Catalog instance
# this should be called after the creation of Catalog instance
self
.
chip_output
.
create_output_file
()
# Prepare the PSF model
if
self
.
overall_config
[
"psf_setting"
][
"psf_model"
]
==
"Gauss"
:
psf_model
=
PSFGauss
(
chip
=
chip
,
psfRa
=
self
.
overall_config
[
"psf_setting"
][
"psf_rcont"
])
elif
self
.
overall_config
[
"psf_setting"
][
"psf_model"
]
==
"Interp"
:
if
chip
.
survey_type
==
"spectroscopic"
:
psf_model
=
PSFInterpSLS
(
chip
,
filt
,
PSF_data_prefix
=
self
.
overall_config
[
"psf_setting"
][
"psf_sls_dir"
])
else
:
psf_model
=
PSFInterp
(
chip
=
chip
,
npsf
=
chip
.
n_psf_samples
,
PSF_data_file
=
self
.
overall_config
[
"psf_setting"
][
"psf_pho_dir"
])
else
:
self
.
chip_output
.
Log_error
(
"unrecognized PSF model type!!"
,
flush
=
True
)
# Apply field distortion model
if
obs_param
[
"field_dist"
]
==
True
:
fd_model
=
FieldDistortion
(
chip
=
chip
,
img_rot
=
pointing
.
img_pa
.
deg
)
else
:
fd_model
=
None
# Update limiting magnitudes for all filters based on the exposure time
# Get the filter which will be used for magnitude cut
for
ifilt
in
range
(
len
(
self
.
all_filters
)):
temp_filter
=
self
.
all_filters
[
ifilt
]
temp_filter
.
update_limit_saturation_mags
(
exptime
=
pointing
.
get_full_depth_exptime
(
temp_filter
.
filter_type
),
chip
=
chip
)
if
temp_filter
.
filter_type
.
lower
()
==
self
.
overall_config
[
"obs_setting"
][
"cut_in_band"
].
lower
():
cut_filter
=
temp_filter
# Read in shear values from configuration file if the constant shear type is used
if
self
.
overall_config
[
"shear_setting"
][
"shear_type"
]
==
"constant"
:
g1_field
,
g2_field
,
_
=
get_shear_field
(
config
=
self
.
overall_config
)
self
.
chip_output
.
Log_info
(
"Use constant shear: g1=%.5f, g2=%.5f"
%
(
g1_field
,
g2_field
))
# Get chip WCS
if
not
hasattr
(
self
,
'h_ext'
):
_
,
_
=
self
.
prepare_headers
(
chip
=
chip
,
pointing
=
pointing
)
chip_wcs
=
galsim
.
FitsWCS
(
header
=
self
.
h_ext
)
# Loop over objects
nobj
=
len
(
cat
.
objs
)
missed_obj
=
0
bright_obj
=
0
dim_obj
=
0
for
j
in
range
(
nobj
):
# # [DEBUG] [TODO]
# if j >= 10:
# break
obj
=
cat
.
objs
[
j
]
# load and convert SED; also caculate object's magnitude in all CSST bands
try
:
sed_data
=
cat
.
load_sed
(
obj
)
norm_filt
=
cat
.
load_norm_filt
(
obj
)
obj
.
sed
,
obj
.
param
[
"mag_%s"
%
filt
.
filter_type
.
lower
()],
obj
.
param
[
"flux_%s"
%
filt
.
filter_type
.
lower
()]
=
cat
.
convert_sed
(
mag
=
obj
.
param
[
"mag_use_normal"
],
sed
=
sed_data
,
target_filt
=
filt
,
norm_filt
=
norm_filt
,
mu
=
obj
.
mu
)
_
,
obj
.
param
[
"mag_%s"
%
cut_filter
.
filter_type
.
lower
()],
obj
.
param
[
"flux_%s"
%
cut_filter
.
filter_type
.
lower
()]
=
cat
.
convert_sed
(
mag
=
obj
.
param
[
"mag_use_normal"
],
sed
=
sed_data
,
target_filt
=
cut_filter
,
norm_filt
=
norm_filt
,
mu
=
obj
.
mu
)
except
Exception
as
e
:
traceback
.
print_exc
()
self
.
chip_output
.
Log_error
(
e
)
continue
# [TODO] Testing
# self.chip_output.Log_info("mag_%s = %.3f"%(filt.filter_type.lower(), obj.param["mag_%s"%filt.filter_type.lower()]))
# Exclude very bright/dim objects (for now)
if
cut_filter
.
is_too_bright
(
mag
=
obj
.
param
[
"mag_%s"
%
self
.
overall_config
[
"obs_setting"
][
"cut_in_band"
].
lower
()],
margin
=
self
.
overall_config
[
"obs_setting"
][
"mag_sat_margin"
]):
self
.
chip_output
.
Log_info
(
"obj %s too birght!! mag_%s = %.3f"
%
(
obj
.
id
,
cut_filter
.
filter_type
,
obj
.
param
[
"mag_%s"
%
self
.
overall_config
[
"obs_setting"
][
"cut_in_band"
].
lower
()]))
bright_obj
+=
1
obj
.
unload_SED
()
continue
if
filt
.
is_too_dim
(
mag
=
obj
.
getMagFilter
(
filt
),
margin
=
self
.
overall_config
[
"obs_setting"
][
"mag_lim_margin"
]):
self
.
chip_output
.
Log_info
(
"obj %s too dim!! mag_%s = %.3f"
%
(
obj
.
id
,
filt
.
filter_type
,
obj
.
getMagFilter
(
filt
)))
dim_obj
+=
1
obj
.
unload_SED
()
continue
# Get corresponding shear values
if
self
.
overall_config
[
"shear_setting"
][
"shear_type"
]
==
"constant"
:
if
obj
.
type
==
'star'
:
obj
.
g1
,
obj
.
g2
=
0.
,
0.
else
:
# Figure out shear fields from overall configuration shear setting
obj
.
g1
,
obj
.
g2
=
g1_field
,
g2_field
elif
self
.
overall_config
[
"shear_setting"
][
"shear_type"
]
==
"catalog"
:
pass
else
:
self
.
chip_output
.
Log_error
(
"Unknown shear input"
)
raise
ValueError
(
"Unknown shear input"
)
# Get position of object on the focal plane
pos_img
,
_
,
_
,
_
,
fd_shear
=
obj
.
getPosImg_Offset_WCS
(
img
=
chip
.
img
,
fdmodel
=
fd_model
,
chip
=
chip
,
verbose
=
False
,
chip_wcs
=
chip_wcs
,
img_header
=
self
.
h_ext
)
# [TODO] For now, only consider objects which their centers (after field distortion) are projected within the focal plane
# Otherwise they will be considered missed objects
# if pos_img.x == -1 or pos_img.y == -1 or (not chip.isContainObj(x_image=pos_img.x, y_image=pos_img.y, margin=0.)):
if
pos_img
.
x
==
-
1
or
pos_img
.
y
==
-
1
:
self
.
chip_output
.
Log_info
(
'obj_ra = %.6f, obj_dec = %.6f, obj_ra_orig = %.6f, obj_dec_orig = %.6f'
%
(
obj
.
ra
,
obj
.
dec
,
obj
.
ra_orig
,
obj
.
dec_orig
))
self
.
chip_output
.
Log_error
(
"Objected missed: %s"
%
(
obj
.
id
))
missed_obj
+=
1
obj
.
unload_SED
()
continue
# Draw object & update output catalog
try
:
if
self
.
overall_config
[
"run_option"
][
"out_cat_only"
]:
isUpdated
=
True
obj
.
real_pos
=
obj
.
getRealPos
(
chip
.
img
,
global_x
=
obj
.
posImg
.
x
,
global_y
=
obj
.
posImg
.
y
,
img_real_wcs
=
obj
.
chip_wcs
)
pos_shear
=
0.
elif
chip
.
survey_type
==
"photometric"
and
not
self
.
overall_config
[
"run_option"
][
"out_cat_only"
]:
isUpdated
,
pos_shear
=
obj
.
drawObj_multiband
(
tel
=
tel
,
pos_img
=
pos_img
,
psf_model
=
psf_model
,
bandpass_list
=
filt
.
bandpass_sub_list
,
filt
=
filt
,
chip
=
chip
,
g1
=
obj
.
g1
,
g2
=
obj
.
g2
,
exptime
=
exptime
,
fd_shear
=
fd_shear
)
elif
chip
.
survey_type
==
"spectroscopic"
and
not
self
.
overall_config
[
"run_option"
][
"out_cat_only"
]:
isUpdated
,
pos_shear
=
obj
.
drawObj_slitless
(
tel
=
tel
,
pos_img
=
pos_img
,
psf_model
=
psf_model
,
bandpass_list
=
filt
.
bandpass_sub_list
,
filt
=
filt
,
chip
=
chip
,
g1
=
obj
.
g1
,
g2
=
obj
.
g2
,
exptime
=
exptime
,
normFilter
=
norm_filt
,
fd_shear
=
fd_shear
)
if
isUpdated
==
1
:
# TODO: add up stats
self
.
chip_output
.
cat_add_obj
(
obj
,
pos_img
,
pos_shear
)
pass
elif
isUpdated
==
0
:
missed_obj
+=
1
self
.
chip_output
.
Log_error
(
"Objected missed: %s"
%
(
obj
.
id
))
else
:
self
.
chip_output
.
Log_error
(
"Draw error, object omitted: %s"
%
(
obj
.
id
))
continue
except
Exception
as
e
:
traceback
.
print_exc
()
self
.
chip_output
.
Log_error
(
e
)
self
.
chip_output
.
Log_error
(
"pointing: #%d, chipID: %d"
%
(
pointing
.
id
,
chip
.
chipID
))
if
obj
.
type
==
"galaxy"
:
self
.
chip_output
.
Log_error
(
"obj id: %s"
%
(
obj
.
param
[
'id'
]))
self
.
chip_output
.
Log_error
(
" e1: %.5f
\n
e2: %.5f
\n
size: %f
\n
bfrac: %f
\n
detA: %f
\n
g1: %.5f
\n
g2: %.5f
\n
"
%
(
obj
.
param
[
'e1'
],
obj
.
param
[
'e2'
],
obj
.
param
[
'size'
],
obj
.
param
[
'bfrac'
],
obj
.
param
[
'detA'
],
obj
.
param
[
'g1'
],
obj
.
param
[
'g2'
]))
# Unload SED:
obj
.
unload_SED
()
del
obj
gc
.
collect
()
del
psf_model
gc
.
collect
()
self
.
chip_output
.
Log_info
(
"Running checkpoint #1 (Object rendering finished): pointing-%d chip-%d pid-%d memory-%6.2fGB"
%
(
pointing
.
id
,
chip
.
chipID
,
os
.
getpid
(),
(
psutil
.
Process
(
os
.
getpid
()).
memory_info
().
rss
/
1024
/
1024
/
1024
)))
self
.
chip_output
.
Log_info
(
"# objects that are too bright %d out of %d"
%
(
bright_obj
,
nobj
))
self
.
chip_output
.
Log_info
(
"# objects that are too dim %d out of %d"
%
(
dim_obj
,
nobj
))
self
.
chip_output
.
Log_info
(
"# objects that are missed %d out of %d"
%
(
missed_obj
,
nobj
))
# Apply flat fielding (with shutter effects)
flat_normal
=
np
.
ones_like
(
chip
.
img
.
array
)
if
obs_param
[
"flat_fielding"
]
==
True
:
flat_normal
=
flat_normal
*
chip
.
flat_img
.
array
/
\
np
.
mean
(
chip
.
flat_img
.
array
)
if
obs_param
[
"shutter_effect"
]
==
True
:
flat_normal
=
flat_normal
*
chip
.
shutter_img
flat_normal
=
np
.
array
(
flat_normal
,
dtype
=
'float32'
)
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
],
values
=
[
True
])
else
:
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
,
'SHTOPEN1'
,
'SHTCLOS0'
],
values
=
[
True
,
self
.
h_ext
[
'SHTCLOS1'
],
self
.
h_ext
[
'SHTOPEN0'
]])
chip
.
img
*=
flat_normal
del
flat_normal
# renew header info
datetime_obs
=
datetime
.
utcfromtimestamp
(
pointing
.
timestamp
)
datetime_obs
=
datetime_obs
.
replace
(
tzinfo
=
timezone
.
utc
)
t_obs
=
Time
(
datetime_obs
)
# ccd刷新2s,等待0.s,开始曝光
t_obs_renew
=
Time
(
t_obs
.
mjd
-
(
2.
+
0.
)
/
86400.
,
format
=
"mjd"
)
t_obs_utc
=
datetime
.
utcfromtimestamp
(
np
.
round
(
datetime
.
utcfromtimestamp
(
t_obs_renew
.
unix
).
replace
(
tzinfo
=
timezone
.
utc
).
timestamp
(),
1
))
self
.
updateHeaderInfo
(
header_flag
=
'prim'
,
keys
=
[
'DATE-OBS'
],
values
=
[
t_obs_utc
.
strftime
(
"%Y-%m-%dT%H:%M:%S.%f"
)[:
-
5
]])
# dark time : 曝光时间+刷新后等带时间0.s+关快门后读出前等待0.s
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'DARKTIME'
],
values
=
[
0.
+
0.
+
pointing
.
exp_time
])
return
chip
,
filt
,
tel
,
pointing
ObservationSim/sim_steps/add_pattern_noise.py
deleted
100644 → 0
View file @
4afd1181
from
numpy.random
import
Generator
,
PCG64
from
ObservationSim.Instrument.Chip
import
ChipUtils
as
chip_utils
from
ObservationSim.Instrument.Chip
import
Effects
def
apply_PRNU
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
chip
.
img
*=
chip
.
prnu_img
if
self
.
overall_config
[
"output_setting"
][
"prnu_output"
]
==
True
:
chip
.
prnu_img
.
write
(
"%s/FlatImg_PRNU_%s.fits"
%
(
self
.
chip_output
.
subdir
,
str
(
chip
.
chipID
).
rjust
(
2
,
'0'
)))
return
chip
,
filt
,
tel
,
pointing
def
add_poisson_and_dark
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
# Add dark current & Poisson noise
# Get exposure time
if
(
obs_param
)
and
(
"exptime"
in
obs_param
)
and
(
obs_param
[
"exptime"
]
is
not
None
):
exptime
=
obs_param
[
"exptime"
]
else
:
exptime
=
pointing
.
exp_time
if
obs_param
[
"add_dark"
]
==
True
:
chip
.
img
,
_
=
chip_utils
.
add_poisson
(
img
=
chip
.
img
,
chip
=
chip
,
exptime
=
pointing
.
exp_time
,
poisson_noise
=
chip
.
poisson_noise
,
InputDark
=
None
)
else
:
chip
.
img
,
_
=
chip_utils
.
add_poisson
(
img
=
chip
.
img
,
chip
=
self
,
exptime
=
exptime
,
poisson_noise
=
chip
.
poisson_noise
,
dark_noise
=
0.
)
return
chip
,
filt
,
tel
,
pointing
def
add_detector_defects
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
# Add Hot Pixels or/and Dead Pixels
rgbadpix
=
Generator
(
PCG64
(
int
(
self
.
overall_config
[
"random_seeds"
][
"seed_defective"
]
+
chip
.
chipID
)))
badfraction
=
5E-5
*
(
rgbadpix
.
random
()
*
0.5
+
0.7
)
chip
.
img
=
Effects
.
DefectivePixels
(
chip
.
img
,
IfHotPix
=
obs_param
[
"hot_pixels"
],
IfDeadPix
=
obs_param
[
"dead_pixels"
],
fraction
=
badfraction
,
seed
=
self
.
overall_config
[
"random_seeds"
][
"seed_defective"
]
+
chip
.
chipID
,
biaslevel
=
0
)
# Apply Bad columns
if
obs_param
[
"bad_columns"
]
==
True
:
chip
.
img
=
Effects
.
BadColumns
(
chip
.
img
,
seed
=
self
.
overall_config
[
"random_seeds"
][
"seed_badcolumns"
],
chipid
=
chip
.
chipID
)
return
chip
,
filt
,
tel
,
pointing
def
add_nonlinearity
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
self
.
chip_output
.
Log_info
(
" Applying Non-Linearity on the chip image"
)
chip
.
img
=
Effects
.
NonLinearity
(
GSImage
=
chip
.
img
,
beta1
=
5.e-7
,
beta2
=
0
)
return
chip
,
filt
,
tel
,
pointing
def
add_blooming
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
self
.
chip_output
.
Log_info
(
" Applying CCD Saturation & Blooming"
)
chip
.
img
=
Effects
.
SaturBloom
(
GSImage
=
chip
.
img
,
nsect_x
=
1
,
nsect_y
=
1
,
fullwell
=
int
(
chip
.
full_well
))
return
chip
,
filt
,
tel
,
pointing
def
add_bias
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
self
.
chip_output
.
Log_info
(
" Adding Bias level and 16-channel non-uniformity"
)
if
obs_param
[
"bias_16channel"
]
==
True
:
chip
.
img
=
Effects
.
AddBiasNonUniform16
(
chip
.
img
,
bias_level
=
float
(
chip
.
bias_level
),
nsecy
=
chip
.
nsecy
,
nsecx
=
chip
.
nsecx
,
seed
=
self
.
overall_config
[
"random_seeds"
][
"seed_biasNonUniform"
]
+
chip
.
chipID
)
elif
obs_param
[
"bias_16channel"
]
==
False
:
chip
.
img
+=
self
.
bias_level
return
chip
,
filt
,
tel
,
pointing
ObservationSim/sim_steps/add_sky_background.py
deleted
100644 → 0
View file @
4afd1181
import
numpy
as
np
import
galsim
from
ObservationSim.Straylight
import
calculateSkyMap_split_g
from
ObservationSim.Instrument
import
FilterParam
from
astropy.time
import
Time
from
datetime
import
datetime
,
timezone
def
add_sky_background_sci
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
# Get exposure time
if
(
obs_param
)
and
(
"exptime"
in
obs_param
)
and
(
obs_param
[
"exptime"
]
is
not
None
):
exptime
=
obs_param
[
"exptime"
]
else
:
exptime
=
pointing
.
exp_time
flat_normal
=
np
.
ones_like
(
chip
.
img
.
array
)
if
obs_param
[
"flat_fielding"
]
==
True
:
flat_normal
=
flat_normal
*
chip
.
flat_img
.
array
/
np
.
mean
(
chip
.
flat_img
.
array
)
if
obs_param
[
"shutter_effect"
]
==
True
:
flat_normal
=
flat_normal
*
chip
.
shutter_img
flat_normal
=
np
.
array
(
flat_normal
,
dtype
=
'float32'
)
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
],
values
=
[
True
])
else
:
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
,
'SHTOPEN1'
,
'SHTCLOS0'
],
values
=
[
True
,
self
.
h_ext
[
'SHTCLOS1'
],
self
.
h_ext
[
'SHTOPEN0'
]])
if
obs_param
[
"enable_straylight_model"
]:
# Filter.sky_background, Filter.zodical_spec will be updated
filt
.
setFilterStrayLightPixel
(
jtime
=
pointing
.
jdt
,
sat_pos
=
np
.
array
([
pointing
.
sat_x
,
pointing
.
sat_y
,
pointing
.
sat_z
]),
pointing_radec
=
np
.
array
([
pointing
.
ra
,
pointing
.
dec
]),
sun_pos
=
np
.
array
([
pointing
.
sun_x
,
pointing
.
sun_y
,
pointing
.
sun_z
]))
self
.
chip_output
.
Log_info
(
"================================================"
)
self
.
chip_output
.
Log_info
(
"sky background + stray light pixel flux value: %.5f"
%
(
filt
.
sky_background
))
if
chip
.
survey_type
==
"photometric"
:
sky_map
=
filt
.
getSkyNoise
(
exptime
=
exptime
)
sky_map
=
sky_map
*
np
.
ones_like
(
chip
.
img
.
array
)
*
flat_normal
sky_map
=
galsim
.
Image
(
array
=
sky_map
)
else
:
# chip.loadSLSFLATCUBE(flat_fn='flat_cube.fits')
sky_map
=
calculateSkyMap_split_g
(
skyMap
=
flat_normal
,
blueLimit
=
filt
.
blue_limit
,
redLimit
=
filt
.
red_limit
,
conf
=
chip
.
sls_conf
,
pixelSize
=
chip
.
pix_scale
,
isAlongY
=
0
,
flat_cube
=
chip
.
flat_cube
,
zoldial_spec
=
filt
.
zodical_spec
)
sky_map
=
(
sky_map
+
filt
.
sky_background
)
*
exptime
# sky_map = sky_map * tel.pupil_area * obs_param["exptime"]
chip
.
img
+=
sky_map
return
chip
,
filt
,
tel
,
pointing
def
add_sky_flat_calibration
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
if
not
hasattr
(
self
,
'h_ext'
):
_
,
_
=
self
.
prepare_headers
(
chip
=
chip
,
pointing
=
pointing
)
chip_wcs
=
galsim
.
FitsWCS
(
header
=
self
.
h_ext
)
# Get exposure time
if
(
obs_param
)
and
(
"exptime"
in
obs_param
)
and
(
obs_param
[
"exptime"
]
is
not
None
):
exptime
=
obs_param
[
"exptime"
]
else
:
exptime
=
pointing
.
exp_time
skyback_level
=
obs_param
[
"flat_level"
]
filter_param
=
FilterParam
()
sky_level_filt
=
obs_param
[
"flat_level_filt"
]
norm_scaler
=
skyback_level
/
exptime
/
filter_param
.
param
[
sky_level_filt
][
5
]
flat_normal
=
np
.
ones_like
(
chip
.
img
.
array
)
if
obs_param
[
"flat_fielding"
]
==
True
:
flat_normal
=
flat_normal
*
chip
.
flat_img
.
array
/
np
.
mean
(
chip
.
flat_img
.
array
)
if
obs_param
[
"shutter_effect"
]
==
True
:
flat_normal
=
flat_normal
*
chip
.
shutter_img
flat_normal
=
np
.
array
(
flat_normal
,
dtype
=
'float32'
)
if
self
.
overall_config
[
"output_setting"
][
"shutter_output"
]
==
True
:
# output 16-bit shutter effect image with pixel value <=65535
shutt_gsimg
=
galsim
.
ImageUS
(
chip
.
shutter_img
*
6E4
)
shutt_gsimg
.
write
(
"%s/ShutterEffect_%s_1.fits"
%
(
self
.
chip_output
.
subdir
,
str
(
chip
.
chipID
).
rjust
(
2
,
'0'
)))
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
],
values
=
[
True
])
else
:
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
,
'SHTOPEN1'
,
'SHTCLOS0'
],
values
=
[
True
,
self
.
h_ext
[
'SHTCLOS1'
],
self
.
h_ext
[
'SHTOPEN0'
]])
if
chip
.
survey_type
==
"photometric"
:
sky_map
=
flat_normal
*
np
.
ones_like
(
chip
.
img
.
array
)
*
norm_scaler
*
filter_param
.
param
[
chip
.
filter_type
][
5
]
/
tel
.
pupil_area
*
exptime
elif
chip
.
survey_type
==
"spectroscopic"
:
# flat_normal = np.ones_like(chip.img.array)
if
obs_param
[
"flat_fielding"
]
==
True
:
flat_normal
=
flat_normal
*
chip
.
flat_img
.
array
/
np
.
mean
(
chip
.
flat_img
.
array
)
if
obs_param
[
"shutter_effect"
]
==
True
:
flat_normal
=
flat_normal
*
chip
.
shutter_img
flat_normal
=
np
.
array
(
flat_normal
,
dtype
=
'float32'
)
sky_map
=
calculateSkyMap_split_g
(
skyMap
=
flat_normal
,
blueLimit
=
filt
.
blue_limit
,
redLimit
=
filt
.
red_limit
,
conf
=
chip
.
sls_conf
,
pixelSize
=
chip
.
pix_scale
,
isAlongY
=
0
,
flat_cube
=
chip
.
flat_cube
)
sky_map
=
sky_map
*
norm_scaler
*
exptime
chip
.
img
+=
sky_map
return
chip
,
filt
,
tel
,
pointing
def
add_sky_background
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
if
not
hasattr
(
self
,
'h_ext'
):
_
,
_
=
self
.
prepare_headers
(
chip
=
chip
,
pointing
=
pointing
)
chip_wcs
=
galsim
.
FitsWCS
(
header
=
self
.
h_ext
)
if
"flat_level"
not
in
obs_param
or
"flat_level_filt"
not
in
obs_param
:
chip
,
filt
,
tel
,
pointing
=
self
.
add_sky_background_sci
(
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
)
else
:
if
obs_param
.
get
(
'flat_level'
)
is
None
or
obs_param
.
get
(
'flat_level_filt'
)
is
None
:
chip
,
filt
,
tel
,
pointing
=
self
.
add_sky_background_sci
(
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
)
else
:
chip
,
filt
,
tel
,
pointing
=
self
.
add_sky_flat_calibration
(
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
)
# renew header info
datetime_obs
=
datetime
.
utcfromtimestamp
(
pointing
.
timestamp
)
datetime_obs
=
datetime_obs
.
replace
(
tzinfo
=
timezone
.
utc
)
t_obs
=
Time
(
datetime_obs
)
##ccd刷新2s,等待0.5s,开始曝光
t_obs_renew
=
Time
(
t_obs
.
mjd
-
(
2.
+
0.5
)
/
86400.
,
format
=
"mjd"
)
t_obs_utc
=
datetime
.
utcfromtimestamp
(
np
.
round
(
datetime
.
utcfromtimestamp
(
t_obs_renew
.
unix
).
replace
(
tzinfo
=
timezone
.
utc
).
timestamp
(),
1
))
self
.
updateHeaderInfo
(
header_flag
=
'prim'
,
keys
=
[
'DATE-OBS'
],
values
=
[
t_obs_utc
.
strftime
(
"%Y-%m-%dT%H:%M:%S.%f"
)[:
-
5
]])
#dark time : 曝光时间+刷新后等带时间0.5s+关闭快门时间1.5s+管快门后读出前等待0.5s
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'DARKTIME'
],
values
=
[
0.
+
0.0
+
0.0
+
pointing
.
exp_time
])
return
chip
,
filt
,
tel
,
pointing
\ No newline at end of file
ObservationSim/sim_steps/prepare_headers.py
deleted
100644 → 0
View file @
4afd1181
from
ObservationSim.Config.Header
import
generatePrimaryHeader
,
generateExtensionHeader
def
prepare_headers
(
self
,
chip
,
pointing
):
self
.
h_prim
=
generatePrimaryHeader
(
xlen
=
chip
.
npix_x
,
ylen
=
chip
.
npix_y
,
pointing_id
=
pointing
.
obs_id
,
pointing_type_code
=
pointing
.
pointing_type_code
,
ra
=
pointing
.
ra
,
dec
=
pointing
.
dec
,
pixel_scale
=
chip
.
pix_scale
,
time_pt
=
pointing
.
timestamp
,
exptime
=
pointing
.
exp_time
,
im_type
=
pointing
.
pointing_type
,
sat_pos
=
[
pointing
.
sat_x
,
pointing
.
sat_y
,
pointing
.
sat_z
],
sat_vel
=
[
pointing
.
sat_vx
,
pointing
.
sat_vy
,
pointing
.
sat_vz
],
project_cycle
=
self
.
overall_config
[
"project_cycle"
],
run_counter
=
self
.
overall_config
[
"run_counter"
],
chip_name
=
str
(
chip
.
chipID
).
rjust
(
2
,
'0'
))
self
.
h_ext
=
generateExtensionHeader
(
chip
=
chip
,
xlen
=
chip
.
npix_x
,
ylen
=
chip
.
npix_y
,
ra
=
pointing
.
ra
,
dec
=
pointing
.
dec
,
pa
=
pointing
.
img_pa
.
deg
,
gain
=
chip
.
gain
,
readout
=
chip
.
read_noise
,
dark
=
chip
.
dark_noise
,
saturation
=
90000
,
pixel_scale
=
chip
.
pix_scale
,
pixel_size
=
chip
.
pix_size
,
xcen
=
chip
.
x_cen
,
ycen
=
chip
.
y_cen
,
extName
=
pointing
.
pointing_type
,
timestamp
=
pointing
.
timestamp
,
exptime
=
pointing
.
exp_time
,
readoutTime
=
chip
.
readout_time
,
t_shutter_open
=
pointing
.
t_shutter_open
,
t_shutter_close
=
pointing
.
t_shutter_close
)
return
self
.
h_prim
,
self
.
h_ext
def
updateHeaderInfo
(
self
,
header_flag
=
'prim'
,
keys
=
[
'key'
],
values
=
[
0
]):
if
header_flag
==
'prim'
:
for
key
,
value
in
zip
(
keys
,
values
):
self
.
h_prim
[
key
]
=
value
if
header_flag
==
'ext'
:
for
key
,
value
in
zip
(
keys
,
values
):
self
.
h_ext
[
key
]
=
value
ObservationSim/sim_steps/readout_output.py
deleted
100644 → 0
View file @
4afd1181
import
os
import
galsim
import
numpy
as
np
from
astropy.io
import
fits
from
ObservationSim.Instrument.Chip
import
ChipUtils
as
chip_utils
from
ObservationSim.Instrument.Chip
import
Effects
from
astropy.time
import
Time
from
datetime
import
datetime
,
timezone
def
add_prescan_overscan
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
self
.
chip_output
.
Log_info
(
"Apply pre/over-scan"
)
chip
.
img
=
chip_utils
.
AddPreScan
(
GSImage
=
chip
.
img
,
pre1
=
chip
.
prescan_x
,
pre2
=
chip
.
prescan_y
,
over1
=
chip
.
overscan_x
,
over2
=
chip
.
overscan_y
)
if
obs_param
[
"add_dark"
]
==
True
:
ny
=
int
(
chip
.
npix_y
/
2
)
base_dark
=
(
ny
-
1
)
*
(
chip
.
readout_time
/
ny
)
*
chip
.
dark_noise
chip
.
img
.
array
[(
chip
.
prescan_y
+
ny
):
-
(
chip
.
prescan_y
+
ny
),:]
=
base_dark
return
chip
,
filt
,
tel
,
pointing
def
add_readout_noise
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
seed
=
int
(
self
.
overall_config
[
"random_seeds"
][
"seed_readout"
])
+
pointing
.
id
*
30
+
chip
.
chipID
rng_readout
=
galsim
.
BaseDeviate
(
seed
)
readout_noise
=
galsim
.
GaussianNoise
(
rng
=
rng_readout
,
sigma
=
chip
.
read_noise
)
chip
.
img
.
addNoise
(
readout_noise
)
return
chip
,
filt
,
tel
,
pointing
def
apply_gain
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
self
.
chip_output
.
Log_info
(
" Applying Gain"
)
if
obs_param
[
"gain_16channel"
]
==
True
:
chip
.
img
,
chip
.
gain_channel
=
Effects
.
ApplyGainNonUniform16
(
chip
.
img
,
gain
=
chip
.
gain
,
nsecy
=
chip
.
nsecy
,
nsecx
=
chip
.
nsecx
,
seed
=
self
.
overall_config
[
"random_seeds"
][
"seed_gainNonUniform"
]
+
chip
.
chipID
)
elif
obs_param
[
"gain_16channel"
]
==
False
:
chip
.
img
/=
chip
.
gain
return
chip
,
filt
,
tel
,
pointing
def
quantization_and_output
(
self
,
chip
,
filt
,
tel
,
pointing
,
catalog
,
obs_param
):
if
not
hasattr
(
self
,
'h_ext'
):
_
,
_
=
self
.
prepare_headers
(
chip
=
chip
,
pointing
=
pointing
)
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'SHTSTAT'
,
'SHTOPEN1'
,
'SHTCLOS0'
,
'SHTCLOS1'
,
'EXPTIME'
],
values
=
[
False
,
self
.
h_ext
[
'SHTOPEN0'
],
self
.
h_ext
[
'SHTOPEN0'
],
self
.
h_ext
[
'SHTOPEN0'
],
0.0
])
# renew header info
datetime_obs
=
datetime
.
utcfromtimestamp
(
pointing
.
timestamp
)
datetime_obs
=
datetime_obs
.
replace
(
tzinfo
=
timezone
.
utc
)
t_obs
=
Time
(
datetime_obs
)
##ccd刷新2s,等待0.5s,开灯后等待0.5s,开始曝光
t_obs_renew
=
Time
(
t_obs
.
mjd
-
2.
/
86400.
,
format
=
"mjd"
)
t_obs_utc
=
datetime
.
utcfromtimestamp
(
np
.
round
(
datetime
.
utcfromtimestamp
(
t_obs_renew
.
unix
).
replace
(
tzinfo
=
timezone
.
utc
).
timestamp
(),
1
))
self
.
updateHeaderInfo
(
header_flag
=
'prim'
,
keys
=
[
'DATE-OBS'
],
values
=
[
t_obs_utc
.
strftime
(
"%Y-%m-%dT%H:%M:%S.%f"
)[:
-
5
]])
gains1
=
list
(
chip
.
gain_channel
[
0
:
8
])
gains2
=
list
(
chip
.
gain_channel
[
8
:])
gains2
.
reverse
()
gains
=
np
.
append
(
gains1
,
gains2
)
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'GAIN01'
,
'GAIN02'
,
'GAIN03'
,
'GAIN04'
,
'GAIN05'
,
'GAIN06'
,
'GAIN07'
,
'GAIN08'
,
'GAIN09'
,
'GAIN10'
,
'GAIN11'
,
'GAIN12'
,
'GAIN13'
,
'GAIN14'
,
'GAIN15'
,
'GAIN16'
],
values
=
gains
)
if
obs_param
[
"format_output"
]
==
True
:
self
.
chip_output
.
Log_info
(
" Apply 1*16 format"
)
chip
.
img
=
chip_utils
.
formatOutput
(
GSImage
=
chip
.
img
)
chip
.
nsecy
=
1
chip
.
nsecx
=
16
chip
.
img
.
array
[
chip
.
img
.
array
>
65535
]
=
65535
chip
.
img
.
replaceNegative
(
replace_value
=
0
)
chip
.
img
.
quantize
()
chip
.
img
=
galsim
.
Image
(
chip
.
img
.
array
,
dtype
=
np
.
uint16
)
fname
=
os
.
path
.
join
(
self
.
chip_output
.
subdir
,
self
.
h_prim
[
'FILENAME'
]
+
'.fits'
)
f_name_size
=
68
if
(
len
(
self
.
h_prim
[
'FILENAME'
])
>
f_name_size
):
self
.
updateHeaderInfo
(
header_flag
=
'prim'
,
keys
=
[
'FILENAME'
],
values
=
[
self
.
h_prim
[
'FILENAME'
][
0
:
f_name_size
]])
hdu1
=
fits
.
PrimaryHDU
(
header
=
self
.
h_prim
)
self
.
updateHeaderInfo
(
header_flag
=
'ext'
,
keys
=
[
'DATASECT'
],
values
=
[
str
(
chip
.
img
.
array
.
shape
[
1
])
+
'x'
+
str
(
chip
.
img
.
array
.
shape
[
0
])])
hdu2
=
fits
.
ImageHDU
(
chip
.
img
.
array
,
header
=
self
.
h_ext
)
hdu2
.
header
.
comments
[
"XTENSION"
]
=
"image extension"
hdu
=
fits
.
HDUList
([
hdu1
,
hdu2
])
hdu
[
0
].
add_datasum
(
when
=
'data unit checksum'
)
hdu
[
0
].
add_checksum
(
when
=
'HDU checksum'
,
override_datasum
=
True
)
hdu
[
1
].
add_datasum
(
when
=
'data unit checksum'
)
hdu
[
1
].
add_checksum
(
when
=
'HDU checksum'
,
override_datasum
=
True
)
hdu
.
writeto
(
fname
,
output_verify
=
'ignore'
,
overwrite
=
True
)
return
chip
,
filt
,
tel
,
pointing
tools/TargetLocationCheck.py
deleted
100644 → 0
View file @
4afd1181
# NOTE: This is a stand-alone function, meaning that you do not need
# to install the entire CSST image simulation pipeline.
# For a given object's coordinate (Ra, Dec), the function will predict
# the object's image position and corresponding filter in the focal plane
# under a specified CSST pointing centered at (rap, decp).
import
galsim
import
numpy
as
np
import
argparse
import
matplotlib.pyplot
as
plt
import
os
,
sys
def
focalPlaneInf
(
ra_target
,
dec_target
,
ra_point
,
dec_point
,
image_rot
=-
113.4333
,
figout
=
"zTargetOnCCD.pdf"
):
"""
Input parameters:
ra_target : right ascension of the target/input object;
float, in unit of degrees;
dec_target: declination of the target/input object;
float, in unit of degrees;
ra_point : right ascension of telescope pointing center;
float, in unit of degrees;
dec_point : declination of telescope pointing center;
float, in unit of degrees;
image_rot : orientation of the camera with respect the sky;
float, in unit of degrees;
NOTE: image_rot=-113.4333 is the default value
in current CSST image simulation;
figout : location of the target object in the focal plane;
str
--------------------------------------------------------------
Usage:
0) specify the coordinate (ra_target, dec_target) of your target and
the pointing center (ra_point dec_point) of the telescope
1) open a terminal
2) type >> python TargetLocationCheck.py ra_target dec_target ra_point dec_point
or type >> python TargetLocationCheck.py ra_target dec_target ra_point dec_point -image_rot=floatNum
or type >> python TargetLocationCheck.py ra_target dec_target ra_point dec_point -image_rot=floatNum -figout=FigureName
"""
print
(
"^_^ Input target coordinate: [Ra, Dec] = [%10.6f, %10.6f]"
%
(
ra_target
,
dec_target
))
print
(
"^_^ Input telescope pointing center: [Ra, Dec] = [%10.6f, %10.6f]"
%
(
ra_point
,
dec_point
))
print
(
"^_^ Input camera orientation: %12.6f degree(s)"
%
image_rot
)
print
(
" "
)
# load ccd parameters
xsize
,
ysize
,
xchip
,
ychip
,
xgap
,
ygap
,
xnchip
,
ynchip
=
ccdParam
()
print
(
"^_^ Pixel range of focal plane: x = [%5d, %5d], y = [%5d, %5d]"
%
(
-
xsize
/
2
,
xsize
/
2
,
-
ysize
/
2
,
ysize
/
2
))
# wcs
wcs
=
getTanWCS
(
ra_point
,
dec_point
,
image_rot
,
pix_scale
=
0.074
)
skyObj
=
galsim
.
CelestialCoord
(
ra
=
ra_target
*
galsim
.
degrees
,
dec
=
dec_target
*
galsim
.
degrees
)
pixObj
=
wcs
.
toImage
(
skyObj
)
xpixObj
=
pixObj
.
x
ypixObj
=
pixObj
.
y
print
(
"^_^ Image position of target: [xImage, yImage] = [%9.3f, %9.3f]"
%
(
xpixObj
,
ypixObj
))
# first determine if the target is in the focal plane
xin
=
(
xpixObj
+
xsize
/
2
)
*
(
xpixObj
-
xsize
/
2
)
yin
=
(
ypixObj
+
ysize
/
2
)
*
(
ypixObj
-
ysize
/
2
)
if
xin
>
0
or
yin
>
0
:
raise
ValueError
(
"!!! Input target is out of the focal plane"
)
# second determine the location of the target
trigger
=
False
for
i
in
range
(
30
):
ichip
=
i
+
1
ischip
=
str
(
"0%d"
%
ichip
)[
-
2
:]
fId
,
fType
=
getChipFilter
(
ichip
)
ix0
,
ix1
,
iy0
,
iy1
=
getChipLim
(
ichip
)
ixin
=
(
xpixObj
-
ix0
)
*
(
xpixObj
-
ix1
)
iyin
=
(
ypixObj
-
iy0
)
*
(
ypixObj
-
iy1
)
if
ixin
<=
0
and
iyin
<=
0
:
trigger
=
True
idx
=
xpixObj
-
ix0
idy
=
ypixObj
-
iy0
print
(
" ---------------------------------------------"
)
print
(
" ** Target locates in CHIP#%s with filter %s **"
%
(
ischip
,
fType
))
print
(
" ** Target position in the chip: [x, y] = [%7.2f, %7.2f]"
%
(
idx
,
idy
))
print
(
" ---------------------------------------------"
)
break
if
not
trigger
:
print
(
"^|^ Target locates in CCD gap"
)
# show the figure
print
(
" Target on CCD layout is saved into %s"
%
figout
)
ccdLayout
(
xpixObj
,
ypixObj
,
figout
=
figout
)
return
def
ccdParam
():
xt
,
yt
=
59516
,
49752
x0
,
y0
=
9216
,
9232
xgap
,
ygap
=
(
534
,
1309
),
898
xnchip
,
ynchip
=
6
,
5
ccdSize
=
xt
,
yt
,
x0
,
y0
,
xgap
,
ygap
,
xnchip
,
ynchip
return
ccdSize
def
getTanWCS
(
ra
,
dec
,
img_rot
,
pix_scale
=
0.074
):
"""
Get the WCS of the image mosaic using Gnomonic/TAN projection
Parameter:
ra, dec: float
(RA, Dec) of pointing of optical axis
img_rot: galsim Angle object
Rotation of image
pix_scale: float
Pixel size in unit of as/pix
Returns:
WCS of the focal plane
"""
xcen
,
ycen
=
0
,
0
img_rot
=
img_rot
*
galsim
.
degrees
dudx
=
-
np
.
cos
(
img_rot
.
rad
)
*
pix_scale
dudy
=
-
np
.
sin
(
img_rot
.
rad
)
*
pix_scale
dvdx
=
-
np
.
sin
(
img_rot
.
rad
)
*
pix_scale
dvdy
=
+
np
.
cos
(
img_rot
.
rad
)
*
pix_scale
moscen
=
galsim
.
PositionD
(
x
=
xcen
,
y
=
ycen
)
sky_center
=
galsim
.
CelestialCoord
(
ra
=
ra
*
galsim
.
degrees
,
dec
=
dec
*
galsim
.
degrees
)
affine
=
galsim
.
AffineTransform
(
dudx
,
dudy
,
dvdx
,
dvdy
,
origin
=
moscen
)
WCS
=
galsim
.
TanWCS
(
affine
,
sky_center
,
units
=
galsim
.
arcsec
)
return
WCS
def
getChipFilter
(
chipID
):
"""
Return the filter index and type for a given chip #(chipID)
"""
filter_type_list
=
[
"nuv"
,
"u"
,
"g"
,
"r"
,
"i"
,
"z"
,
"y"
,
"GU"
,
"GV"
,
"GI"
]
# TODO: maybe a more elegent way other than hard coded?
# e.g. use something like a nested dict:
if
chipID
in
[
6
,
15
,
16
,
25
]:
filter_type
=
"y"
if
chipID
in
[
11
,
20
]:
filter_type
=
"z"
if
chipID
in
[
7
,
24
]:
filter_type
=
"i"
if
chipID
in
[
14
,
17
]:
filter_type
=
"u"
if
chipID
in
[
9
,
22
]:
filter_type
=
"r"
if
chipID
in
[
12
,
13
,
18
,
19
]:
filter_type
=
"nuv"
if
chipID
in
[
8
,
23
]:
filter_type
=
"g"
if
chipID
in
[
1
,
10
,
21
,
30
]:
filter_type
=
"GI"
if
chipID
in
[
2
,
5
,
26
,
29
]:
filter_type
=
"GV"
if
chipID
in
[
3
,
4
,
27
,
28
]:
filter_type
=
"GU"
filter_id
=
filter_type_list
.
index
(
filter_type
)
return
filter_id
,
filter_type
def
getChipLim
(
chipID
):
"""
Calculate the edges in pixel for a given CCD chip on the focal plane
NOTE: There are 5*4 CCD chips in the focus plane for photometric observation.
Parameters:
chipID: int
the index of the chip
Returns:
A galsim BoundsD object
"""
xt
,
yt
,
x0
,
y0
,
gx
,
gy
,
xnchip
,
ynchip
=
ccdParam
()
gx1
,
gx2
=
gx
rowID
=
((
chipID
-
1
)
%
5
)
+
1
colID
=
6
-
((
chipID
-
1
)
//
5
)
# xlim of a given CCD chip
xrem
=
2
*
(
colID
-
1
)
-
(
xnchip
-
1
)
xcen
=
(
x0
//
2
+
gx1
//
2
)
*
xrem
if
chipID
>=
26
or
chipID
==
21
:
xcen
=
(
x0
//
2
+
gx1
//
2
)
*
xrem
-
(
gx2
-
gx1
)
if
chipID
<=
5
or
chipID
==
10
:
xcen
=
(
x0
//
2
+
gx1
//
2
)
*
xrem
+
(
gx2
-
gx1
)
nx0
=
xcen
-
x0
//
2
+
1
nx1
=
xcen
+
x0
//
2
# ylim of a given CCD chip
yrem
=
(
rowID
-
1
)
-
ynchip
//
2
ycen
=
(
y0
+
gy
)
*
yrem
ny0
=
ycen
-
y0
//
2
+
1
ny1
=
ycen
+
y0
//
2
return
nx0
-
1
,
nx1
-
1
,
ny0
-
1
,
ny1
-
1
def
ccdLayout
(
xpixTar
,
ypixTar
,
figout
=
"ccdLayout.pdf"
):
fig
=
plt
.
figure
(
figsize
=
(
10.0
,
8.0
))
ax
=
fig
.
add_axes
([
0.1
,
0.1
,
0.80
,
0.80
])
# plot the layout of the ccd distribution
for
i
in
range
(
30
):
ichip
=
i
+
1
fId
,
fType
=
getChipFilter
(
ichip
)
ischip
=
str
(
"0%d"
%
ichip
)[
-
2
:]
ix0
,
ix1
,
iy0
,
iy1
=
getChipLim
(
ichip
)
ax
.
plot
([
ix0
,
ix1
],[
iy0
,
iy0
],
"k-"
,
linewidth
=
2.5
)
ax
.
plot
([
ix0
,
ix1
],[
iy1
,
iy1
],
"k-"
,
linewidth
=
2.5
)
ax
.
plot
([
ix0
,
ix0
],[
iy0
,
iy1
],
"k-"
,
linewidth
=
2.5
)
ax
.
plot
([
ix1
,
ix1
],[
iy0
,
iy1
],
"k-"
,
linewidth
=
2.5
)
ax
.
text
(
ix0
+
500
,
iy0
+
1500
,
"%s#%s"
%
(
fType
,
ischip
),
fontsize
=
12
,
color
=
"grey"
)
ax
.
plot
(
xpixTar
,
ypixTar
,
"r*"
,
ms
=
12
)
ax
.
set_xlabel
(
"$X\,[\mathrm{pixels}]$"
,
fontsize
=
20
)
ax
.
set_ylabel
(
"$Y\,[\mathrm{pixels}]$"
,
fontsize
=
20
)
ax
.
invert_yaxis
()
ax
.
axis
(
'off'
)
plt
.
savefig
(
figout
)
def
parseArguments
():
# Create argument parser
parser
=
argparse
.
ArgumentParser
()
# Positional arguments
parser
.
add_argument
(
"ra_target"
,
type
=
float
)
parser
.
add_argument
(
"dec_target"
,
type
=
float
)
parser
.
add_argument
(
"ra_point"
,
type
=
float
)
parser
.
add_argument
(
"dec_point"
,
type
=
float
)
# Optional arguments
parser
.
add_argument
(
"-image_rot"
,
type
=
float
,
default
=-
113.4333
)
parser
.
add_argument
(
"-figout"
,
type
=
str
,
default
=
"zTargetOnCCD.pdf"
)
# Parse arguments
args
=
parser
.
parse_args
()
return
args
if
__name__
==
"__main__"
:
# Parse the arguments
args
=
parseArguments
()
# Run function
focalPlaneInf
(
args
.
ra_target
,
args
.
dec_target
,
args
.
ra_point
,
args
.
dec_point
,
args
.
image_rot
,
args
.
figout
)
tools/getPSF.py
deleted
100644 → 0
View file @
4afd1181
import
os
import
numpy
as
np
import
ObservationSim.PSF.PSFInterp
as
PSFInterp
from
ObservationSim.Instrument
import
Chip
,
Filter
,
FilterParam
import
yaml
import
galsim
import
astropy.io.fits
as
fitsio
# Setup PATH
SIMPATH
=
"/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C8/testRun_FGS"
config_filename
=
SIMPATH
+
"/config_C6_fits.yaml"
cat_filename
=
SIMPATH
+
"/MSC_00000000/MSC_10106100000000_chip_40_filt_FGS.cat"
# Read cat file
catFn
=
open
(
cat_filename
,
"r"
)
line
=
catFn
.
readline
()
print
(
cat_filename
,
'
\n
'
,
line
)
imgPos
=
[]
chipID
=
-
1
for
line
in
catFn
:
line
=
line
.
strip
()
columns
=
line
.
split
()
if
chipID
==
-
1
:
chipID
=
int
(
columns
[
1
])
else
:
assert
chipID
==
int
(
columns
[
1
])
ximg
=
float
(
columns
[
3
])
yimg
=
float
(
columns
[
4
])
imgPos
.
append
([
ximg
,
yimg
])
imgPos
=
np
.
array
(
imgPos
)
nobj
=
imgPos
.
shape
[
0
]
print
(
'chipID, nobj::'
,
chipID
,
nobj
)
# Read config file
with
open
(
config_filename
,
"r"
)
as
stream
:
try
:
config
=
yaml
.
safe_load
(
stream
)
for
key
,
value
in
config
.
items
():
print
(
key
+
" : "
+
str
(
value
))
except
yaml
.
YAMLError
as
exc
:
print
(
exc
)
# Setup Chip
chip
=
Chip
(
chipID
=
chipID
,
config
=
config
)
print
(
'chip.bound::'
,
chip
.
bound
.
xmin
,
chip
.
bound
.
xmax
,
chip
.
bound
.
ymin
,
chip
.
bound
.
ymax
)
for
iobj
in
range
(
nobj
):
print
(
"
\n
get psf for iobj-"
,
iobj
,
'
\t
'
,
'bandpass:'
,
end
=
" "
,
flush
=
True
)
# Setup Position on focalplane
x
,
y
=
imgPos
[
iobj
,
:]
# try get the PSF at some location (1234, 1234) on the chip
x
=
x
+
chip
.
bound
.
xmin
y
=
y
+
chip
.
bound
.
ymin
pos_img
=
galsim
.
PositionD
(
x
,
y
)
# Setup sub-bandpass
# (There are 4 sub-bandpasses for each PSF sample)
filter_param
=
FilterParam
()
filter_id
,
filter_type
=
chip
.
getChipFilter
()
filt
=
Filter
(
filter_id
=
filter_id
,
filter_type
=
filter_type
,
filter_param
=
filter_param
,
ccd_bandpass
=
chip
.
effCurve
)
bandpass_list
=
filt
.
bandpass_sub_list
for
i
in
range
(
len
(
bandpass_list
)):
print
(
i
,
end
=
" "
,
flush
=
True
)
bandpass
=
bandpass_list
[
i
]
# say you want to access the PSF for the sub-bandpass at the blue end for that chip
# Get corresponding PSF model
psf_model
=
PSFInterp
(
chip
=
chip
,
npsf
=
100
,
PSF_data_file
=
config
[
"psf_setting"
][
"psf_dir"
])
psf
=
psf_model
.
get_PSF
(
chip
=
chip
,
pos_img
=
pos_img
,
bandpass
=
bandpass
,
galsimGSObject
=
False
)
if
True
:
fn
=
"psf_{:}.{:}.{:}.fits"
.
format
(
chipID
,
iobj
,
i
)
if
fn
!=
None
:
if
os
.
path
.
exists
(
fn
):
os
.
remove
(
fn
)
hdu
=
fitsio
.
PrimaryHDU
()
hdu
.
data
=
psf
hdu
.
header
.
set
(
'pixScale'
,
5
)
hdu
.
writeto
(
fn
)
tools/indexFits_hdf5.py
deleted
100644 → 0
View file @
4afd1181
# NAME:
# indexFits_hdf5
# PURPOSE:
# Return a healpix indexed catalog from a set of Fits
# CALLING:
# write_StampsIndex(dir_cat=dir_temp)
# INPUTS:
# dir_cat - the directory of Fits catalog, "<dir_cat>/stampCats/*.fits"
# OUTPUTS:
# <dir_cat>/stampCatsIndex.hdf5
# OPTIONAL:
# test_fits(nfits=100, dir_cat=None) - generate a set of Fits by galsim gaussian
# HISTORY:
# Written by Chengliang Wei, 13 Apr. 2023
# Included by csst-simulation, C.W. 25 Apr. 2023
#
#
import
os
import
numpy
as
np
import
astropy.io.fits
as
fitsio
import
h5py
import
healpy
import
galsim
def
test_fits
(
nfits
=
100
,
dir_cat
=
None
):
for
ifits
in
range
(
nfits
):
gal
=
galsim
.
Gaussian
(
sigma
=
np
.
random
.
uniform
(
0.2
,
0.3
)).
shear
(
g1
=
np
.
random
.
uniform
(
-
0.5
,
0.5
),
g2
=
np
.
random
.
uniform
(
-
0.5
,
0.5
))
arr
=
gal
.
drawImage
(
nx
=
64
,
ny
=
64
,
scale
=
0.074
).
array
hdu
=
fitsio
.
PrimaryHDU
()
hdu
.
data
=
arr
hdu
.
header
.
set
(
'index'
,
ifits
)
hdu
.
header
.
set
(
'ra'
,
60.
+
np
.
random
.
uniform
(
-
0.2
,
0.2
))
hdu
.
header
.
set
(
'dec'
,
-
40.
+
np
.
random
.
uniform
(
-
0.2
,
0.2
))
hdu
.
header
.
set
(
'mag_g'
,
22
+
np
.
random
.
uniform
(
-
1
,
1
))
hdu
.
header
.
set
(
'pixScale'
,
0.074
)
fout
=
dir_cat
+
"stampCats/testStamp_{:}.fits"
.
format
(
ifits
)
if
os
.
path
.
exists
(
fout
):
os
.
remove
(
fout
)
hdu
.
writeto
(
fout
)
def
write_StampsIndex
(
dir_cat
=
None
,
DEBUG
=
False
):
MAXNUMBERINDEX
=
10000
NSIDE
=
128
fp
=
h5py
.
File
(
dir_cat
+
'stampCatsIndex.hdf5'
,
'w'
)
grp1
=
fp
.
create_group
(
'Stamps'
)
dataSet_Size
=
np
.
zeros
(
healpy
.
nside2npix
(
NSIDE
),
dtype
=
np
.
int64
)
fitsList
=
os
.
listdir
(
dir_cat
+
'stampCats/'
)
#获取fits文件列表
for
istamp
in
range
(
len
(
fitsList
)):
print
(
istamp
,
': '
,
fitsList
[
istamp
],
end
=
'
\r
'
)
hdu
=
fitsio
.
open
(
dir_cat
+
"stampCats/"
+
fitsList
[
istamp
])
tra
=
hdu
[
0
].
header
[
'RA'
]
tdec
=
hdu
[
0
].
header
[
'DEC'
]
healpixID
=
healpy
.
ang2pix
(
NSIDE
,
tra
,
tdec
,
nest
=
False
,
lonlat
=
True
)
if
not
(
str
(
healpixID
)
in
grp1
):
grp2
=
grp1
.
create_group
(
str
(
healpixID
))
else
:
grp2
=
grp1
[
str
(
healpixID
)]
if
not
(
'ra'
in
grp2
):
dset_ra
=
grp2
.
create_dataset
(
'ra'
,
(
0
,),
dtype
=
'f16'
,
maxshape
=
(
MAXNUMBERINDEX
,
))
dset_dec
=
grp2
.
create_dataset
(
'dec'
,
(
0
,),
dtype
=
'f16'
,
maxshape
=
(
MAXNUMBERINDEX
,
))
dt
=
h5py
.
special_dtype
(
vlen
=
str
)
dset_fn
=
grp2
.
create_dataset
(
'filename'
,
(
0
,),
dtype
=
dt
,
maxshape
=
(
MAXNUMBERINDEX
,
))
else
:
dset_ra
=
grp2
[
'ra'
]
dset_dec
=
grp2
[
'dec'
]
dset_fn
=
grp2
[
'filename'
]
dataSet_Size
[
healpixID
]
=
dataSet_Size
[
healpixID
]
+
1
grp2
[
'ra'
].
resize
((
dataSet_Size
[
healpixID
],))
grp2
[
'dec'
].
resize
((
dataSet_Size
[
healpixID
],))
grp2
[
'filename'
].
resize
((
dataSet_Size
[
healpixID
],))
dset_ra
[
dataSet_Size
[
healpixID
]
-
1
]
=
tra
dset_dec
[
dataSet_Size
[
healpixID
]
-
1
]
=
tdec
dset_fn
[
dataSet_Size
[
healpixID
]
-
1
]
=
fitsList
[
istamp
]
fp
.
close
()
if
DEBUG
:
print
(
'
\n
'
)
ff
=
h5py
.
File
(
dir_cat
+
"stampCatsIndex.hdf5"
,
"r"
)
ss
=
0
for
kk
in
ff
[
'Stamps'
].
keys
():
print
(
kk
,
ff
[
'Stamps'
][
kk
][
'ra'
].
size
)
ss
=
ss
+
ff
[
'Stamps'
][
kk
][
'ra'
].
size
print
(
ss
)
if
__name__
==
'__main__'
:
dir_temp
=
"./Catalog_test/"
#test_fits(dir_cat=dir_temp)
write_StampsIndex
(
dir_cat
=
dir_temp
)
Prev
1
…
7
8
9
10
11
Next
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment