Commit 50ccbd08 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

remove data_manager.py and dfs.py

parent ea9aa5ea
Pipeline #11718 passed with stage
"""
Identifier: KSC-SJ4-csst_common/data_manager.py
Name: data_manager.py
Description: file path generator
Author: Bo Zhang
Created: 2022-09-13
Modified-History:
2022-09-13, Bo Zhang, created
2022-09-13, Bo Zhang, added CsstMbiDataManager
2022-09-29, Bo Zhang, favor CsstMsDataManager instead of CsstMbiDataManager
2022-10-26, Bo Zhang, reconstruct CsstMsDataManager, deprecate CsstMbiDataManager
2022-10-28, Bo Zhang, added CsstMsDataManager.query_rc(), dm.use_dfs, dm.node
2022-11-06, Bo Zhang, deleted CsstMbiDataManager
2022-11-20, Bo Zhang, added DFS APIs
"""
import glob
import os
import re
from typing import Union
import numpy as np
from astropy import time
from astropy.io import fits
from astropy.table import Table
from csst_dfs_api.common.catalog import CatalogApi
from csst_dfs_api.facility.level0 import Level0DataApi
from csst_dfs_api.facility.level0prc import Level0PrcApi
from csst_dfs_api.facility.level1 import Level1DataApi
from csst_dfs_api.facility.level1prc import Level1PrcApi
from csst_dfs_api.mbi.level2 import Level2DataApi as MbiLevel2DataApi
from csst_dfs_api.sls.level2spectra import Level2SpectraApi as SlsLevel2DataApi
from .dfs import DFS
from .logger import get_logger
from .params import CSST_PARAMS as CP
from .time import now
raise DeprecationWarning("DataManager is deprecated and will be no longer available")
class CsstMsDataManager:
"""
CSST MS data manager, including MBI and SLS.
``CsstMsDataManager`` provides an interface to switch between DFS and local file system.
To initialize ``CsstMsDataManager`` from local directory, use ``CsstMsDataManager.from_dir()``
To initialize ``CsstMsDataManager`` on ``dandelion`` or ``PM node``, ``CsstMsDataManager.quickstart()``.
To initialize ``CsstMsDataManager`` from DFS, use ``CsstMsDataManager.from_dfs()``.
To generate L0 and L1 file paths, use ``CsstMsDataManager.l0_detector()``, ``CsstMsDataManager.l1_detector()``, etc.
Here are some examples for simulation with different versions.
C3:
* MSC_MS_210525220000_100000020_06_raw.fits
* MSC_CRS_210525220000_100000020_06_raw.fits
* MSC_210525120000_0000020_06.cat
C5.1:
* CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
* CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
* MSC_10000100_chip_06_filt_y.cat
* MSC_10000100_chip_06_filt_y.log
C5.2
* CSST_MSC_MS_SCI_20270810081950_20270810082220_100000100_06_L0_1.fits
* CSST_MSC_MS_CRS_20270810081950_20270810082220_100000100_06_L0_1.fits
* MSC_100000100_chip_06_filt_y.cat
* MSC_100000100_chip_06_filt_y.log
C6.2
* CSST_MSC_MS_SCIE_20280716233448_20280716233718_10160000100_06_L0_V01.fits
* CSST_MSC_MS_CRS_20280716233448_20280716233718_10160000100_06_L0_V01.fits
* MSC_10160000100_chip_06_filt_y.cat
* MSC_10160000100_chip_06_filt_y.log
Parameters
----------
ver_sim : str
The version of simulation data, see ``csst_common.params.CP``.
datatype : str
The options are {"mbi", "sls", "all"}.
The "all" option is used for QC in particular.
Note that in this case methods like ``get_bias`` are unavailable.
available_detectors : list
The list of available detector serial numbers of available images.
target_detectors : list
The list of target detector serial numbers of available images.
dir_l0 : str
The L0 directory.
dir_l1 : str
The L1 directory.
path_aux : str
The aux data directory (bias, flat, dark).
dfs_root : str
The DFS root path.
telescope : str
The telescope pipeline_id. Defaults to ``CSST`` for C5.2 simulation.
instrument : str
The instrument pipeline_id. Defaults to ``MSC`` for C5.2 simulation.
project : str
The survey pipeline_id. Defaults to ``MS`` for C5.2 simulation.
obs_type : str
The image type signature for science images. Defualts to ``SCI`` for C5.2 simulation.
exp_start : int
The exposure start time in ``yyyymmddhhmmss`` format.
exp_stop : int
The exposure start time in ``yyyymmddhhmmss`` format.
obs_id : str
The exposure ID.
level : str
The data level, i.e., ``0``.
version : str
The L0 version.
ext : str
The extension pipeline_id, i.e., ``.fits``.
log_ppl : str
The pipeline log file pipeline_id.
log_mod : str
The module log file pipeline_id.
clear_dir : bool
If True, clear ``dm.dir_l1`` directory.
verbose : bool
If True, print verbose info.
n_jobs : int
The number of CPU jobs.
n_jobs_gpu : int
The number of GPU jobs.
backend : str
The joblib backend.
device : str
The device for neural network. "CPU" or "GPU".
stamps : str
The time stamps.
Examples
--------
>>> dm = CsstMsDataManager.from_dir(
>>> ver_sim="C6.2", datatype="mbi", dir_l0="/Users/cham/L1Pipeline/L0_MBI_C6.2", dir_l1=".")
>>> # access L0 directory
>>> dm.dir_l0
>>> # access L1 directory
>>> dm.dir_l1
>>> # access path_aux
>>> dm.path_aux
>>> # access target detectors
>>> dm.target_detectors
>>> # access available detectors
>>> dm.available_detectors
>>> # generate an L1 file path (detector-specified)
>>> dm.l1_detector(detector=6, post="IMG.fits")
>>> # define an L1 file (non-detector-specified)
>>> dm.l1_file("flipped_image.fits")
"""
# TODO: update docstring
def __init__(
self,
ver_sim: str = "C6.2",
datatype: str = "mbi",
available_detectors: Union[None, list] = None,
target_detectors: Union[None, list, int] = None,
dir_l0: str = ".",
dir_l1: str = ".",
path_aux: str = "/pipeline/aux", # aux dir
dfs_root: str = "/dfsroot",
telescope: str = "CSST",
instrument: str = "MSC",
project: str = "MS",
obs_type: str = "SCI",
exp_start: int = "20270810081950",
exp_stop: int = "20270810082220",
obs_id: str = "100000100",
level: str = "0",
version: str = "01",
ext: str = "fits",
log_ppl: str = "",
log_mod: str = "",
clear_dir: bool = False,
verbose: bool = True,
n_jobs: int = 18,
n_jobs_gpu: int = 1,
backend: str = "multiprocessing",
device: str = "CPU",
stamps: str = "",
):
# set DFS log dir
os.environ["CSST_DFS_LOGS_DIR"] = "."
# version
assert ver_sim in CP["sim"]["versions"]
self.ver_sim = ver_sim
# datatype, valid_detectors, detector2filter
assert datatype in ["mbi", "sls", "all"]
self.datatype = datatype
if datatype == "mbi":
# MBI
self.valid_detectors = CP["mbi"]["detectors"]
self.detector2filter = CP["mbi"]["detector2filter"]
elif datatype == "sls":
# SLS
self.valid_detectors = CP["sls"]["detectors"]
self.detector2filter = CP["sls"]["detector2filter"]
else:
# ALL
self.valid_detectors = CP["all"]["detectors"]
self.detector2filter = CP["all"]["detector2filter"]
if verbose:
print("Data type is: ", self.datatype)
print("Valid detectors are: ", self.valid_detectors)
# available_detectors
self.available_detectors = (
available_detectors if available_detectors is not None else list()
)
if verbose:
print("Available detectors are:", self.available_detectors)
# set all available detectors by default
self.target_detectors = target_detectors
if verbose:
print("Target detectors are: ", self._target_detectors)
# exposure info
self.obs_id = obs_id
self.exp_start = exp_start
self.exp_stop = exp_stop
# file pipeline_id components
self.telescope = telescope
self.instrument = instrument
self.project = project
self.obs_type = obs_type
self.level = level
self.version = version
self.ext = ext
# DFS configuration
# self.use_dfs = use_dfs
# self.dfs_node = dfs_node
self.dfs_root = dfs_root
# data directory
self.dir_l0 = dir_l0
self.dir_l1 = dir_l1
self.path_aux = path_aux
self.ref_version = None
# record hard code names in history
self.hardcode_history = []
# parallel configuration
self.n_jobs = n_jobs
self.n_jobs_gpu = n_jobs_gpu
self.backend = backend
self.device = device
# change to working directory
os.chdir(self.dir_l1)
# clear dir_l1
if clear_dir:
self.clear_dir(self.dir_l1)
# L1 whitelist
self.l1_whitelist = [
self.l1_file(name="mrs.csv"),
]
for detector in self.target_detectors:
if self.datatype == "mbi":
self.l1_whitelist.append(
self.l1_detector(detector=detector, post="img_L1.fits")
)
self.l1_whitelist.append(
self.l1_detector(detector=detector, post="wht_L1.fits")
)
self.l1_whitelist.append(
self.l1_detector(detector=detector, post="flg_L1.fits")
)
self.l1_whitelist.append(
self.l1_detector(detector=detector, post="cat.fits")
)
self.l1_whitelist.append(
self.l1_detector(detector=detector, post="psf.fits")
)
elif self.datatype == "sls":
self.l1_whitelist.append(
self.l1_detector(detector=detector, post="L1_1.fits")
)
# pipeline logger
if log_ppl == "":
self.logger_ppl = get_logger(name="CSST L1 Pipeline Logger", filename="")
else:
self.logger_ppl = get_logger(
name="CSST L1 Pipeline Logger", filename=os.path.join(dir_l1, log_ppl)
)
self.l1_whitelist.append(os.path.join(dir_l1, log_ppl))
# module logger
if log_mod == "":
self.logger_mod = get_logger(name="CSST L1 Module Logger", filename="")
else:
self.logger_mod = get_logger(
name="CSST L1 Module Logger", filename=os.path.join(dir_l1, log_mod)
)
self.l1_whitelist.append(os.path.join(dir_l1, log_mod))
self.custom_bias = None
self.custom_dark = None
self.custom_flat = None
self.stamps = stamps
if self.stamps != "":
self.l1_whitelist.append(self.l1_file(self.stamps))
self.write_stamp()
self.crds = CRDS()
self.dfs = DFS()
def write_stamp(self):
if self.stamps is not None and self.stamps != "":
with open(self.l1_file(self.stamps), "a+") as f:
f.write(f"{time.Time.now().isot}+08:00\n")
@staticmethod
def now():
return now()
# DFS APIs
@property
def dfs_L0DataApi(self):
return Level0DataApi()
@property
def dfs_L0PrcApi(self):
return Level0PrcApi()
@property
def dfs_L1DataApi(self):
return Level1DataApi()
@property
def dfs_L1PrcApi(self):
return Level1PrcApi()
@property
def dfs_MbiL2DataApi(self):
return MbiLevel2DataApi()
@property
def dfs_SlsL2DataApi(self):
return SlsLevel2DataApi()
@property
def dfs_CatApi(self):
return CatalogApi()
@property
def target_detectors(self):
return self._target_detectors
@target_detectors.setter
def target_detectors(self, detectors: Union[None, list, int] = None):
assert detectors is None or type(detectors) in [list, int]
if detectors is None:
self._target_detectors = list(
set(self.available_detectors) & set(self.valid_detectors)
)
elif isinstance(detectors, list):
self._target_detectors = list(
set(self.available_detectors)
& set(self.valid_detectors)
& set(detectors)
)
elif isinstance(detectors, int):
self._target_detectors = list(
set(self.available_detectors) & set(self.valid_detectors) & {detectors}
)
def set_detectors(self, detectors=None):
raise DeprecationWarning(
"This method is deprecated, please directly use dm.target_detectors = detectors!"
)
@staticmethod
def from_dir(
ver_sim="C6.2",
datatype="mbi",
dir_l0=".",
dir_l1=".",
path_aux="",
pattern="CSST_MSC_*_SCIE_*.fits",
log_ppl="csst-l1ppl.log",
log_mod="csst-l1mod.log",
n_jobs=18,
backend="multiprocessing",
device="CPU",
**kwargs,
):
"""initialize the multi-band imaging data manager"""
assert ver_sim in CP["sim"]["versions"]
# glob files
fps_img = CsstMsDataManager.glob_image(dir_l0=dir_l0, pattern=pattern)
if len(fps_img) == 0:
raise FileNotFoundError(f"No file found with pattern {pattern} in {dir_l0}")
# parse filename
if ver_sim in ["C5.2", "C6.1"]:
pattern = re.compile(
r"(?P<telescope>[A-Z]+)_"
r"(?P<instrument>[A-Z]+)_"
r"(?P<project>[A-Z]+)_"
r"(?P<obs_type>[A-Z]+)_"
r"(?P<exp_start>[0-9]{14})_"
r"(?P<exp_stop>[0-9]{14})_"
r"(?P<obs_id>[0-9]{9})_"
r"(?P<detector>[0-9]{2})_"
r"L(?P<level>[0-9]{1})_"
r"(?P<version>[0-9]{1})"
r".(?P<ext>[a-z]{4})"
)
mo = re.fullmatch(pattern, fps_img[0])
assert mo is not None
mogd = mo.groupdict()
mogd.pop("detector")
else:
pattern = re.compile(
r"(?P<telescope>[A-Z]+)_"
r"(?P<instrument>[A-Z]+)_"
r"(?P<project>[A-Z]+)_"
r"(?P<obs_type>[A-Z]+)_"
r"(?P<exp_start>[0-9]{14})_"
r"(?P<exp_stop>[0-9]{14})_"
r"(?P<obs_id>[0-9]{11})_"
r"(?P<detector>[0-9]{2})_"
r"L(?P<level>[0-9]{1})_"
r"V(?P<version>[0-9]{2})"
r".(?P<ext>[a-z]{4})"
)
mo = re.fullmatch(pattern, fps_img[0])
assert mo is not None
mogd = mo.groupdict()
mogd.pop("detector")
# available detectors
available_detectors = [
int(re.fullmatch(pattern, fp)["detector"]) for fp in fps_img
]
available_detectors.sort()
return CsstMsDataManager(
ver_sim=ver_sim,
datatype=datatype,
available_detectors=available_detectors,
target_detectors=None,
dir_l0=dir_l0,
dir_l1=dir_l1,
path_aux=path_aux, # bias dark flat
log_ppl=log_ppl,
log_mod=log_mod,
n_jobs=n_jobs,
backend=backend,
device=device,
**mogd,
**kwargs,
)
@staticmethod
def glob_image(dir_l0, pattern="CSST_MSC_*_SCIE_*.fits"):
"""glob files in L0 data directory"""
fps = glob.glob(os.path.join(dir_l0, pattern))
fps = [os.path.basename(fp) for fp in fps]
fps.sort()
print("{} files found with pattern: {}".format(len(fps), pattern))
return fps
def l0_id(self, detector=6):
"""Level0 ID, consistent with DFS."""
return f"{self.obs_id}{detector:02d}"
def l0_cat(self, detector=6):
"""the L0 cat file path"""
assert detector in self.available_detectors
fn = f"{self.instrument}_{self.obs_id}_chip_{detector:02d}_filt_{self.detector2filter[detector]}.cat"
fp = os.path.join(self.dir_l0, fn)
if not os.path.exists(fp):
raise FileNotFoundError(f"File not found: {fp}")
return fp
def l0_log(self, detector=6):
"""L0 log file path"""
assert detector in self.available_detectors
fn = f"{self.instrument}_{self.obs_id}_chip_{detector:02d}_filt_{self.detector2filter[detector]}.log"
fp = os.path.join(self.dir_l0, fn)
if not os.path.exists(fp):
raise FileNotFoundError(f"File not found: {fp}")
return fp
def l0_detector(self, detector=6):
"""L0 detector-specific image file path"""
assert detector in self.available_detectors
if self.ver_sim in ["C5.2", "C6.1"]:
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_{self.obs_type}_{self.exp_start}_"
f"{self.exp_stop}_{self.obs_id}_{detector:02d}_L{self.level}_{self.version}.{self.ext}"
)
else:
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_{self.obs_type}_{self.exp_start}_"
f"{self.exp_stop}_{self.obs_id}_{detector:02d}_L{self.level}_V{self.version}.{self.ext}"
)
fp = os.path.join(self.dir_l0, fn)
if not os.path.exists(fp):
raise FileNotFoundError(f"File not found: {fp}")
return fp
def l0_crs(self, detector=6):
"""L0 cosmic ray file path"""
assert detector in self.available_detectors
if self.ver_sim == "C5.2":
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_CRS_{self.exp_start}_"
f"{self.exp_stop}_{self.obs_id}_{detector:02d}_L{self.level}_{self.version}.{self.ext}"
)
else:
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_CRS_{self.exp_start}_"
f"{self.exp_stop}_{self.obs_id}_{detector:02d}_L{self.level}_V{self.version}.{self.ext}"
)
fp = os.path.join(self.dir_l0, fn)
if not os.path.exists(fp):
raise FileNotFoundError(f"File not found: {fp}")
return fp
def l1_detector(self, detector=6, post="IMG.fits"):
"""generate L1 file path
Parameters
----------
detector:
detector ID
post:
postfix
e.g, {"img.fits", "wht.fits", "flg.fits", "img_L1.fits", "wht_L1.fits", "flg_L1.fits"}
Returns
-------
L1 file path
"""
assert detector in self.available_detectors
if post == ".fits":
# no additional suffix
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_{self.obs_type}_{self.exp_start}_"
f"{self.exp_stop}_{self.obs_id}_{detector:02d}_L1_V{self.version}{post}"
)
else:
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_{self.obs_type}_{self.exp_start}_"
f"{self.exp_stop}_{self.obs_id}_{detector:02d}_L1_V{self.version}_{post}"
)
return os.path.join(self.dir_l1, fn)
def get_refs(self, detector=6):
"""
Parameters
----------
detector: int
Detector number.
Returns
-------
dict
{'bias': '/ccds_root/references/msc/csst_msc_ms_bias_11_000001.fits',
'dark': '/ccds_root/references/msc/csst_msc_ms_dark_11_000001.fits',
'ledflat': '/ccds_root/references/msc/csst_msc_ms_ledflat_11_000001.fits',
'shutter': '/ccds_root/references/msc/csst_msc_ms_shutter_11_000001.fits'}
"""
if self.crds.is_available:
try:
print("CRDS available, use refs from CRDS ...")
return self.crds.retry(
self.crds.get_refs, 3, file_path=self.l0_detector(detector)
)
except BaseException as e:
print("CRDS reference access failed! ", e)
print("Use refs from local files ...")
refs = dict()
if self.datatype == "mbi":
ref_types = ["bias", "dark", "ledflat", "shutter"]
else:
ref_types = ["bias", "dark", "ledflat"]
for ref_type in ref_types:
fp = os.path.join(
self.path_aux,
"C6.2_ref_crds",
"csst_msc_ms_{}_{:02d}_{:06d}.fits".format(ref_type, detector, 1),
)
assert os.path.exists(fp), f"File not found: [{fp}]"
refs[ref_type] = fp
return refs
def get_bias(self, detector=6):
"""get bias data"""
fp = os.path.join(
self.path_aux,
"C6.2_ref_crds",
"csst_msc_ms_{}_{:02d}_{:06d}.fits".format("bias", detector, 1),
)
if not os.path.exists(fp):
raise FileNotFoundError(fp)
else:
return fp
def get_dark(self, detector=6):
"""get dark data"""
fp = os.path.join(
self.path_aux,
"C6.2_ref_crds",
"csst_msc_ms_{}_{:02d}_{:06d}.fits".format("dark", detector, 1),
)
if not os.path.exists(fp):
raise FileNotFoundError(fp)
else:
return fp
def get_flat(self, detector=6):
"""get flat data"""
fp = os.path.join(
self.path_aux,
"C6.2_ref_crds",
"csst_msc_ms_{}_{:02d}_{:06d}.fits".format("ledflat", detector, 1),
)
if not os.path.exists(fp):
raise FileNotFoundError(fp)
else:
return fp
def get_shutter(self, detector=6):
"""get flat data"""
fp = os.path.join(
self.path_aux,
"C6.2_ref_crds",
"csst_msc_ms_{}_{:02d}_{:06d}.fits".format("shutter", detector, 1),
)
if not os.path.exists(fp):
raise FileNotFoundError(fp)
else:
return fp
# def get_axeconf(self):
# return os.path.join(self.path_aux, "axeconf") # "/home/csstpipeline/L1Pipeline/aux/axeconf"
def l1_file(self, name="", comment=""):
"""L1 file path
Parameters
----------
name : str
file pipeline_id
comment : str
use the function pipeline_id plz
Returns
-------
fp: str
the synthetic file path
"""
fp = os.path.join(self.dir_l1, name)
# record hardcode history
self.hardcode_history.append(dict(hdcd=fp, comment=comment))
return fp
def get_sls_info(self):
"""Get the target SLS image header info and return."""
# if self.use_dfs:
# raise NotImplementedError()
# else:
assert len(self.target_detectors) == 1
header = fits.getheader(self.l0_detector(self.target_detectors[0]), ext=1)
return header
def get_mbi_info(self):
"""Get all MBI image header info and return as a table."""
# if self.use_dfs:
# raise NotImplementedError()
# else:
info = Table.read(
"/nfsdata/share/csst_simulation_data/Cycle-5-SimuData/slitlessSpectroscopy/t_mbi_l1.fits"
)
return info
@staticmethod
def quickstart(
ver_sim="C6.2",
datatype="mbi",
dir_l1=".",
exposure_id=100,
n_jobs=18,
backend="multiprocessing",
):
"""
Quick dataset generator for tests on dandelion or PML
Parameters
----------
ver_sim : str
"C6.2"
datatype : str
{"mbi", "sls"}
dir_l1 : str
output directory
exposure_id : int
The serial number of the exposure. 20-154 for C5.2.
n_jobs : int
The number of jobs.
backend : str
The joblib backend.
Returns
-------
CsstMsDataManager
The Main Survey Data Manager instance.
"""
try:
assert ver_sim == "C6.2" and exposure_id == 100
except BaseException:
raise ValueError("Please use ver_sim = 'C6.2'! and exposure_id = 100")
assert datatype in ["mbi", "sls"]
# auto identify node pipeline_id
hostname = os.uname()[1]
assert hostname in [
"dandelion",
"tulip",
]
dir_l0 = "/nfsdata/share/pipeline-unittest/MSC_C6.2_UNITTEST/MSC_0000100"
path_aux = "/nfsdata/share/pipeline/aux"
return CsstMsDataManager.from_dir(
ver_sim=ver_sim,
datatype=datatype,
dir_l0=dir_l0,
dir_l1=dir_l1,
path_aux=path_aux,
n_jobs=n_jobs,
backend=backend,
)
def __repr__(self):
lines = ""
lines += "<CsstMsDataManager>\n"
lines += f"- Data type = {self.datatype}\n"
lines += f"- Valid detectors = {self.valid_detectors}\n"
lines += f"- Available detectors = {self.available_detectors}\n"
lines += f"- Target detectors = {self.target_detectors}\n"
lines += f"- dir_l0 = {self.dir_l0}\n"
lines += f"- dir_l1 = {self.dir_l1}\n"
lines += f"- dfs_node = {self.dfs_node}\n"
lines += f"- CSST_DFS_GATEWAY = " + os.getenv("CSST_DFS_GATEWAY") + "\n"
return lines
def remove_files(self, fmt="*.fits"):
"""Remove L1 files conforming the format."""
os.system(f"rm -rf {os.path.join(self.dir_l1, fmt)}")
def remove_dir(self, dir_name):
"""Remove L1 (sub-)directory."""
os.system(f"rm -rf {os.path.join(self.dir_l1, dir_name)}")
@staticmethod
def clear_dir(dir_path):
os.system(f"rm -rf {dir_path}/*")
# DFS interfaces
@property
def dfs_node(self):
return self._dfs_node
@staticmethod
def dfs_is_available():
"""Test if DFS works."""
try:
tbl = CatalogApi().catalog_query(
catalog_name="gaia3",
ra=180,
dec=0,
radius=0.1,
columns=("ra", "dec"),
min_mag=0,
max_mag=30,
obstime=-1,
limit=-1,
)
return len(tbl) > 0
except:
return False
def get_coord(self):
"""Get pointing coordinate."""
header = fits.getheader(self.l0_detector(detector=self.target_detectors[0]))
if self.ver_sim == "C5.2":
pointing_ra = header["RA_OBJ"]
pointing_dec = header["DEC_OBJ"]
else:
pointing_ra = header["OBJ_RA"]
pointing_dec = header["OBJ_DEC"]
return pointing_ra, pointing_dec
def dfs_rc_auto(self):
"""Download RC for the current exposure."""
assert self.dfs_is_available()
assert len(self.target_detectors) >= 1
pointing_ra, pointing_dec = self.get_coord()
refcat = self.dfs_rc_query(
ra=pointing_ra,
dec=pointing_dec,
columns=(
"ref_epoch",
"ra",
"ra_error",
"dec",
"dec_error",
"parallax",
"parallax_error",
"pmra",
"pmra_error",
"pmdec",
"pmdec_error",
"phot_g_mean_mag",
"source_id",
),
radius=2,
min_mag=0,
max_mag=30,
obstime=-1,
limit=-1,
)
return refcat
def dfs_rc_query(
self,
ra=180,
dec=0,
radius=2,
columns=(
"ref_epoch",
"ra",
"ra_error",
"dec",
"dec_error",
"parallax",
"parallax_error",
"pmra",
"pmra_error",
"pmdec",
"pmdec_error",
"phot_g_mean_mag",
"source_id",
),
min_mag=0,
max_mag=30,
obstime=-1,
limit=-1,
):
"""Query Reference Catalog (RC) from DFS.
Ref.
https://gea.esac.esa.int/archive/documentation/GDR3/Gaia_archive/chap_datamodel/
sec_dm_main_source_catalogue/ssec_dm_gaia_source.html
"""
cat = self.dfs_CatApi.catalog_query(
catalog_name="gaia3",
ra=ra,
dec=dec,
columns=columns,
radius=radius,
min_mag=min_mag,
max_mag=max_mag,
obstime=obstime,
limit=limit,
)
if cat["code"] == 0:
self.logger_ppl.info(
f"Results from DFS CATAPI:\n"
f" - code = {cat['code']}\n"
f" - message = {cat['message']}\n"
f" - totalCount = {cat['totalCount']}\n"
f" - columns = {cat['columns']}"
)
return self.dfs_CatApi.to_table(cat)
else:
self.logger_ppl.info(
f"Results from DFS CATAPI:\n"
f" - code = {cat['code']}\n"
f" - message = {cat['message']}"
)
raise ValueError("Bad catalog query result!")
def dfs_l1_push(self):
"""Push MBI/SLS L1 data to DFS."""
# l1api = get_l1api()
# l1api.write()
return
def dfs_l2_push(self):
"""Push SLS spectra to DFS."""
pass
@staticmethod
def from_l1id(
l1_id="1000000001",
datatype="sls",
dir_l0="/L1Pipeline/L0",
dir_l1="/L1Pipeline/L1",
use_dfs=True,
dfs_node="pml",
clear_l1=False,
dfs_root="/share/dfs",
):
pass
@staticmethod
def link_L0Data(obs_id="100000100", dfs_root="/dfsroot", dir_l0="/L1Pipeline/L0"):
# query for L0 data
print(f"Query obs_id={obs_id} ...", end="")
recs = Level0DataApi().find(obs_id=obs_id)
print("Message: \n", recs)
print(f"{recs['totalCount']} records obtained!")
assert recs["code"] == 0
assert recs["totalCount"] > 0
# make symbolic links
print("Making symbolic links ...")
for rec in recs["data"]:
os.symlink(
src=os.path.join(dfs_root, rec.file_path),
dst=os.path.join(dir_l0, os.path.basename(rec.file_path)),
)
# return records
return recs
@staticmethod
def from_dfs(
obs_id="100000100",
ver_sim="C6.2",
datatype="mbi",
dir_l0="/pipeline/L0",
dir_l1="/pipeline/L1",
path_aux="/pipeline/aux",
clear_l1=False,
dfs_root="/share/dfs",
n_jobs=18,
backend="multiprocessing",
device="CPU",
**kwargs,
):
"""Initialize CsstMsDataManager from DFS."""
# (clear and) make directories
if os.path.exists(dir_l0):
os.system(f"rm -rf {dir_l0}/*")
else:
os.mkdir(dir_l0)
# if os.path.exists(dir_l1):
# os.system(f"rm -rf {dir_l1}/*")
# else:
# os.mkdir(dir_l1)
# os.chdir(dir_l1)
if not os.path.exists(dir_l1):
os.mkdir(dir_l1)
elif clear_l1:
os.system(f"rm -rf {dir_l1}/*")
os.chdir(dir_l1)
print(f"Query obs_id={obs_id} ...", end="")
records = CsstMsDataManager(verbose=False).dfs_L0DataApi.find(obs_id=obs_id)
print(f"{records['totalCount']} records obtained!")
tbl = Table([_.__dict__ for _ in records["data"]])
tbl.sort(["detector_no", "obs_type"])
print("Making symbolic links ...")
for i_rec in range(len(tbl)):
os.symlink(
src=os.path.join(dfs_root, tbl["file_path"][i_rec]),
dst=os.path.join(dir_l0, os.path.basename(tbl["file_path"][i_rec])),
)
# initialize dm
dm = CsstMsDataManager.from_dir(
ver_sim=ver_sim,
datatype=datatype,
dir_l0=dir_l0,
dir_l1=dir_l1,
path_aux=path_aux,
n_jobs=n_jobs,
backend=backend,
device=device,
**kwargs,
)
assert dm.obs_id == obs_id
return dm
def dfs_l0_query(self, obs_id: str = "100000100"):
"""Query L0 data from DFS."""
result = self.dfs_L0DataApi.find(obs_id=str(obs_id))
print(f"{result['totalCount']} records returned from DFS.")
if not result["code"] == 0:
raise ValueError(f"DFS returns non-zero code! ({result['code']})")
tbl = Table([_.__dict__ for _ in result["data"]])
tbl.sort(["detector_no", "obs_type"])
# Check if all 30 detectors are available
for detector in CP["all"]["detectors"]:
for obs_type in ["sci", "cosmic_ray"]:
if (
np.sum(
(tbl["detector_no"] == f"{detector:02d}")
& (tbl["obs_type"] == obs_type)
)
== 0
):
self.logger_ppl.warning(
f"Record not found for detector {detector:02d} and obs_type {obs_type}"
)
return tbl
def dfs_l0_check_all(self):
"""Check all C5.2 L0 data is available in DFS."""
is_good = True
for obs_id in range(100000020, 100000155):
tbl = self.dfs_l0_query(obs_id=f"{obs_id}")
if len(tbl) == 60:
self.logger_ppl.info(
f"DFS returns {len(tbl)} records for obs_id={obs_id}"
)
else:
is_good = False
self.logger_ppl.warning(
f"DFS returns {len(tbl)} records for obs_id={obs_id}"
)
return is_good
def dfs_l1_query(self, obs_id, detector):
"""Query L1 data from DFS."""
pass
def l1_cleanup(self):
filelist = glob.glob(f"{self.dir_l1}/**")
for file in filelist:
if file not in self.l1_whitelist:
try:
os.remove(file)
except:
pass
class CsstMsFile(dict):
def __init__(self, filepath, ver_sim="C6.2", dir_out=".", **kwargs):
super(CsstMsFile, self).__init__()
self.ver_sim = ver_sim
self.filepath = filepath
self.filename = os.path.basename(filepath)
self.dir_in = os.path.dirname(filepath)
self.dir_out = dir_out
self.kwargs = kwargs
# parse filename
# CSST_MSC_MS_SCIE_20270713222417_20270713222647_10160000066_01_L0_V01.fits
pattern = re.compile(
r"(?P<telescope>[A-Z]+)_"
r"(?P<instrument>[A-Z]+)_"
r"(?P<project>[A-Z]+)_"
r"(?P<obs_type>[A-Z]+)_"
r"(?P<exp_start>[0-9]{14})_"
r"(?P<exp_stop>[0-9]{14})_"
r"(?P<obs_id>[0-9]{11})_"
r"(?P<detector>[0-9]{2})_"
r"L(?P<level>[0-9]{1})_"
r"V(?P<version>[0-9]{2})"
r"(?P<ext>[a-zA-Z._]+)"
)
mo = re.fullmatch(pattern, self.filename)
try:
assert mo is not None
except AssertionError as ae:
print(self.filename)
raise ae
mogd = mo.groupdict()
# mogd.pop("detector")
for k, v in mogd.items():
self.__setattr__(k, v)
for k, v in kwargs.items():
self.__setattr__(k, v)
def fpo(self, post=".fits"):
if post.startswith("."):
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_{self.obs_type}_{self.exp_start}_{self.exp_stop}_"
f"{self.obs_id}_{self.detector}_L1_V{self.version}{post}"
)
else:
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_{self.obs_type}_{self.exp_start}_{self.exp_stop}_"
f"{self.obs_id}_{self.detector}_L1_V{self.version}_{post}"
)
return os.path.join(self.dir_out, fn)
def fno(self, post=".fits"):
if post.startswith("."):
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_{self.obs_type}_{self.exp_start}_"
f"{self.exp_stop}_{self.obs_id}_{self.detector}_L1_V{self.version}{post}"
)
else:
fn = (
f"{self.telescope}_{self.instrument}_{self.project}_{self.obs_type}_{self.exp_start}_"
f"{self.exp_stop}_{self.obs_id}_{self.detector}_L1_V{self.version}_{post}"
)
return fn
def fpi(self):
return os.path.join(self.dir_in, self.filename)
@staticmethod
def from_l1id(id=17796, ver_sim="C6.2", dir_out=".", dfs_root="/dfsroot"):
L1DataApi = Level1DataApi()
rec = L1DataApi.get(id=id)
try:
assert rec["code"] == 0
except AssertionError as ae:
print(rec)
raise ae
header = rec["data"].header
header["filter"] = rec["data"].filter
return CsstMsFile(
filepath=os.path.join(dfs_root, rec["data"].file_path),
ver_sim=ver_sim,
dir_out=dir_out,
header=header,
)
def move_to(self, dir_in="./input", dir_out="./output"):
return CsstMsFile(
filepath=os.path.join(dir_in, os.path.basename(self.filepath)),
ver_sim=self.ver_sim,
dir_out=dir_out,
**self.kwargs,
)
def to_dict(self):
return dict(
filepath=self.filepath,
ver_sim=self.ver_sim,
dir_out=self.dir_out,
kwargs=self.kwargs,
)
def __repr__(self):
return f'CsstMsFile(filepath="{self.filename}", ver_sim="{self.ver_sim}", dir_out="{self.dir_out}")'
# file = CsstMsFile(
# "L1/MSC/SCI/62173/10160000108/CSST_MSC_MS_SCI_20290206174352_20290206174622_10160000108_21_L1_V01.fits")
# file = CsstMsFile.from_l1id(id=17796, dfs_root="/share/dfs")
# file = CsstMsFile.from_l1id(id=12853, dfs_root="/share/dfs")
# CsstMsFile("CSST_MSC_MS_SCI_20280716184136_20280716184406_10160000099_14_L1_V01_IMG.fits")
"""
Identifier: csst_common/dfs.py
Name: dfs.py
Description: DFS wrapper
Author: Bo Zhang
Created: 2023-07-08
Modified-History:
2023-07-08, Bo Zhang, implemented DFS
2023-07-11, Bo Zhang, updated DFS
2023-12-08, Bo Zhang, tweaks
2023-12-20, Bo Zhang, do nothing in DFS.__init__()
2023-12-28, Bo Zhang, add DFS.rec_to_table and DFS.rec_to_dlist
2025-01-02, Bo Zhang, use csst_dfs_client
"""
from astropy.table import Table
from csst_dfs_client import (
level0,
level1,
catalog,
plan,
)
# from csst_dfs_api.common.catalog import CatalogApi
# from csst_dfs_api.facility import (
# Level0DataApi,
# Level0PrcApi,
# Level1DataApi,
# Level1PrcApi,
# Level2DataApi,
# Level2TypeApi,
# OtherDataApi,
# )
class DFS:
def __init__(self):
self.level0 = level0
self.level1 = level1
self.catalog = catalog
self.plan = plan
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment