import numpy as np from import fits import yaml from CpicImgSim.config import cpism_refdata, __version__, which_focalplane from CpicImgSim.utils import Logger import os from datetime import datetime, timedelta from astropy.coordinates import SkyCoord import re import json import pandas as pd config_file = f'{cpism_refdata}/cpism_config.yaml' with open(config_file, 'r') as fid: config = yaml.load(fid, Loader=yaml.FullLoader) output_dir = config['output_dir'] if config['relative_path']: ref_dir_base = os.path.dirname(cpism_refdata) output_dir = f'{ref_dir_base}/{output_dir}' log_dir = output_dir + '/LOG' tmp_dir = config['tmp_dir'] log_level = config['log_level'] header_check = config['check_fits_header'] for dir in ['', 'TMP', 'CAL', 'SCI', 'LOG']: sub_dir = f"{output_dir}/{dir}" if not os.path.exists(sub_dir): os.makedirs(sub_dir) tmp_folder_path = '.' if tmp_dir == 'TMP': tmp_folder_path = output_dir + '/TMP' log = Logger(log_dir+'/cpism_pack.log', log_level).logger def check_and_update_fits_header(header): """ Check the header keywords and update the description according to the data model. Parameters ----------- header: The header to be checked. Returns -------- None """ hdu = 'image' if 'FILETYPE' in header.keys(): hdu = 'primary' model_file = f"{cpism_refdata}/io/image_header.json" if hdu == 'primary': model_file = f"{cpism_refdata}/io/primary_header.json" with open(model_file, 'r', encoding='utf-8') as fid: data_model = json.load(fid) dm_comment = {} def print_warning(info): if header_check: log.warning(info) # check existance and format of keyword in fits header for keyword, comment, format, _, _ in data_model: if pd.isnull(comment): comment = '' if len(comment) > 46: comment = comment[:46] print_warning( f"Keyword {keyword} has a comment longer than 46 characters. It will be truncated to 46 characters.") dm_comment[keyword] = comment if keyword not in header.keys(): print_warning(f"Keyword {keyword} not found in [{hdu}] header.") elif not pd.isnull(format): value = header[keyword] # check the type of the value, I for int, R for float, C for str if isinstance(value, str): type = 'C' elif isinstance(value, float): type = 'R' elif isinstance(value, bool): type = 'L' elif isinstance(value, int): type = 'I' else: type = 'U' if type != format[0]: print_warning( f"Keyword {keyword} has wrong type in [{hdu}]. {format[0]} expected, {type} found.") # check if there are extral keyword in header, and update the comment for keyword in header.keys(): # print(keyword) if keyword not in dm_comment.keys(): print_warning( f"Keyword {keyword} not found in the [{hdu}] data model.") else: header[keyword] = (header[keyword], dm_comment[keyword]) return header def obsid_parser( obsid: int): """ Parse the obsid to get the obstype. Parameters ---------- obsid: str The obsid of the observation. Obsid must be 11 digits and start with 5 for CPIC. Returns ------- str The obstype of the observation. Raises ------ ValueError If the obsid is not 11 digits or does not start with 5. """ obsid = str(obsid) if len(obsid) != 11: raise ValueError('Obsid must be 11 digits.') if obsid[0] != '5': raise ValueError('Obsid must start with 5 for CPIC') obstype_dict = { '00': 'BIAS', '01': 'DARK', '02': 'FLAT', '03': 'BKGD', '04': 'LASR', '10': 'SCIE', '11': 'DENF', '12': 'CALS', '15': 'TEMP' } obstype = obstype_dict.get(obsid[1:3], 'DEFT') return obstype def datetime_obj_to_mjd(time_obj): """ transfer datetime object to mean julian date (MJD). Parameters ---------- time_obj: datetime.datetime The datetime object. Returns ------- float The mean julian date (MJD). """ return (time_obj - datetime(1858, 11, 17)).total_seconds() / 86400 def primary_hdu( obs_info: dict, gnc_info: dict, filename_output=False): """ Generate the primary hdu of the fits file. Parameters ---------- obs_info: dict The parameters of the observation. See `save_fits` function. gnc_info: dict The gnc information of the observation. filename_output: bool (optional) If True, the folder and the filename will be returned. Returns ------- fits.PrimaryHDU The primary hdu of the fits file. str, str The folder and filename of the fits file. Only returned if filename_output is True. Notes ----- The gnc_info dict should contain the information of orbit and observation. these informations are used to genrated the hdu header. Refer to the data model for more information. """ camera_config, _ = load_camera_and_optics_config(obs_info['band']) obsid = obs_info['obsid'] exp_start = gnc_info.get( 'EXPSTART','seconds')) exp_start = datetime.fromisoformat(exp_start) duartion = (obs_info['expt'] + camera_config['readout_time']) * obs_info['nframe'] default_end = exp_start + timedelta(seconds=duartion) exp_end = gnc_info.get('EXPEND', default_end.isoformat(timespec='seconds')) exp_end = datetime.fromisoformat(exp_end) filename = "CSST_CPIC" filename += "_" + which_focalplane(obs_info['band']).upper() filename += "_" + obsid_parser(obsid) filename += "_" + exp_start.strftime("%Y%m%d%H%M%S") filename += "_" + exp_end.strftime("%Y%m%d%H%M%S") filename += f"_{obsid}_X_L0_V01.fits" type_dir = 'CAL' if str(obsid)[1] == '1': type_dir = 'SCI' mjd_dir = f"{datetime_obj_to_mjd(exp_start):.0f}" folder = f"{type_dir}/{mjd_dir}" header = fits.Header() # General keywords header['SIMPLE'] = True header['BITPIX'] = 8 header['NAXIS'] = 0 header['EXTEND'] = True header['NEXTEND'] = 1 # + parameters['nframe'] header['GROUPS'] = False header['DATE'] ='seconds') heaer_filename = filename[:-4] if len(heaer_filename) > 68: heaer_filename = heaer_filename[:68] header['FILENAME'] = heaer_filename header['FILETYPE'] = obsid_parser(obsid) header['TELESCOP'] = 'CSST' header['INSTRUME'] = 'CPIC' header['RADECSYS'] = 'ICRS' header['EQUINOX'] = 2000.0 header['FITSCREA'] = f'CPISM V{__version__}' cstar = {'ra': '0d', 'dec': '0d'} if obs_info['target'] != {}: cstar = obs_info['target']['cstar'] radec = SkyCoord(cstar['ra'], cstar['dec']) target_name = radec.to_string('hmsdms') target_name = re.sub(R'[hdms\s]', '', target_name) header['OBJECT'] = cstar.get('name', target_name) header['TARGET'] = target_name header['OBSID'] = str(obsid) header['OBJ_RA'] = header['OBJ_DEC'] = # telescope information header['REFFRAME'] = 'CSSTGSC-1.0' header['DATE-OBS'] = exp_start.isoformat(timespec='seconds') header['SATESWV'] = 'SIMULATION' header['EXPSTART'] = datetime_obj_to_mjd(exp_start) header['CABSTART'] = gnc_info.get('CABSTART', header['EXPSTART']) header['SUNANGL0'] = gnc_info.get('SUNANGL0', -1.0) header['MOONANG0'] = gnc_info.get('MOONANG0', -1.0) header['TEL_ALT0'] = gnc_info.get('TEL_ALT0', -1.0) header['POS_ANG0'] = gnc_info.get( 'POS_ANG0', float(obs_info['rotation'])) header['POSI0_X'] = gnc_info.get('POSI0_X', -1.0) header['POSI0_Y'] = gnc_info.get('POSI0_Y', -1.0) header['POSI0_Z'] = gnc_info.get('POSI0_Z', -1.0) header['VELO0_X'] = gnc_info.get('VELO0_X', -1.0) header['VELO0_Y'] = gnc_info.get('VELO0_Y', -1.0) header['VELO0_Z'] = gnc_info.get('VELO0_Z', -1.0) header['EULER0_1'] = gnc_info.get('EULER0_1', -1.0) header['EULER0_2'] = gnc_info.get('EULER0_2', -1.0) header['EULER0_3'] = gnc_info.get('EULER0_3', -1.0) header['RA_PNT0'] = gnc_info.get('RA_PNT0', header['OBJ_RA']) header['DEC_PNT0'] = gnc_info.get('DEC_PNT0', header['OBJ_DEC']) header['EXPEND'] = datetime_obj_to_mjd(exp_end) header['CABEND'] = gnc_info.get('CABEDN', header['EXPEND']) header['SUNANGL1'] = gnc_info.get('SUNANGL1', header['SUNANGL0']) header['MOONANG1'] = gnc_info.get('MOONANG1', header['MOONANG0']) header['TEL_ALT1'] = gnc_info.get('TEL_ALT1', header['TEL_ALT0']) header['POS_ANG1'] = gnc_info.get('POS_ANG1', header['POS_ANG0']) header['POSI1_X'] = gnc_info.get('POSI1_X', header['POSI0_x']) header['POSI1_Y'] = gnc_info.get('POSI1_Y', header['POSI0_y']) header['POSI1_Z'] = gnc_info.get('POSI1_Z', header['POSI0_z']) header['VELO1_X'] = gnc_info.get('VELO1_X', header['VELO0_x']) header['VELO1_Y'] = gnc_info.get('VELO1_Y', header['VELO0_y']) header['VELO1_Z'] = gnc_info.get('VELO1_Z', header['VELO0_z']) header['EULER1_1'] = gnc_info.get('EULER1_1', header['EULER0_1']) header['EULER1_2'] = gnc_info.get('EULER1_2', header['EULER0_2']) header['EULER1_3'] = gnc_info.get('EULER1_3', header['EULER0_3']) header['RA_PNT1'] = gnc_info.get('RA_PNT1', header['RA_PNT0']) header['DEC_PNT1'] = gnc_info.get('DEC_PNT1', header['DEC_PNT0']) header['EXPTIME'] = (exp_end - exp_start).total_seconds() header['EPOCH'] = float(exp_start.year) header['CHECKSUM'] = '0000000000000000' header['DATASUM'] = '0000000000' check_and_update_fits_header(header) # other information hdu = fits.PrimaryHDU(header=header) hdu.add_checksum() if filename_output: return hdu, folder, filename else: return hdu def load_camera_and_optics_config(band): """ Load camera and optics configuration from reference data. Parameters ---------- band : str Band name. Returns camera_config, optics_config : dict, dict """ camera = which_focalplane(band) if camera == 'vis': config_file = 'emccd_config.yaml' elif camera == 'nir': raise ValueError('NIR camera is not supported yet') config_file = 'nir_config.yaml' with open(f"{cpism_refdata}/camera/{config_file}", 'r') as fid: camera_config = yaml.load(fid, Loader=yaml.FullLoader) with open(f"{cpism_refdata}/optics/optics_config.yaml", 'r') as fid: optics_config = yaml.load(fid, Loader=yaml.FullLoader)[camera] return camera_config, optics_config def frame_header(obs_info, index, bunch_start, primary_header={}): """ Generate the header for a single frame. Parameters ---------- obs_info : dict Dictionary of parameters. See `save_fits` function. index : int Frame index. bunch_start : str Start time of the bunch. primary_header : dict (optional) Primary header. default: {} Returns ------- """ header = fits.Header() camera_config, optics_config = load_camera_and_optics_config( obs_info['band']) plszx = camera_config['plszx'] plszy = camera_config['plszy'] pscan1 = camera_config['pscan1'] pscan2 = camera_config['pscan2'] oscan1 = camera_config['oscan1'] oscan2 = camera_config['oscan2'] udark = camera_config['udark'] bdark = camera_config['bdark'] ldark = camera_config['ldark'] rdark = camera_config['rdark'] imgszx = plszx + pscan1 + oscan1 + ldark + rdark imgszy = plszy + pscan2 + oscan2 + udark + bdark header['XTENSION'] = 'IMAGE' header['BITPIX'] = 16 header['NAXIS'] = 2 header['NAXIS1'] = 1080 header['NAXIS2'] = 1050 header['EXTNAME'] = 'IMAGE' header['EXTVER'] = 1 header['BSCALE'] = 1.0 header['BZERO'] = 32768.0 header['BUNIT'] = 'ADU' header['FILTER'] = obs_info['band'] header['DETSN'] = '00000000000' header['DETNAME'] = camera_config['detector_name'] header['CHIPLAB'] = camera_config['ccd_label'] header['CHIPTEMP'] = float(camera_config['chip_temp']) header['DEWTEMP'] = float(camera_config['dewar_temp']) header['DETSIZE'] = f"{imgszx} * {imgszy}" header['IMGINDEX'] = index frame_time = obs_info['expt'] + camera_config['readout_time'] bunch_start = datetime.fromisoformat(bunch_start) expstart = bunch_start + timedelta(seconds=frame_time * (index - 1)) bunch_start_mjd = datetime_obj_to_mjd(bunch_start) ra0 = primary_header.get('RA_PNT0', -1.0) dec0 = primary_header.get('DEC_PNT0', -1.0) pa0 = primary_header.get('POS_ANG0', -1.0) cab0 = primary_header.get('CABSTART', bunch_start_mjd) delta_t = frame_time * index bunch_end = bunch_start + timedelta(seconds=delta_t) bunch_end_mjd = datetime_obj_to_mjd(bunch_end) ra1 = primary_header.get('RA_PNT1', ra0) dec1 = primary_header.get('DEC_PNT1', dec0) pa1 = primary_header.get('POS_ANG1', pa0) cab1 = primary_header.get('CABEND', bunch_end_mjd) img_cab = datetime_obj_to_mjd(expstart) ratio = (img_cab - cab0)/(cab1 - cab0) ra = ra0 + (ra1 - ra0) * ratio dec = dec0 + (dec1 - dec0) * ratio pa = pa0 + (pa1 - pa0) * ratio header['IMG_EXPT'] = expstart.isoformat(timespec='seconds') header['IMG_CABT'] = header['IMG_EXPT'] header['IMG_DUR'] = float(obs_info['expt']) header['IMG_PA'] = ra header['IMG_RA'] = dec header['IMG_DEC'] = pa header['DATASECT'] = f"{plszx} * {plszy}" header['PIXSCAL'] = optics_config['platescale'] header['PIXSIZE'] = camera_config['pitch_size'] header['NCHAN'] = 1 header['PSCAN1'] = pscan1 header['PSCAN2'] = pscan2 header['OSCAN1'] = oscan1 header['OSCAN2'] = oscan2 header['UDARK'] = udark header['BDARK'] = bdark header['LDARK'] = ldark header['RDARK'] = rdark # WCS cstar = {'ra': '0d', 'dec': '0d'} if obs_info['target'] != {}: cstar = obs_info['target']['cstar'] radec = SkyCoord(cstar['ra'], cstar['dec']) shift = obs_info['shift'] platescale = optics_config['platescale'] rotation = np.radians(obs_info['rotation']) header['WCSAXES'] = 2 header['CRPIX1'] = (plszx + 1)/2 + pscan1 + ldark + shift[0] / platescale header['CRPIX2'] = (plszy + 1)/2 + pscan2 + udark + shift[0] / platescale header['CRVAL1'] = header['CRVAL2'] = header['CTYPE1'] = 'RA---TAN' header['CTYPE2'] = 'DEC--TAN' header['CD1_1'] = np.cos(rotation) header['CD1_2'] = -np.sin(rotation) header['CD2_1'] = np.sin(rotation) header['CD2_2'] = np.cos(rotation) header['others'] = 'other' # Readout information header['EMGAIN'] = float(obs_info['emgain']) header['GAIN'] = float(camera_config['ph_per_adu']) header['DET_BIAS'] = float(camera_config['bias_level']) header['RON'] = float(camera_config['readout_noise']) header['READTIME'] = float(camera_config['readout_time']) header['ROSPEED'] = float(camera_config['readout_speed']) # CPIC information header['LS_STAT'] = 'OFF' header['IWA'] = optics_config['mask_width'] / 2 header['WFSINFO1'] = -1.0 header['WFSINFO2'] = -1.0 header['CHECKSUM'] = '0000000000000000' header['DATASUM'] = '0000000000' header = check_and_update_fits_header(header) return header def save_fits_simple(images, obs_info): """ Save the image to a fits file with a simple header to TMP directory. Parameters ---------- images : numpy.ndarray Image array to be written. obs_info : dict Dictionary of observation informations. See `save_fits` function. Returns ---------- Filename of the saved fits file. """ target = obs_info['target'] target_info = 'NO_TARGET' if 'cstar' in target.keys(): target_info = '' target_info = f"S{target['cstar']['magnitude']:.1f}" target_info += f"_P{len(target.get('planets', '[]'))}" name = target_info if 'name' in target.keys(): name = target['name'] name = name.replace('/', '_') name = name.replace(',', '_') now = time = now.strftime("%Y%m%d%H%M%S") filename = f"{name}_{time}.fits" header = fits.Header() header['skybg'] = obs_info['skybg'] header['name'] = name header['exptime'] = obs_info['expt'] header['nframe'] = obs_info['nframe'] header['band'] = obs_info['band'] header['emgain'] = obs_info['emgain'] header['obsid'] = obs_info['obsid'] header['rotation'] = obs_info['rotation'] shift = obs_info['shift'] header['shift'] = f"x:{shift[0]},y:{shift[1]}" fullname = f"{tmp_folder_path}/{filename}" fits.writeto(fullname, images, overwrite=True, header=header) return fullname def save_fits(images, obs_info, gnc_info, csst_format=True): """ Save the image to a fits file. Parameters ---------- images : numpy.ndarray Image array to be saved. obs_info : dict Dictionary of observation informations. Must contain the following keys - band: str - Band of the image. - expt: float - Exposure time of the each image. - nframe: int - Number of frames in the image. - emgain: int - EM gain of the camera. - obsid: str - Observation ID. Obsid must be 11 digits and start with 5 for CPIC. See pharse_obsid() for details. - rotation: float - Rotation angle of the image. - shift: list - Shift of the image. gnc_info : dict Dictionary of GNC information. Contains the keywords in the primary header. See primary_hdu() for details. csst_format : bool, optional Whether to save the fits file in CSST format, by default True. """ if not csst_format: save_fits_simple(images, obs_info) return hdu_header, folder, filename = primary_hdu(obs_info, gnc_info, True) hdu_list = fits.HDUList([hdu_header]) if len(images.shape) == 2: images = np.array([images]) for index in range(images.shape[0]): header = frame_header( obs_info, index + 1, hdu_header.header['DATE-OBS'], primary_header=hdu_header.header ) frame_hdu = fits.ImageHDU(images[index, :, :], header=header) frame_hdu.add_checksum() hdu_list.append(frame_hdu) folder = f"{output_dir}/{folder}" if not os.path.exists(folder): os.makedirs(folder) hdu_list.writeto(f"{folder}/{filename}", overwrite=True)