from ObservationSim.ObservationSim import Observation from ObservationSim._util import parse_args from datetime import datetime import os import numpy as np import galsim import yaml import gc gc.enable() def Pointing(config, pointing_filename=None, data_dir=None): pRA = [] pDEC = [] if pointing_filename and data_dir: pointing_file = os.path.join(data_dir, pointing_filename) f = open(pointing_file, 'r') for _ in range(1): header = f.readline() iline = 0 for line in f: line = line.strip() columns = line.split() pRA.append(float(columns[0])) pDEC.append(float(columns[1])) f.close() else: pRa.append(config["obs_setting"]["ra_center"]) pDec.append(config["obs_setting"]["dec_center"]) pRA = np.array(pRA) pDEC = np.array(pDEC) # Create calibration pointings # NOTE: temporary implementation ncal = config['obs_setting']['np_cal'] pointing_type = ['MS']*len(pRA) pRA = np.append([pRA[0]]*ncal, pRA) pDEC = np.append([pDEC[0]]*ncal, pDEC) pointing_type = ['CAL']*ncal + pointing_type # Calculate starting time(s) # NOTE: temporary implementation t0 = datetime(2021, 5, 25, 12, 0, 0) t = datetime.timestamp(t0) timestamp_obs = [] delta_t = 10 # Time elapsed between exposures (minutes) for i in range(len(pointing_type)): timestamp_obs.append(t) if pointing_type[i] == 'CAL': t += 3 * delta_t * 60 # 3 calibration exposures for each pointing elif pointing_type[i] == 'MS': t += delta_t * 60 timestamp_obs = np.array(timestamp_obs) pointing_type = np.array(pointing_type) return pRA, pDEC, timestamp_obs, pointing_type def MakeDirectories(work_dir, run_name, nPointings, pRange=None): if not os.path.exists(work_dir): try: os.makedirs(work_dir, exist_ok=True) except OSError: pass imgDir = os.path.join(work_dir, run_name) if not os.path.exists(imgDir): try: os.makedirs(imgDir, exist_ok=True) except OSError: pass prefix = "MSC_" for pointing_ID in range(nPointings): if pRange is not None: if pointing_ID not in pRange: continue fname=prefix + str(pointing_ID).rjust(7, '0') subImgDir = os.path.join(imgDir, fname) if not os.path.exists(subImgDir): try: os.makedirs(subImgDir, exist_ok=True) except OSError: pass def runSim(): """ Main simulation call. """ args = parse_args() if args.config_dir is None: args.config_dir = '' args.config_dir = os.path.abspath(args.config_dir) args.config_file = os.path.join(args.config_dir, args.config_file) with open(args.config_file, "r") as stream: try: config = yaml.safe_load(stream) for key, value in config.items(): print (key + " : " + str(value)) except yaml.YAMLError as exc: print(exc) config["obs_setting"]["image_rot"] = float(config["obs_setting"]["image_rot"])*galsim.degrees if args.data_dir is not None: config['data_dir'] = args.data_dir if args.work_dir is not None: config['work_dir'] = args.work_dir pRA, pDEC, timestamp_obs, pointing_type = Pointing(config=config, pointing_filename=config['pointing_file'], data_dir=config['pointing_dir']) MakeDirectories(work_dir=config['work_dir'], run_name=config['run_name'], nPointings=len(pRA), pRange=config['obs_setting']['run_pointings']) obs = Observation(config=config, work_dir=config['work_dir'], data_dir=config['data_dir']) if config["pointing_file"] is None: obs.runExposure(chips=config["obs_setting"]["run_chips"]) else: obs.runExposure_MPI_PointingList( ra_cen=pRA, dec_cen=pDEC, pRange=config["obs_setting"]["run_pointings"], timestamp_obs=timestamp_obs, pointing_type=pointing_type, exptime=config["obs_setting"]["exp_time"], use_mpi=config["run_option"]["use_mpi"], chips=config["obs_setting"]["run_chips"] ) if __name__=='__main__': runSim()