Commit 4ddd430b authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

adapt for input pmapname

parent b771e96c
Pipeline #8909 passed with stage
......@@ -18,9 +18,8 @@ from .slsconf import convert_slsconf
class CCDS:
def __init__(
self,
ccds_root="/ccds_root",
ccds_cache="/pipeline/ccds_cache",
pmapname=None,
ccds_root: str = "/ccds_root",
ccds_cache: str = "/pipeline/ccds_cache",
):
print("[CCDS] Setting CCDS root path ... ", end="")
self.ccds_root = ccds_root
......@@ -36,32 +35,24 @@ class CCDS:
os.environ["CCDS_OBSERVATORY"] = "csst"
print("Done")
print("Query for observatory ... ", end="")
print("[CCDS] Query for observatory ... ", end="")
self.observatory = client.get_default_observatory()
print(self.observatory)
assert (
self.observatory == "csst"
), f"observatory [{self.observatory}] is not `csst`!"
self.operational_context = client.get_default_context(self.observatory)
self.pmapname = self.operational_context = client.get_default_context(
self.observatory
)
print(f"[CCDS] Query for operational_context = {self.operational_context}... ")
self.pmapname = pmapname if pmapname is not None else self.operational_context
print(f"[CCDS] Setting pmapname = {self.pmapname}... ")
try:
# dump mappings
client.dump_mappings(self.pmapname)
self.is_available = True
except BaseException as e:
print(f"[CCDS] Failed to get pmapname={self.pmapname}:", e.args)
self.is_available = False
print(f"[CCDS] operational_context = {self.operational_context}")
print(f"[CCDS] pmapname = {self.pmapname}")
self.is_available = False
def __repr__(self):
return (
f"[CCDS] url='{os.getenv('CCDS_SERVER_URL', default='')}' >\n"
f" - is_available={self.is_available}\n"
# f" - is_available={self.is_available}\n"
f" - observatory='{self.observatory}'\n"
f" - operational_context='{self.operational_context}'\n"
f" - ccds_root='{self.ccds_root}'\n"
......@@ -70,8 +61,23 @@ class CCDS:
def get_refs(
self,
file_path="/dfsroot/L0/MSC/SCIE/62030/10160000105/MS/CSST_MSC_MS_SCIE_20280916072059_20280916072329_10160000105_10_L0_V01.fits",
file_path: str = "./test.fits",
pmapname: str = "",
):
# Set pmapname
if pmapname:
print(f"[CCDS] Setting pmapname = {pmapname}... ")
self.pmapname = pmapname
try:
# dump mappings
client.dump_mappings(self.pmapname)
self.is_available = True
except BaseException as e:
print(f"[CCDS] Failed to get pmapname = {self.pmapname}:", e.args)
self.is_available = False
# print(f"[CCDS] operational_context = {self.operational_context}")
print(f"[CCDS] pmapname = {self.pmapname}")
# print("get min header")
hdrs = client.api.get_minimum_header(
self.pmapname, file_path, ignore_cache=False
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment