import os import shutil import yaml import galsim import numpy as np from astropy.time import Time from observation_sim.config._util import get_obs_id import observation_sim.instruments._util as _util class Pointing(object): def __init__(self, id=0, ra=0., dec=0., img_pa=0., timestamp=1621915200, sat_x=0., sat_y=0., sat_z=0., sun_x=0., sun_y=0., sun_z=0., sat_vx=0., sat_vy=0., sat_vz=0., exp_time=150., pointing_type='SCI', pointing_type_code='101', pointing_id='00000001', obs_config_file=None, t_shutter_open=1.3, t_shutter_close=1.3): self.id = id self.ra = ra self.dec = dec self.img_pa = img_pa * galsim.degrees self.timestamp = timestamp self.sat_x, self.sat_y, self.sat_z = sat_x, sat_y, sat_z self.sun_x, self.sun_y, self.sun_z = sun_x, sun_y, sun_z self.sat_vx, self.sat_vy, self.sat_vz = sat_vx, sat_vy, sat_vz self.exp_time = exp_time self.pointing_type = pointing_type self.pointing_type_code = pointing_type_code self.obs_id = pointing_id self.survey_field_type = 'WIDE' self.jdt = 0. self.obs_config_file = obs_config_file self.t_shutter_open = t_shutter_open self.t_shutter_close = t_shutter_close self.output_dir = "." if self.obs_config_file is not None: with open(self.obs_config_file, "r") as stream: try: self.obs_param = yaml.safe_load(stream) except yaml.YAMLError as exc: print(exc) if self.obs_param["obs_type"]: self.pointing_type = self.obs_param["obs_type"] if self.obs_param["obs_type_code"]: self.pointing_type_code = self.obs_param["obs_type_code"] if self.obs_param["obs_id"]: self.obs_id = str(self.obs_param["obs_id"]) def get_full_depth_exptime(self, filter_type): if self.survey_field_type == 'WIDE': if filter_type in _util.SPEC_FILTERS: return 150. * 4 else: if filter_type.lower() in ['nuv', 'y']: return 150. * 4 elif filter_type.lower() in ['u', 'g', 'r', 'i', 'z']: return 150. * 2 else: return max(150., self.exp_time) # [TODO] for FGS elif self.survey_field_type == 'DEEP': if filter_type in _util.SPEC_FILTERS: return 250. * 4 * 4 else: if filter_type.lower() in ['nuv', 'y']: return 250. * 4 * 4 elif filter_type.lower() in ['u', 'g', 'r', 'i', 'z']: return 250. * 2 * 4 else: return max(150., self.exp_time) # [TODO] for FGS def read_pointing_columns(self, columns, id=0, t=1621915200, pointing_type='SCI'): self.id = id col_len = len(columns) self.ra = float(columns[0]) self.dec = float(columns[1]) self.img_pa = float(columns[4]) * galsim.degrees # self.pointing_type = pointing_type if col_len > 5: jdt = np.double(columns[5]) t_temp = Time(jdt, format='jd') self.jdt = jdt self.timestamp = t_temp.unix self.sat_x = float(columns[6]) self.sat_y = float(columns[7]) self.sat_z = float(columns[8]) self.sun_x = float(columns[9]) self.sun_y = float(columns[10]) self.sun_z = float(columns[11]) self.sat_vx = float(columns[15]) self.sat_vy = float(columns[16]) self.sat_vz = float(columns[17]) self.exp_time = float(columns[18]) is_deep = float(columns[19]) # [TODO] Can also define other survey types if is_deep != -1.0: self.survey_field_type = "DEEP" if not self.obs_config_file: self.obs_config_file = str(columns[20]) with open(self.obs_config_file, "r") as stream: try: self.obs_param = yaml.safe_load(stream) except yaml.YAMLError as exc: print(exc) self.pointing_type_code = columns[21][0:3] self.obs_id = columns[21][3:] self.pointing_type = self.obs_param["obs_type"] else: self.timestamp = t def make_output_pointing_dir(self, overall_config, copy_obs_config=False): run_dir = os.path.join( overall_config["work_dir"], overall_config["run_name"]) if not os.path.exists(run_dir): try: os.makedirs(run_dir, exist_ok=True) except OSError: pass self.output_prefix = get_obs_id( img_type=self.pointing_type, project_cycle=overall_config["project_cycle"], run_counter=overall_config["run_counter"], pointing_id=self.obs_id, pointing_type_code=self.pointing_type_code) self.output_dir = os.path.join(run_dir, self.output_prefix) if not os.path.exists(self.output_dir): try: os.makedirs(self.output_dir, exist_ok=True) except OSError: pass if copy_obs_config and self.obs_config_file: obs_config_output_path = os.path.join( self.output_dir, os.path.basename(self.obs_config_file)) if not os.path.exists(obs_config_output_path): try: shutil.copy(self.obs_config_file, self.output_dir) except OSError: pass