Newer
Older
from astropy.time import Time
from ObservationSim.Config import Pointing
def parse_args():
'''
Parse command line arguments. Many of the following
can be set in the .yaml config file as well.
'''
parser = argparse.ArgumentParser()
parser.add_argument('--config_file', type=str, required=True, help='.yaml config file for simulation settings.')
parser.add_argument('--catalog', type=str, required=True, help='name of the catalog interface class to be loaded.')
parser.add_argument('-c', '--config_dir', type=str, help='Directory that houses the .yaml config file.')
parser.add_argument('-d', '--data_dir', type=str, help='Directory that houses the input data.')
parser.add_argument('-w', '--work_dir', type=str, help='The path for output.')
def generate_pointing_list(config, pointing_filename=None, data_dir=None):
pointing_list = []
# Only valid when the pointing list does not contain time stamp column
t0 = datetime(2021, 5, 25, 12, 0, 0)
delta_t = 10. # Time elapsed between exposures (minutes)
# Calculate starting time(s) for CAL exposures
# NOTE: temporary implementation
t = datetime.timestamp(t0)
ipoint = 0
run_pointings = config['obs_setting']['run_pointings']
if pointing_filename and data_dir:
pointing_file = os.path.join(data_dir, pointing_filename)
f = open(pointing_file, 'r')
for _ in range(1):
header = f.readline()
iline = 0
for line in f:
if run_pointings and iline not in run_pointings:
iline += 1
line = line.strip()
columns = line.split()
pointing = Pointing()
pointing.read_pointing_columns(columns=columns, id=ipoint)
t += delta_t * 60.
pointing_list.append(pointing)
iline += 1
ipoint += 1
if config["obs_setting"]["exp_time"]:
exp_time = config["obs_setting"]["exp_time"]
else:
exp_time = 150.
pointing = Pointing(
id=ipoint,
ra=config["obs_setting"]["ra_center"],
dec=config["obs_setting"]["dec_center"],
img_pa=config["obs_setting"]["image_rot"],
timestamp=t,
Fang Yuedong
committed
pointing_type='SCI'
)
t += delta_t * 60.
pointing_list.append(pointing)
ipoint += 1
return pointing_list
def make_run_dirs(work_dir, run_name, pointing_list):
if not os.path.exists(work_dir):
try:
os.makedirs(work_dir, exist_ok=True)
except OSError:
pass
imgDir = os.path.join(work_dir, run_name)
if not os.path.exists(imgDir):
try:
os.makedirs(imgDir, exist_ok=True)
except OSError:
pass
Fang Yuedong
committed
return imgDir
imgDir = os.path.join(path_dict["work_dir"], config["run_name"])
if not os.path.exists(imgDir):
try:
os.makedirs(imgDir, exist_ok=True)
except OSError:
pass
prefix = "MSC_" + str(pointing_ID).rjust(8, '0')
subImgdir = os.path.join(imgDir, prefix)
if not os.path.exists(subImgdir):
try:
os.makedirs(subImgdir, exist_ok=True)
except OSError:
pass
return subImgdir, prefix
def get_shear_field(config):
if not config["shear_setting"]["shear_type"] in ["constant", "catalog"]:
raise ValueError("Please set a right 'shear_method' parameter.")
if config["shear_setting"]["shear_type"] == "constant":
g1 = config["shear_setting"]["reduced_g1"]
g2 = config["shear_setting"]["reduced_g2"]
nshear = 1
# TODO logging