Commit e27dacbb authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

simplify Pipeline

parent aadd7bc2
Pipeline #11633 passed with stage
"""
Identifier: csst_common/dfs.py
Name: dfs.py
Description: DFS wrapper
Author: Bo Zhang
Created: 2023-07-08
Modified-History:
2023-12-22, Bowei Zhao, implemented convert_slsconf
2023-12-22, Bo Zhang, tweaks
Details:
chipid: [01, 02, 03, 04, 05, 10, 21, 26, 27, 28, 29, 30]
filter: [GI, GV, GU, GU, GV, GI, GI, GV, GU, GU, GV, GI]
clabel: [GI-1, GV-1, GU-1, GU-2, GV-2, GI-2, GI-3, GV-3, GU-3, GU-4, GV-4, GI-4]
keys = ['BEAMA', 'MMAG_EXTRACT_A', 'MMAG_MARK_A', 'DYDX_ORDER_A', 'DYDX_A_0', 'DYDX_A_1', 'XOFF_A', 'YOFF_A',
'DISP_ORDER_A', 'DLDP_A_0', 'DLDP_A_1', 'BEAMB', 'MMAG_EXTRACT_B', 'MMAG_MARK_B', 'DYDX_ORDER_B', 'DYDX_B_0',
'XOFF_B', 'YOFF_B', 'DISP_ORDER_B', 'DLDP_B_0', 'DLDP_B_1', 'BEAMC', 'MMAG_EXTRACT_C', 'MMAG_MARK_C',
'DYDX_ORDER_C', 'DYDX_C_0', 'DYDX_C_1', 'XOFF_C', 'YOFF_C', 'DISP_ORDER_C', 'DLDP_C_0', 'DLDP_C_1', 'BEAMD',
'MMAG_EXTRACT_D', 'MMAG_MARK_D', 'DYDX_ORDER_D', 'DYDX_D_0', 'DYDX_D_1', 'XOFF_D', 'YOFF_D', 'DISP_ORDER_D',
'DLDP_D_0', 'DLDP_D_1', 'BEAME', 'MMAG_EXTRACT_E', 'MMAG_MARK_E', 'DYDX_ORDER_E', 'DYDX_E_0', 'DYDX_E_1',
'XOFF_E', 'YOFF_E', 'DISP_ORDER_E', 'DLDP_E_0', 'DLDP_E_1']
"""
import json
import os
from astropy.io import fits
def convert_slsconf(
extract1d_json_path: str,
sensitivity_fits_path: str,
dir_output: str,
) -> dict:
extract1d_name = os.path.basename(extract1d_json_path)
sensitivity_name = os.path.basename(sensitivity_fits_path)
slsconf = dict()
# conf_names = []
# conf_paths = []
sensitivity_order_names = []
# sensitivity_order_paths = []
# save CSST_MSC_MS_SENSITIVITY_{CHIPID}_{VERSION}_{ORDER}.conf
for order in ["0st", "+1st", "-1st", "+2st", "-2st"]:
sensitivity_order_name = sensitivity_name.replace(".fits", f"_{order}.fits")
sensitivity_order_path = os.path.join(dir_output, sensitivity_order_name)
resave_sensitivity(sensitivity_fits_path, sensitivity_order_path)
sensitivity_order_names.append(sensitivity_order_name)
# sensitivity_order_paths.append(sensitivity_order_path)
slsconf[f"sensitivity_{order}"] = sensitivity_order_path
# save CSST_MSC_MS_EXTRACT1D_{CHIPID}_{VERSION}_{DIRECTION}.conf
for direction, GRATINGLR in zip(["left", "right"], ["GRATINGL", "GRATINGR"]):
conf_name = extract1d_name.replace(".json", f"_{direction}.conf")
conf_path = os.path.join(dir_output, conf_name)
fsave_conf(extract1d_json_path, conf_path, sensitivity_order_names, GRATINGLR)
# conf_names.append(conf_name)
# conf_paths.append(conf_path)
slsconf[f"conf_{direction}"] = conf_path
# return dict(zip(conf_names, conf_paths)), dict(
# zip(sensitivity_order_names, sensitivity_order_paths)
# )
return slsconf
def readjson(file_json_path):
with open(file_json_path) as f:
d = json.load(f)
return d
def fwriteKEY(fsx, flt):
fsx.write("INSTRUMENT CSSTSLS" + "\n")
fsx.write("CAMERA " + flt + "\n")
if flt == "GI":
fsx.write("WAVELENGTH 6200 10000" + "\n")
elif flt == "GV":
fsx.write("WAVELENGTH 4000 6200" + "\n")
elif flt == "GU":
fsx.write("WAVELENGTH 2550 4000" + "\n")
fsx.write("\n" + "SCIENCE_EXT SCI ; Science extension" + "\n")
fsx.write("DQ_EXT DQ ; DQ extension" + "\n")
fsx.write("ERRORS_EXT ERR ; Error extension" + "\n")
fsx.write("FFNAME csstFlat.fits" + "\n")
fsx.write("DQMASK 246 ; 4096 and 512 taken out" + "\n")
fsx.write("\n" + "RDNOISE 5.0" + "\n")
fsx.write("EXPTIME EXPTIME" + "\n")
fsx.write("POBJSIZE 1.0" + "\n")
fsx.write("#SMFACTOR 1.0" + "\n\n")
def fwriteBEAM(
fsx,
extract1d_json_path,
sensitivity_order_name,
GRATINGLR,
BEAMX,
SENSITIVITY_X,
MMAG_EXTRACT_X,
MMAG_MARK_X,
DYDX_ORDER_X,
DYDX_X_0,
DYDX_X_1,
XOFF_X,
YOFF_X,
DISP_ORDER_X,
DLDP_X_0,
DLDP_X_1,
):
d = readjson(extract1d_json_path)
fsx.write(BEAMX), [
fsx.write(" " + str(d[GRATINGLR][BEAMX][j]))
for j in range(len(d[GRATINGLR][BEAMX]))
], fsx.write("\n")
fsx.write(MMAG_EXTRACT_X + " " + str(d[GRATINGLR][MMAG_EXTRACT_X]) + "\n")
fsx.write(MMAG_MARK_X + " " + str(d[GRATINGLR][MMAG_MARK_X]) + "\n")
fsx.write("# " + "\n")
fsx.write("# Trace description " + "\n")
fsx.write("# " + "\n")
fsx.write(DYDX_ORDER_X + " " + str(d[GRATINGLR][DYDX_ORDER_X]) + "\n")
fsx.write(DYDX_X_0), [
fsx.write(" " + str(d[GRATINGLR][DYDX_X_0][j]))
for j in range(len(d[GRATINGLR][DYDX_X_0]))
], fsx.write("\n")
if BEAMX == "BEAMB":
pass
else:
fsx.write(DYDX_X_1), [
fsx.write(" " + str(d[GRATINGLR][DYDX_X_1][j]))
for j in range(len(d[GRATINGLR][DYDX_X_1]))
], fsx.write("\n")
fsx.write("# " + "\n")
fsx.write("# X and Y Offsets " + "\n")
fsx.write("# " + "\n")
fsx.write(XOFF_X + " " + str(d[GRATINGLR][XOFF_X]) + "\n")
fsx.write(YOFF_X + " " + str(d[GRATINGLR][YOFF_X]) + "\n")
fsx.write("# " + "\n")
fsx.write("# Dispersion solution " + "\n")
fsx.write("# " + "\n")
fsx.write(DISP_ORDER_X + " " + str(d[GRATINGLR][DISP_ORDER_X]) + "\n")
fsx.write(DLDP_X_0), [
fsx.write(" " + str(d[GRATINGLR][DLDP_X_0][j]))
for j in range(len(d[GRATINGLR][DLDP_X_0]))
], fsx.write("\n")
fsx.write(DLDP_X_1), [
fsx.write(" " + str(d[GRATINGLR][DLDP_X_1][j]))
for j in range(len(d[GRATINGLR][DLDP_X_1]))
], fsx.write("\n")
fsx.write("# " + "\n")
fsx.write(SENSITIVITY_X + " " + sensitivity_order_name + "\n")
fsx.write("# " + "\n" + "\n")
def fsave_conf(
extract1d_json_path, extract1d_conf_path, sensitivity_order_names, GRATINGLR
):
c = extract1d_conf_path
flt = readjson(extract1d_json_path)["FILTER"]
os.system("> " + c)
fs = open(c, "a")
fwriteKEY(fs, flt)
fs.write("# 1 order (BEAM A) *******************" + "\n")
fwriteBEAM(
fs,
extract1d_json_path,
sensitivity_order_names[0],
GRATINGLR,
"BEAMA",
"SENSITIVITY_A",
"MMAG_EXTRACT_A",
"MMAG_MARK_A",
"DYDX_ORDER_A",
"DYDX_A_0",
"DYDX_A_1",
"XOFF_A",
"YOFF_A",
"DISP_ORDER_A",
"DLDP_A_0",
"DLDP_A_1",
)
fs.write("\n# 0 order (BEAM B) *******************" + "\n")
fwriteBEAM(
fs,
extract1d_json_path,
sensitivity_order_names[1],
GRATINGLR,
"BEAMB",
"SENSITIVITY_B",
"MMAG_EXTRACT_B",
"MMAG_MARK_B",
"DYDX_ORDER_B",
"DYDX_B_0",
"DYDX_B_1",
"XOFF_B",
"YOFF_B",
"DISP_ORDER_B",
"DLDP_B_0",
"DLDP_B_1",
)
fs.write("\n# -1 order (BEAM C) *******************" + "\n")
fwriteBEAM(
fs,
extract1d_json_path,
sensitivity_order_names[2],
GRATINGLR,
"BEAMC",
"SENSITIVITY_C",
"MMAG_EXTRACT_C",
"MMAG_MARK_C",
"DYDX_ORDER_C",
"DYDX_C_0",
"DYDX_C_1",
"XOFF_C",
"YOFF_C",
"DISP_ORDER_C",
"DLDP_C_0",
"DLDP_C_1",
)
fs.write("\n# 2 order (BEAM D) *******************" + "\n")
fwriteBEAM(
fs,
extract1d_json_path,
sensitivity_order_names[3],
GRATINGLR,
"BEAMD",
"SENSITIVITY_D",
"MMAG_EXTRACT_D",
"MMAG_MARK_D",
"DYDX_ORDER_D",
"DYDX_D_0",
"DYDX_D_1",
"XOFF_D",
"YOFF_D",
"DISP_ORDER_D",
"DLDP_D_0",
"DLDP_D_1",
)
fs.write("\n# -2 order (BEAM E) *******************" + "\n")
fwriteBEAM(
fs,
extract1d_json_path,
sensitivity_order_names[4],
GRATINGLR,
"BEAME",
"SENSITIVITY_E",
"MMAG_EXTRACT_E",
"MMAG_MARK_E",
"DYDX_ORDER_E",
"DYDX_E_0",
"DYDX_E_1",
"XOFF_E",
"YOFF_E",
"DISP_ORDER_E",
"DLDP_E_0",
"DLDP_E_1",
)
fs.close()
def resave_sensitivity(sensitivity_fits_path, sensitivity_order_path):
h = fits.open(sensitivity_fits_path)
for extname in ["L0st", "LP1st", "LM1st", "LP2st", "LM2st"]:
hdu0 = fits.PrimaryHDU()
hdu1 = fits.BinTableHDU(h[extname].data)
hdul = fits.HDUList([hdu0, hdu1])
hdul.writeto(sensitivity_order_path, overwrite=True, checksum=True)
hdul.close()
h.close()
......@@ -8,12 +8,11 @@ Modified-History:
2023-12-15, Bo Zhang, import CI/CD related objects to cicd.py
2023-12-20, Bo Zhang, update module header
"""
from .utils import retry
from .pipeline import Pipeline, ModuleResult
from .file import File
from .time import now
from .dfs import DFS
from .ccds import CCDS
__all__ = [
"Pipeline",
......@@ -21,6 +20,4 @@ __all__ = [
"File",
"retry",
"now",
"DFS",
"CCDS",
]
......@@ -21,11 +21,12 @@ from typing import Callable, NamedTuple, Optional, Any, Union
from astropy.time import Time, TimeDelta
from astropy.io import fits
from .utils import retry
import csst_dfs_client as dfs1
import csst_fs as dfs2
from .ccds import CCDS
from .dfs import DFS
from .utils import retry
from .file import File
from .io import reformat_header
from .logger import get_logger
from .status import CsstStatus, CsstResult
......@@ -40,125 +41,89 @@ def print_directory_tree(directory="."):
print(f"{subindent}{file}")
class Pipeline:
"""
CSST pipeline configuration class.
It is used for each pipeline to initialize environment.
Parameters
----------
dir_input : str
Input directory.
dir_output : str
Output directory.
dir_temp : str
Temp directory.
dir_aux : str
Aux path.
dfs_root : str
DFS root path.
ccds_root : str
CCDS root path.
ccds_cache : str
CCDS cache path.
filter_warnings : bool
If `True`, filter warnings.
dfs : bool
If `True`, initialize DFS.
ccds : bool
If `True`, initialize CCDS.
**kwargs : Any
Additional keyword arguments.
"""
def __init__(
self,
dir_input: str = "/pipeline/input",
dir_output: str = "/pipeline/output",
dir_temp: str = "/pipeline/temp",
dir_aux: str = "/pipeline/aux",
dfs_root: str = "/dfs_root",
ccds_root: str = "/ccds_root",
ccds_cache: str = "/pipeline/ccds_cache",
pipeline_log: str = "pipeline.log",
module_log: str = "module.log",
filter_warnings: bool = False,
dfs: bool = True,
ccds: bool = False,
clean_output_before_run: bool = True,
**kwargs: Any,
):
# record start time
self.t_start = Time.now()
EXIT_CODES = {
"success": 0,
"ccds_reference_file_error": 10,
"reference_catalog_error": 11,
"input_data_error": 20,
"input_data_not_unique": 21,
"input_data_invalid": 22,
"data_product_invalid": 30,
"data_product_ingestion_error": 31,
}
1
# get pipeline information from env vars
self.pipeline_id: str = os.getenv("PIPELINE_ID", "-")
self.build: int = int(os.getenv("BUILD", "0"))
self.created: str = os.getenv("CREATED", "-")
self.verbose: bool = bool(os.getenv("VERBOSE", ""))
# set directory information
self.dir_input: str = dir_input
self.dir_output: str = dir_output
self.dir_temp: str = dir_temp
self.dir_aux: str = dir_aux
self.dfs_root: str = dfs_root
self.ccds_root: str = ccds_root
self.ccds_cache: str = ccds_cache
class Pipeline:
if clean_output_before_run:
self.clean_output()
def __init__(self, msg: str = "", **env_vars: Any):
# record start time
self.t_start = Time.now()
# additional parameters
self.kwargs: dict = kwargs
# set message
self.msg = msg
self.msg_dict = self.json2dict(self.msg)
# set environment variables
for k, v in env_vars.items():
os.environ[k] = str(v)
# get settings from environment variables
self.settings: dict = {
# DFS & CCDS directories
"DFS_ROOT": os.getenv("DFS_ROOT", "/dfs_root"),
"CCDS_ROOT": os.getenv("CCDS_ROOT", "/ccds_root"),
"CCDS_CACHE": os.getenv("CCDS_CACHE", "/pipeline/ccds_cache"),
# working directories
"DIR_INPUT": os.getenv("DIR_INPUT", "/pipeline/input"),
"DIR_OUTPUT": os.getenv("DIR_OUTPUT", "/pipeline/output"),
"DIR_TEMP": os.getenv("DIR_TEMP", "/pipeline/temp"),
"DIR_AUX": os.getenv("DIR_AUX", "/pipeline/aux"),
"LOG_FILE": os.getenv("LOG_FILE", "pipeline.log"),
# docker image information
"DOCKER_IMAGE": os.getenv("DOCKER_IMAGE", "-"),
"BUILD": os.getenv("BUILD", "-"),
"CREATED": os.getenv("CREATED", "-"),
# additional settings
# verbose mode
"VERBOSE": os.getenv("VERBOSE", "false").lower() == "true",
# ignore warnings
"IGNORE_WARNINGS": os.getenv("IGNORE_WARNINGS", "true").lower() == "true",
# support OSS, False by default
"USE_OSS": (os.getenv("USE_OSS", "false")).lower() == "true",
}
# set attributes
for k, v in self.settings.items():
setattr(self, k.lower(), v)
# set logger
self.logger = get_logger(
name="pipeline logger",
filename=os.path.join(self.dir_output, pipeline_log),
filename=str(os.path.join(self.dir_output, self.log_file)),
)
# filter warnings
if self.settings["IGNORE_WARNINGS"]:
self.filter_warnings("ignore")
# change working directory
print(f"Change directory to {self.dir_output}")
os.chdir(self.dir_output)
# Frequently used files
self.message = Message(os.path.join(self.dir_output, "message.txt"))
self.timestamp = Timestamp(os.path.join(self.dir_output, "timestamp.txt"))
# self.exit_code = ExitCode(os.path.join(self.dir_output, "exit_code"))
# self.error_trace = ErrorTrace(os.path.join(self.dir_output, "error_trace"))
# DFS1, DFS2 & CCDS
self.dfs1 = dfs1
self.dfs2 = dfs2
self.ccds = CCDS(ccds_root=self.ccds_root, ccds_cache=self.ccds_cache)
if dfs:
self.dfs: Union[DFS | None] = DFS()
else:
self.dfs: Union[DFS | None] = None
if ccds:
self.ccds: Union[CCDS | None] = CCDS(
ccds_root=ccds_root, ccds_cache=ccds_cache
)
else:
self.ccds: Union[CCDS | None] = None
# exit code
self.EXIT_CODES = EXIT_CODES
if filter_warnings:
self.filter_warnings()
@staticmethod
def dict2json(d: dict):
"""Convert `dict` to JSON format string."""
return json.dumps(d, ensure_ascii=False)
def info(self):
"""List environment variables such as `PIPELINE_ID`, etc."""
print(f"PIPELINE_ID={self.pipeline_id}")
print(f"BUILD={self.build}")
print(f"CREATED={self.created}")
print(f"VERBOSE={self.verbose}")
@staticmethod
def json2dict(m: str):
"""Convert JSON format string to `dict`."""
return json.loads(m)
@property
def info_header(self) -> fits.Header:
"""Summarize pipeline info into a `astropy.io.fits.Header`."""
h = fits.Header()
h.set("PIPELINE", self.pipeline_id, comment="pipeline ID")
h.set("BUILD", self.build, comment="pipeline build number")
h.set("CREATED", self.pipeline_id, comment="pipeline build time")
return reformat_header(h, strip=False, comment="Pipeline info")
def summarize(self):
"""Summarize this run."""
t_stop: Time = Time.now()
......@@ -188,9 +153,9 @@ class Pipeline:
return Time.now().isot
@staticmethod
def filter_warnings():
def filter_warnings(level: str = "ignore"):
# Suppress all warnings
warnings.filterwarnings("ignore")
warnings.filterwarnings(level)
@staticmethod
def reset_warnings(self):
......@@ -221,6 +186,7 @@ class Pipeline:
def download_oss_file(self, oss_file_path: str) -> None:
"""Download an OSS file from OSS to local path."""
# TODO
# self.dfs.download_file(oss_file_path, local_path)
pass
......@@ -292,104 +258,6 @@ class Pipeline:
return f"{self.pipeline_id}-{self.build}"
# class ErrorTrace:
# """Write error trace to file."""
#
# def __init__(self, file_path=""):
# self.file_path = file_path
#
# def __repr__(self):
# return f"< ErrorTrace [{self.file_path}] >"
#
# def write(self, s: str):
# with open(self.file_path, "w+") as f:
# f.write(s)
class Message:
"""Write JSON format messages to file."""
def __init__(self, file_path: str = ""):
self.file_path = file_path
def __repr__(self):
return f"< Message [{self.file_path}] >"
def write(self, dlist: list[dict]):
"""Write messages to file."""
with open(self.file_path, "w+") as f:
for d in dlist:
f.write(self.dict2msg(d) + "\n")
def preview(self, dlist: list[dict], n: int = 10) -> None:
"""Preview top `n` messages."""
print(f"No. of messages = {len(dlist)}")
print(f"=========== Top {n} ===========")
s = ""
for d in dlist[:n]:
s += self.dict2msg(d) + "\n"
print(s)
print("")
@staticmethod
def dict2msg(d: dict):
"""Convert `dict` to JSON format string."""
m = json.dumps(d).replace(" ", "")
return m
@staticmethod
def msg2dict(m: str):
"""Convert JSON format string to `dict`."""
d = json.loads(m)
return d
# DEPRECATED
# class ExitCode:
# def __init__(self, file_path=""):
# self.file_path = file_path
#
# def __repr__(self):
# return f"< ExitCode [{self.file_path}] >"
#
# def truncate(self):
# with open(self.file_path, "w") as file:
# file.truncate(0)
#
# def write(self, code=0):
# with open(self.file_path, "w+") as f:
# f.write(str(code))
# print(f"Exit with code {code} (written to '{self.file_path}')")
class Timestamp:
def __init__(self, file_path: str = "timestamp.txt"):
"""
Timestamp Class.
Initialize a Timestamp object anc connect it to `file_path`.
Parameters
----------
file_path : str
Time stamp file path.
"""
self.file_path = file_path
def __repr__(self):
return f"< Timestamp [{self.file_path}] >"
def empty(self):
"""Clean time stamp file."""
with open(self.file_path, "w") as file:
file.truncate(0)
def record(self):
"""Record a time stamp."""
with open(self.file_path, "a+") as f:
f.write(f"{Time.now().isot}+00:00\n")
class ModuleResult(NamedTuple):
module: str
cost: float
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment