An error occurred while loading the file. Please try again.
_util.py 4.47 KiB
import numpy as np
import os
from datetime import datetime
import argparse
from astropy.time import Time
from observation_sim.config import Pointing
def parse_args():
    '''
    Parse command line arguments. Many of the following
    can be set in the .yaml config file as well.
    '''
    parser = argparse.ArgumentParser()
    parser.add_argument('--config_file', type=str, required=True,
                        help='.yaml config file for simulation settings.')
    parser.add_argument('--catalog', type=str,
                        help='name of the catalog interface class to be loaded.')
    parser.add_argument('-c', '--config_dir', type=str,
                        help='Directory that houses the .yaml config file.')
    parser.add_argument('-d', '--data_dir', type=str,
                        help='Directory that houses the input data.')
    parser.add_argument('-w', '--work_dir', type=str,
                        help='The path for output.')
    return parser.parse_args()
def generate_pointing_list(config, pointing_filename=None, data_dir=None):
    pointing_list = []
    # Only valid when the pointing list does not contain time stamp column
    t0 = datetime(2021, 5, 25, 12, 0, 0)
    delta_t = 10.  # Time elapsed between exposures (minutes)
    # Calculate starting time(s) for CAL exposures
    # NOTE: temporary implementation
    t = datetime.timestamp(t0)
    ipoint = 0
    run_pointings = config['obs_setting']['run_pointings']
    if "obs_config_file" in config['obs_setting']:
        obs_config_file = config['obs_setting']["obs_config_file"]
    else:
        obs_config_file = None
    # if pointing_filename and data_dir:
    if pointing_filename:
        if data_dir:
            pointing_file = os.path.join(data_dir, pointing_filename)
        else:
            pointing_file = pointing_filename
        f = open(pointing_file, 'r')
        # for _ in range(1):
        #     header = f.readline()
        iline = 0
        for line in f:
            if len(line.strip()) == 0 or line[0] == '#':
                continue
            if run_pointings and iline not in run_pointings:
                iline += 1
                ipoint += 1
                continue
            line = line.strip()
            columns = line.split()
            pointing = Pointing(obs_config_file=obs_config_file)
            pointing.read_pointing_columns(columns=columns, id=ipoint)
            t += delta_t * 60.
            pointing_list.append(pointing)
            iline += 1
ipoint += 1 f.close() else: if config["obs_setting"]["exp_time"]: exp_time = config["obs_setting"]["exp_time"] else: exp_time = 150. pointing = Pointing( id=ipoint, ra=config["obs_setting"]["ra_center"], dec=config["obs_setting"]["dec_center"], img_pa=config["obs_setting"]["image_rot"], timestamp=t, exp_time=exp_time, pointing_type='SCI', obs_config_file=obs_config_file ) t += delta_t * 60. pointing_list.append(pointing) ipoint += 1 return pointing_list def make_run_dirs(work_dir, run_name, pointing_list): if not os.path.exists(work_dir): try: os.makedirs(work_dir, exist_ok=True) except OSError: pass imgDir = os.path.join(work_dir, run_name) if not os.path.exists(imgDir): try: os.makedirs(imgDir, exist_ok=True) except OSError: pass return imgDir def make_output_pointing_dir(path_dict, config, pointing_ID=0): imgDir = os.path.join(path_dict["work_dir"], config["run_name"]) if not os.path.exists(imgDir): try: os.makedirs(imgDir, exist_ok=True) except OSError: pass prefix = "MSC_" + str(pointing_ID).rjust(8, '0') subImgdir = os.path.join(imgDir, prefix) if not os.path.exists(subImgdir): try: os.makedirs(subImgdir, exist_ok=True) except OSError: pass return subImgdir, prefix def get_shear_field(config): if not config["shear_setting"]["shear_type"] in ["constant", "catalog"]: raise ValueError("Please set a right 'shear_method' parameter.") if config["shear_setting"]["shear_type"] == "constant": g1 = config["shear_setting"]["reduced_g1"] g2 = config["shear_setting"]["reduced_g2"] nshear = 1 # TODO logging else: g1, g2 = 0., 0. nshear = 0 return g1, g2, nshear