Commit 7c99b572 authored by GZhao's avatar GZhao
Browse files

update cpic sourcecode

parent 7283405d
from .main import quick_run_v2, vis_observation
from .optics import focal_mask
from .target import star_photlam, planet_contrast, extract_target_x_y, spectrum_generator
from .camera import CosmicRayFrameMaker, sky_frame_maker
from .config import __version__
__all__ = [
"CosmicRayFrameMaker",
"sky_frame_maker",
"star_photlam",
"planet_contrast",
"extract_target_x_y",
"spectrum_generator",
"focal_mask",
"quick_run_v2",
"vis_observation",
"__version__"
]
\ No newline at end of file
This diff is collapsed.
import os, yaml
import warnings
from datetime import datetime
import numpy as np
config_aim = os.path.dirname(os.path.dirname(__file__))
config_aim = os.path.join(config_aim, 'data/refdata_path.yaml')
# def set_config(refdata_path=None):
# if refdata_path is None:
# print("input cpism refencence data folder")
# refdata_path = input()
# refdata_path = os.path.abspath(refdata_path)
# with open(config_aim, 'w') as f:
# yaml.dump(refdata_path, f)
# return refdata_path
# try:
# with open(config_aim, 'r') as f:
# cpism_refdata = yaml.load(f, Loader=yaml.FullLoader)
# if not os.path.isdir(cpism_refdata):
# raise FileNotFoundError('cpism refdata path not found')
# config_set = True
# except FileNotFoundError:
# warnings.warn(f'refdata not setup yet, set it before use')
# cpism_refdata = set_config()
def load_refdata_path(config_aim):
"""Load refdata path.
The refdata path is stored in config_aim file. If not found, set it.
Parameters
----------
config_aim : str
config_aim file path
"""
with open(config_aim, 'r') as f:
refdata_list = yaml.load(f, Loader=yaml.FullLoader)
for refdata in refdata_list:
if os.path.isdir(refdata):
return refdata
print("csst_cpic_sim refdata folder not found, please input cpism refencence data folder")
refdata = input()
refdata = os.path.abspath(refdata)
if os.path.isdir(refdata):
refdata_list.append(refdata)
with open(config_aim, 'w') as f:
yaml.dump(refdata_list, f)
exit()
cpism_refdata = load_refdata_path(config_aim)
config = {}
config['cpism_refdata'] = cpism_refdata
config['utc0'] = '2024-05-01T00:00:00'
config['hybrid_model'] = f'{cpism_refdata}/target_model/hybrid_model.fits'
config['bcc_model'] = f'{cpism_refdata}/target_model/bccmodels'
config['mag_system'] = 'abmag'
config['apm_file'] = f'{cpism_refdata}/optics/apm.fits'
config['actuator_file'] = f'{cpism_refdata}/optics/actuator.fits'
config['aberration'] = f'{cpism_refdata}/optics/initial_phase_aberration.fits'
config['mask_width'] = 0.4
config['check_fits_header'] = False
config['bands'] = {
'f661': f'{cpism_refdata}/throughtput/f661_total.fits',
'f743': f'{cpism_refdata}/throughtput/f743_total.fits',
'f883': f'{cpism_refdata}/throughtput/f883_total.fits',
'f565': f'{cpism_refdata}/throughtput/f565_total.fits',
'f520': f'{cpism_refdata}/throughtput/f520.fits',
'f662': f'{cpism_refdata}/throughtput/f662.fits',
'f850': f'{cpism_refdata}/throughtput/f850.fits',
'f720': f'{cpism_refdata}/throughtput/f720.fits',
}
config['diameter'] = 2 # in meters
config['platescale'] = 0.016153
config['datamodel'] = f'{cpism_refdata}/io/csst-cpic-l0.yaml'
config['log_dir'] = f'{cpism_refdata}/log'
config['log_level'] = f'info'
config['output'] = f'./'
config['sp2teff_model'] = f'{cpism_refdata}/target_model/sptype2teff_lut.json'
config['dm_pickle'] = f'{cpism_refdata}/optics/dm_model.pkl'
config['pysyn_refdata'] = f'{cpism_refdata}/starmodel/grp/redcat/trds'
config['catalog_folder'] = f'{cpism_refdata}/demo_catalog'
config['csst_format'] = True
config['nsample'] = 5
update_able_keys = [
'apm_file', 'actuator_file', 'aberration', 'log_dir', 'log_level', 'catalog_folder', 'nsample', 'csst_format', 'output', 'check_fits_header'
]
def replace_cpism_refdata(
config: dict,
output: str = '$') -> None:
"""Replace the cpism_refdata in the config.
In the config file, we use ${cpism_refdata} to indicate the cpism_refdata.
This function is used to replace the cpism_refdata in the config, or replace back.
Parameters
----------
config: dict
config dict.
output: str
'$' or 'other'. If output is '$', then replace the cpism_refdata in the config with ${cpism_refdata}.
If output is 'other', then replace the ${cpism_refdata} in the config file with the real path.
'$' is used meanly to generate a demo config file.
"""
aim = cpism_refdata
target = '${cpism_refdata}'
if output != '$':
aim, target = target, aim
for key, value in config.items():
if isinstance(value, str):
config[key] = value.replace(aim, target)
if isinstance(value, dict):
replace_cpism_refdata(value, output)
with open(cpism_refdata + '/cpism_config.yaml', 'r') as f:
new_config = yaml.load(f, Loader=yaml.FullLoader)
replace_cpism_refdata(new_config, None)
config.update(new_config)
if os.environ.get('PYSYN_CDBS') is None:
os.environ['PYSYN_CDBS'] = config['pysyn_refdata']
__version__ = '2.0.0'
# we need to ignore the warning from pysynphot, because we only use the star models.
with warnings.catch_warnings(): # pragma: no cover
warnings.filterwarnings("ignore")
import pysynphot as S
def setup_config(new_config):
"""Set up config from a dict.
Some of the configs need to calcuate.
Parameters
----------
new_config: dict
new config dict.
Returns
-------
None
"""
config.update(new_config)
config['utc0_float'] = datetime.timestamp(datetime.fromisoformat(config['utc0']))
config['solar_spectrum'] = f"{os.environ['PYSYN_CDBS']}/grid/solsys/solar_spec.fits"
config['aperature_area'] = (config['diameter'] * 50)**2 * np.pi # cm^2
config['default_band'] = list(config['bands'].keys())[0]
config['default_filter'] = config['bands'][config['default_band']]
setup_config({})
def which_focalplane(band):
"""
Return the name of the focalplane which the band belongs to.
right now only support vis
Parameters
-----------
band: str
The name of the band.
Returns
--------
str
The name of the focalplane.
'vis' or 'nir' or 'wfs'
Raises
-------
ValueError
If the band is not in ['f565', 'f661', 'f743', 'f883', 'f940', 'f1265', 'f1425', 'f1542', 'wfs']
"""
# band = band.lower()
# if band in ['f565', 'f661', 'f743', 'f883']:
# return 'vis'
# if band in ['f940', 'f1265', 'f1425', 'f1542']:
# return 'nir'
# if band in ['wfs']:
# return 'wfs'
return 'vis'
# raise ValueError(f"未知的波段{band}")
def iso_time(time):
"""Transfer relative time to iso time format
Parameters
----------
time: str or float
The relative time in seconds.
Returns
-------
str
The iso time format.
"""
if isinstance(time, str):
_ = datetime.fromisoformat(time)
return time
utc0 = config['utc0']
time0 = datetime.timestamp(datetime.fromisoformat(utc0))
time = datetime.fromtimestamp(time0 + time)
return time.isoformat()
def relative_time(time):
"""Transfer iso time format to relative time in seconds
Parameters
----------
time: str or float
The iso time format.
Returns
-------
float
The relative time in seconds.
"""
if isinstance(time, float):
return time
if isinstance(time, int):
return float(time)
utc0 = config['utc0']
time0 = datetime.timestamp(datetime.fromisoformat(utc0))
return datetime.timestamp(datetime.fromisoformat(time)) - time0
\ No newline at end of file
import yaml, os, re
from datetime import datetime
import numpy as np
import pandas as pd
from astropy.io import fits
from astropy.coordinates import SkyCoord
from .config import __version__, which_focalplane
from .utils import Logger
from .config import config, iso_time
default_output_dir = config['output']
log_level = config['log_level']
header_check = config['check_fits_header']
def set_up_logger(log_dir):
if not os.path.exists(log_dir):
os.makedirs(log_dir)
return Logger(log_dir+'/cpism_pack.log', log_level).logger
log = set_up_logger(config['log_dir'])
def check_and_update_fits_header(header):
"""
Check the header keywords and update the description according to the data model.
Parameters
-----------
header: astropy.io.fits.header.Header
The header to be checked.
Returns
--------
None
"""
# model_file = f"{cpism_refdata}/io/image_header.json"
# if hdu == 'primary':
# model_file = f"{cpism_refdata}/io/primary_header.json"
model_file = config['datamodel']
with open(model_file, 'r', encoding='utf-8') as fid:
data_model = yaml.load(fid, Loader=yaml.FullLoader)
if 'FILETYPE' in header.keys():
header_model =data_model['HDU0']
hdu = 'hdu0'
else:
header_model = data_model['HDU1']
hdu = 'hdu1'
dm_comment = {}
def print_warning(info):
if header_check:
log.warning(info)
# check existance and format of keyword in fits header
for _, content in header_model.items():
comment = content['comment']
keyword = content['key']
dtype = content['dtype']
if pd.isnull(comment):
comment = ''
if len(comment) > 47:
# comment = comment[:46]
print_warning(
f"Keyword {keyword} has a comment longer than 47 characters.")
dm_comment[keyword] = comment
if keyword not in header.keys():
print_warning(f"Keyword {keyword} not found in header.")
elif not pd.isnull(dtype):
value = header[keyword]
# check the type of the value, I for int, R for float, C for str
if isinstance(value, str):
key_type = 'str'
elif isinstance(value, float):
key_type = 'flo'
elif isinstance(value, bool):
key_type = 'boo'
elif isinstance(value, int):
key_type = 'int'
elif isinstance(value, type(header['COMMENT'])):
key_type = 'str'
else:
key_type = 'ukn'
if key_type != dtype[0:3]:
print_warning(
f"Keyword {keyword} has wrong type in [{hdu}]. {dtype} expected, {key_type} found.")
# check if there are extral keyword in header, and update the comment
for keyword in header.keys():
# print(keyword)
if keyword not in dm_comment.keys():
print_warning(
f"Keyword {keyword} not found in the [{hdu}] data model.")
elif keyword != 'COMMENT': # comment keyword is not allowed to be updated
header[keyword] = (header[keyword], dm_comment[keyword])
return header
def obsid_parser(
obsid: int):
"""
Parse the obsid to get the obstype.
Parameters
----------
obsid: str
The obsid of the observation.
Obsid must be 11 digits and start with 5 for CPIC.
Returns
-------
str
The obstype of the observation.
Raises
------
ValueError
If the obsid is not 11 digits or does not start with 5.
"""
obsid = str(obsid)
if len(obsid) != 11:
raise ValueError('Obsid must be 11 digits.')
if obsid[0] != '4':
raise ValueError('Obsid must start with 4 for CPIC')
obstype_dict = {
'20': 'BIAS',
'21': 'DARK',
'22': 'FLAT',
'23': 'BKG',
'24': 'LASER',
'01': 'SCI',
'02': 'DSF',
'10': 'CALS',
}
obstype = obstype_dict.get(obsid[1:3], 'DEFT')
return obstype
def datetime_obj_to_mjd(time_obj):
"""
transfer datetime object to mean julian date (MJD).
Parameters
----------
time_obj: datetime.datetime
The datetime object.
Returns
-------
float
The mean julian date (MJD).
"""
return (time_obj - datetime(1858, 11, 17)).total_seconds() / 86400
def primary_hdu(
obs_info: dict,
gnc_info: dict,
filename_output=False):
"""
Generate the primary hdu of the fits file.
Parameters
----------
obs_info: dict
The parameters of the observation. See `save_fits` function.
gnc_info: dict
The gnc information of the observation.
filename_output: bool (optional)
If True, the folder and the filename will be returned.
Returns
-------
fits.PrimaryHDU
The primary hdu of the fits file.
str, str
The folder and filename of the fits file. Only returned if filename_output is True.
Notes
-----
The gnc_info dict should contain the information of orbit and observation.
these informations are used to genrated the hdu header. Refer to the data model for more information.
"""
# camera_config, _ = load_camera_and_optics_config(obs_info['band'])
obsid = obs_info['obsid']
exp_start = obs_info['EXPSTART']
exp_start = datetime.fromisoformat(exp_start)
exp_end = obs_info['EXPEND']
exp_end = datetime.fromisoformat(exp_end)
filename = "CSST_CPIC"
filename += "_" + which_focalplane(obs_info['band']).upper()
filename += "_" + obsid_parser(obsid)
filename += "_" + exp_start.strftime("%Y%m%d%H%M%S")
filename += "_" + exp_end.strftime("%Y%m%d%H%M%S")
filename += f"_{obsid}_X_L0_V01.fits"
type_dir = 'CAL'
if int(f'{obsid}'[1:3]) <= 10:
type_dir = 'SCI'
mjd_dir = f"{datetime_obj_to_mjd(exp_start):.0f}"
folder = f"{type_dir}/{mjd_dir}"
header = fits.Header()
# General keywords
header['SIMPLE'] = True
header['BITPIX'] = 8
header['NAXIS'] = 0
header['EXTEND'] = True
header['NEXTEND'] = 1 # + parameters['nframe']
# header['GROUPS'] = False
header['DATE'] = datetime.now().isoformat(timespec='seconds')
heaer_filename = filename[:-4]
if len(heaer_filename) > 68:
heaer_filename = heaer_filename[:68]
header['FILENAME'] = heaer_filename
header['FILETYPE'] = obsid_parser(obsid)
header['TELESCOP'] = 'CSST'
header['INSTRUME'] = 'CPIC'
header['RADECSYS'] = 'ICRS'
header['EQUINOX'] = 2000.0
header['FITSSWV'] = f'CPISM V{__version__}'
header['COMMENT'] = ''
cstar = {'ra': '0d', 'dec': '0d'}
if obs_info['target'] != {}:
cstar = obs_info['target']['cstar']
radec = SkyCoord(cstar['ra'], cstar['dec'])
target_name = radec.to_string('hmsdms')
target_name = re.sub(R'[hdms\s]', '', target_name)
header['OBJECT'] = cstar.get('name', target_name)
header['TARGET'] = target_name
header['OBSID'] = str(obsid)
header['RA_OBJ'] = radec.ra.degree
header['DEC_OBJ'] = radec.dec.degree
# telescope information
header['REFFRAME'] = 'CSSTGSC-1.0'
header['DATE-OBS'] = exp_start.isoformat(timespec='seconds')
header['SATESWV'] = '1'
header['EXPSTART'] = datetime_obj_to_mjd(exp_start)
cabstart = gnc_info.get('CABSTART', exp_start.isoformat(timespec='seconds'))
cabstart = iso_time(cabstart)
cabstart_mjd = datetime_obj_to_mjd(datetime.fromisoformat(cabstart))
header['CABSTART'] = cabstart_mjd
header['SUNANGL0'] = gnc_info.get('SUNANGL0', -1.0)
header['MOONANG0'] = gnc_info.get('MOONANG0', -1.0)
header['TEL_ALT0'] = gnc_info.get('TEL_ALT0', -1.0)
header['POS_ANG0'] = gnc_info.get(
'POS_ANG0', float(obs_info['rotation']))
header['POSI0_X'] = gnc_info.get('POSI0_X', -1.0)
header['POSI0_Y'] = gnc_info.get('POSI0_Y', -1.0)
header['POSI0_Z'] = gnc_info.get('POSI0_Z', -1.0)
header['VELO0_X'] = gnc_info.get('VELO0_X', -1.0)
header['VELO0_Y'] = gnc_info.get('VELO0_Y', -1.0)
header['VELO0_Z'] = gnc_info.get('VELO0_Z', -1.0)
header['EULER0_1'] = gnc_info.get('EULER0_1', -1.0)
header['EULER0_2'] = gnc_info.get('EULER0_2', -1.0)
header['EULER0_3'] = gnc_info.get('EULER0_3', -1.0)
header['RA_PNT0'] = gnc_info.get('RA_PNT0', header['RA_OBJ'])
header['DEC_PNT0'] = gnc_info.get('DEC_PNT0', header['DEC_OBJ'])
header['EXPEND'] = datetime_obj_to_mjd(exp_end)
cabend = gnc_info.get('CABEND', exp_end.isoformat(timespec='seconds'))
cabend = iso_time(cabend)
cabend_mjd = datetime_obj_to_mjd(datetime.fromisoformat(cabend))
header['CABEND'] = cabend_mjd
header['SUNANGL1'] = gnc_info.get('SUNANGL1', header['SUNANGL0'])
header['MOONANG1'] = gnc_info.get('MOONANG1', header['MOONANG0'])
header['TEL_ALT1'] = gnc_info.get('TEL_ALT1', header['TEL_ALT0'])
header['POS_ANG1'] = gnc_info.get('POS_ANG1', header['POS_ANG0'])
header['POSI1_X'] = gnc_info.get('POSI1_X', header['POSI0_x'])
header['POSI1_Y'] = gnc_info.get('POSI1_Y', header['POSI0_y'])
header['POSI1_Z'] = gnc_info.get('POSI1_Z', header['POSI0_z'])
header['VELO1_X'] = gnc_info.get('VELO1_X', header['VELO0_x'])
header['VELO1_Y'] = gnc_info.get('VELO1_Y', header['VELO0_y'])
header['VELO1_Z'] = gnc_info.get('VELO1_Z', header['VELO0_z'])
header['EULER1_1'] = gnc_info.get('EULER1_1', header['EULER0_1'])
header['EULER1_2'] = gnc_info.get('EULER1_2', header['EULER0_2'])
header['EULER1_3'] = gnc_info.get('EULER1_3', header['EULER0_3'])
header['RA_PNT1'] = gnc_info.get('RA_PNT1', header['RA_PNT0'])
header['DEC_PNT1'] = gnc_info.get('DEC_PNT1', header['DEC_PNT0'])
header['EPOCH'] = float(exp_start.year)
header['EXPTIME'] = (exp_end - exp_start).total_seconds()
header['CHECKSUM'] = '0000000000000000'
header['DATASUM'] = '0000000000'
check_and_update_fits_header(header)
# other information
hdu = fits.PrimaryHDU(header=header)
hdu.add_checksum()
if filename_output:
return hdu, folder, filename
else:
return hdu
def frame_header(obs_info, index, primary_header, camera_dict):
"""
Generate the header for a single frame.
Parameters
----------
obs_info : dict
Dictionary of parameters. See `save_fits` function.
index : int
Frame index.
primary_header : dict
Primary header
Returns
-------
astropy.io.fits.Header
"""
header = fits.Header()
# camera_config, optics_config = load_camera_and_optics_config(
# obs_info['band'])
camera_config = camera_dict
plszx = camera_config['plszx']
plszy = camera_config['plszy']
pscan1 = camera_config['pscan1']
pscan2 = camera_config['pscan2']
oscan1 = camera_config['oscan1']
oscan2 = camera_config['oscan2']
udark = camera_config['udark']
bdark = camera_config['bdark']
ldark = camera_config['ldark']
rdark = camera_config['rdark']
imgszx = plszx + pscan1 + oscan1 + ldark + rdark
imgszy = plszy + pscan2 + oscan2 + udark + bdark
header['XTENSION'] = 'IMAGE'
header['BITPIX'] = 16
header['NAXIS'] = 2
header['NAXIS1'] = 1080
header['NAXIS2'] = 1050
header['PCOUNT'] = 0
header['GCOUNT'] = 1
header['BSCALE'] = 1
header['BZERO'] = 32768
header['EXTNAME'] = 'IMAGE'
header['EXTVER'] = 1
header['BUNIT'] = 'ADU'
header['FILTER'] = obs_info['band']
header['DETSN'] = '0'
header['DETNAME'] = camera_config['detector_name']
header['CHIPLAB'] = camera_config['ccd_label']
header['DEWTEMP'] = float(camera_config['cooler_temp'])
frame_info = obs_info['frame_info'][index]
header['CHIPTEMP'] = float(frame_info['chiptemp'])
header['DETSIZE'] = f"{imgszx} * {imgszy}"
header['IMGINDEX'] = index + 1
utc0_float = config['utc0_float']
ra0 = primary_header.get('RA_PNT0', -1.0)
dec0 = primary_header.get('DEC_PNT0', -1.0)
pa0 = primary_header.get('POS_ANG0', -1.0)
cab0 = primary_header.get('CABSTART', obs_info['frame_info'][0]['expt_start'])
ra1 = primary_header.get('RA_PNT1', ra0)
dec1 = primary_header.get('DEC_PNT1', dec0)
pa1 = primary_header.get('POS_ANG1', pa0)
cab1 = primary_header.get('CABEND', cab0)
frame_stamp = frame_info['expt_start'] + utc0_float
frame_mjd = datetime_obj_to_mjd(datetime.fromtimestamp(frame_stamp))
cab0 = iso_time(cab0)
cab1 = iso_time(cab1)
cab0 = datetime_obj_to_mjd(datetime.fromisoformat(cab0))
cab1 = datetime_obj_to_mjd(datetime.fromisoformat(cab1))
ratio = (frame_mjd - cab0)/(cab1 - cab0)
ra = ra0 + (ra1 - ra0) * ratio
dec = dec0 + (dec1 - dec0) * ratio
pa = pa0 + (pa1 - pa0) * ratio
header['IMG_EXPT'] = datetime.fromtimestamp(frame_stamp).isoformat(timespec='seconds')
header['IMG_CABT'] = header['IMG_EXPT']
header['IMG_DUR'] = float(obs_info['expt'])
header['IMG_PA'] = ra
header['IMG_RA'] = dec
header['IMG_DEC'] = pa
header['DATASECT'] = f"{plszx} * {plszy}"
header['PIXSCAL'] = frame_info['platescale']
header['PIXSIZE'] = float(camera_config['pitch_size'])
header['NCHAN'] = 1
header['PSCAN1'] = pscan1
header['PSCAN2'] = pscan2
header['OSCAN1'] = oscan1
header['OSCAN2'] = oscan2
header['UDARK'] = udark
header['BDARK'] = bdark
header['LDARK'] = ldark
header['RDARK'] = rdark
# WCS
cstar = {'ra': '0d', 'dec': '0d'}
if obs_info['target'] != {}:
cstar = obs_info['target']['cstar']
radec = SkyCoord(cstar['ra'], cstar['dec'])
shift = obs_info['shift']
platescale = frame_info['platescale']
rotation = np.radians(obs_info['rotation'])
header['WCSAXES'] = 2
header['CRPIX1'] = (plszx + 1)/2 + pscan1 + ldark + shift[0] / platescale
header['CRPIX2'] = (plszy + 1)/2 + pscan2 + udark + shift[0] / platescale
header['CRVAL1'] = radec.ra.degree
header['CRVAL2'] = radec.dec.degree
header['CTYPE1'] = 'RA---TAN'
header['CTYPE2'] = 'DEC--TAN'
header['CD1_1'] = np.cos(rotation)
header['CD1_2'] = -np.sin(rotation)
header['CD2_1'] = np.sin(rotation)
header['CD2_2'] = np.cos(rotation)
# Readout information
header['EMGAIN'] = float(obs_info['emgain'])
header['GAIN'] = float(camera_config['ph_per_adu'])
header['DET_BIAS'] = float(camera_config['bias_level'])
header['RON'] = float(camera_config['readout_noise'])
header['READTIME'] = float(camera_config['readout_time'])
header['ROSPEED'] = float(camera_config['readout_speed'])
# CPIC information
header['LS_STAT'] = 'OFF'
header['IWA'] = frame_info['iwa']
header['WFSINFO1'] = -1.0
header['WFSINFO2'] = -1.0
header['CHECKSUM'] = '0000000000000000'
header['DATASUM'] = '0000000000'
header = check_and_update_fits_header(header)
return header
def save_fits_simple(images, obs_info, output_folder='./'):
"""
Save the image to a fits file with a simple header to TMP directory.
Parameters
----------
images : numpy.ndarray
Image array to be written.
obs_info : dict
Dictionary of observation informations. See `save_fits` function.
Returns
----------
Filename of the saved fits file.
"""
target = obs_info['target']
target_info = 'NO_TARGET'
if 'cstar' in target.keys():
target_info = ''
target_info = f"S{target['cstar']['magnitude']:.1f}"
target_info += f"_P{len(target.get('planets', '[]'))}"
name = target_info
if 'name' in target.keys():
name = target['name']
name = name.replace('/', '_')
name = name.replace(',', '_')
now = datetime.now()
time = now.strftime("%Y%m%d%H%M%S")
filename = f"{name}_{time}.fits"
header = fits.Header()
header['skybg'] = obs_info['skybg']
header['name'] = name
header['exptime'] = obs_info['expt']
header['nframe'] = obs_info['nframe']
header['band'] = obs_info['band']
header['emgain'] = obs_info['emgain']
header['obsid'] = obs_info['obsid']
header['rotation'] = obs_info['rotation']
shift = obs_info['shift']
header['shift'] = f"x:{shift[0]},y:{shift[1]}"
fullname = os.path.join(output_folder, filename)
print(fullname)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
log.debug(f"Output folder {output_folder} is created.")
log.debug(f"save fits file to {fullname}")
fits.writeto(fullname, images, overwrite=True, header=header)
return fullname
def save_fits(images, obs_info, gnc_info, camera_dict={}, csst_format=True, output_folder='./'):
"""
Save the image to a fits file.
Parameters
----------
images : numpy.ndarray
Image array to be saved.
obs_info : dict
Dictionary of observation informations.
Must contain the following keys
- band: str
- Band of the image.
- expt: float
- Exposure time of the each image.
- nframe: int
- Number of frames in the image.
- emgain: int
- EM gain of the camera.
- obsid: str
- Observation ID. Obsid must be 11 digits and start with 5 for CPIC. See pharse_obsid() for details.
- rotation: float
- Rotation angle of the image.
- shift: list
- Shift of the image.
gnc_info : dict
Dictionary of GNC information.
Contains the keywords in the primary header. See primary_hdu() for details.
csst_format : bool, optional
Whether to save the fits file in CSST format, by default True.
"""
if not csst_format:
save_fits_simple(images, obs_info, output_folder=output_folder)
return
hdu_header, folder, filename = primary_hdu(obs_info, gnc_info, True)
hdu_list = fits.HDUList([hdu_header])
if len(images.shape) == 2:
images = np.array([images])
for index in range(images.shape[0]):
header = frame_header(
obs_info,
index,
hdu_header.header,
camera_dict=camera_dict,
)
frame_hdu = fits.ImageHDU(images[index, :, :], header=header)
frame_hdu.add_checksum()
hdu_list.append(frame_hdu)
folder = f"{output_folder}/{folder}"
if not os.path.exists(folder):
os.makedirs(folder)
log.debug(f'make new folder {folder}')
full_path = f"{folder}/{filename}"
log.debug(f'save fits file: {full_path}')
hdu_list.writeto(full_path, overwrite=True)
import argparse, sys, tqdm, time, os, yaml
from glob import glob
from datetime import datetime
import traceback
import numpy as np
from .target import spectrum_generator, target_file_load
from .optics import focal_mask, focal_convolve
from .camera import CosmicRayFrameMaker, sky_frame_maker, CpicVisEmccd
from .io import save_fits, log
from .config import update_able_keys, relative_time
from .config import config as default_config
def vis_observation(
target: dict,
skybg: float,
expt: float,
nframe: int,
band: str,
emset: int,
obsid: int = 41000000000,
rotation: float = 0,
shift: list = [0, 0],
gnc_info: dict = {},
csst_format: bool = True,
camera = CpicVisEmccd(),
crmaker = CosmicRayFrameMaker(),
nsample: int = 1,
emgain=None,
prograss_bar=None,
output='./'
) -> np.ndarray:
"""Observation simulation main process on visable band using EMCCD.
Parameters
-----------
target : dict
target dictionary. See the input of `target.spectrum_generator` for more details.
skybg : float
sky background in unit of magnitude/arcsec^2
expt: float
exposure time in unit of second
nframe: int
number of frames
band: str
band name
emset: int
EM gain setting value. 1023(0x3FF) for ~1.0× EM gain.
obsid: int
observation ID. Start from 4 for CPIC, 01 for science observation. See the input of io.obsid_parser for more details.
rotation: float
rotation of the telescope. in unit of degree. 0 means North is up.
shift: list
target shift in unit of arcsec
gnc_info: dict
gnc_info dictionary. See the input of io.primary_hdu for more details.
csst_format: bool
if True, the output fits file will be in the csst format.
crmaker: CosmicRayFrameMaker
CosmicRayFrameMaker object. See the input of camera.CosmicRayFrameMaker for more details.
nsample: int
number of samples for wide bandpass.
emgain: float or None
if None, emgain are set using emset parameter. Else, emgain are set using emgain parameter.
prograss_bar: bool
if True, a prograss_bar will be shown.
output: str
the output directory.
Returns
-------
image_cube: np.ndarray
"""
start_time = time.time()
platescale = default_config['platescale']
iwa = default_config['mask_width'] / 2
area = default_config['aperature_area']
expt_start = camera.system_time
target_list = []
if 'cstar' in target.keys():
target_list = spectrum_generator(target)
image_cube = []
if emgain is None:
emgain_value = camera.emgain_set(emset, self_update=False)
else:
emgain_value, emset = emgain, -emgain
params = {
'target': target,
'skybg': skybg,
'expt': expt,
'nframe': nframe,
'band': band,
'emset': emset,
'emgain': emgain_value,
'obsid': obsid,
'rotation': rotation,
'shift': shift,
}
all_frame_info = []
if prograss_bar:
pg_bar = tqdm.tqdm(total=nframe, leave=False)
for i in range(nframe):
if prograss_bar:
pg_bar.update(1)
else:
print(f'\r Simulation Running: Frame {i+1}/{nframe}', end='')
frame_info = {}
frame_info['expt_start'] = camera.system_time
focal_frame = focal_convolve(
band,
target_list,
init_shifts=shift,
rotation=rotation,
nsample=nsample,
platesize=camera.flat_shape
)
if skybg is None or skybg > 100:
sky_bkg_frame = 0
else:
sky_bkg_frame = sky_frame_maker(
band,
skybg,
platescale,
camera.flat_shape
)
sky_bkg_frame = sky_bkg_frame * area * expt
sky_bkg_frame = focal_mask(sky_bkg_frame, iwa, platescale)
cr_frame = crmaker.make_cr_frame(camera.dark_shape, expt)
camera.time_syn(expt, readout=True)
image = camera.readout(
focal_frame + sky_bkg_frame,
emset,
expt,
image_cosmic_ray=cr_frame,
emgain=emgain
)
image_cube.append(image)
frame_info['expt_end'] = camera.system_time
frame_info['chiptemp'] = camera.ccd_temp
frame_info['platescale'] = platescale
frame_info['iwa'] = iwa
all_frame_info.append(frame_info)
image_cube = np.array(image_cube)
expt_end = camera.system_time
utc0 = default_config['utc0']
utc0_stamp = datetime.timestamp(datetime.fromisoformat(utc0))
expt_start_iso = datetime.fromtimestamp(utc0_stamp + expt_start)
expt_end_iso = datetime.fromtimestamp(utc0_stamp + expt_end)
params['EXPSTART'] = expt_start_iso.isoformat()
params['EXPEND'] = expt_end_iso.isoformat()
params['frame_info'] = all_frame_info
save_fits(image_cube, params, gnc_info, camera.__dict__.copy(), csst_format=csst_format, output_folder=output)
if prograss_bar:
pg_bar.close()
print(f' Done [{time.time() - start_time:.1f}s] ')
else:
print(f'\r Done [{time.time() - start_time:.1f}s] ')
return image_cube
def quick_run_v2(
target_str: str,
band: str,
expt: float,
emgain: float,
nframe: int,
skybg: float = None,
rotation: float = 0,
shift: list = [0, 0],
emset_input: bool=False,
cr_frame: bool=True,
camera_effect: bool=True,
prograss_bar=False,
output='./') -> np.ndarray:
"""A quick run function for CPIC.
Parameters
----------
target_str: str
target string. See the input of `target.target_file_load` for more details.
band: str
band name
expt: float
exposure time in unit of second
emgain: float
EM gain value. Note that emgain is not the same as emset, controled by emset_input keyword.
nframe: int
number of frames
skybg: float or None
sky background in unit of magnitude. If None, sky background will not be set.
rotation: float
rotation of the telescope. in unit of degree. 0 means North is up.
shift: list
target shift in unit of arcsec
emset_input: bool
if True, emgain paramter is emset. Else, emgain is set using emgain parameter.
cr_frame: bool
if True, cosmic ray frame will be added to the image.
camera_effect: bool
if True, camera effect will be added to the image.
prograss_bar: bool
if True, a prograss_bar will be shown.
output: str
the output directory.
Returns
-------
image_cube: np.ndarray
"""
print(f'Quick Run: {target_str}')
log.debug(
f"""input parameters:
target_str: {target_str}
skybg: {skybg}
band: {band}
expt: {expt}
nframe: {nframe}
emgain: {emgain}
rotation: {rotation}
shift: {shift}
emset_input: {emset_input}
cr_frame: {cr_frame}
camera_effect: {camera_effect}
prograss_bar: {prograss_bar}
output: {output}
""")
target_dict = target_file_load(target_str)
if target_dict == {}:
target_dict['name'] = 'blank'
emgain_value = emgain
if emset_input:
emgain_value = None
camera = CpicVisEmccd()
if not camera_effect:
camera.switch['bias_vp'] = False
camera.switch['bias_hp'] = False
camera.switch['bias_ci'] = False
camera.switch['bias_shift'] = False
camera.switch['badcolumn'] = False
camera.switch['flat'] = False
camera.switch['cte'] = False
camera.switch['shutter'] = False
camera.switch['nonlinear'] = False
if not cr_frame:
camera.switch['cosmicray'] = False
return vis_observation(
target=target_dict,
skybg=skybg,
expt=expt,
nframe=nframe,
band=band,
emset=emgain,
emgain=emgain_value,
csst_format=False,
shift=shift,
rotation=rotation,
camera=camera,
prograss_bar=prograss_bar,
output=output
)
def deduplicate_names_add_count(names: list):
"""remove duplicate names and add count"""
for i in range(len(names)-1,-1,-1):
if names.count(names[i]) > 1:
names[i] = names[i] + '_' + str(names.count(names[i]))
def observation_simulation_from_config(obs_file, config_file):
"""Run observation simulation from config file
Parameters
-----------
obs_file: str
observation file or folder.
config_file: str
config file.
Examples:
see examples in `example` folder.
"""
config = {}
if config_file:
with open(config_file) as fid:
config = yaml.load(fid, Loader=yaml.FullLoader)
camera_config = config.get('camera', [{}])
all_camera = []
all_camera_name = []
for c_config in camera_config:
all_camera.append(CpicVisEmccd(c_config))
all_camera_name.append(c_config.get('name', 'camera'))
deduplicate_names_add_count(all_camera_name)
for key in config.keys():
if key in update_able_keys:
default_config[key] = config[key]
output_folder = default_config['output']
csst_format = default_config['csst_format']
nsample = default_config['nsample']
obs_file = os.path.abspath(obs_file)
file_list = []
if os.path.isdir(obs_file):
file_list = glob(f"{obs_file}/*.yaml")
elif '.yaml' == obs_file[-5:]:
file_list = [obs_file]
if not file_list:
log.warning(f"No observation file found in {obs_file}")
for ind_target, file in enumerate(file_list):
try:
with open(file, 'r') as fid:
obs_info = yaml.load(fid, Loader=yaml.FullLoader)
target = target_file_load(obs_info.get('target', {}))
skybg = obs_info.get('skybg', None)
expt = obs_info['expt']
band = obs_info['band']
emset = obs_info['emset']
nframe = obs_info['nframe']
obsid = obs_info['obsid']
rotation = obs_info.get('rotation', 0)
shift = obs_info.get('shift', [0, 0])
gnc_info = obs_info.get('gnc_info', {})
time = obs_info.get('time', 0)
emgain = obs_info.get('emgain', None)
time = relative_time(time)
except Exception as e:
log.error(f"{file} is not a valid yaml file.")
log.error(f"Failed with {type(e).__name__}{e}.\n\n {traceback.format_exc()}")
continue
ind_camera = 0
for camera_name, camera in zip(all_camera_name, all_camera):
ind_camera += 1
ind_run = ind_target * len(all_camera) + ind_camera
all_run = len(all_camera) * len(file_list)
info_text = f"({ind_run}/{all_run}) obsid[{obsid}] with {camera_name}"
log.info(info_text)
if time == 0:
camera.time_syn(time, initial=True)
else:
dt = time - camera.system_time
if dt < 0:
log.warning(f'Time is not synced. {dt} seconds are added.')
dt = 0
camera.time_syn(dt, readout=False)
if len(all_camera) > 1:
output = os.path.join(output_folder, camera_name)
else:
output = output_folder
try:
vis_observation(
target,
skybg,
expt,
nframe,
band,
emset,
obsid,
emgain=emgain,
rotation=rotation,
shift=shift,
gnc_info=gnc_info,
camera=camera,
output=output,
nsample=nsample,
csst_format=csst_format,
prograss_bar=True)
except Exception as e:
log.error(f"{info_text} failed with {type(e).__name__}{e}.\n\n {traceback.format_exc()}")
def main(argv=None):
"""
Command line interface of csst_cpic_sim
Parameters
-----------
argv: list
input arguments. Default is None. If None, sys.argv is used.
"""
parser = argparse.ArgumentParser(description='Cpic obsevation image simulation')
parser.set_defaults(func=lambda _: parser.print_usage())
subparsers = parser.add_subparsers(help='type of runs')
parser_quickrun = subparsers.add_parser('quickrun', help='a quick observation with no configration file')
parser_quickrun.add_argument('target_string', type=str, help='example: *5.1/25.3(1.3,1.5)/22.1(2.3,-4.5)')
parser_quickrun.add_argument('expt', type=float, help='exposure time [ms]')
parser_quickrun.add_argument('emgain', type=float, help='emgain or emgain set value if emgain_input is False')
parser_quickrun.add_argument('nframe', type=int, help='number of frames')
parser_quickrun.add_argument('-b', '--band', type=str, default='f661', help='band, one of f565/f661/f743/f883')
parser_quickrun.add_argument('-r', '--rotation', type=float, default=0, help='rotation angle [degree]')
parser_quickrun.add_argument('-s', '--skybk', type=float, default=21, help='magnitude of sky background [mag/arcsec^2]')
parser_quickrun.add_argument('-f', '--cr_frame', action='store_true', help='if True, cosmic ray frame will be added')
parser_quickrun.add_argument('-e', '--emset', action='store_true', help='if True, emgain set value will be used as input')
parser_quickrun.add_argument('-c', '--camera_effect', action='store_true', help='if True, camera effect will be added')
parser_quickrun.add_argument('-o', '--output', type=str, default='./', help='output folder')
def quick_run_call(args):
quick_run_v2(
target_str=args.target_string,
expt=args.expt,
emgain=args.emgain,
nframe=args.nframe,
band=args.band,
rotation=args.rotation,
skybg=args.skybk,
cr_frame=args.cr_frame,
emset_input=args.emset,
camera_effect=args.camera_effect,
output=os.path.abspath(args.output),
prograss_bar=True
)
parser_quickrun.set_defaults(func=quick_run_call)
parser_run = subparsers.add_parser('run', help='observation with configration files or folder')
parser_run.add_argument('target_config', type=str, help='configration file or folder')
parser_run.add_argument('-c', '--config', type=str, help='instrument configration file')
def run_all(args):
observation_simulation_from_config(
args.target_config,
args.config,
)
parser_run.set_defaults(func=run_all)
args = parser.parse_args(argv)
args.func(args)
# if __name__ == '__main__': # pragma: no cover
# sys.exit(main())
# target_example = {
# 'cstar': {
# 'magnitude': 1,
# 'ra': '120d',
# 'dec': '40d',
# 'distance': 10,
# 'sptype': 'F0III',
# },
# 'stars': [
# {
# 'magnitude': 20,
# 'pangle': 60,
# 'separation': 1,
# 'sptype': 'F0III'
# }
# ]
# }
# # quick_run('', 10, 'f661', 1, 1, 30)
# # quick_run('*2.4/10(3,5)/15(-4,2)', 21, 'f661', 1, 1, 30)
# # # normal target
# observation_simulation(
# target=target_example,
# skybg=21,
# expt=1,
# nframe=2,
# band='f661',
# emgain=30,
# obsid=51012345678,
# )
# # bias
# observation_simulation(
# target=target_example,
# skybg=999,
# expt=1,
# nframe=2,
# band='f661',
# emgain=1,
# obsid=51012345678,
# shift=[3, 3],
# rotation=60
# )
# # bias-gain
# observation_simulation(
# target={},
# skybg=999,
# expt=0.01,
# nframe=2,
# band='f661',
# emgain=1000,
# obsid=50012345678,
# )
# # dark
# observation_simulation(
# target={},
# skybg=999,
# expt=100,
# nframe=2,
# band='f661',
# emgain=30,
# obsid=50112345678,
# )
# # flat
# observation_simulation(
# target={},
# skybg=15,
# expt=10,
# nframe=2,
# band='f661',
# emgain=30,
# obsid=50112345678,
# )
import numpy as np
from scipy.signal import fftconvolve
from scipy.ndimage import rotate
from .config import config, S # S is synphot
from .utils import region_replace
from .io import log
from .psf_simulation import single_band_masked_psf, single_band_psf
FILTERS = {}
for key, value in config['bands'].items():
FILTERS[key] = S.FileBandpass(value)
default_band = config['default_band']
def filter_throughput(filter_name):
"""
Totally throughput of the each CPIC band.
Including the throughput of the filter, telescope, cpic, and camera QE.
If the filter_name is not supported, return the throughput of the default filter(f661).
Parameters
-----------
filter_name: str
The name of the filter.
One of ['f565', 'f661'(default), 'f743', 'f883', 'f940', 'f1265', 'f1425', 'f1542']
Returns
--------
synphot.Bandpass
The throughput of the filter.
"""
filter_name = filter_name.lower()
filter_name = default_band if filter_name == 'default' else filter_name
if filter_name not in FILTERS.keys():
log.warning(f"滤光片名称错误({filter_name}),返回默认滤光片({default_band})透过率")
filter_name = default_band
return FILTERS[filter_name]
def _rotate_and_shift(shift, rotation, init_shifts):
rotation_rad = rotation / 180 * np.pi
return np.array([
shift[0] * np.cos(rotation_rad) + shift[1] * np.sin(rotation_rad),
-shift[0] * np.sin(rotation_rad) + shift[1] * np.cos(rotation_rad)
]) + np.array(init_shifts)
def ideal_focus_image(
bandpass: S.spectrum.SpectralElement,
targets: list,
platescale,
platesize: list = [1024, 1024],
init_shifts: list = [0, 0],
rotation: float = 0) -> np.ndarray:
"""Ideal focus image of the targets.
Each star is a little point of 1pixel.
Parameters
-----------
bandpass: synphot.SpectralElement
The bandpass of the filter.
targets: list
The list of the targets. See the output of `spectrum_generator` for details.
platescale: float
The platescale of the camera. Unit: arcsec/pixel
platesize: list
The size of the image. Unit: pixel
init_shifts: list
The shifts of the targets to simulate the miss alignment. Unit: arcsec
rotation: float
The rotation of the image. Unit: degree
Returns
--------
np.ndarray
The ideal focus image.
"""
focal_image = np.zeros(platesize)
focal_shape = np.array(platesize)[::-1] # x, y
if not targets:
return focal_image
for target in targets:
sub_x, sub_y, sub_spectrum, sub_image = target
sub_shift = _rotate_and_shift([sub_x, sub_y], rotation, init_shifts) / platescale
sed = (sub_spectrum * bandpass).integrate()
if sub_image is None:
x = (focal_shape[0] - 1)/2 + sub_shift[0]
y = (focal_shape[1] - 1)/2 + sub_shift[1]
int_x = int(x)
int_y = int(y)
if int_x < 0 or int_x >= focal_shape[0] - 1 or int_y < 0 or int_y >= focal_shape[1] - 1:
continue
dx1 = x - int_x
dx0 = 1 - dx1
dy1 = y - int_y
dy0 = 1 - dy1
sub = np.array([
[dx0*dy0, dx1*dy0],
[dx0*dy1, dx1*dy1]]) * sed
focal_image[int_y: int_y+2, int_x: int_x+2] += sub
else:
# sub_image = sub_image
sub_image = np.abs(rotate(sub_image, rotation, reshape=False))
sub_image = sub_image / sub_image.sum()
sub_img_shape = np.array(sub_image.shape)[::-1]
sub_shift += (focal_shape-1)/2 - (sub_img_shape-1)/2
focal_image = region_replace(
focal_image,
sub_image * sed,
sub_shift,
subpix=True
)
return focal_image
def focal_convolve(
band: str,
targets: list,
init_shifts: list = [0, 0],
rotation: float = 0,
nsample: int = 5,
error: float = 0,
platesize: list = [1024, 1024]) -> np.ndarray :
"""PSF convolution of the ideal focus image.
Parameters
----------
band: str
The name of the band.
target: list
The list of thetargets. See the output of `spectrum_generator` for details.
init_shifts: list
The shifts of the targets to simulate the miss alignment. Unit: arcsec
rotation: float
The rotation of the image. Unit: degree
error: float
The error of the DM acceleration. Unit: nm
platesize: list
The size of the image. Unit: pixel
Returns
--------
np.ndarray
"""
# config = optics_config[which_focalplane(band)]
platescale = config['platescale']
# telescope_config = optics_config['telescope']
area = config['aperature_area']
filter = filter_throughput(band)
throughput = filter.throughput
wave = filter.wave
throughput_criterion = throughput.max() * 0.1
wave_criterion = wave[throughput > throughput_criterion]
min_wave = wave_criterion[0]
max_wave = wave_criterion[-1]
# print(min_wave, max_wave)
platescale = config['platescale']
iwa = config['mask_width'] / 2
if abs(init_shifts[0]) > 4 or abs(init_shifts[1]) > 4:
print('Input shifts are too large, and are set to zero')
init_shifts = [0, 0]
all_fp_image = []
if not targets:
return np.zeros((platesize[1], platesize[0]))
for i_wave in range(nsample):
d_wave = (max_wave - min_wave) / nsample
wave0 = min_wave + i_wave * d_wave
wave1 = min_wave + (i_wave + 1) * d_wave
center_wavelength = (wave0 + wave1) / 2 * 1e-10
i_throughput = throughput.copy()
i_throughput[(wave > wave1) | (wave < wave0)] = 0
i_band = S.ArrayBandpass(wave, i_throughput, waveunits='angstrom')
i_fp_image = ideal_focus_image(i_band, targets[1:], platescale, platesize, init_shifts, rotation)
psf = single_band_psf(center_wavelength, error=error)
_, _, cstar_sp, _ = targets[0]
cstar_flux = (cstar_sp * i_band).integrate()
cstar_psf = single_band_masked_psf(center_wavelength, error=error, shift=init_shifts)
c_fp_image = fftconvolve(i_fp_image, psf, mode='same')
c_fp_image = focal_mask(c_fp_image, iwa, platescale)
c_fp_image = c_fp_image + cstar_flux * cstar_psf
all_fp_image.append(c_fp_image * area) # trans to photon/second
return np.array(all_fp_image).sum(axis=0)
def focal_mask(image, iwa, platescale, throughtput=1e-6):
"""
Mask the image outside the inner working angle.
Parameters
-----------
image: np.ndarray
The image to be masked.
iwa: float
The inner working angle. Unit: arcsec.
platescale: float
The platescale of the image. Unit: arcsec/pixel.
throughtput: float
The throughtput of the mask. The default is 1e-6.
Returns
--------
np.ndarray
The masked image.
"""
xx, yy = np.mgrid[0:image.shape[0], 0:image.shape[1]]
center = np.array([(image.shape[0]-1)/2, (image.shape[1]-1)/2])
mask = (abs(xx - center[0]) < iwa /
platescale) | (abs(yy - center[1]) < iwa / platescale)
image_out = image.copy()
image_out[mask] *= throughtput
return image_out
import os, pickle
import numpy as np
from astropy.io import fits
from hcipy import Field, Wavefront, FraunhoferPropagator
from hcipy import SurfaceApodizer, SurfaceAberrationAtDistance
from hcipy import TipTiltMirror, DeformableMirror
from hcipy import ComplexSurfaceApodizer
from hcipy import make_pupil_grid, make_circular_aperture, make_obstructed_circular_aperture
from hcipy import make_xinetics_influence_functions, make_uniform_grid
from .config import config
DMCACHE = True
ARCSEC2RAD = np.radians(1 / 3600)
# initial psf simulation
apm, apm_header = fits.getdata(config['apm_file'], header=True)
actuator = fits.getdata(config['actuator_file'])
surface_aberration = fits.getdata(config['aberration'])
pupil_diameter = 2 # m
F_number = 83
focal_length = pupil_diameter * F_number
pupil_shape = apm.shape
pupil_rate = apm_header['PHRATE'] # meter/pixel
pupil_grid = make_pupil_grid(pupil_shape[0], pupil_shape[0] * pupil_rate)
aperture = make_circular_aperture(pupil_diameter)(pupil_grid)
aperture = aperture * Field(apm.flatten(), pupil_grid)
second_pupil_size = pupil_diameter * 2 # just gauss a number
second_aperture = make_circular_aperture(second_pupil_size)(pupil_grid)
lyot_stop = ComplexSurfaceApodizer(second_aperture, second_aperture*0, lambda _: 2)
emccd_pixel_size = 13e-6 # m
platescale = emccd_pixel_size / (ARCSEC2RAD * focal_length) # arcsec / pixel
focal_shape = np.array([1024, 1024])
focal_size = focal_shape * emccd_pixel_size
# platescale = vis_focal_config['platescale']
mask_width = config['mask_width']
focal_full_frame = make_uniform_grid(focal_shape, focal_size, has_center=True)
prop_full_frame = FraunhoferPropagator(pupil_grid, focal_full_frame, focal_length)
spider_width = mask_width/platescale*emccd_pixel_size
spider = make_obstructed_circular_aperture(focal_size[0]*2, 0, 4, spider_width)(focal_full_frame)
pupil_cross_mask = ComplexSurfaceApodizer(spider, spider*0, lambda _: 2)
num_actuators_across = 32
# dm spacing is little smaller than pupil
actuator_spacing = 0.95 * pupil_diameter / num_actuators_across
pickle_file = config['dm_pickle']
if os.path.exists(pickle_file) and DMCACHE:
with open(pickle_file, 'rb') as fid:
influence_functions = pickle.load(fid)
else:
influence_functions = make_xinetics_influence_functions(
pupil_grid, num_actuators_across, actuator_spacing)
with open(pickle_file, 'wb') as fid:
pickle.dump(influence_functions, fid)
deformable_mirror = DeformableMirror(influence_functions)
tiptilt_mirror = TipTiltMirror(pupil_grid)
aberration = SurfaceApodizer(
surface_sag=surface_aberration.flatten(), refractive_index=-1)
# arbitrary distance for the aberration to propagate
aberration_distance = 80 * focal_length
aberration = SurfaceAberrationAtDistance(aberration, aberration_distance)
def single_band_masked_psf(
wavelength: float,
error: float = 0,
shift: list = [0, 0]) -> np.ndarray:
"""CPIC PSF considering the focal plane mask.
Parameters
-----------
wavelength : float
observation wavelength in meter
error : float
deformable mirror control error in nm
shift : list
angular shift of the target in arcsec.
Returns
----------
psf : np.ndarray
psf in the focal plane. Normalized as the input flux is 1.
(Note that total flux of the psf is not 1, because it is masked)
"""
error = np.random.normal(0, error*1e-9, actuator.shape)
deformable_mirror.actuators = actuator + error
wf = Wavefront(aperture, wavelength)
shift = np.array(shift) * ARCSEC2RAD / 2
tiptilt_mirror.actuators = shift
wf = tiptilt_mirror(wf)
wf = aberration(wf)
first_focal = prop_full_frame(deformable_mirror(wf))
strength = first_focal.intensity.shaped.sum()
second_pupil = prop_full_frame.backward(pupil_cross_mask(first_focal))
second_focal = prop_full_frame(lyot_stop(second_pupil))
psf = second_focal.intensity.shaped
return psf / strength
def single_band_psf(
wavelength: float,
error: float = 0) -> np.ndarray:
"""CPIC PSF without considering the focal plane mask.
Used to simulate the off-axis PSF.
Parameters
-----------
wavelength : float
observation wavelength in meter
error : float
deformable mirror control error in nm
Returns
----------
psf : np.ndarray
psf in the focal plane. Normalized. The total flux is 1.
"""
error = np.random.normal(0, error*1e-9, actuator.shape)
deformable_mirror.actuators = actuator + error
wf = Wavefront(aperture, wavelength)
wf = aberration(wf)
first_focal = prop_full_frame(deformable_mirror(wf))
image = np.array(first_focal.intensity.shaped)
return image / image.sum()
# def single_band_psf(wavelength, error=0, aber_phase=None):
# error = np.random.normal(0, error*1e-9, actuator.shape)
# deformable_mirror.actuators = actuator + error
# wf = Wavefront(aperture, wavelength)
# wf = aberration(wf)
# if aber_phase is not None:
# other_error = SurfaceApodizer(
# surface_sag=aber_phase.flatten(), refractive_index=-1)
# wf = other_error(wf)
# first_focal = prop_full_frame(deformable_mirror(wf))
# image = np.array(first_focal.intensity.shaped)
# return image / image.sum()
\ No newline at end of file
This diff is collapsed.
import numpy as np
import scipy.ndimage as nd
import logging
import random
import matplotlib.pyplot as plt
# DO NOT IMPORT CPICIMGSIM MODULES HERE
class Logger(object):
def __init__(self, filename, level='INFO'):
self.logger = logging.getLogger('cpism_log')
self.logger.setLevel(logging.DEBUG)
shinfo = logging.StreamHandler()
onlyinfo = logging.Filter()
onlyinfo.filter = lambda record: (record.levelno < logging.WARNING)
fmtstr = '%(message)s'
shinfo.setFormatter(logging.Formatter(fmtstr)) # 设置屏幕上显示的格式
shinfo.setLevel(logging.INFO)
shinfo.addFilter(onlyinfo)
sh = logging.StreamHandler()
fmtstr = '!%(levelname)s!: %(message)s [%(filename)s - %(funcName)s (line: %(lineno)d)]: '
sh.setFormatter(logging.Formatter(fmtstr)) # 设置屏幕上显示的格式
sh.setLevel(logging.WARNING)
th = logging.FileHandler(filename) # 往文件里写入#指定间隔时间自动生成文件的处理器
fmtstr = '%(asctime)s %(filename)s [%(funcName)s] - %(levelname)s: %(message)s'
th.setFormatter(logging.Formatter(fmtstr)) # 设置文件里写入的格式
th.setLevel(logging.__dict__.get(level.upper()))
self.logger.addHandler(shinfo)
self.logger.addHandler(sh)
self.logger.addHandler(th)
def random_seed_select(seed=-1):
"""
Select a random seed for numpy.random and return it.
"""
if seed == -1:
seed = random.randint(0, 2**32-1)
np.random.seed(seed)
return seed
def region_replace(
background: np.ndarray,
front: np.ndarray,
shift: list,
bmask: float = 1.0,
fmask: float = 1.0,
padded_in: bool = False,
padded_out: bool = False,
subpix: bool = False
):
"""
replace a region of the background with the front image.
Parameters
----------
background: np.ndarray
The background image.
front: np.ndarray
The front image.
shift: list
The [x, y] shift of the front image. Unit: pixel.
Relative to the lower-left corner of the background image.
[0, 0] means the lower-left corner of the front image is at the lower-left corner of the background image.
bmask: float
The mask of the background image. Default: 1.0
0.0 means the background image is masked.
1.0 means the background image is fully added.
fmask: float
The mask of the front image. Default: 1.0
0.0 means the front image is masked (not added).
1.0 means the front image is fully added.
padded_in: bool
Whether the input background image is padded. Default: False
In the function, the background image is padded by the size of the front image.
If True, means the background image is padded.
padded_out: bool
Whether the output image is padded. Default: False
In the function, the background image is padded by the size of the front image.
If True, means the output image is padded.
padded_in and padded_out are designed for the case that replace_region fuction is called multiple times.
subpix: bool
Whether the shift is subpixel. Default: False
If True, the shift is subpixel, using scipy.ndimage.shift to shift the front image.
If False, the shift is integer, using numpy slicing to shift the front image.
Returns
-------
np.ndarray
The output image.
shape = background.shape if padded_out = False
shape = background.shape + 2 * front.shape if padded_out = True
"""
int_shift = np.array(shift).astype(int)
b_sz = np.array(background.shape)
f_sz = np.array(front.shape)
if padded_in:
padded = background
b_sz = b_sz - f_sz * 2
else:
padded = np.pad(background, ((f_sz[0], f_sz[0]), (f_sz[1], f_sz[1])))
if np.any((int_shift < -b_sz) | (int_shift > b_sz)):
if padded_out:
return padded
return background
if subpix:
subs = np.array(shift) - int_shift
front = nd.shift(front, (subs[0], subs[1]))
int_shift += f_sz
roi_y = int_shift[1]
roi_x = int_shift[0]
padded[roi_y: roi_y+f_sz[0], roi_x:roi_x+f_sz[1]] *= bmask
padded[roi_y: roi_y+f_sz[0], roi_x:roi_x+f_sz[1]] += fmask * front
if padded_out:
return padded
return padded[f_sz[0]:b_sz[0]+f_sz[0], f_sz[1]:b_sz[1]+f_sz[1]]
def psf_imshow(psf, vmin=1e-8, vmax=0.1, log=True, region=1):
focal_img = psf.copy()
focal_img = (focal_img - focal_img.min()) / (focal_img.max() - focal_img.min())
if log:
focal_img = np.log10(focal_img * 9 + 1)
plt.imshow(focal_img, origin='lower', cmap='gray', vmin=vmin, vmax=vmax)
shape = psf.shape
plt.xlim(shape[1] * (1 - region) / 2, shape[1] * (1 + region) / 2)
plt.ylim(shape[0] * (1 - region) / 2, shape[0] * (1 + region) / 2)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment