data_manager.py 27.2 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
3
4
5
6
7
8
"""
Identifier:     KSC-SJ4-csst_common/data_manager.py
Name:           data_manager.py
Description:    file path generator
Author:         Bo Zhang
Created:        2022-09-13
Modified-History:
    2022-09-13, Bo Zhang, created
BO ZHANG's avatar
tweaks    
BO ZHANG committed
9
    2022-09-13, Bo Zhang, added CsstMbiDataManager
BO ZHANG's avatar
BO ZHANG committed
10
    2022-09-29, Bo Zhang, favor CsstMsDataManager instead of CsstMbiDataManager
BO ZHANG's avatar
BO ZHANG committed
11
    2022-10-26, Bo Zhang, reconstruct CsstMsDataManager, deprecate CsstMbiDataManager
BO ZHANG's avatar
BO ZHANG committed
12
    2022-10-28, Bo Zhang, added CsstMsDataManager.query_rc(), dm.use_dfs, dm.node
BO ZHANG's avatar
BO ZHANG committed
13
    2022-11-06, Bo Zhang, deleted CsstMbiDataManager
BO ZHANG's avatar
BO ZHANG committed
14
    2022-11-20, Bo Zhang, added DFS APIs
BO ZHANG's avatar
BO ZHANG committed
15
"""
BO ZHANG's avatar
BO ZHANG committed
16
import glob
BO ZHANG's avatar
BO ZHANG committed
17
import os
BO ZHANG's avatar
BO ZHANG committed
18
import re
BO ZHANG's avatar
BO ZHANG committed
19
from typing import Union
BO ZHANG's avatar
BO ZHANG committed
20

BO ZHANG's avatar
BO ZHANG committed
21
import numpy as np
BO ZHANG's avatar
BO ZHANG committed
22
from astropy.io import fits
BO ZHANG's avatar
BO ZHANG committed
23
from astropy.table import Table
BO ZHANG's avatar
BO ZHANG committed
24
25
26
from csst_dfs_api.facility.calmerge import CalMergeApi
from csst_dfs_api.facility.level0 import Level0DataApi
from csst_dfs_api.facility.level0prc import Level0PrcApi
BO ZHANG's avatar
BO ZHANG committed
27
from csst_dfs_api.msc.level1 import Level1DataApi
BO ZHANG's avatar
BO ZHANG committed
28
from csst_dfs_api.msc.level2 import Level2DataApi
BO ZHANG's avatar
BO ZHANG committed
29
from csst_dfs_api.common.catalog import CatalogApi
BO ZHANG's avatar
BO ZHANG committed
30

BO ZHANG's avatar
BO ZHANG committed
31
from .logger import get_logger
32
from .params import CSST_PARAMS as CP
BO ZHANG's avatar
BO ZHANG committed
33
from .params import DFS_CONF
BO ZHANG's avatar
BO ZHANG committed
34
35


BO ZHANG's avatar
BO ZHANG committed
36
class CsstMsDataManager:
BO ZHANG's avatar
BO ZHANG committed
37
38
    """
    CSST MS data manager, including MBI and SLS.
BO ZHANG's avatar
tweaks    
BO ZHANG committed
39

BO ZHANG's avatar
tweaks    
BO ZHANG committed
40
41
42
43
    ``CsstMsDataManager`` provides an interface to switch between DFS and local file system.
    To initialize ``CsstMsDataManager`` from local directory, use ``CsstMsDataManager.from_dir()``
    To initialize ``CsstMsDataManager`` on ``dandelion`` or ``PM node``, ``CsstMsDataManager.quickstart()``.
    To initialize ``CsstMsDataManager`` from DFS, use ``CsstMsDataManager.from_dfs()``.
BO ZHANG's avatar
tweaks    
BO ZHANG committed
44
    To generate L0 and L1 file paths, use ``CsstMsDataManager.l0_detector()``, ``CsstMsDataManager.l1_detector()``, etc.
BO ZHANG's avatar
BO ZHANG committed
45
    Here are some examples for simulation with different versions.
BO ZHANG's avatar
BO ZHANG committed
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
    C3:
        MSC_MS_210525220000_100000020_06_raw.fits
        MSC_CRS_210525220000_100000020_06_raw.fits
        MSC_210525120000_0000020_06.cat
    C5.1:
        CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
        CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
        MSC_10000100_chip_06_filt_y.cat
        MSC_10000100_chip_06_filt_y.log
    C5.2
        CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
        CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
        MSC_100000100_chip_06_filt_y.cat
        MSC_100000100_chip_06_filt_y.log

BO ZHANG's avatar
BO ZHANG committed
61
62
63
    Parameters
    ----------
    ver_sim : str
BO ZHANG's avatar
BO ZHANG committed
64
65
66
67
68
69
70
71
72
        The version of simulation data, see ``csst_common.params.CP``.
    datatype : str
        The options are {"mbi", "sls", "all"}.
        The "all" option is used for QC in particular.
        Note that in this case methods like ``get_bias`` are unavailable.
    available_detectors : list
        The list of available detector serial numbers of available images.
    target_detectors : list
        The list of target detector serial numbers of available images.
BO ZHANG's avatar
BO ZHANG committed
73
74
75
76
    dir_l0 : str
        The L0 directory.
    dir_l1 : str
        The L1 directory.
BO ZHANG's avatar
BO ZHANG committed
77
78
    path_aux : str
        The aux data directory (bias, flat, dark).
BO ZHANG's avatar
BO ZHANG committed
79
80
    use_dfs : bool
        If True, use DFS. In case some modules may have other options such as astroquery.
BO ZHANG's avatar
BO ZHANG committed
81
    dfs_node : str
BO ZHANG's avatar
BO ZHANG committed
82
83
        The environment in which the pipeline will run.
        Use "pml" for Purple Mountain Lab cluster and "local" for others.
BO ZHANG's avatar
BO ZHANG committed
84
85
    dfs_root : str
        The DFS root path.
BO ZHANG's avatar
BO ZHANG committed
86
    obs_id : int
BO ZHANG's avatar
BO ZHANG committed
87
        The exposure ID.
BO ZHANG's avatar
BO ZHANG committed
88
    exp_start : int
BO ZHANG's avatar
BO ZHANG committed
89
        The exposure start time in ``yyyymmddhhmmss`` format.
BO ZHANG's avatar
BO ZHANG committed
90
    exp_stop : int
BO ZHANG's avatar
BO ZHANG committed
91
92
93
94
95
96
97
        The exposure start time in ``yyyymmddhhmmss`` format.
    _telescope : str
        The telescope name. Defaults to ``CSST`` for C5.2 simulation.
    _instrument : str
        The instrument name. Defaults to ``MSC`` for C5.2 simulation.
    _survey : str
        The survey name. Defaults to ``MS`` for C5.2 simulation.
BO ZHANG's avatar
BO ZHANG committed
98
    obs_type : str
BO ZHANG's avatar
BO ZHANG committed
99
        The image type signature for science images. Defualts to ``SCI`` for C5.2 simulation.
BO ZHANG's avatar
BO ZHANG committed
100
    l0_post : str
BO ZHANG's avatar
BO ZHANG committed
101
        The postfix. Defaults to ``L0_1`` for C5.2 simulation.
BO ZHANG's avatar
BO ZHANG committed
102
103
104
105
    log_ppl : str
        The pipeline log file name.
    log_mod : str
        The module log file name.
106
107
    clear_dir : bool
        If True, clear ``dm.dir_l1`` directory.
BO ZHANG's avatar
BO ZHANG committed
108
109
    verbose : bool
        If True, print verbose info.
BO ZHANG's avatar
BO ZHANG committed
110
111
112
113
    n_jobs : int
        The number of jobs.
    backend : str
        The joblib backend.
BO ZHANG's avatar
BO ZHANG committed
114
115
116

    Examples
    --------
BO ZHANG's avatar
BO ZHANG committed
117
    >>> dm_mbi = CsstMsDataManager(...)
BO ZHANG's avatar
BO ZHANG committed
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
    >>> # access L0 directory
    >>> dm_mbi.dir_l0
    >>> # access L1 directory
    >>> dm_mbi.dir_l1
    >>> # access path_aux
    >>> dm_mbi.path_aux
    >>> # access ver_sim
    >>> dm_mbi.ver_sim
    >>> # access target detectors
    >>> dm_mbi.target_detectors
    >>> # access available detectors
    >>> dm_mbi.available_detectors
    >>> # define an L1 file (detector-specified)
    >>> dm_mbi.l1_detector(detector=6)
    >>> # define an L1 file (non-detector-specified)
    >>> dm_mbi.l1_file("flipped_image.fits")
BO ZHANG's avatar
BO ZHANG committed
134
135
    """

BO ZHANG's avatar
BO ZHANG committed
136
    def __init__(self,
BO ZHANG's avatar
tweaks    
BO ZHANG committed
137
138
139
                 ver_sim: str = "C5.2",
                 datatype: str = "mbi",
                 available_detectors: Union[None, list] = None,
BO ZHANG's avatar
BO ZHANG committed
140
                 target_detectors: Union[None, list, int] = None,
BO ZHANG's avatar
tweaks    
BO ZHANG committed
141
142
                 dir_l0: str = ".",
                 dir_l1: str = ".",
BO ZHANG's avatar
BO ZHANG committed
143
                 path_aux: str = "/L1Pipeline/aux",  # aux dir
BO ZHANG's avatar
BO ZHANG committed
144
                 use_dfs: bool = False,
BO ZHANG's avatar
BO ZHANG committed
145
                 dfs_node: str = "kmust",
BO ZHANG's avatar
BO ZHANG committed
146
                 dfs_root: str = "/share/dfs",
BO ZHANG's avatar
BO ZHANG committed
147
148
149
                 obs_id: str = "100000100",
                 exp_start: int = "20270810081950",
                 exp_stop: int = "20270810082220",
BO ZHANG's avatar
tweaks    
BO ZHANG committed
150
151
152
                 _telescope: str = "CSST",
                 _instrument: str = "MSC",
                 _survey: str = "MS",
BO ZHANG's avatar
BO ZHANG committed
153
154
                 obs_type: str = "SCI",
                 l0_post: str = "L0_1",
BO ZHANG's avatar
BO ZHANG committed
155
156
                 log_ppl="csst-l1ppl.log",
                 log_mod="csst-l1mod.log",
157
                 clear_dir=False,
BO ZHANG's avatar
BO ZHANG committed
158
                 verbose=True,
BO ZHANG's avatar
BO ZHANG committed
159
160
                 n_jobs=18,
                 backend="multiprocessing"
BO ZHANG's avatar
BO ZHANG committed
161
162
163
                 ):

        # version
BO ZHANG's avatar
BO ZHANG committed
164
        assert ver_sim in CP["sim"]["versions"]
BO ZHANG's avatar
tweaks    
BO ZHANG committed
165
        self.ver_sim = ver_sim
BO ZHANG's avatar
BO ZHANG committed
166

BO ZHANG's avatar
BO ZHANG committed
167
        # datatype, valid_detectors, detector2filter
168
        assert datatype in ["mbi", "sls", "all"]
BO ZHANG's avatar
tweaks    
BO ZHANG committed
169
        self.datatype = datatype
