Commit 85162102 authored by Chen Yili's avatar Chen Yili
Browse files

Upload New File

parent e2688095
import numpy as np
from astropy.io import fits
import yaml
from .config import cpism_refdata, __version__, which_focalplane
from .utils import Logger
import os
from datetime import datetime, timedelta
from astropy.coordinates import SkyCoord
import re
import json
import pandas as pd
config_file = f'{cpism_refdata}/cpism_config.yaml'
with open(config_file, 'r') as fid:
config = yaml.load(fid, Loader=yaml.FullLoader)
output_dir = config['output_dir']
if config['relative_path']:
ref_dir_base = os.path.dirname(cpism_refdata)
output_dir = f'{ref_dir_base}/{output_dir}'
log_dir = output_dir + '/LOG'
tmp_dir = config['tmp_dir']
log_level = config['log_level']
header_check = config['check_fits_header']
for dir in ['', 'TMP', 'CAL', 'SCI', 'LOG']:
sub_dir = f"{output_dir}/{dir}"
if not os.path.exists(sub_dir):
os.makedirs(sub_dir)
tmp_folder_path = '.'
if tmp_dir == 'TMP':
tmp_folder_path = output_dir + '/TMP'
log = Logger(log_dir+'/cpism_pack.log', log_level).logger
def check_and_update_fits_header(header):
"""
Check the header keywords and update the description according to the data model.
Parameters
-----------
header: astropy.io.fits.header.Header
The header to be checked.
Returns
--------
None
"""
hdu = 'image'
if 'FILETYPE' in header.keys():
hdu = 'primary'
model_file = f"{cpism_refdata}/io/image_header.json"
if hdu == 'primary':
model_file = f"{cpism_refdata}/io/primary_header.json"
with open(model_file, 'r', encoding='utf-8') as fid:
data_model = json.load(fid)
dm_comment = {}
def print_warning(info):
if header_check:
log.warning(info)
# check existance and format of keyword in fits header
for keyword, comment, format, _, _ in data_model:
if pd.isnull(comment):
comment = ''
if len(comment) > 46:
comment = comment[:46]
print_warning(
f"Keyword {keyword} has a comment longer than 46 characters. It will be truncated to 46 characters.")
dm_comment[keyword] = comment
if keyword not in header.keys():
print_warning(f"Keyword {keyword} not found in [{hdu}] header.")
elif not pd.isnull(format):
value = header[keyword]
# check the type of the value, I for int, R for float, C for str
if isinstance(value, str):
type = 'C'
elif isinstance(value, float):
type = 'R'
elif isinstance(value, bool):
type = 'L'
elif isinstance(value, int):
type = 'I'
else:
type = 'U'
if type != format[0]:
print_warning(
f"Keyword {keyword} has wrong type in [{hdu}]. {format[0]} expected, {type} found.")
# check if there are extral keyword in header, and update the comment
for keyword in header.keys():
# print(keyword)
if keyword not in dm_comment.keys():
print_warning(
f"Keyword {keyword} not found in the [{hdu}] data model.")
else:
header[keyword] = (header[keyword], dm_comment[keyword])
return header
def obsid_parser(
obsid: int):
"""
Parse the obsid to get the obstype.
Parameters
----------
obsid: str
The obsid of the observation.
Obsid must be 11 digits and start with 5 for CPIC.
Returns
-------
str
The obstype of the observation.
Raises
------
ValueError
If the obsid is not 11 digits or does not start with 5.
"""
obsid = str(obsid)
if len(obsid) != 11:
raise ValueError('Obsid must be 11 digits.')
if obsid[0] != '5':
raise ValueError('Obsid must start with 5 for CPIC')
obstype_dict = {
'00': 'BIAS',
'01': 'DARK',
'02': 'FLAT',
'03': 'BKGD',
'04': 'LASR',
'10': 'SCIE',
'11': 'DENF',
'12': 'CALS',
'15': 'TEMP'
}
obstype = obstype_dict.get(obsid[1:3], 'DEFT')
return obstype
def datetime_obj_to_mjd(time_obj):
"""
transfer datetime object to mean julian date (MJD).
Parameters
----------
time_obj: datetime.datetime
The datetime object.
Returns
-------
float
The mean julian date (MJD).
"""
return (time_obj - datetime(1858, 11, 17)).total_seconds() / 86400
def primary_hdu(
obs_info: dict,
gnc_info: dict,
filename_output=False):
"""
Generate the primary hdu of the fits file.
Parameters
----------
obs_info: dict
The parameters of the observation. See `save_fits` function.
gnc_info: dict
The gnc information of the observation.
filename_output: bool (optional)
If True, the folder and the filename will be returned.
Returns
-------
fits.PrimaryHDU
The primary hdu of the fits file.
str, str
The folder and filename of the fits file. Only returned if filename_output is True.
Notes
-----
The gnc_info dict should contain the information of orbit and observation.
these informations are used to genrated the hdu header. Refer to the data model for more information.
"""
camera_config, _ = load_camera_and_optics_config(obs_info['band'])
obsid = obs_info['obsid']
exp_start = gnc_info.get(
'EXPSTART', datetime.now().isoformat(timespec='seconds'))
exp_start = datetime.fromisoformat(exp_start)
duartion = (obs_info['expt'] +
camera_config['readout_time']) * obs_info['nframe']
default_end = exp_start + timedelta(seconds=duartion)
exp_end = gnc_info.get('EXPEND', default_end.isoformat(timespec='seconds'))
exp_end = datetime.fromisoformat(exp_end)
filename = "CSST_CPIC"
filename += "_" + which_focalplane(obs_info['band']).upper()
filename += "_" + obsid_parser(obsid)
filename += "_" + exp_start.strftime("%Y%m%d%H%M%S")
filename += "_" + exp_end.strftime("%Y%m%d%H%M%S")
filename += f"_{obsid}_X_L0_V01.fits"
type_dir = 'CAL'
if str(obsid)[1] == '1':
type_dir = 'SCI'
mjd_dir = f"{datetime_obj_to_mjd(exp_start):.0f}"
folder = f"{type_dir}/{mjd_dir}"
header = fits.Header()
# General keywords
header['SIMPLE'] = True
header['BITPIX'] = 8
header['NAXIS'] = 0
header['EXTEND'] = True
header['NEXTEND'] = 1 # + parameters['nframe']
header['GROUPS'] = False
header['DATE'] = datetime.now().isoformat(timespec='seconds')
heaer_filename = filename[:-4]
if len(heaer_filename) > 68:
heaer_filename = heaer_filename[:68]
header['FILENAME'] = heaer_filename
header['FILETYPE'] = obsid_parser(obsid)
header['TELESCOP'] = 'CSST'
header['INSTRUME'] = 'CPIC'
header['RADECSYS'] = 'ICRS'
header['EQUINOX'] = 2000.0
header['FITSCREA'] = f'CPISM V{__version__}'
cstar = {'ra': '0d', 'dec': '0d'}
if obs_info['target'] != {}:
cstar = obs_info['target']['cstar']
radec = SkyCoord(cstar['ra'], cstar['dec'])
target_name = radec.to_string('hmsdms')
target_name = re.sub(R'[hdms\s]', '', target_name)
header['OBJECT'] = cstar.get('name', target_name)
header['TARGET'] = target_name
header['OBSID'] = str(obsid)
header['OBJ_RA'] = radec.ra.degree
header['OBJ_DEC'] = radec.dec.degree
# telescope information
header['REFFRAME'] = 'CSSTGSC-1.0'
header['DATE-OBS'] = exp_start.isoformat(timespec='seconds')
header['SATESWV'] = 'SIMULATION'
header['EXPSTART'] = datetime_obj_to_mjd(exp_start)
header['CABSTART'] = gnc_info.get('CABSTART', header['EXPSTART'])
header['SUNANGL0'] = gnc_info.get('SUNANGL0', -1.0)
header['MOONANG0'] = gnc_info.get('MOONANG0', -1.0)
header['TEL_ALT0'] = gnc_info.get('TEL_ALT0', -1.0)
header['POS_ANG0'] = gnc_info.get(
'POS_ANG0', float(obs_info['rotation']))
header['POSI0_X'] = gnc_info.get('POSI0_X', -1.0)
header['POSI0_Y'] = gnc_info.get('POSI0_Y', -1.0)
header['POSI0_Z'] = gnc_info.get('POSI0_Z', -1.0)
header['VELO0_X'] = gnc_info.get('VELO0_X', -1.0)
header['VELO0_Y'] = gnc_info.get('VELO0_Y', -1.0)
header['VELO0_Z'] = gnc_info.get('VELO0_Z', -1.0)
header['EULER0_1'] = gnc_info.get('EULER0_1', -1.0)
header['EULER0_2'] = gnc_info.get('EULER0_2', -1.0)
header['EULER0_3'] = gnc_info.get('EULER0_3', -1.0)
header['RA_PNT0'] = gnc_info.get('RA_PNT0', header['OBJ_RA'])
header['DEC_PNT0'] = gnc_info.get('DEC_PNT0', header['OBJ_DEC'])
header['EXPEND'] = datetime_obj_to_mjd(exp_end)
header['CABEND'] = gnc_info.get('CABEDN', header['EXPEND'])
header['SUNANGL1'] = gnc_info.get('SUNANGL1', header['SUNANGL0'])
header['MOONANG1'] = gnc_info.get('MOONANG1', header['MOONANG0'])
header['TEL_ALT1'] = gnc_info.get('TEL_ALT1', header['TEL_ALT0'])
header['POS_ANG1'] = gnc_info.get('POS_ANG1', header['POS_ANG0'])
header['POSI1_X'] = gnc_info.get('POSI1_X', header['POSI0_x'])
header['POSI1_Y'] = gnc_info.get('POSI1_Y', header['POSI0_y'])
header['POSI1_Z'] = gnc_info.get('POSI1_Z', header['POSI0_z'])
header['VELO1_X'] = gnc_info.get('VELO1_X', header['VELO0_x'])
header['VELO1_Y'] = gnc_info.get('VELO1_Y', header['VELO0_y'])
header['VELO1_Z'] = gnc_info.get('VELO1_Z', header['VELO0_z'])
header['EULER1_1'] = gnc_info.get('EULER1_1', header['EULER0_1'])
header['EULER1_2'] = gnc_info.get('EULER1_2', header['EULER0_2'])
header['EULER1_3'] = gnc_info.get('EULER1_3', header['EULER0_3'])
header['RA_PNT1'] = gnc_info.get('RA_PNT1', header['RA_PNT0'])
header['DEC_PNT1'] = gnc_info.get('DEC_PNT1', header['DEC_PNT0'])
header['EXPTIME'] = (exp_end - exp_start).total_seconds()
header['EPOCH'] = float(exp_start.year)
header['CHECKSUM'] = '0000000000000000'
header['DATASUM'] = '0000000000'
check_and_update_fits_header(header)
# other information
hdu = fits.PrimaryHDU(header=header)
hdu.add_checksum()
if filename_output:
return hdu, folder, filename
else:
return hdu
def load_camera_and_optics_config(band):
"""
Load camera and optics configuration from reference data.
Parameters
----------
band : str
Band name.
Returns camera_config, optics_config : dict, dict
"""
camera = which_focalplane(band)
if camera == 'vis':
config_file = 'emccd_config.yaml'
elif camera == 'nir':
raise ValueError('NIR camera is not supported yet')
config_file = 'nir_config.yaml'
with open(f"{cpism_refdata}/camera/{config_file}", 'r') as fid:
camera_config = yaml.load(fid, Loader=yaml.FullLoader)
with open(f"{cpism_refdata}/optics/optics_config.yaml", 'r') as fid:
optics_config = yaml.load(fid, Loader=yaml.FullLoader)[camera]
return camera_config, optics_config
def frame_header(obs_info, index, bunch_start, primary_header={}):
"""
Generate the header for a single frame.
Parameters
----------
obs_info : dict
Dictionary of parameters. See `save_fits` function.
index : int
Frame index.
bunch_start : str
Start time of the bunch.
primary_header : dict (optional)
Primary header. default: {}
Returns
-------
astropy.io.fits.Header
"""
header = fits.Header()
camera_config, optics_config = load_camera_and_optics_config(
obs_info['band'])
plszx = camera_config['plszx']
plszy = camera_config['plszy']
pscan1 = camera_config['pscan1']
pscan2 = camera_config['pscan2']
oscan1 = camera_config['oscan1']
oscan2 = camera_config['oscan2']
udark = camera_config['udark']
bdark = camera_config['bdark']
ldark = camera_config['ldark']
rdark = camera_config['rdark']
imgszx = plszx + pscan1 + oscan1 + ldark + rdark
imgszy = plszy + pscan2 + oscan2 + udark + bdark
header['XTENSION'] = 'IMAGE'
header['BITPIX'] = 16
header['NAXIS'] = 2
header['NAXIS1'] = 1080
header['NAXIS2'] = 1050
header['EXTNAME'] = 'IMAGE'
header['EXTVER'] = 1
header['BSCALE'] = 1.0
header['BZERO'] = 32768.0
header['BUNIT'] = 'ADU'
header['FILTER'] = obs_info['band']
header['DETSN'] = '00000000000'
header['DETNAME'] = camera_config['detector_name']
header['CHIPLAB'] = camera_config['ccd_label']
header['CHIPTEMP'] = float(camera_config['chip_temp'])
header['DEWTEMP'] = float(camera_config['dewar_temp'])
header['DETSIZE'] = f"{imgszx} * {imgszy}"
header['IMGINDEX'] = index
frame_time = obs_info['expt'] + camera_config['readout_time']
bunch_start = datetime.fromisoformat(bunch_start)
expstart = bunch_start + timedelta(seconds=frame_time * (index - 1))
bunch_start_mjd = datetime_obj_to_mjd(bunch_start)
ra0 = primary_header.get('RA_PNT0', -1.0)
dec0 = primary_header.get('DEC_PNT0', -1.0)
pa0 = primary_header.get('POS_ANG0', -1.0)
cab0 = primary_header.get('CABSTART', bunch_start_mjd)
delta_t = frame_time * index
bunch_end = bunch_start + timedelta(seconds=delta_t)
bunch_end_mjd = datetime_obj_to_mjd(bunch_end)
ra1 = primary_header.get('RA_PNT1', ra0)
dec1 = primary_header.get('DEC_PNT1', dec0)
pa1 = primary_header.get('POS_ANG1', pa0)
cab1 = primary_header.get('CABEND', bunch_end_mjd)
img_cab = datetime_obj_to_mjd(expstart)
ratio = (img_cab - cab0)/(cab1 - cab0)
ra = ra0 + (ra1 - ra0) * ratio
dec = dec0 + (dec1 - dec0) * ratio
pa = pa0 + (pa1 - pa0) * ratio
header['IMG_EXPT'] = expstart.isoformat(timespec='seconds')
header['IMG_CABT'] = header['IMG_EXPT']
header['IMG_DUR'] = float(obs_info['expt'])
header['IMG_PA'] = ra
header['IMG_RA'] = dec
header['IMG_DEC'] = pa
header['DATASECT'] = f"{plszx} * {plszy}"
header['PIXSCAL'] = optics_config['platescale']
header['PIXSIZE'] = camera_config['pitch_size']
header['NCHAN'] = 1
header['PSCAN1'] = pscan1
header['PSCAN2'] = pscan2
header['OSCAN1'] = oscan1
header['OSCAN2'] = oscan2
header['UDARK'] = udark
header['BDARK'] = bdark
header['LDARK'] = ldark
header['RDARK'] = rdark
# WCS
cstar = {'ra': '0d', 'dec': '0d'}
if obs_info['target'] != {}:
cstar = obs_info['target']['cstar']
radec = SkyCoord(cstar['ra'], cstar['dec'])
shift = obs_info['shift']
platescale = optics_config['platescale']
rotation = np.radians(obs_info['rotation'])
header['WCSAXES'] = 2
header['CRPIX1'] = (plszx + 1)/2 + pscan1 + ldark + shift[0] / platescale
header['CRPIX2'] = (plszy + 1)/2 + pscan2 + udark + shift[0] / platescale
header['CRVAL1'] = radec.ra.degree
header['CRVAL2'] = radec.dec.degree
header['CTYPE1'] = 'RA---TAN'
header['CTYPE2'] = 'DEC--TAN'
header['CD1_1'] = np.cos(rotation)
header['CD1_2'] = -np.sin(rotation)
header['CD2_1'] = np.sin(rotation)
header['CD2_2'] = np.cos(rotation)
header['others'] = 'other'
# Readout information
header['EMGAIN'] = float(obs_info['emgain'])
header['GAIN'] = float(camera_config['ph_per_adu'])
header['DET_BIAS'] = float(camera_config['bias_level'])
header['RON'] = float(camera_config['readout_noise'])
header['READTIME'] = float(camera_config['readout_time'])
header['ROSPEED'] = float(camera_config['readout_speed'])
# CPIC information
header['LS_STAT'] = 'OFF'
header['IWA'] = optics_config['mask_width'] / 2
header['WFSINFO1'] = -1.0
header['WFSINFO2'] = -1.0
header['CHECKSUM'] = '0000000000000000'
header['DATASUM'] = '0000000000'
header = check_and_update_fits_header(header)
return header
def save_fits_simple(images, obs_info):
"""
Save the image to a fits file with a simple header to TMP directory.
Parameters
----------
images : numpy.ndarray
Image array to be written.
obs_info : dict
Dictionary of observation informations. See `save_fits` function.
Returns
----------
Filename of the saved fits file.
"""
target = obs_info['target']
target_info = 'NO_TARGET'
if 'cstar' in target.keys():
target_info = ''
target_info = f"S{target['cstar']['magnitude']:.1f}"
target_info += f"_P{len(target.get('planets', '[]'))}"
name = target_info
if 'name' in target.keys():
name = target['name']
name = name.replace('/', '_')
name = name.replace(',', '_')
now = datetime.now()
time = now.strftime("%Y%m%d%H%M%S")
filename = f"{name}_{time}.fits"
header = fits.Header()
header['skybg'] = obs_info['skybg']
header['name'] = name
header['exptime'] = obs_info['expt']
header['nframe'] = obs_info['nframe']
header['band'] = obs_info['band']
header['emgain'] = obs_info['emgain']
header['obsid'] = obs_info['obsid']
header['rotation'] = obs_info['rotation']
shift = obs_info['shift']
header['shift'] = f"x:{shift[0]},y:{shift[1]}"
fullname = f"{tmp_folder_path}/{filename}"
fits.writeto(fullname, images, overwrite=True, header=header)
return fullname
def save_fits(images, obs_info, gnc_info, csst_format=True):
"""
Save the image to a fits file.
Parameters
----------
images : numpy.ndarray
Image array to be saved.
obs_info : dict
Dictionary of observation informations.
Must contain the following keys
- band: str
- Band of the image.
- expt: float
- Exposure time of the each image.
- nframe: int
- Number of frames in the image.
- emgain: int
- EM gain of the camera.
- obsid: str
- Observation ID. Obsid must be 11 digits and start with 5 for CPIC. See pharse_obsid() for details.
- rotation: float
- Rotation angle of the image.
- shift: list
- Shift of the image.
gnc_info : dict
Dictionary of GNC information.
Contains the keywords in the primary header. See primary_hdu() for details.
csst_format : bool, optional
Whether to save the fits file in CSST format, by default True.
"""
if not csst_format:
save_fits_simple(images, obs_info)
return
hdu_header, folder, filename = primary_hdu(obs_info, gnc_info, True)
hdu_list = fits.HDUList([hdu_header])
if len(images.shape) == 2:
images = np.array([images])
for index in range(images.shape[0]):
header = frame_header(
obs_info,
index + 1,
hdu_header.header['DATE-OBS'],
primary_header=hdu_header.header
)
frame_hdu = fits.ImageHDU(images[index, :, :], header=header)
frame_hdu.add_checksum()
hdu_list.append(frame_hdu)
folder = f"{output_dir}/{folder}"
if not os.path.exists(folder):
os.makedirs(folder)
hdu_list.writeto(f"{folder}/{filename}", overwrite=True)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment