_util.py 4.47 KB
Newer Older
Fang Yuedong's avatar
Fang Yuedong committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import numpy as np
import os
from datetime import datetime
import argparse
from astropy.time import Time

from observation_sim.config import Pointing


def parse_args():
    '''
    Parse command line arguments. Many of the following
    can be set in the .yaml config file as well.
    '''
    parser = argparse.ArgumentParser()
    parser.add_argument('--config_file', type=str, required=True,
                        help='.yaml config file for simulation settings.')
    parser.add_argument('--catalog', type=str,
                        help='name of the catalog interface class to be loaded.')
    parser.add_argument('-c', '--config_dir', type=str,
                        help='Directory that houses the .yaml config file.')
    parser.add_argument('-d', '--data_dir', type=str,
                        help='Directory that houses the input data.')
    parser.add_argument('-w', '--work_dir', type=str,
                        help='The path for output.')
    return parser.parse_args()


def generate_pointing_list(config, pointing_filename=None, data_dir=None):
    pointing_list = []

    # Only valid when the pointing list does not contain time stamp column
    t0 = datetime(2021, 5, 25, 12, 0, 0)
    delta_t = 10.  # Time elapsed between exposures (minutes)

    # Calculate starting time(s) for CAL exposures
    # NOTE: temporary implementation
    t = datetime.timestamp(t0)
    ipoint = 0

    run_pointings = config['obs_setting']['run_pointings']
    if "obs_config_file" in config['obs_setting']:
        obs_config_file = config['obs_setting']["obs_config_file"]
    else:
        obs_config_file = None

    # if pointing_filename and data_dir:
    if pointing_filename:
        if data_dir:
            pointing_file = os.path.join(data_dir, pointing_filename)
        else:
            pointing_file = pointing_filename
        f = open(pointing_file, 'r')
        # for _ in range(1):
        #     header = f.readline()
        iline = 0
        for line in f:
            if len(line.strip()) == 0 or line[0] == '#':
                continue
            if run_pointings and iline not in run_pointings:
                iline += 1
                ipoint += 1
                continue
            line = line.strip()
            columns = line.split()
            pointing = Pointing(obs_config_file=obs_config_file)
            pointing.read_pointing_columns(columns=columns, id=ipoint)
            t += delta_t * 60.
            pointing_list.append(pointing)
            iline += 1
            ipoint += 1
        f.close()
    else:
        if config["obs_setting"]["exp_time"]:
            exp_time = config["obs_setting"]["exp_time"]
        else:
            exp_time = 150.
        pointing = Pointing(
            id=ipoint,
            ra=config["obs_setting"]["ra_center"],
            dec=config["obs_setting"]["dec_center"],
            img_pa=config["obs_setting"]["image_rot"],
            timestamp=t,
            exp_time=exp_time,
            pointing_type='SCI',
            obs_config_file=obs_config_file
        )
        t += delta_t * 60.
        pointing_list.append(pointing)
        ipoint += 1
    return pointing_list


def make_run_dirs(work_dir, run_name, pointing_list):
    if not os.path.exists(work_dir):
        try:
            os.makedirs(work_dir, exist_ok=True)
        except OSError:
            pass
    imgDir = os.path.join(work_dir, run_name)
    if not os.path.exists(imgDir):
        try:
            os.makedirs(imgDir, exist_ok=True)
        except OSError:
            pass
    return imgDir


def make_output_pointing_dir(path_dict, config, pointing_ID=0):
    imgDir = os.path.join(path_dict["work_dir"], config["run_name"])
    if not os.path.exists(imgDir):
        try:
            os.makedirs(imgDir, exist_ok=True)
        except OSError:
            pass
    prefix = "MSC_" + str(pointing_ID).rjust(8, '0')
    subImgdir = os.path.join(imgDir, prefix)
    if not os.path.exists(subImgdir):
        try:
            os.makedirs(subImgdir, exist_ok=True)
        except OSError:
            pass
    return subImgdir, prefix


def get_shear_field(config):
    if not config["shear_setting"]["shear_type"] in ["constant", "catalog"]:
        raise ValueError("Please set a right 'shear_method' parameter.")

    if config["shear_setting"]["shear_type"] == "constant":
        g1 = config["shear_setting"]["reduced_g1"]
        g2 = config["shear_setting"]["reduced_g2"]
        nshear = 1
        # TODO logging
    else:
        g1, g2 = 0., 0.
        nshear = 0
    return g1, g2, nshear