BO ZHANG's avatar
BO ZHANG committed
170
171
172
173
        if datatype == "mbi":
            # MBI
            self.valid_detectors = CP["mbi"]["detectors"]
            self.detector2filter = CP["mbi"]["detector2filter"]
174
        elif datatype == "sls":
BO ZHANG's avatar
BO ZHANG committed
175
176
177
            # SLS
            self.valid_detectors = CP["sls"]["detectors"]
            self.detector2filter = CP["sls"]["detector2filter"]
178
179
        else:
            # ALL
BO ZHANG's avatar
BO ZHANG committed
180
181
            self.valid_detectors = CP["all"]["detectors"]
            self.detector2filter = CP["all"]["detector2filter"]
BO ZHANG's avatar
BO ZHANG committed
182
183
184
        if verbose:
            print("Data type is: ", self.datatype)
            print("Valid detectors are: ", self.valid_detectors)
BO ZHANG's avatar
BO ZHANG committed
185
186

        # available_detectors
187
        self.available_detectors = available_detectors if available_detectors is not None else list()
BO ZHANG's avatar
BO ZHANG committed
188
189
        if verbose:
            print("Available detectors are:", self.available_detectors)
BO ZHANG's avatar
BO ZHANG committed
190
        # set all available detectors by default
191
        self.target_detectors = target_detectors
BO ZHANG's avatar
BO ZHANG committed
192
193
        if verbose:
            print("Target detectors are: ", self._target_detectors)
BO ZHANG's avatar
BO ZHANG committed
194
195

        # exposure info
BO ZHANG's avatar
BO ZHANG committed
196
197
198
        self.obs_id = obs_id
        self.exp_start = exp_start
        self.exp_stop = exp_stop
BO ZHANG's avatar
BO ZHANG committed
199

BO ZHANG's avatar
BO ZHANG committed
200
201
202
203
        # file name components
        self._telescope = _telescope
        self._instrument = _instrument
        self._survey = _survey
BO ZHANG's avatar
BO ZHANG committed
204
205
        self.obs_type = obs_type
        self.l0_post = l0_post
BO ZHANG's avatar
BO ZHANG committed
206

BO ZHANG's avatar
BO ZHANG committed
207
208
        # DFS configuration
        self.use_dfs = use_dfs
209
        self.dfs_node = dfs_node
BO ZHANG's avatar
BO ZHANG committed
210
        self.dfs_root = dfs_root
BO ZHANG's avatar
BO ZHANG committed
211
212

        # data directory
BO ZHANG's avatar
BO ZHANG committed
213
214
215
        self.dir_l0 = dir_l0
        self.dir_l1 = dir_l1
        self.path_aux = path_aux
BO ZHANG's avatar
BO ZHANG committed
216
        self.ref_version = None
BO ZHANG's avatar
BO ZHANG committed
217

BO ZHANG's avatar
BO ZHANG committed
218
        # record hard code names in history
BO ZHANG's avatar
BO ZHANG committed
219
220
        self.hardcode_history = []

BO ZHANG's avatar
BO ZHANG committed
221
222
        self.n_jobs = n_jobs
        self.backend = backend
BO ZHANG's avatar
BO ZHANG committed
223

BO ZHANG's avatar
BO ZHANG committed
224
        # aXe
BO ZHANG's avatar
BO ZHANG committed
225
        self.set_env()
BO ZHANG's avatar
BO ZHANG committed
226

BO ZHANG's avatar
BO ZHANG committed
227
228
        # change to working directory
        os.chdir(self.dir_l1)
229
230
231
        # clear dir_l1
        if clear_dir:
            self.clear_dir(self.dir_l1)
BO ZHANG's avatar
BO ZHANG committed
232

BO ZHANG's avatar
BO ZHANG committed
233
        # pipeline logger
BO ZHANG's avatar
BO ZHANG committed
234
        self.logger_ppl = get_logger(name="CSST L1 Pipeline Logger", filename=os.path.join(dir_l1, log_ppl))
BO ZHANG's avatar
BO ZHANG committed
235
236
        if verbose:
            self.logger_ppl.info("logger_ppl initialized")
BO ZHANG's avatar
BO ZHANG committed
237
238
        # module logger
        self.logger_mod = get_logger(name="CSST L1 Module Logger", filename=os.path.join(dir_l1, log_mod))
BO ZHANG's avatar
BO ZHANG committed
239
240
        if verbose:
            self.logger_mod.info("logger_mod initialized")
BO ZHANG's avatar
BO ZHANG committed
241

242
243
244
245
        self.custom_bias = None
        self.custom_dark = None
        self.custom_flat = None

246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
    # DFS APIs
    @property
    def dfs_L0DataApi(self):
        return Level0DataApi()

    @property
    def dfs_L1DataApi(self):
        return Level1DataApi()

    @property
    def dfs_L2DataApi(self):
        return Level2DataApi()

    @property
    def dfs_L0PrcApi(self):
        return Level0PrcApi()

    @property
    def dfs_CalApi(self):
        return CalMergeApi()

BO ZHANG's avatar
BO ZHANG committed
267
268
269
270
    @property
    def dfs_CatApi(self):
        return CatalogApi()

BO ZHANG's avatar
BO ZHANG committed
271
272
273
    def set_env(self):
        """ set environment variables """
        if os.uname()[1] == "dandelion":
BO ZHANG's avatar
BO ZHANG committed
274
275
            os.environ["LD_LIBRARY_PATH"] = "/home/csstpipeline/anaconda3/lib"
            os.environ["AXE_BINDIR"] = "/home/csstpipeline/PycharmProjects/axe/cextern/src"
BO ZHANG's avatar
BO ZHANG committed
276
277
        else:
            os.environ["LD_LIBRARY_PATH"] = ""
BO ZHANG's avatar
BO ZHANG committed
278
            os.environ["AXE_BINDIR"] = ""
BO ZHANG's avatar
BO ZHANG committed
279

280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    @property
    def target_detectors(self):
        return self._target_detectors

    @target_detectors.setter
    def target_detectors(self, detectors: Union[None, list, int] = None):
        assert detectors is None or type(detectors) in [list, int]
        if detectors is None:
            self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors))
        elif isinstance(detectors, list):
            self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors) & set(detectors))
        elif isinstance(detectors, int):
            self._target_detectors = list(set(self.available_detectors) & set(self.valid_detectors) & {detectors})

    def set_detectors(self, detectors=None):
        raise DeprecationWarning("This method is deprecated, please directly use dm.target_detectors = detectors!")

BO ZHANG's avatar
BO ZHANG committed
297
    @staticmethod
BO ZHANG's avatar
BO ZHANG committed
298
299
300
301
302
303
304
305
306
307
308
    def from_dir(
            ver_sim="C5.2",
            datatype="mbi",
            dir_l0=".",
            dir_l1=".",
            path_aux="",
            use_dfs=False,
            dfs_node="kmust",
            n_jobs=18,
            backend="multiprocessing"
    ):
BO ZHANG's avatar
BO ZHANG committed
309
        """ initialize the multi-band imaging data manager """
BO ZHANG's avatar
BO ZHANG committed
310

BO ZHANG's avatar
BO ZHANG committed
311
312
313
314
315
316
317
318
319
320
321
322
        assert ver_sim in ["C5.2", ]

        # glob files
        fps_img = CsstMsDataManager.glob_image(dir_l0, ver_sim=ver_sim)
        if len(fps_img) == 0:
            raise FileNotFoundError(f"No file found in dir_l0: {dir_l0}")

        # available detectors
        available_detectors = [int(re.split(r"[_.]", fp)[7]) for fp in fps_img]
        available_detectors.sort()

        # parse info
BO ZHANG's avatar
BO ZHANG committed
323
324
325
        (_telescope, _instrument, _survey, obs_type,
         exp_start, exp_stop, obs_id,
         _detector, *l0_post, _ext) = re.split(r"[_.]", fps_img[0])
BO ZHANG's avatar
BO ZHANG committed
326

BO ZHANG's avatar
BO ZHANG committed
327
328
329
        # exp_start = int(exp_start)
        # exp_stop = int(exp_stop)
        # obs_id = int(obs_id)
BO ZHANG's avatar
BO ZHANG committed
330

BO ZHANG's avatar
BO ZHANG committed
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
        return CsstMsDataManager(
            ver_sim=ver_sim,
            datatype=datatype,
            available_detectors=available_detectors,
            target_detectors=None,
            dir_l0=dir_l0,
            dir_l1=dir_l1,
            path_aux=path_aux,  # bias dark flat
            use_dfs=use_dfs,
            dfs_node=dfs_node,
            obs_id=obs_id,
            exp_start=exp_start,
            exp_stop=exp_stop,
            _telescope=_telescope,
            _instrument=_instrument,
            _survey=_survey,
            obs_type=obs_type,
            l0_post="_".join(l0_post),
            n_jobs=n_jobs,
            backend=backend
        )
BO ZHANG's avatar
BO ZHANG committed
352

BO ZHANG's avatar
BO ZHANG committed
353
    @staticmethod
BO ZHANG's avatar
BO ZHANG committed
354
    def glob_image(dir_l0, ver_sim="C5.2"):
BO ZHANG's avatar
BO ZHANG committed
355
356
357
358
359
360
361
362
363
364
        """ glob files in L0 data directory """
        if ver_sim == "C3":
            pattern = os.path.join(dir_l0, "MSC_MS_*_raw.fits")
        else:
            assert ver_sim in ["C5.1", "C5.2"]
            pattern = os.path.join(dir_l0, "CSST_MSC_MS_SCI_*.fits")
        fps = glob.glob(pattern)
        fps = [os.path.basename(fp) for fp in fps]
        fps.sort()

365
        print("{} files found with pattern: {}".format(len(fps), pattern))
BO ZHANG's avatar
BO ZHANG committed
366
367
368
369
370
        return fps

    @staticmethod
    def glob_cat(dir_l0, ver_sim="C5"):
        """ glob input catalogs in L0 data directory """
BO ZHANG's avatar
BO ZHANG committed
371
372
        assert ver_sim in ["C5.1", "C5.2"]
        pattern = os.path.join(dir_l0, "MSC_*.cat")
BO ZHANG's avatar
BO ZHANG committed
373
374
375
376
377
378
379
        fps = glob.glob(pattern)
        fps = [os.path.basename(fp) for fp in fps]
        fps.sort()

        print("@DM.glob_dir: {} files found with pattern: {}".format(len(fps), pattern))
        return fps

BO ZHANG's avatar
BO ZHANG committed
380
381
382
383
    def l0_id(self, detector=6):
        """ Level0 ID, consistent with DFS. """
        return f"{self.obs_id}{detector:02d}"

BO ZHANG's avatar
BO ZHANG committed
384
385
    def l0_cat(self, detector=6):
        """ the L0 cat file path"""
BO ZHANG's avatar
BO ZHANG committed
386
387
        assert self.ver_sim == "C5.2"
        fn = "{}_{}_chip_{:02d}_filt_{}.cat".format(
BO ZHANG's avatar
BO ZHANG committed
388
            self._instrument, self.obs_id, detector, self.detector2filter[detector])
BO ZHANG's avatar
BO ZHANG committed
389
390
391
392
        return os.path.join(self.dir_l0, fn)

    def l0_log(self, detector=6):
        """ L0 log file path """
BO ZHANG's avatar
BO ZHANG committed
393
394
        assert self.ver_sim == "C5.2"
        fn = "{}_{}_chip_{:02d}_filt_{}.log".format(
BO ZHANG's avatar
BO ZHANG committed
395
            self._instrument, self.obs_id, detector, self.detector2filter[detector])
BO ZHANG's avatar
BO ZHANG committed
396
397
398
399
        return os.path.join(self.dir_l0, fn)

    def l0_detector(self, detector=6):
        """ L0 detector-specific image file path """
BO ZHANG's avatar
BO ZHANG committed
400
401
402
        assert self.ver_sim in ["C5.1", "C5.2"]
        fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_L0_1.fits".format(
            self._telescope, self._instrument, self._survey,
BO ZHANG's avatar
BO ZHANG committed
403
            self.exp_start, self.exp_stop, self.obs_id, detector)
BO ZHANG's avatar
BO ZHANG committed
404
405
406
407
        return os.path.join(self.dir_l0, fn)

    def l0_crs(self, detector=6):
        """ L0 cosmic ray file path """
BO ZHANG's avatar
BO ZHANG committed
408
409
410
        assert self.ver_sim in ["C5.1", "C5.2"]
        fn = "{}_{}_{}_CRS_{}_{}_{}_{:02d}_L0_1.fits".format(
            self._telescope, self._instrument, self._survey,
BO ZHANG's avatar
BO ZHANG committed
411
            self.exp_start, self.exp_stop, self.obs_id, detector)
BO ZHANG's avatar
BO ZHANG committed
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
        return os.path.join(self.dir_l0, fn)

    def l1_detector(self, detector=6, post="img.fits"):
        """ generate L1 file path

        Parameters
        ----------
        detector:
            detector ID
        post:
            postfix
            e.g, {"img.fits", "wht.fits", "flg.fits", "img_L1.fits", "wht_L1.fits", "flg_L1.fits"}

        Returns
        -------
        L1 file path

        """
BO ZHANG's avatar
BO ZHANG committed
430
431
432
        assert self.ver_sim in ["C5.1", "C5.2"]
        fn = "{}_{}_{}_SCI_{}_{}_{}_{:02d}_{}".format(
            self._telescope, self._instrument, self._survey,
BO ZHANG's avatar
BO ZHANG committed
433
            self.exp_start, self.exp_stop, self.obs_id, detector, post)
BO ZHANG's avatar
BO ZHANG committed
434
435
        return os.path.join(self.dir_l1, fn)

BO ZHANG's avatar
BO ZHANG committed
436
    def get_bias(self, detector=6):
BO ZHANG's avatar
tweaks    
BO ZHANG committed
437
        """ get bias data """
438
        if self.custom_bias is None:
BO ZHANG's avatar
BO ZHANG committed
439
440
441
442
443
444
445
            return os.path.join(
                self.path_aux,
                "C6.1_ref_crds",
                "csst_msc_{}_{:02d}_{:04d}.fits".format(
                    "bias", detector, 1 if self.ref_version is None else self.ref_version
                )
            )
BO ZHANG's avatar
BO ZHANG committed
446
        else:
447
            return self.custom_bias.format("bias", detector)
BO ZHANG's avatar
BO ZHANG committed
448

BO ZHANG's avatar
BO ZHANG committed
449
    def get_dark(self, detector=6):
BO ZHANG's avatar
tweaks    
BO ZHANG committed
450
        """ get dark data """
451
        if self.custom_dark is None:
BO ZHANG's avatar
BO ZHANG committed
452
453
454
455
456
457
458
            return os.path.join(
                self.path_aux,
                "C6.1_ref_crds",
                "csst_msc_{}_{:02d}_{:04d}.fits".format(
                    "dark", detector, 1 if self.ref_version is None else self.ref_version
                )
            )
BO ZHANG's avatar
BO ZHANG committed
459
        else:
460
            return self.custom_dark.format("dark", detector)
BO ZHANG's avatar
BO ZHANG committed
461

BO ZHANG's avatar
BO ZHANG committed
462
    def get_flat(self, detector=6):
BO ZHANG's avatar
tweaks    
BO ZHANG committed
463
        """ get flat data """
464
        if self.custom_flat is None:
BO ZHANG's avatar
BO ZHANG committed
465
466
467
468
469
470
471
            return os.path.join(
                self.path_aux,
                "C6.1_ref_crds",
                "csst_msc_{}_{:02d}_{:04d}.fits".format(
                    "flat", detector, 1 if self.ref_version is None else self.ref_version
                )
            )
BO ZHANG's avatar
BO ZHANG committed
472
        else:
473
            return self.custom_flat.format("flat", detector)
BO ZHANG's avatar
BO ZHANG committed
474

BO ZHANG's avatar
BO ZHANG committed
475
    def get_axeconf(self):
BO ZHANG's avatar
BO ZHANG committed
476
        return os.path.join(self.path_aux, "axeconf")  # "/home/csstpipeline/L1Pipeline/aux/axeconf"
BO ZHANG's avatar
BO ZHANG committed
477

BO ZHANG's avatar
BO ZHANG committed
478
479
480
481
482
    def l1_file(self, name="", comment=""):
        """ L1 file path

        Parameters
        ----------
BO ZHANG's avatar
BO ZHANG committed
483
        name : str
BO ZHANG's avatar
BO ZHANG committed
484
            file name
BO ZHANG's avatar
BO ZHANG committed
485
        comment : str
BO ZHANG's avatar
BO ZHANG committed
486
487
488
489
490
491
492
493
494
495
496
497
498
            use the function name plz

        Returns
        -------
        fp: str
            the synthetic file path

        """
        fp = os.path.join(self.dir_l1, name)
        # record hardcode history
        self.hardcode_history.append(dict(hdcd=fp, comment=comment))
        return fp

BO ZHANG's avatar
tweaks    
BO ZHANG committed
499
500
    def get_sls_info(self):
        """ Get the target SLS image header info and return. """
BO ZHANG's avatar
BO ZHANG committed
501
        # if self.use_dfs:
BO ZHANG's avatar
BO ZHANG committed
502
503
504
505
        #     raise NotImplementedError()
        # else:
        assert len(self.target_detectors) == 1
        header = fits.getheader(self.l0_detector(self.target_detectors[0]), ext=1)
BO ZHANG's avatar
BO ZHANG committed
506
507
        return header

BO ZHANG's avatar
tweaks    
BO ZHANG committed
508
509
    def get_mbi_info(self):
        """ Get all MBI image header info and return as a table. """
BO ZHANG's avatar
BO ZHANG committed
510
        # if self.use_dfs:
BO ZHANG's avatar
BO ZHANG committed
511
512
513
        #     raise NotImplementedError()
        # else:
        info = Table.read("/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/t_mbi_l1.fits")
BO ZHANG's avatar
BO ZHANG committed
514
515
        return info

BO ZHANG's avatar
BO ZHANG committed
516
    @staticmethod
517
    def quickstart(ver_sim="C5.2", datatype="mbi", dir_l1=".", exposure_id=100,
518
                   use_dfs=False, dfs_node="kmust", clear_l1=False, n_jobs=18, backend="multiprocessing"):
519
520
        """
        Quick dataset generator for tests on dandelion or PML
BO ZHANG's avatar
BO ZHANG committed
521
522
523

        Parameters
        ----------
BO ZHANG's avatar
BO ZHANG committed
524
        ver_sim : str
BO ZHANG's avatar
BO ZHANG committed
525
            {"C5.2"}
BO ZHANG's avatar
BO ZHANG committed
526
        datatype : str
BO ZHANG's avatar
BO ZHANG committed
527
            {"mbi", "sls"}
BO ZHANG's avatar
BO ZHANG committed
528
        dir_l1 : str
BO ZHANG's avatar
BO ZHANG committed
529
            output directory
BO ZHANG's avatar
BO ZHANG committed
530
        exposure_id : int
BO ZHANG's avatar
BO ZHANG committed
531
            The serial number of the exposure. 20-154 for C5.2.
BO ZHANG's avatar
BO ZHANG committed
532
        use_dfs : bool
BO ZHANG's avatar
BO ZHANG committed
533
            If True, use DFS.
BO ZHANG's avatar
BO ZHANG committed
534
535
        dfs_node : str
            The DFS node. Defaults to "kmust", could be "pml".
536
        clear_l1 : bool
537
            If True, clear dir_l1.
538
539
540
541
        n_jobs : int
            The number of jobs.
        backend : str
            The joblib backend.
BO ZHANG's avatar
BO ZHANG committed
542
543
544

        Returns
        -------
BO ZHANG's avatar
tweaks    
BO ZHANG committed
545
        CsstMsDataManager
BO ZHANG's avatar
BO ZHANG committed
546
            The Main Survey Data Manager instance.
BO ZHANG's avatar
BO ZHANG committed
547
548
549
550
551
552
553
554
555
556
        """
        assert datatype in ["mbi", "sls"]
        # auto identify node name
        hostname = os.uname()[1]
        assert hostname in ["dandelion", "ubuntu"]

        # dandelion
        if hostname == "dandelion" and datatype == "mbi":
            dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/multipleBandsImaging/" \
                     "NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
BO ZHANG's avatar
BO ZHANG committed
557
            path_aux = "/nfsdata/users/csstpipeline/L1Pipeline/aux"
BO ZHANG's avatar
BO ZHANG committed
558
        elif hostname == "dandelion" and datatype == "sls":
BO ZHANG's avatar
BO ZHANG committed
559
            dir_l0 = "/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/" \
BO ZHANG's avatar
BO ZHANG committed
560
                     "NGP_AstrometryON_shearOFF_Spec/MSC_{:07d}/".format(exposure_id)
BO ZHANG's avatar
BO ZHANG committed
561
            path_aux = "/nfsdata/users/csstpipeline/L1Pipeline/aux"
BO ZHANG's avatar
BO ZHANG committed
562
563
564
565
566
567
568
569
570
571
572

        # PMO
        elif hostname == "ubuntu" and datatype == "mbi":
            dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
                     "NGP_AstrometryON_shearOFF/MSC_{:07d}/".format(exposure_id)
            path_aux = "/data/sim_data/MSC_0000100/ref/MSC_{}_*_{:02d}_combine.fits"
        elif hostname == "ubuntu" and datatype == "sls":
            dir_l0 = "/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/" \
                     "NGP_AstrometryON_shearOFF_Spec/MSC_{:07d}/".format(exposure_id)
            path_aux = ""
        else:
BO ZHANG's avatar
tweaks    
BO ZHANG committed
573
            raise ValueError("@DM: invalid hostname {} or datatype {}!".format(hostname, datatype))
BO ZHANG's avatar
BO ZHANG committed
574

BO ZHANG's avatar
BO ZHANG committed
575
        return CsstMsDataManager.from_dir(
BO ZHANG's avatar
BO ZHANG committed
576
            ver_sim=ver_sim, datatype=datatype, dir_l0=dir_l0, dir_l1=dir_l1, path_aux=path_aux,
BO ZHANG's avatar
BO ZHANG committed
577
578
            use_dfs=use_dfs, dfs_node=dfs_node, n_jobs=n_jobs, backend=backend
        )
BO ZHANG's avatar
BO ZHANG committed
579

580
581
582
    def __repr__(self):
        lines = ""
        lines += "<CsstMsDataManager>\n"
583
584
585
586
587
588
        lines += f"- Data type = {self.datatype}\n"
        lines += f"- Valid detectors = {self.valid_detectors}\n"
        lines += f"- Available detectors = {self.available_detectors}\n"
        lines += f"- Target detectors = {self.target_detectors}\n"
        lines += f"- dir_l0 = {self.dir_l0}\n"
        lines += f"- dir_l1 = {self.dir_l1}\n"
BO ZHANG's avatar
BO ZHANG committed
589
        lines += f"- use_dfs = {self.use_dfs}\n"
590
591
        lines += f"- dfs_node = {self.dfs_node}\n"
        lines += f"- CSST_DFS_GATEWAY = " + os.getenv("CSST_DFS_GATEWAY") + "\n"
592
593
        return lines

594
    def remove_files(self, fmt="*.fits"):
BO ZHANG's avatar
BO ZHANG committed
595
        """ Remove L1 files conforming the format. """
596
        os.system(f"rm -rf {os.path.join(self.dir_l1, fmt)}")
597

BO ZHANG's avatar
BO ZHANG committed
598
599
600
601
    def remove_dir(self, dir_name):
        """ Remove L1 (sub-)directory. """
        os.system(f"rm -rf {os.path.join(self.dir_l1, dir_name)}")

602
603
604
605
    @staticmethod
    def clear_dir(dir_path):
        os.system(f"rm -rf {dir_path}/*")

BO ZHANG's avatar
BO ZHANG committed
606
607
608
609
610
611
612
613
614
615
616
617
    # DFS interfaces
    @property
    def dfs_node(self):
        return self._dfs_node

    @dfs_node.setter
    def dfs_node(self, dfs_node):
        # for DFS configuration, defaults to "local", could be "pml"
        assert dfs_node in DFS_CONF.keys()
        self._dfs_node = dfs_node
        for k, v in DFS_CONF[dfs_node].items():
            os.environ[k] = v
618

619
620
    @staticmethod
    def dfs_is_available():
BO ZHANG's avatar
BO ZHANG committed
621
622
        """ Test if DFS works. """
        try:
BO ZHANG's avatar
BO ZHANG committed
623
            tbl = CatalogApi().catalog_query(
BO ZHANG's avatar
tweaks    
BO ZHANG committed
624
625
626
627
628
629
630
631
632
                catalog_name="gaia3",
                ra=180,
                dec=0,
                radius=.1,
                columns=("ra", "dec"),
                min_mag=0,
                max_mag=30,
                obstime=-1,
                limit=-1
BO ZHANG's avatar
BO ZHANG committed
633
634
            )
            return len(tbl) > 0
BO ZHANG's avatar
BO ZHANG committed
635
636
637
        except:
            return False

BO ZHANG's avatar
BO ZHANG committed
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
    def dfs_rc_query(
            self,
            ra=180,
            dec=0,
            radius=2,
            columns=(
                    'ref_epoch',
                    'ra',
                    'ra_error',
                    'dec',
                    'dec_error',
                    'parallax',
                    'parallax_error',
                    'pmra',
                    'pmra_error',
                    'pmdec',
                    'pmdec_error',
                    'phot_g_mean_mag',
                    'source_id'
            ),
            min_mag=0,
            max_mag=30,
            obstime=-1,
            limit=-1
    ):
BO ZHANG's avatar
BO ZHANG committed
663
664
665
666
667
        """ Query Reference Catalog (RC) from DFS.
        Ref.
        https://gea.esac.esa.int/archive/documentation/GDR3/Gaia_archive/chap_datamodel/
        sec_dm_main_source_catalogue/ssec_dm_gaia_source.html
        """
BO ZHANG's avatar
BO ZHANG committed
668
        try:
BO ZHANG's avatar
BO ZHANG committed
669
            cat = self.dfs_CatApi.catalog_query(
BO ZHANG's avatar
BO ZHANG committed
670
671
672
673
674
675
676
677
678
                catalog_name="gaia3",
                ra=ra,
                dec=dec,
                columns=columns,
                radius=radius,
                min_mag=min_mag,
                max_mag=max_mag,
                obstime=obstime,
                limit=limit
BO ZHANG's avatar
BO ZHANG committed
679
680
            )
            tbl = self.dfs_CatApi.to_table(cat)
BO ZHANG's avatar
BO ZHANG committed
681
682
683
684
685
686
687
688
689
            return tbl
        except:
            print("Error occurred during the query!")
            return None

    def dfs_l1_push(self):
        """ Push MBI/SLS L1 data to DFS. """
        # l1api = get_l1api()
        # l1api.write()
690
        return
BO ZHANG's avatar
BO ZHANG committed
691
692
693
694
695

    def dfs_l2_push(self):
        """ Push SLS spectra to DFS. """
        pass

BO ZHANG's avatar
BO ZHANG committed
696
697
698
699
700
701
702
703
704
705
706
707
708
    @staticmethod
    def from_l1id(
            l1_id="1000000001",
            datatype="sls",
            dir_l0="/L1Pipeline/L0",
            dir_l1="/L1Pipeline/L1",
            use_dfs=True,
            dfs_node="pml",
            clear_l1=False,
            dfs_root="/share/dfs"
    ):
        pass

BO ZHANG's avatar
BO ZHANG committed
709
    @staticmethod
