Commit bb631b05 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add CsstPlanObsid CsstPlanObsgroup

parent 8373a725
from ._dfs import DFS, dfs
from .dag import CsstDAGs
from ._csst import csst, CsstPlanObsid, CsstPlanObsgroup
from .dict import DotDict
from .instrument import csst
from .observation import CsstPlanObsid, CsstPlanObsgroup
class DotDict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 递归转换嵌套字典为 DotDict
for key, value in self.items():
if isinstance(value, dict) and not isinstance(value, DotDict):
self[key] = DotDict(value)
def __getattr__(self, key):
"""属性访问优先级:1. 内置属性 → 2. 键值 → 3. 报错"""
try:
# 优先返回内置属性(如 keys, items 等方法)
return object.__getattribute__(self, key)
except AttributeError:
if key in self:
# 其次返回键值(若存在)
return self[key]
# 属性不存在时抛出标准异常
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{key}'"
)
def __setattr__(self, key, value):
"""属性设置规则:下划线开头为内置属性,否则为键值"""
if key.startswith("_"):
# 内置属性直接存储(如 _internal_var)
object.__setattr__(self, key, value)
else:
# 键值处理:自动转换嵌套结构
if isinstance(value, dict) and not isinstance(value, DotDict):
value = DotDict(value)
self[key] = value # 存储为字典键值
def __delattr__(self, key):
"""删除逻辑:区分内置属性和键值"""
if key.startswith("_"):
object.__delattr__(self, key)
else:
if key in self:
del self[key]
else:
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{key}'"
)
# dd = DotDict({"a": {"b": {"c": 1}}})
class CaseInsensitiveDict(dict):
def __init__(self, *args, **kwargs):
self._key_map = {}
super().__init__(*args, **kwargs)
# 初始化时处理传入的键
for key in list(self.keys()):
self._key_map[key.lower()] = key
def __setitem__(self, key, value):
key_lower = key.lower() if isinstance(key, str) else key
self._key_map[key_lower] = key # 记录原始键
super().__setitem__(key_lower, value)
def __getitem__(self, key):
key_lower = key.lower() if isinstance(key, str) else key
return super().__getitem__(key_lower)
def __delitem__(self, key):
key_lower = key.lower() if isinstance(key, str) else key
del self._key_map[key_lower]
super().__delitem__(key_lower)
def __contains__(self, key):
key_lower = key.lower() if isinstance(key, str) else key
return super().__contains__(key_lower)
def get(self, key, default=None):
key_lower = key.lower() if isinstance(key, str) else key
return super().get(key_lower, default)
def keys(self):
return self._key_map.values() # 返回原始键
def items(self):
for key_lower, value in super().items():
yield self._key_map.get(key_lower, key_lower), value
class CaseInsensitiveDict(dict):
def __init__(self, *args, **kwargs):
self._key_map = {}
super().__init__(*args, **kwargs)
# 初始化时处理传入的键
for key in list(self.keys()):
self._key_map[key.lower()] = key
def __setitem__(self, key, value):
key_lower = key.lower() if isinstance(key, str) else key
self._key_map[key_lower] = key # 记录原始键
super().__setitem__(key_lower, value)
def __getitem__(self, key):
key_lower = key.lower() if isinstance(key, str) else key
return super().__getitem__(key_lower)
def __delitem__(self, key):
key_lower = key.lower() if isinstance(key, str) else key
del self._key_map[key_lower]
super().__delitem__(key_lower)
def __contains__(self, key):
key_lower = key.lower() if isinstance(key, str) else key
return super().__contains__(key_lower)
def get(self, key, default=None):
key_lower = key.lower() if isinstance(key, str) else key
return super().get(key_lower, default)
def keys(self):
return self._key_map.values() # 返回原始键
def items(self):
for key_lower, value in super().items():
yield (self._key_map.get(key_lower, key_lower), value)
from typing import Optional
from .dict import DotDict
class Detector:
def __init__(self, name: str = "01", is_effective: bool = True):
self.name = name
def __init__(
self,
name: str = "01",
is_effective: bool = True,
):
self.name: str = name
self.is_effective: bool = is_effective
def __repr__(self):
return f"<Detector: {self.name}, is_effective: {self.is_effective}>"
class SimpleInstrument(CaseInsensitiveDict):
def __init__(
self,
name: str,
detectors: list[Detector],
):
self.name = name
self._detectors = detectors
super().__init__()
for d in detectors:
self[d.name] = d
# d = Detector(name="01", is_effective=True)
class SimpleInstrument(DotDict):
def __init__(self, name: str, detectors: list[Detector]):
self._name = name
super().__init__(**{d.name: d for d in detectors})
@property
def name(self):
return self._name
@property
def detectors(self):
return self._detectors
return list(self.values())
@property
def detector_names(self):
return [d.name for d in self._detectors]
return list(self.keys())
@property
def effective_detectors(self):
return [d for d in self._detectors if d.is_effective]
def n_detector(self):
return len(self.detectors)
@property
def effective_detector_names(self):
return [d.name for d in self._detectors if d.is_effective]
def effective_detectors(self):
return [d for d in self.detectors if d.is_effective]
@property
def n_detector(self):
return len(self.detectors)
def effective_detector_names(self):
return [d.name for d in self.detectors if d.is_effective]
@property
def n_effective_detector(self):
return len(self.effective_detectors)
def __repr__(self):
return f"<SimpleInstrument: {self.name}, {self.n_detector} detectors, {self.n_effective_detector} effective>"
return (
f"<SimpleInstrument: {self.name}, "
f"{self.n_detector} detectors, "
f"{self.n_effective_detector} effective>"
)
def __getitem__(self, item):
return self[item.upper()]
# detectors = [Detector(name=f"{_}") for _ in range(10)]
# si = SimpleInstrument(
# name="SomeInstrument",
# detectors=detectors,
# )
class ComplexInstrument(CaseInsensitiveDict):
class ComplexInstrument(DotDict):
def __init__(
self,
name: str,
instruments: list[SimpleInstrument],
):
self.name = name
self.instruments = instruments
super().__init__()
for i in self.instruments:
self[i.name] = i
super().__init__(**{i.name: i for i in instruments})
self._name = name
@property
def name(self):
return self._name
@property
def instruments(self):
return list(self.values())
@property
def n_instrument(self):
......@@ -133,17 +116,22 @@ class ComplexInstrument(CaseInsensitiveDict):
return f"[ComplexInstrument: {self.name}, {str_instruments}]"
class Telescope(CaseInsensitiveDict):
class Telescope(DotDict):
def __init__(
self,
name: str,
instruments: list[SimpleInstrument | ComplexInstrument],
):
self.name = name
self.instruments = instruments
super().__init__()
for i in self.instruments:
self[i.name] = i
super().__init__(**{i.name: i for i in instruments})
self._name = name
@property
def name(self):
return self._name
@property
def instruments(self):
return list(self.values())
@property
def n_instrument(self):
......@@ -153,7 +141,7 @@ class Telescope(CaseInsensitiveDict):
mbi = SimpleInstrument(
name="MBI",
detectors=[
Detector(name=_)
Detector(name=_, is_effective=True)
for _ in [
"06",
"07",
......@@ -181,7 +169,7 @@ mbi = SimpleInstrument(
sls = SimpleInstrument(
name="SLS",
detectors=[
Detector(name=_)
Detector(name=_, is_effective=True)
for _ in [
"01",
"02",
......@@ -206,22 +194,25 @@ ir = SimpleInstrument(
mci = SimpleInstrument(
name="MCI",
detectors=[Detector(name=_) for _ in ["C1", "C2", "C3"]],
detectors=[Detector(name=_, is_effective=True) for _ in ["C1", "C2", "C3"]],
)
ifs = SimpleInstrument(
name="IFS",
detectors=[Detector(name=_) for _ in ["B", "R"]],
detectors=[Detector(name=_, is_effective=True) for _ in ["B", "R"]],
)
cpic = SimpleInstrument(
name="CPIC",
detectors=[Detector(name=_) for _ in ["VIS", "NIR"]],
detectors=[
Detector(name="VIS", is_effective=True),
Detector(name="NIR", is_effective=False),
],
)
hstdm = SimpleInstrument(
name="HSTDM",
detectors=[Detector(name=_) for _ in ["sis01", "sis02"]],
detectors=[Detector(name=_) for _ in ["SIS1", "SIS2"]],
)
msc = ComplexInstrument(
......@@ -233,3 +224,15 @@ csst = Telescope(
name="CSST",
instruments=[msc, mci, ifs, cpic, hstdm],
)
# csst
# csst.MSC
# csst.MSC.MBI
# csst.MSC.SLS
# csst.MSC.IR
# csst.MCI
# csst.IFS
# csst.CPIC
# csst.HSTDM
# csst.HSTDM.effective_detectors
# csst.HSTDM.n_effective_detector
from .dict import DotDict
from .instrument import csst
from astropy.table import Table
import numpy as np
# from csst_dag.dict import DotDict
# from csst_dag.instrument import csst
class CsstPlanObsid(DotDict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@property
def detectors(self):
if self.instrument in ("MSC", "MCI", "IFS", "CPIC"):
return csst[self.instrument].effective_detectors
elif self.instrument == "HSTDM":
if self.params.detector == "SIS12":
return csst[self.instrument].effective_detectors
else:
return [csst[self.instrument][self.params.detector]]
else:
raise ValueError(f"Unknown instrument: {self.instrument}")
@property
def n_detector(self):
return len(self.detectors)
@property
def n_file_expected(self):
if self.instrument in ("MSC", "MCI", "IFS", "CPIC"):
return self.n_detector
elif self.instrument == "HSTDM":
return self.n_detector * self.params.num_epec_frame
else:
raise ValueError(f"Unknown instrument: {self.instrument}")
@staticmethod
def from_plan(plan_data: dict) -> "CsstPlanObsid":
return CsstPlanObsid(**plan_data)
class CsstPlanObsgroup(DotDict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# construct plan table
self._plan_table = Table(list(self.values()))
# assert unique obs_group
u_keys = np.unique(
self._plan_table["dataset", "instrument", "obs_type", "obs_group"]
)
assert len(u_keys) == 1, "Multiple `instruments/obs_types/datasets` found."
# no duplicated obs_id's
u_obsid = np.unique(self._plan_table["obs_id"])
assert len(u_obsid) == len(
self
), f"n_obsid {len(u_obsid)} != n_plan {len(self)}"
# assign parameters
self._dataset = str(u_keys[0]["dataset"])
self._instrument = str(u_keys[0]["instrument"])
self._obs_type = str(u_keys[0]["obs_type"])
self._obs_group = str(u_keys[0]["obs_group"])
@property
def dataset(self):
return self._dataset
@property
def instrument(self):
return self._instrument
@property
def obs_type(self):
return self._obs_type
@property
def obs_group(self):
return self._obs_group
@property
def obs_id_list(self):
return list(self.keys())
@property
def n_file_expected(self):
return sum([self[_].n_file_expected for _ in self.obs_id_list])
def __repr__(self):
return (
f"<CsstPlanObsgroup '{self.obs_group}' (n_obs_id={len(self)}): "
f"instrument='{self.instrument}' "
f"dataset='{self.dataset}' "
f"obs_type='{self.obs_type}'>"
)
@staticmethod
def from_plan(plan_data: list[dict]) -> "CsstPlanObsgroup":
return CsstPlanObsgroup(
**{_["obs_id"]: CsstPlanObsid.from_plan(_) for _ in plan_data}
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment