An error occurred while loading the file. Please try again.
-
Fang Yuedong authored4afd1181
import numpy as np
import os
from datetime import datetime
import argparse
from astropy.time import Time
from observation_sim.config import Pointing
def parse_args():
'''
Parse command line arguments. Many of the following
can be set in the .yaml config file as well.
'''
parser = argparse.ArgumentParser()
parser.add_argument('--config_file', type=str, required=True,
help='.yaml config file for simulation settings.')
parser.add_argument('--catalog', type=str,
help='name of the catalog interface class to be loaded.')
parser.add_argument('-c', '--config_dir', type=str,
help='Directory that houses the .yaml config file.')
parser.add_argument('-d', '--data_dir', type=str,
help='Directory that houses the input data.')
parser.add_argument('-w', '--work_dir', type=str,
help='The path for output.')
return parser.parse_args()
def generate_pointing_list(config, pointing_filename=None, data_dir=None):
pointing_list = []
# Only valid when the pointing list does not contain time stamp column
t0 = datetime(2021, 5, 25, 12, 0, 0)
delta_t = 10. # Time elapsed between exposures (minutes)
# Calculate starting time(s) for CAL exposures
# NOTE: temporary implementation
t = datetime.timestamp(t0)
ipoint = 0
run_pointings = config['obs_setting']['run_pointings']
if "obs_config_file" in config['obs_setting']:
obs_config_file = config['obs_setting']["obs_config_file"]
else:
obs_config_file = None
# if pointing_filename and data_dir:
if pointing_filename:
if data_dir:
pointing_file = os.path.join(data_dir, pointing_filename)
else:
pointing_file = pointing_filename
f = open(pointing_file, 'r')
# for _ in range(1):
# header = f.readline()
iline = 0
for line in f:
if len(line.strip()) == 0 or line[0] == '#':
continue
if run_pointings and iline not in run_pointings:
iline += 1
ipoint += 1
continue
line = line.strip()
columns = line.split()
pointing = Pointing(obs_config_file=obs_config_file)
pointing.read_pointing_columns(columns=columns, id=ipoint)
t += delta_t * 60.
pointing_list.append(pointing)
iline += 1
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
ipoint += 1
f.close()
else:
if config["obs_setting"]["exp_time"]:
exp_time = config["obs_setting"]["exp_time"]
else:
exp_time = 150.
pointing = Pointing(
id=ipoint,
ra=config["obs_setting"]["ra_center"],
dec=config["obs_setting"]["dec_center"],
img_pa=config["obs_setting"]["image_rot"],
timestamp=t,
exp_time=exp_time,
pointing_type='SCI',
obs_config_file=obs_config_file
)
t += delta_t * 60.
pointing_list.append(pointing)
ipoint += 1
return pointing_list
def make_run_dirs(work_dir, run_name, pointing_list):
if not os.path.exists(work_dir):
try:
os.makedirs(work_dir, exist_ok=True)
except OSError:
pass
imgDir = os.path.join(work_dir, run_name)
if not os.path.exists(imgDir):
try:
os.makedirs(imgDir, exist_ok=True)
except OSError:
pass
return imgDir
def make_output_pointing_dir(path_dict, config, pointing_ID=0):
imgDir = os.path.join(path_dict["work_dir"], config["run_name"])
if not os.path.exists(imgDir):
try:
os.makedirs(imgDir, exist_ok=True)
except OSError:
pass
prefix = "MSC_" + str(pointing_ID).rjust(8, '0')
subImgdir = os.path.join(imgDir, prefix)
if not os.path.exists(subImgdir):
try:
os.makedirs(subImgdir, exist_ok=True)
except OSError:
pass
return subImgdir, prefix
def get_shear_field(config):
if not config["shear_setting"]["shear_type"] in ["constant", "catalog"]:
raise ValueError("Please set a right 'shear_method' parameter.")
if config["shear_setting"]["shear_type"] == "constant":
g1 = config["shear_setting"]["reduced_g1"]
g2 = config["shear_setting"]["reduced_g2"]
nshear = 1
# TODO logging
else:
g1, g2 = 0., 0.
nshear = 0
return g1, g2, nshear