BO ZHANG's avatar
BO ZHANG committed
710
711
712
713
714
    def from_dfs(
            obs_id="100000100",
            datatype="mbi",
            dir_l0="/L1Pipeline/L0",
            dir_l1="/L1Pipeline/L1",
BO ZHANG's avatar
tweaks    
BO ZHANG committed
715
            path_aux="/L1Pipeline/aux",
BO ZHANG's avatar
BO ZHANG committed
716
            use_dfs=True,
BO ZHANG's avatar
BO ZHANG committed
717
718
            dfs_node="pml",
            clear_l1=False,
BO ZHANG's avatar
BO ZHANG committed
719
720
721
            dfs_root="/share/dfs",
            n_jobs=18,
            backend="multiprocessing"
BO ZHANG's avatar
BO ZHANG committed
722
    ):
BO ZHANG's avatar
BO ZHANG committed
723
        """ Initialize CsstMsDataManager from DFS. """
724
725
726
727
728
        # (clear and) make directories
        if os.path.exists(dir_l0):
            os.system(f"rm -rf {dir_l0}/*")
        else:
            os.mkdir(dir_l0)
BO ZHANG's avatar
BO ZHANG committed
729
730
731
732
        # if os.path.exists(dir_l1):
        #     os.system(f"rm -rf {dir_l1}/*")
        # else:
        #     os.mkdir(dir_l1)
BO ZHANG's avatar
BO ZHANG committed
733
734
735
        # os.chdir(dir_l1)
        if not os.path.exists(dir_l1):
            os.mkdir(dir_l1)
BO ZHANG's avatar
BO ZHANG committed
736
737
        elif clear_l1:
            os.system(f"rm -rf {dir_l1}/*")
738
739
        os.chdir(dir_l1)

BO ZHANG's avatar
BO ZHANG committed
740
        print(f"Query obs_id={obs_id} ...", end="")
BO ZHANG's avatar
BO ZHANG committed
741
        records = CsstMsDataManager(dfs_node=dfs_node, verbose=False).dfs_L0DataApi.find(obs_id=obs_id)
BO ZHANG's avatar
tweaks    
BO ZHANG committed
742
        print(f"{records['totalCount']} records obtained!")
BO ZHANG's avatar
BO ZHANG committed
743
744
745
        tbl = Table([_.__dict__ for _ in records["data"]])
        tbl.sort(["detector_no", "obs_type"])

BO ZHANG's avatar
tweaks    
BO ZHANG committed
746
747
        print("Making symbolic links ...")
        for i_rec in range(len(tbl)):
BO ZHANG's avatar
BO ZHANG committed
748
            os.symlink(
BO ZHANG's avatar
tweaks    
BO ZHANG committed
749
750
                src=os.path.join(dfs_root, tbl["file_path"][i_rec]),
                dst=os.path.join(dir_l0, os.path.basename(tbl["file_path"][i_rec])),
BO ZHANG's avatar
BO ZHANG committed
751
752
            )

753

BO ZHANG's avatar
BO ZHANG committed
754
755
756
        # initialize dm
        dm = CsstMsDataManager.from_dir(
            ver_sim="C5.2", datatype=datatype, dir_l0=dir_l0, dir_l1=dir_l1,
BO ZHANG's avatar
BO ZHANG committed
757
758
            path_aux=path_aux, use_dfs=use_dfs, dfs_node=dfs_node,
            n_jobs=n_jobs, backend=backend
BO ZHANG's avatar
BO ZHANG committed
759
        )
BO ZHANG's avatar
BO ZHANG committed
760
        assert dm.obs_id == obs_id
BO ZHANG's avatar
BO ZHANG committed
761
762
763

        return dm

BO ZHANG's avatar
BO ZHANG committed
764
    def dfs_l0_query(self, obs_id: str = "100000100"):
BO ZHANG's avatar
BO ZHANG committed
765
        """ Query L0 data from DFS. """
BO ZHANG's avatar
BO ZHANG committed
766
        result = self.dfs_L0DataApi.find(obs_id=str(obs_id))
BO ZHANG's avatar
BO ZHANG committed
767
        print(f"{result['totalCount']} records returned from DFS.")
BO ZHANG's avatar
tweaks    
BO ZHANG committed
768
769
770
        if not result["code"] == 0:
            raise ValueError(f"DFS returns non-zero code! ({result['code']})")
        tbl = Table([_.__dict__ for _ in result["data"]])
BO ZHANG's avatar
BO ZHANG committed
771
772
773
774
775
776
777
        tbl.sort(["detector_no", "obs_type"])
        # Check if all 30 detectors are available
        for detector in CP["all"]["detectors"]:
            for obs_type in ["sci", "cosmic_ray"]:
                if np.sum((tbl["detector_no"] == f"{detector:02d}") & (tbl["obs_type"] == obs_type)) == 0:
                    self.logger_ppl.warning(f"Record not found for detector {detector:02d} and obs_type {obs_type}")
        return tbl
BO ZHANG's avatar
BO ZHANG committed
778

BO ZHANG's avatar
BO ZHANG committed
779
780
781
782
783
784
785
786
787
788
789
790
    def dfs_l0_check_all(self):
        """ Check all C5.2 L0 data is available in DFS. """
        is_good = True
        for obs_id in range(100000020, 100000155):
            tbl = self.dfs_l0_query(obs_id=f"{obs_id}")
            if len(tbl) == 60:
                self.logger_ppl.info(f"DFS returns {len(tbl)} records for obs_id={obs_id}")
            else:
                is_good = False
                self.logger_ppl.warning(f"DFS returns {len(tbl)} records for obs_id={obs_id}")
        return is_good

791
    def dfs_l1_query(self, obs_id, detector):
BO ZHANG's avatar
BO ZHANG committed
792
793
        """ Query L1 data from DFS. """
        pass
794
795
796
797


# temporarily compatible with old interface
CsstMbiDataManager = CsstMsDataManager