import numpy as np import os from datetime import datetime import argparse def parse_args(): ''' Parse command line arguments. Many of the following can be set in the .yaml config file as well. ''' parser = argparse.ArgumentParser() parser.add_argument('config_file', help='.yaml config file for simulation settings.') parser.add_argument('-c', '--config_dir', help='Directory that houses the ,yaml config file.') parser.add_argument('-d', '--data_dir', help='Directory that houses the input data.') parser.add_argument('-w', '--work_dir', help='The path for output.') return parser.parse_args() def generate_pointings(config, pointing_filename=None, data_dir=None): pRA = [] pDEC = [] if pointing_filename and data_dir: pointing_file = os.path.join(data_dir, pointing_filename) f = open(pointing_file, 'r') for _ in range(1): header = f.readline() iline = 0 for line in f: line = line.strip() columns = line.split() pRA.append(float(columns[0])) pDEC.append(float(columns[1])) f.close() else: pRA.append(config["obs_setting"]["ra_center"]) pDEC.append(config["obs_setting"]["dec_center"]) pRA = np.array(pRA) pDEC = np.array(pDEC) # Create calibration pointings # NOTE: temporary implementation ncal = config['obs_setting']['np_cal'] pointing_type = ['MS']*len(pRA) pRA = np.append([pRA[0]]*ncal, pRA) pDEC = np.append([pDEC[0]]*ncal, pDEC) pointing_type = ['CAL']*ncal + pointing_type # Calculate starting time(s) # NOTE: temporary implementation t0 = datetime(2021, 5, 25, 12, 0, 0) t = datetime.timestamp(t0) timestamp_obs = [] delta_t = 10 # Time elapsed between exposures (minutes) for i in range(len(pointing_type)): timestamp_obs.append(t) if pointing_type[i] == 'CAL': t += 3 * delta_t * 60 # 3 calibration exposures for each pointing elif pointing_type[i] == 'MS': t += delta_t * 60 timestamp_obs = np.array(timestamp_obs) pointing_type = np.array(pointing_type) if config['obs_setting']['run_pointings'] is None: pRange = list(range(len(pRA))) else: ncal = config['obs_setting']['np_cal'] plist = config['obs_setting']['run_pointings'] pRange = list(range(ncal)) + [x + ncal for x in plist] return pRA, pDEC, timestamp_obs, pointing_type, pRange def make_run_dirs(work_dir, run_name, nPointings, pRange=None): if not os.path.exists(work_dir): try: os.makedirs(work_dir, exist_ok=True) except OSError: pass imgDir = os.path.join(work_dir, run_name) if not os.path.exists(imgDir): try: os.makedirs(imgDir, exist_ok=True) except OSError: pass prefix = "MSC_" for pointing_ID in range(nPointings): if pRange is not None: if pointing_ID not in pRange: continue fname=prefix + str(pointing_ID).rjust(7, '0') subImgDir = os.path.join(imgDir, fname) if not os.path.exists(subImgDir): try: os.makedirs(subImgDir, exist_ok=True) except OSError: pass def imgName(tt=0): ut = datetime.utcnow() eye, emo, eda, eho, emi, ese = str(ut.year), str(ut.month), str(ut.day), str(ut.hour), str(ut.minute), str(ut.second) emse = str(ut.microsecond) if int(emo)<10: emo = "0%s"%emo if int(eda)<10: eda = "0%s"%eda if int(eho)<10: eho = "0%s"%eho if int(emi)<10: emi = "0%s"%emi if int(ese)<10: ese = "0%s"%ese if tt==0: namekey = "CSST%s%s%sT%s%s%s"%(eye,emo,eda,eho,emi,ese) elif tt==1: namekey = "%s-%s-%sT%s:%s:%s.%s"%(eye,emo,eda,eho,emi,ese,emse) elif tt==2: namekey = "%s%s%s%s%s%s"%(eye,emo,eda,eho,emi,ese) else: raise ValueError("!!! Give a right 'tt' value.") return namekey def makeSubDir_PointingList(path_dict, config, pointing_ID=0): imgDir = os.path.join(path_dict["work_dir"], config["run_name"]) if not os.path.exists(imgDir): try: os.makedirs(imgDir, exist_ok=True) except OSError: pass prefix = "MSC_" + str(pointing_ID).rjust(7, '0') subImgdir = os.path.join(imgDir, prefix) if not os.path.exists(subImgdir): try: os.makedirs(subImgdir, exist_ok=True) except OSError: pass return subImgdir, prefix def getShearFiled(config, shear_cat_file=None): if not config["shear_setting"]["shear_type"] in ["constant", "extra"]: raise ValueError("Please set a right 'shear_method' parameter.") if config["shear_setting"]["shear_type"] == "constant": g1 = config["shear_setting"]["reduced_g1"] g2 = config["shear_setting"]["reduced_g2"] reduced_shear = np.sqrt(g1**2 + g2**2) nshear = 1 # TODO logging else: # TODO logging if not os.path.exists(shear_cat_file): raise ValueError("Cannot find shear catalog file.") try: shearCat = np.loadtxt(shear_cat_file) nshear = shearCat.shape[0] g1, g2 = shearCat[:, 0], shearCat[:, 1] except: print("Failed to the shear catalog file.") print("Setting to no shear.") g1, g2 = 0., 0. return g1, g2, nshear