Commit b026df97 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

major updates

parent cad9fd06
Pipeline #2166 failed with stage
in 0 seconds
unit_test_data_root
.idea/* .idea/*
*.png *.png
*.DS_Store* *.DS_Store*
......
...@@ -12,6 +12,6 @@ uninstall: ...@@ -12,6 +12,6 @@ uninstall:
pip uninstall $(PKG) -y pip uninstall $(PKG) -y
test: test:
coverage run -m pytest . --import-mode=importlib --cov-report=html --cov-report=term-missing coverage run -m pytest . --cov=$(PKG) --import-mode=importlib --cov-report=html --cov-report=term-missing
coverage report -m coverage report -m
rm -rf .coverage .pytest_cache rm -rf .coverage .pytest_cache
...@@ -44,42 +44,3 @@ from csst_common.params import CSST_PARAMS as CP ...@@ -44,42 +44,3 @@ from csst_common.params import CSST_PARAMS as CP
print(CP) print(CP)
print(CP["mbi"]["detector2filter"]) print(CP["mbi"]["detector2filter"])
``` ```
how to use data_manager
```python
from csst_common.data_manager import CsstMbiDataManager
dm = CsstMbiDataManager(...)
# access L0 directory
dm.dir_l0
# access L1 directory
dm.dir_l1
# access dir_pcref
dm.dir_pcref
# access path_aux
dm.path_aux
# access ver_sim
dm.ver_sim
# access target detectors
dm.target_detectors
# access available detectors
dm.available_detectors
# define an L1 file (detector-specified)
dm.l1_detector(detector=6)
# define an L1 file (non-detector-specified)
dm.l1_file("flipped_image.fits")
```
a shortcut for test dataset
```python
from csst_common.data_manager import CsstMbiDataManager
CsstMbiDataManager.quickstart(ver_sim="C5.2", dir_l1=".", exposure_id=100)
```
## algorithm description
The `csst_common` provides some common modules for CSST pipeline.
- `csst_common.parameters`
- `csst_common.data_manager`
...@@ -9,10 +9,10 @@ Modified-History: ...@@ -9,10 +9,10 @@ Modified-History:
2022-09-13, Bo Zhang, fixed a bug 2022-09-13, Bo Zhang, fixed a bug
""" """
import os
from .status import CsstResult, CsstStatus from .status import CsstResult, CsstStatus
from .pipeline import Pipeline
from .ccds import CCDS
from .dfs import DFS
from .decorator import parameterized_module_decorator
__version__ = "0.0.1" __version__ = "0.0.2"
PACKAGE_PATH = os.path.dirname(__file__)
import json
import os
import re
import astropy.io.fits as pyfits
from ccds import client
# chipid: [01, 02, 03, 04, 05, 10, 21, 26, 27, 28, 29, 30]
# filter: [GI, GV, GU, GU, GV, GI, GI, GV, GU, GU, GV, GI]
# clabel: [GI-1, GV-1, GU-1, GU-2, GV-2, GI-2, GI-3, GV-3, GU-3, GU-4, GV-4, GI-4]
def get_version():
observatory = client.get_default_observatory()
operational_context = client.get_default_context(observatory)
ver = re.split(r"[_.]", operational_context)[1]
return ver
def resave_sensitivity(dir_save, chipid, flt):
ver = get_version()
h = pyfits.open(
dir_save + "CSST_MSC_MS_SENSITIVITY_" + chipid + "_" + ver + ".fits"
)
for extname, order in zip(
["L0ST", "LP1ST", "LM1ST", "LP2ST", "LM2ST"],
["0st", "1st", "-1st", "2st", "-2st"],
):
hdu0 = pyfits.PrimaryHDU()
hdu1 = pyfits.BinTableHDU(h[extname].data)
hdul = pyfits.HDUList([hdu0, hdu1])
hdul.writeto(
dir_save + "CSST_MSC_MS_SENSITIVITY_" + flt + "_" + order + ".fits",
overwrite=True,
checksum=True,
)
def readjson(dir_save, fjsoname):
with open(dir_save + fjsoname) as f:
d = json.load(f)
return d
##################################################################################
##################################################################################
# keys = ['BEAMA', 'MMAG_EXTRACT_A', 'MMAG_MARK_A', 'DYDX_ORDER_A', 'DYDX_A_0', 'DYDX_A_1', 'XOFF_A', 'YOFF_A',
# 'DISP_ORDER_A', 'DLDP_A_0', 'DLDP_A_1', 'BEAMB', 'MMAG_EXTRACT_B', 'MMAG_MARK_B', 'DYDX_ORDER_B', 'DYDX_B_0',
# 'XOFF_B', 'YOFF_B', 'DISP_ORDER_B', 'DLDP_B_0', 'DLDP_B_1', 'BEAMC', 'MMAG_EXTRACT_C', 'MMAG_MARK_C',
# 'DYDX_ORDER_C', 'DYDX_C_0', 'DYDX_C_1', 'XOFF_C', 'YOFF_C', 'DISP_ORDER_C', 'DLDP_C_0', 'DLDP_C_1', 'BEAMD',
# 'MMAG_EXTRACT_D', 'MMAG_MARK_D', 'DYDX_ORDER_D', 'DYDX_D_0', 'DYDX_D_1', 'XOFF_D', 'YOFF_D', 'DISP_ORDER_D',
# 'DLDP_D_0', 'DLDP_D_1', 'BEAME', 'MMAG_EXTRACT_E', 'MMAG_MARK_E', 'DYDX_ORDER_E', 'DYDX_E_0', 'DYDX_E_1',
# 'XOFF_E', 'YOFF_E', 'DISP_ORDER_E', 'DLDP_E_0', 'DLDP_E_1']
GL = [
"GI2",
"GV4",
"GU2",
"GU4",
"GV2",
"GI4",
"GI6",
"GV8",
"GU6",
"GU8",
"GV6",
"GI8",
]
GR = [
"GI1",
"GV3",
"GU1",
"GU3",
"GV1",
"GI3",
"GI5",
"GV7",
"GU5",
"GU7",
"GV5",
"GI7",
]
SEN = ["GI", "GV", "GU", "GU", "GV", "GI", "GI", "GV", "GU", "GU", "GV", "GI"]
def fwriteKEY(fsx, i):
fsx.write("INSTRUMENT CSSTSLS" + "\n")
fsx.write("CAMERA " + SEN[i] + "\n")
if SEN[i] == "GI":
fsx.write("WAVELENGTH 6200 10000" + "\n")
elif SEN[i] == "GV":
fsx.write("WAVELENGTH 4000 6200" + "\n")
elif SEN[i] == "GU":
fsx.write("WAVELENGTH 2550 4000" + "\n")
fsx.write("\n" + "SCIENCE_EXT SCI ; Science extension" + "\n")
fsx.write("DQ_EXT DQ ; DQ extension" + "\n")
fsx.write("ERRORS_EXT ERR ; Error extension" + "\n")
fsx.write("FFNAME csstFlat.fits" + "\n")
fsx.write("DQMASK 246 ; 4096 and 512 taken out" + "\n")
fsx.write("\n" + "RDNOISE 5.0" + "\n")
fsx.write("EXPTIME EXPTIME" + "\n")
fsx.write("POBJSIZE 1.0" + "\n")
fsx.write("#SMFACTOR 1.0" + "\n\n")
def fwriteBEAM(
dir_save,
fsx,
i,
GRATINGLR,
BEAMX,
MMAG_EXTRACT_X,
MMAG_MARK_X,
DYDX_ORDER_X,
DYDX_X_0,
DYDX_X_1,
XOFF_X,
YOFF_X,
DISP_ORDER_X,
DLDP_X_0,
DLDP_X_1,
):
ver = get_version()
[
d01_GI21,
d02_GV43,
d03_GU21,
d04_GU43,
d05_GV21,
d10_GI43,
d21_GI65,
d26_GV87,
d27_GU65,
d28_GU87,
d29_GV65,
d30_GI87,
] = [
readjson(dir_save, "CSST_MSC_MS_EXTRACT1D_" + chipid + "_" + ver + ".json")
for chipid in [
"01",
"02",
"03",
"04",
"05",
"10",
"21",
"26",
"27",
"28",
"29",
"30",
]
]
d = [
d01_GI21,
d02_GV43,
d03_GU21,
d04_GU43,
d05_GV21,
d10_GI43,
d21_GI65,
d26_GV87,
d27_GU65,
d28_GU87,
d29_GV65,
d30_GI87,
]
fsx.write(BEAMX), [
fsx.write(" " + str(d[i][GRATINGLR][BEAMX][j]))
for j in range(len(d[i][GRATINGLR][BEAMX]))
], fsx.write("\n")
fsx.write(MMAG_EXTRACT_X + " " + str(d[i][GRATINGLR][MMAG_EXTRACT_X]) + "\n")
fsx.write(MMAG_MARK_X + " " + str(d[i][GRATINGLR][MMAG_MARK_X]) + "\n")
fsx.write("# " + "\n")
fsx.write("# Trace description " + "\n")
fsx.write("# " + "\n")
fsx.write(DYDX_ORDER_X + " " + str(d[i][GRATINGLR][DYDX_ORDER_X]) + "\n")
fsx.write(DYDX_X_0), [
fsx.write(" " + str(d[i][GRATINGLR][DYDX_X_0][j]))
for j in range(len(d[i][GRATINGLR][DYDX_X_0]))
], fsx.write("\n")
if BEAMX == "BEAMB":
pass
else:
fsx.write(DYDX_X_1), [
fsx.write(" " + str(d[i][GRATINGLR][DYDX_X_1][j]))
for j in range(len(d[i][GRATINGLR][DYDX_X_1]))
], fsx.write("\n")
fsx.write("# " + "\n")
fsx.write("# X and Y Offsets " + "\n")
fsx.write("# " + "\n")
fsx.write(XOFF_X + " " + str(d[i][GRATINGLR][XOFF_X]) + "\n")
fsx.write(YOFF_X + " " + str(d[i][GRATINGLR][YOFF_X]) + "\n")
fsx.write("# " + "\n")
fsx.write("# Dispersion solution " + "\n")
fsx.write("# " + "\n")
fsx.write(DISP_ORDER_X + " " + str(d[i][GRATINGLR][DISP_ORDER_X]) + "\n")
fsx.write(DLDP_X_0), [
fsx.write(" " + str(d[i][GRATINGLR][DLDP_X_0][j]))
for j in range(len(d[i][GRATINGLR][DLDP_X_0]))
], fsx.write("\n")
fsx.write(DLDP_X_1), [
fsx.write(" " + str(d[i][GRATINGLR][DLDP_X_1][j]))
for j in range(len(d[i][GRATINGLR][DLDP_X_1]))
], fsx.write("\n")
fsx.write("# " + "\n")
if BEAMX == "BEAMA":
ordername = "1st"
fsx.write(
"SENSITIVITY_A CSST_MSC_MS_SENSITIVITY_" + SEN[i] + "_1st.fits" + "\n"
)
elif BEAMX == "BEAMB":
ordername = "0st"
fsx.write(
"SENSITIVITY_B CSST_MSC_MS_SENSITIVITY_" + SEN[i] + "_0st.fits" + "\n"
)
elif BEAMX == "BEAMC":
ordername = "-1st"
fsx.write(
"SENSITIVITY_C CSST_MSC_MS_SENSITIVITY_" + SEN[i] + "_-1st.fits" + "\n"
)
elif BEAMX == "BEAMD":
ordername = "2st"
fsx.write(
"SENSITIVITY_D CSST_MSC_MS_SENSITIVITY_" + SEN[i] + "_2st.fits" + "\n"
)
elif BEAMX == "BEAME":
ordername = "-2st"
fsx.write(
"SENSITIVITY_E CSST_MSC_MS_SENSITIVITY_" + SEN[i] + "_-2st.fits" + "\n"
)
fsx.write("# " + "\n" + "\n")
def fsave_conf(dir_save, GLR, GRATINGLR, i):
c = dir_save + "CSST_MSC_MS_" + GLR[i] + ".conf"
os.system("> " + c)
fs = open(c, "a")
fwriteKEY(fs, i)
fs.write("# 1 order (BEAM A) *******************" + "\n")
fwriteBEAM(
dir_save,
fs,
i,
GRATINGLR,
"BEAMA",
"MMAG_EXTRACT_A",
"MMAG_MARK_A",
"DYDX_ORDER_A",
"DYDX_A_0",
"DYDX_A_1",
"XOFF_A",
"YOFF_A",
"DISP_ORDER_A",
"DLDP_A_0",
"DLDP_A_1",
)
fs.write("\n# 0 order (BEAM B) *******************" + "\n")
fwriteBEAM(
dir_save,
fs,
i,
GRATINGLR,
"BEAMB",
"MMAG_EXTRACT_B",
"MMAG_MARK_B",
"DYDX_ORDER_B",
"DYDX_B_0",
"DYDX_B_1",
"XOFF_B",
"YOFF_B",
"DISP_ORDER_B",
"DLDP_B_0",
"DLDP_B_1",
)
fs.write("\n# -1 order (BEAM C) *******************" + "\n")
fwriteBEAM(
dir_save,
fs,
i,
GRATINGLR,
"BEAMC",
"MMAG_EXTRACT_C",
"MMAG_MARK_C",
"DYDX_ORDER_C",
"DYDX_C_0",
"DYDX_C_1",
"XOFF_C",
"YOFF_C",
"DISP_ORDER_C",
"DLDP_C_0",
"DLDP_C_1",
)
fs.write("\n# 2 order (BEAM D) *******************" + "\n")
fwriteBEAM(
dir_save,
fs,
i,
GRATINGLR,
"BEAMD",
"MMAG_EXTRACT_D",
"MMAG_MARK_D",
"DYDX_ORDER_D",
"DYDX_D_0",
"DYDX_D_1",
"XOFF_D",
"YOFF_D",
"DISP_ORDER_D",
"DLDP_D_0",
"DLDP_D_1",
)
fs.write("\n# -2 order (BEAM E) *******************" + "\n")
fwriteBEAM(
dir_save,
fs,
i,
GRATINGLR,
"BEAME",
"MMAG_EXTRACT_E",
"MMAG_MARK_E",
"DYDX_ORDER_E",
"DYDX_E_0",
"DYDX_E_1",
"XOFF_E",
"YOFF_E",
"DISP_ORDER_E",
"DLDP_E_0",
"DLDP_E_1",
)
fs.close()
def get_slsconf(dir_save=".", **kwargs):
"""save SLS conf files to `dir_save`"""
# resave the sensitivity.fits
for chipid, flt in zip(["01", "02", "03"], ["GI", "GV", "GU"]):
resave_sensitivity(dir_save, chipid, flt)
# save CSST_MSC_MS_*.conf
for i in range(0, 12):
fsave_conf(dir_save, GL, "GRATINGL", i)
fsave_conf(dir_save, GR, "GRATINGR", i)
# TODO: assert all files are saved correctly, then return
# TODO: return a dict containing filepath mapping
from astropy.wcs import WCS
def transform_coordinate(
ra: float = 180.000,
dec: float = 0.0,
original_epoch: float = 2016.0,
target_epoch: float = 2020.0,
) -> tuple[float, float]:
"""Transform a coordinate from `original_epoch` to `target_epoch`."""
pass
def transform_wcs(
wcs: WCS, original_epoch: float = 2016.0, target_epoch: float = 2020.0
) -> WCS:
"""Transform a wcs from `original_epoch` to `target_epoch`."""
pass
import re
import astropy.io.fits as pyfits, numpy as np, time, os, json
from crds import client
# chipid: [01, 02, 03, 04, 05, 10, 21, 26, 27, 28, 29, 30]
# filter: [GI, GV, GU, GU, GV, GI, GI, GV, GU, GU, GV, GI]
# clabel: [GI-1, GV-1, GU-1, GU-2, GV-2, GI-2, GI-3, GV-3, GU-3, GU-4, GV-4, GI-4]
def get_version():
observatory = client.get_default_observatory()
operational_context = client.get_default_context(observatory)
ver = re.split(r'[_\.]', operational_context)[1]
return ver
def resave_sensitivity(dir_save, chipid, flt):
ver = get_version()
h = pyfits.open(dir_save+'CSST_MSC_MS_SENSITIVITY_'+chipid+'_'+ver+'.fits')
for extname, order in zip(['L0ST', 'LP1ST', 'LM1ST', 'LP2ST', 'LM2ST'], ['0st', '1st', '-1st', '2st', '-2st']):
hdu0 = pyfits.PrimaryHDU()
hdu1 = pyfits.BinTableHDU(h[extname].data)
hdul = pyfits.HDUList([hdu0, hdu1])
hdul.writeto(dir_save+'CSST_MSC_MS_SENSITIVITY_'+flt+'_'+order+'.fits', overwrite=True, checksum=True)
def readjson(dir_save, fjsoname):
with open(dir_save+fjsoname) as f:
d = json.load(f)
return d
##################################################################################
##################################################################################
# keys = ['BEAMA', 'MMAG_EXTRACT_A', 'MMAG_MARK_A', 'DYDX_ORDER_A', 'DYDX_A_0', 'DYDX_A_1', 'XOFF_A', 'YOFF_A', 'DISP_ORDER_A', 'DLDP_A_0', 'DLDP_A_1', 'BEAMB', 'MMAG_EXTRACT_B', 'MMAG_MARK_B', 'DYDX_ORDER_B', 'DYDX_B_0', 'XOFF_B', 'YOFF_B', 'DISP_ORDER_B', 'DLDP_B_0', 'DLDP_B_1', 'BEAMC', 'MMAG_EXTRACT_C', 'MMAG_MARK_C', 'DYDX_ORDER_C', 'DYDX_C_0', 'DYDX_C_1', 'XOFF_C', 'YOFF_C', 'DISP_ORDER_C', 'DLDP_C_0', 'DLDP_C_1', 'BEAMD', 'MMAG_EXTRACT_D', 'MMAG_MARK_D', 'DYDX_ORDER_D', 'DYDX_D_0', 'DYDX_D_1', 'XOFF_D', 'YOFF_D', 'DISP_ORDER_D', 'DLDP_D_0', 'DLDP_D_1', 'BEAME', 'MMAG_EXTRACT_E', 'MMAG_MARK_E', 'DYDX_ORDER_E', 'DYDX_E_0', 'DYDX_E_1', 'XOFF_E', 'YOFF_E', 'DISP_ORDER_E', 'DLDP_E_0', 'DLDP_E_1']
GL = ['GI2', 'GV4', 'GU2', 'GU4', 'GV2', 'GI4', 'GI6', 'GV8', 'GU6', 'GU8', 'GV6', 'GI8' ]
GR = ['GI1', 'GV3', 'GU1', 'GU3', 'GV1', 'GI3', 'GI5', 'GV7', 'GU5', 'GU7', 'GV5', 'GI7' ]
SEN = ['GI', 'GV', 'GU', 'GU', 'GV', 'GI', 'GI', 'GV', 'GU', 'GU', 'GV', 'GI' ]
def fwriteKEY(fsx, i):
fsx.write('INSTRUMENT CSSTSLS'+'\n')
fsx.write('CAMERA '+SEN[i]+'\n')
if SEN[i] == 'GI':
fsx.write('WAVELENGTH 6200 10000'+'\n')
elif SEN[i] == 'GV':
fsx.write('WAVELENGTH 4000 6200'+'\n')
elif SEN[i] == 'GU':
fsx.write('WAVELENGTH 2550 4000'+'\n')
fsx.write('\n'+'SCIENCE_EXT SCI ; Science extension'+'\n')
fsx.write('DQ_EXT DQ ; DQ extension'+'\n')
fsx.write('ERRORS_EXT ERR ; Error extension'+'\n')
fsx.write('FFNAME csstFlat.fits'+'\n')
fsx.write('DQMASK 246 ; 4096 and 512 taken out'+'\n')
fsx.write('\n'+'RDNOISE 5.0'+'\n')
fsx.write('EXPTIME EXPTIME'+'\n')
fsx.write('POBJSIZE 1.0'+'\n')
fsx.write('#SMFACTOR 1.0'+'\n\n')
def fwriteBEAM(dir_save, fsx, i, GRATINGLR, BEAMX, MMAG_EXTRACT_X, MMAG_MARK_X, DYDX_ORDER_X, DYDX_X_0, DYDX_X_1, XOFF_X, YOFF_X, DISP_ORDER_X, DLDP_X_0, DLDP_X_1):
ver = get_version()
[d01_GI21, d02_GV43, d03_GU21, d04_GU43, d05_GV21, d10_GI43, d21_GI65, d26_GV87, d27_GU65, d28_GU87, d29_GV65, d30_GI87] = [readjson(dir_save, 'CSST_MSC_MS_EXTRACT1D_'+chipid+'_'+ver+'.json') for chipid in ['01', '02', '03', '04', '05', '10', '21', '26', '27', '28', '29', '30']]
d = [d01_GI21, d02_GV43, d03_GU21, d04_GU43, d05_GV21, d10_GI43, d21_GI65, d26_GV87, d27_GU65, d28_GU87, d29_GV65, d30_GI87]
fsx.write(BEAMX), [fsx.write(' '+str(d[i][GRATINGLR][BEAMX][j])) for j in range(len(d[i][GRATINGLR][BEAMX]))], fsx.write('\n')
fsx.write(MMAG_EXTRACT_X+' '+str(d[i][GRATINGLR][MMAG_EXTRACT_X])+'\n')
fsx.write(MMAG_MARK_X+' '+str(d[i][GRATINGLR][MMAG_MARK_X])+'\n')
fsx.write('# '+'\n')
fsx.write('# Trace description '+'\n')
fsx.write('# '+'\n')
fsx.write(DYDX_ORDER_X+' '+str(d[i][GRATINGLR][DYDX_ORDER_X])+'\n')
fsx.write(DYDX_X_0), [fsx.write(' '+str(d[i][GRATINGLR][DYDX_X_0][j])) for j in range(len(d[i][GRATINGLR][DYDX_X_0]))], fsx.write('\n')
if BEAMX == 'BEAMB':
pass
else:
fsx.write(DYDX_X_1), [fsx.write(' '+str(d[i][GRATINGLR][DYDX_X_1][j])) for j in range(len(d[i][GRATINGLR][DYDX_X_1]))], fsx.write('\n')
fsx.write('# '+'\n')
fsx.write('# X and Y Offsets '+'\n')
fsx.write('# '+'\n')
fsx.write(XOFF_X+' '+str(d[i][GRATINGLR][XOFF_X])+'\n')
fsx.write(YOFF_X+' '+str(d[i][GRATINGLR][YOFF_X])+'\n')
fsx.write('# '+'\n')
fsx.write('# Dispersion solution '+'\n')
fsx.write('# '+'\n')
fsx.write(DISP_ORDER_X+' '+str(d[i][GRATINGLR][DISP_ORDER_X])+'\n')
fsx.write(DLDP_X_0), [fsx.write(' '+str(d[i][GRATINGLR][DLDP_X_0][j])) for j in range(len(d[i][GRATINGLR][DLDP_X_0]))], fsx.write('\n')
fsx.write(DLDP_X_1), [fsx.write(' '+str(d[i][GRATINGLR][DLDP_X_1][j])) for j in range(len(d[i][GRATINGLR][DLDP_X_1]))], fsx.write('\n')
fsx.write('# '+'\n')
if BEAMX == 'BEAMA':
ordername = '1st'
fsx.write('SENSITIVITY_A CSST_MSC_MS_SENSITIVITY_'+SEN[i]+'_1st.fits'+'\n')
elif BEAMX == 'BEAMB':
ordername = '0st'
fsx.write('SENSITIVITY_B CSST_MSC_MS_SENSITIVITY_'+SEN[i]+'_0st.fits'+'\n')
elif BEAMX == 'BEAMC':
ordername = '-1st'
fsx.write('SENSITIVITY_C CSST_MSC_MS_SENSITIVITY_'+SEN[i]+'_-1st.fits'+'\n')
elif BEAMX == 'BEAMD':
ordername = '2st'
fsx.write('SENSITIVITY_D CSST_MSC_MS_SENSITIVITY_'+SEN[i]+'_2st.fits'+'\n')
elif BEAMX == 'BEAME':
ordername = '-2st'
fsx.write('SENSITIVITY_E CSST_MSC_MS_SENSITIVITY_'+SEN[i]+'_-2st.fits'+'\n')
fsx.write('# '+'\n'+'\n')
def fsave_conf(dir_save, GLR, GRATINGLR, i):
c = dir_save+'CSST_MSC_MS_'+GLR[i]+'.conf'
os.system('> '+c)
fs = open(c, 'a')
fwriteKEY(fs, i)
fs.write('# 1 order (BEAM A) *******************'+'\n')
fwriteBEAM(dir_save, fs, i, GRATINGLR, 'BEAMA', 'MMAG_EXTRACT_A', 'MMAG_MARK_A', 'DYDX_ORDER_A', 'DYDX_A_0', 'DYDX_A_1', 'XOFF_A', 'YOFF_A', 'DISP_ORDER_A', 'DLDP_A_0', 'DLDP_A_1')
fs.write('\n# 0 order (BEAM B) *******************'+'\n')
fwriteBEAM(dir_save, fs, i, GRATINGLR, 'BEAMB', 'MMAG_EXTRACT_B', 'MMAG_MARK_B', 'DYDX_ORDER_B', 'DYDX_B_0', 'DYDX_B_1', 'XOFF_B', 'YOFF_B', 'DISP_ORDER_B', 'DLDP_B_0', 'DLDP_B_1')
fs.write('\n# -1 order (BEAM C) *******************'+'\n')
fwriteBEAM(dir_save, fs, i, GRATINGLR, 'BEAMC', 'MMAG_EXTRACT_C', 'MMAG_MARK_C', 'DYDX_ORDER_C', 'DYDX_C_0', 'DYDX_C_1', 'XOFF_C', 'YOFF_C', 'DISP_ORDER_C', 'DLDP_C_0', 'DLDP_C_1')
fs.write('\n# 2 order (BEAM D) *******************'+'\n')
fwriteBEAM(dir_save, fs, i, GRATINGLR, 'BEAMD', 'MMAG_EXTRACT_D', 'MMAG_MARK_D', 'DYDX_ORDER_D', 'DYDX_D_0', 'DYDX_D_1', 'XOFF_D', 'YOFF_D', 'DISP_ORDER_D', 'DLDP_D_0', 'DLDP_D_1')
fs.write('\n# -2 order (BEAM E) *******************'+'\n')
fwriteBEAM(dir_save, fs, i, GRATINGLR, 'BEAME', 'MMAG_EXTRACT_E', 'MMAG_MARK_E', 'DYDX_ORDER_E', 'DYDX_E_0', 'DYDX_E_1', 'XOFF_E', 'YOFF_E', 'DISP_ORDER_E', 'DLDP_E_0', 'DLDP_E_1')
fs.close()
def get_slsconf(dir_save=".", **kwargs):
""" save SLS conf files to `dir_save` """
# resave the sensitivity.fits
for chipid, flt in zip(['01', '02', '03'], ['GI', 'GV', 'GU']):
resave_sensitivity(dir_save,chipid, flt)
# save CSST_MSC_MS_*.conf
for i in range(0,12):
fsave_conf(dir_save, GL, 'GRATINGL', i)
fsave_conf(dir_save, GR, 'GRATINGR', i)
pass
---
pml:
CSST_DFS_API_MODE: "cluster"
CSST_DFS_GATEWAY: "172.24.27.2:30880"
CSST_DFS_APP_ID: "test"
CSST_DFS_APP_TOKEN: "test"
kmust:
CSST_DFS_API_MODE: "cluster"
CSST_DFS_GATEWAY: "222.197.214.168:30880"
CSST_DFS_APP_ID: "1"
CSST_DFS_APP_TOKEN: "1"
tcc:
CSST_DFS_API_MODE: "cluster"
CSST_DFS_GATEWAY: "10.0.0.8:30880"
CSST_DFS_APP_ID: "test"
CSST_DFS_APP_TOKEN: "test"
This diff is collapsed.
"""
Identifier: KSC-SJ4-csst_common/__init__.py
Name: __init__.py
Description: csst_common package
Author: Bo Zhang
Created: 2022-09-13
Modified-History:
2022-09-13, Bo Zhang, created
2022-09-13, Bo Zhang, fixed a bug
2023-12-39, Bo Zhang, deprecated
"""
import os
from collections import namedtuple
from astropy import table
FileRecord = namedtuple("FileRecord", ["filepath", "db", "comment", "existence"])
class FileRecorder(list):
"""
FileRecord Recorder, inherited from the built-in ``list``.
This recorder is used to record files generated by functional modules.
In principle, a CSST pipeline module should return a status (CsstStatus)
and a file recorder (``FileRecorder``) after it finishes data processing.
Parameters
----------
*args : any
The arguments passed to ``list()``.
**kwargs : any
The keyword arguments passed to ``list()``.
Methods
-------
add_record(filepath: str = "", db: bool = False, comment: str = "")
Add a file record, each record is a ``collections.namedtuple``.
Users should provide a file path ``filepath``, whether this file should be
written to database ``db``, a comment ``comment``.
An existence boolean will be attached to check if the file exists.
to_table()
Convert to ``astropy.table.Table``.
pprint(*args, **kwargs)
Use ``astropy.table.Table.pprint`` to print file records in table format.
pprint_all(*args, **kwargs)
Use ``astropy.table.Table.pprint_all`` to print file records in table format.
Examples
--------
>>> fr = FileRecorder()
>>> for i in range(3):
>>> fr.add_record("test{:03d}.txt".format(i), db=True, comment="Test file {:d}".format(i))
>>> fr.pprint_all()
<FileRecorder length=3>
filepath db comment existence
----------- ---- ----------- ---------
test000.txt True Test file 0 False
test001.txt True Test file 1 False
test002.txt True Test file 2 False
>>> fr.pprint_all()
<FileRecorder length=3>
filepath db comment existence
----------- ---- ----------- ---------
test000.txt True Test file 0 False
test001.txt True Test file 1 False
test002.txt True Test file 2 False
"""
def __init__(self, *args, **kwargs):
super(FileRecorder, self).__init__(*args, **kwargs)
@staticmethod
def FileRecord(filepath: str = "", db: bool = False, comment: str = ""):
return FileRecord(
filepath=filepath,
db=db,
comment=comment,
existence=os.path.exists(filepath),
)
def add_record(self, filepath: str = "", db: bool = False, comment: str = ""):
existence = os.path.exists(filepath)
assert isinstance(filepath, str)
assert isinstance(db, bool)
assert isinstance(comment, str)
super().append(
FileRecord(filepath=filepath, db=db, comment=comment, existence=existence)
)
def to_table(self):
return table.Table([_._asdict() for _ in self])
def pprint(self, *args, **kwargs):
print("<FileRecorder length={}>".format(len(self)))
return self.to_table().pprint(*args, **kwargs)
def pprint_all(self, *args, **kwargs):
print("<FileRecorder length={}>".format(len(self)))
return self.to_table().pprint_all(*args, **kwargs)
def __repr__(self):
t = self.to_table()
lines, outs = t.formatter._pformat_table(
t,
max_lines=-1,
max_width=-1,
show_name=True,
show_unit=None,
show_dtype=False,
align="<",
)
if outs["show_length"]:
lines.append(f"Length = {len(self)} rows")
return "\n".join(lines)
@property
def summary(self):
if len(self) == 0:
return "0 alleged, 0 missing, 0 to db"
else:
return (
f"{len(self)} alleged, "
f"{len(self) - sum(self.to_table()['existence'])} missing, "
f"{sum(self.to_table()['existence'])} to db"
)
def is_good(self):
"""Check if all alleged files exist."""
return all(self.to_table()["existence"])
from astropy.io import fits
def check_file(file_path="test.fits") -> bool:
pass
def _check_file_fits() -> bool:
"""Validate checksum for .fits files."""
return True
...@@ -23,9 +23,9 @@ def get_logger(name: str = "CSST pipeline", filename: Optional[str] = None): ...@@ -23,9 +23,9 @@ def get_logger(name: str = "CSST pipeline", filename: Optional[str] = None):
Parameters Parameters
---------- ----------
name : str name : str
The logger name. The logger pipeline_id.
filename : str filename : str
The log file name. The log file pipeline_id.
Returns Returns
------- -------
...@@ -50,7 +50,7 @@ def get_logger(name: str = "CSST pipeline", filename: Optional[str] = None): ...@@ -50,7 +50,7 @@ def get_logger(name: str = "CSST pipeline", filename: Optional[str] = None):
# logging formatter # logging formatter
formatter = logging.Formatter( formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(module)s.py:%(lineno)d - %(message)s" "%(asctime)s - %(levelname)s - %(module)s.py:%(lineno)d - %(msg)s"
) )
# stream handler # stream handler
......
...@@ -9,11 +9,14 @@ Modified-History: ...@@ -9,11 +9,14 @@ Modified-History:
2022-09-13, Bo Zhang, added CSST_PARAMS 2022-09-13, Bo Zhang, added CSST_PARAMS
2022-10-28, Bo Zhang, added DFS_CONF 2022-10-28, Bo Zhang, added DFS_CONF
""" """
import os.path
import yaml import yaml
from . import PACKAGE_PATH
PACKAGE_PATH = os.path.dirname(__file__)
with open(PACKAGE_PATH + "/data/csst_params.yml") as f: with open(PACKAGE_PATH + "/data/csst_params.yml") as f:
CSST_PARAMS = yaml.safe_load(f) CSST_PARAMS = yaml.safe_load(f)
with open(PACKAGE_PATH + "/data/node_conf.yml") as f: # with open(PACKAGE_PATH + "/data/node_conf.yml") as f:
DFS_CONF = yaml.safe_load(f) # DFS_CONF = yaml.safe_load(f)
...@@ -2,10 +2,11 @@ import json ...@@ -2,10 +2,11 @@ import json
import os import os
import subprocess import subprocess
import warnings import warnings
from typing import Any
from astropy import time from astropy import time
from .crds import CRDS from .ccds import CCDS
from .dfs import DFS from .dfs import DFS
from .logger import get_logger from .logger import get_logger
...@@ -13,83 +14,83 @@ from .logger import get_logger ...@@ -13,83 +14,83 @@ from .logger import get_logger
class Pipeline: class Pipeline:
def __init__( def __init__(
self, self,
dir_in="/pipeline/input", dir_input: str = "/pipeline/input",
dir_out="/pipeline/output", dir_output: str = "/pipeline/output",
dir_aux="/pipeline/aux", dir_aux: str = "/pipeline/aux",
dfs_root="/dfsroot", dfs_root: str = "/dfs_root",
crds_root="/crdsroot", ccds_root: str = "/ccds_root",
crds_cache="/pipeline/crds_cache", ccds_cache: str = "/pipeline/ccds_cache",
clean=True, filter_warnings: bool = False,
n_jobs_cpu=18, dfs: bool = True,
n_jobs_gpu=9, ccds: bool = False,
device="CPU", **kwargs: Any,
filter_warnings=False,
dfs=True,
crds=False,
): ):
# get pipeline information from environ # get pipeline information from env vars
self.name = os.getenv("PIPELINE", "-") self.pipeline_id = os.getenv("PIPELINE_ID", "-")
self.build = os.getenv("BUILD", "-") self.build = os.getenv("BUILD", "-")
self.created = os.getenv("CREATED", "-")
# set directory information # set directory information
self.dir_in = dir_in self.dir_input = dir_input
self.dir_out = dir_out self.dir_output = dir_output
self.dir_aux = dir_aux self.dir_aux = dir_aux
self.dfs_root = dfs_root self.dfs_root = dfs_root
self.crds_root = crds_root self.ccds_root = ccds_root
self.crds_cache = crds_cache self.ccds_cache = ccds_cache
self.clean = clean
# set resource information # additional parameters
self.n_jobs_cpu = n_jobs_cpu self.kwargs = kwargs
self.n_jobs_gpu = n_jobs_gpu
self.device = device.upper()
# set logger # set logger
self.pipeline_logger = get_logger(name="pipeline", filename=os.path.join(self.dir_out, "pipeline.log")) self.pipeline_logger = get_logger(
self.module_logger = get_logger(name="module", filename=os.path.join(self.dir_out, "module.log")) name="pipeline",
filename=os.path.join(
self.dir_output,
"pipeline.log",
),
)
self.module_logger = get_logger(
name="module",
filename=os.path.join(
self.dir_output,
"module.log",
),
)
# change working directory # change working directory
print(f"Change directory to {self.dir_out}") print(f"Change directory to {self.dir_output}")
os.chdir(self.dir_out) os.chdir(self.dir_output)
# clean input/output directory
if self.clean:
self.clean_directory(self.dir_in)
self.clean_directory(self.dir_out)
# Frequently used files # Frequently used files
self.message = Message(os.path.join(self.dir_out, "msg.txt")) self.msg = MessageWriter(
self.time_stamp = TimeStamp(os.path.join(self.dir_out, "time_stamp.txt")) os.path.join(self.dir_output, "message.txt"),
self.exit_code = ExitCode(os.path.join(self.dir_out, "exit_code")) )
self.error_trace = ErrorTrace(os.path.join(self.dir_out, "error_trace")) self.tsr = TimeStampRecorder(
os.path.join(self.dir_output, "time_stamp.txt"),
)
# self.exit_code = ExitCode(os.path.join(self.dir_output, "exit_code"))
# self.error_trace = ErrorTrace(os.path.join(self.dir_output, "error_trace"))
if dfs: if dfs:
self.dfs = DFS(n_try=5) self.dfs = DFS(n_try=5)
else: else:
self.dfs = None self.dfs = None
if crds: if ccds:
self.crds = CRDS() self.ccds = CCDS()
else: else:
self.crds = None self.ccds = None
if filter_warnings: if filter_warnings:
self.filter_warnings() self.filter_warnings()
self.ban_multithreading() def inspect(self):
print(f"PIPELINE_ID={self.pipeline_id}")
print(f"BUILD={self.build}")
print(f"CREATED={self.created}")
def ban_multithreading(self): def clean_output(self):
os.environ["OMP_NUM_THREADS"] = "1" """Clean output directory."""
os.environ["OPENBLAS_NUM_THREADS"] = "1" self.clean_directory(self.dir_output)
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
def set_test_env(self):
os.environ["CSST_DFS_API_MODE"] = "cluster"
os.environ["CSST_DFS_GATEWAY"] = "172.24.27.2:30880"
os.environ["CSST_DFS_APP_ID"] = "test"
os.environ["CSST_DFS_APP_TOKEN"] = "test"
os.environ["CRDS_SERVER_URL"] = "http://172.24.27.2:29000"
@staticmethod @staticmethod
def clean_directory(d): def clean_directory(d):
...@@ -112,19 +113,23 @@ class Pipeline: ...@@ -112,19 +113,23 @@ class Pipeline:
warnings.resetwarnings() warnings.resetwarnings()
class ErrorTrace: # class ErrorTrace:
def __init__(self, file_path=""): # """Write error trace to file."""
self.file_path = file_path #
# def __init__(self, file_path=""):
# self.file_path = file_path
#
# def __repr__(self):
# return f"< ErrorTrace [{self.file_path}] >"
#
# def write(self, s: str):
# with open(self.file_path, "w+") as f:
# f.write(s)
def __repr__(self):
return f"< ErrorTrace [{self.file_path}] >"
def write(self, s: str): class MessageWriter:
with open(self.file_path, "w+") as f: """Write JSON format messages to file."""
f.write(s)
class Message:
def __init__(self, file_path=""): def __init__(self, file_path=""):
self.file_path = file_path self.file_path = file_path
...@@ -147,34 +152,47 @@ class Message: ...@@ -147,34 +152,47 @@ class Message:
return d return d
class ExitCode: # DEPRECATED
def __init__(self, file_path=""): # class ExitCode:
self.file_path = file_path # def __init__(self, file_path=""):
# self.file_path = file_path
def __repr__(self): #
return f"< ExitCode [{self.file_path}] >" # def __repr__(self):
# return f"< ExitCode [{self.file_path}] >"
def truncate(self): #
with open(self.file_path, 'w') as file: # def truncate(self):
file.truncate(0) # with open(self.file_path, "w") as file:
# file.truncate(0)
def write(self, code=0): #
with open(self.file_path, "w+") as f: # def write(self, code=0):
f.write(str(code)) # with open(self.file_path, "w+") as f:
print(f"Exit with code {code} (written to '{self.file_path}')") # f.write(str(code))
# print(f"Exit with code {code} (written to '{self.file_path}')")
class TimeStamp:
def __init__(self, file_path=""): class TimeStampRecorder:
def __init__(self, file_path: str = "tsr.txt"):
"""
TimeStampRecorder Class.
Initialize a TimeStampRecorder object anc connect it to `file_path`.
Parameters
----------
file_path : str
Time stamp file path.
"""
self.file_path = file_path self.file_path = file_path
def __repr__(self): def __repr__(self):
return f"< TimeStamp [{self.file_path}] >" return f"< TimeStampRecorder [{self.file_path}] >"
def truncate(self): def empty(self):
with open(self.file_path, 'w') as file: """Clean time stamp file."""
with open(self.file_path, "w") as file:
file.truncate(0) file.truncate(0)
def punch_in(self): def touch(self):
"""Write a time stamp."""
with open(self.file_path, "a+") as f: with open(self.file_path, "a+") as f:
f.write(f"{time.Time.now().isot}+00:00\n") f.write(f"{time.Time.now().isot}+00:00\n")
from ._module_docstr import ModuleHeader from ._module_docstr import ModuleHeader
from ._io import remove_dir, remove_files
from .tempfile import randfile from .tempfile import randfile
import glob
import os
import shutil
def remove_files(fmt="*post.fits"):
""" Remove files matching the specified format. """
for fp in glob.glob(fmt):
os.remove(fp)
def remove_dir(path=""):
""" Remove the specified directory. """
if os.path.exists(path):
shutil.rmtree(path)
...@@ -15,7 +15,7 @@ import shutil ...@@ -15,7 +15,7 @@ import shutil
import numpy as np import numpy as np
from csst_common import PACKAGE_PATH PACKAGE_PATH = os.path.dirname(os.path.dirname(__file__))
with open(PACKAGE_PATH + "/data/module_header.txt", "r") as f: with open(PACKAGE_PATH + "/data/module_header.txt", "r") as f:
...@@ -50,7 +50,7 @@ class ModuleHeader: ...@@ -50,7 +50,7 @@ class ModuleHeader:
identifier: str identifier: str
the identifier, e.g., SJ4 the identifier, e.g., SJ4
author: str author: str
author name author pipeline_id
description: str description: str
description of the module description of the module
ignore_init: bool ignore_init: bool
......
...@@ -24,4 +24,3 @@ def randfile(digits=20, ext=".fits"): ...@@ -24,4 +24,3 @@ def randfile(digits=20, ext=".fits"):
# 使用secrets模块生成指定长度的随机字符串 # 使用secrets模块生成指定长度的随机字符串
random_string = ''.join(secrets.choice(characters) for _ in range(digits)) + ext random_string = ''.join(secrets.choice(characters) for _ in range(digits)) + ext
return random_string return random_string
import functools
import time
import traceback
from collections import namedtuple
from astropy.time import Time
from csst_common.data_manager import CsstMsDataManager
from csst_common.file_recorder import FileRecorder
from csst_common.status import CsstStatus
__all__ = ["ModuleResult", "l1ppl_module", "module_wrapper"]
# module should return ModuleResult as result
ModuleResult = namedtuple("ModuleResult", ["module", "timecost", "status", "fr", "output"])
def l1ppl_module(func):
@functools.wraps(func)
def call_l1ppl_module(dm: CsstMsDataManager, *args, **kwargs):
dm.logger_ppl.info(f"=====================================================")
t_start = time.time()
dm.logger_ppl.info(f"Starting Module: **{func.__name__}**")
# dm.logger_ppl.info(f"Additional arguments: {args} {kwargs}")
try:
# if the module finishes
status, fr, *output = func(dm, *args, **kwargs)
except Exception as e:
# if the module raises error
exc_info = traceback.format_exc() # traceback info
dm.logger_ppl.error(f"Error occurs! \n{exc_info}")
status = CsstStatus.ERROR # default status if exceptions occur
fr = FileRecorder() # default FileRecorder if exceptions occur
output = [exc_info, ] # default output if exceptions occur
finally:
t_stop = time.time()
t_cost = t_stop - t_start
if status in [CsstStatus.PERFECT, CsstStatus.WARNING, CsstStatus.ERROR]:
# perfect / warning / error
dm.logger_ppl.info(f"Module finished with status: {status} - time cost: {t_cost:.1f} sec")
else:
# invalid status
dm.logger_ppl.error(f"Invalid status: {status}")
# record exception traceback info
dm.logger_ppl.info(
f"ModuleResult: \n"
f" - name: {func.__name__}\n"
f" - status: {status}\n"
f" - additional output: {output}\n"
f" - fr: [{fr.summary}]\n{fr}\n"
)
# write time stamp
dm.write_stamp()
return ModuleResult(func.__name__, t_cost, status, fr, output)
return call_l1ppl_module
def module_wrapper(func):
@functools.wraps(func)
def call_module(logger, tstamp=None, *args, **kwargs):
logger.info(f"=====================================================")
t_start = time.time()
logger.info(f"Starting Module: **{func.__name__}**")
# logger.info(f"Additional arguments: {args} {kwargs}")
try:
# if the module finishes
status, fr, *output = func(*args, **kwargs)
except Exception as e:
# if the module raises error
exc_info = traceback.format_exc() # traceback info
logger.error(f"Error occurs! \n{exc_info}")
status = CsstStatus.ERROR # default status if exceptions occur
fr = FileRecorder() # default FileRecorder if exceptions occur
output = [exc_info, ] # default output if exceptions occur
finally:
t_stop = time.time()
t_cost = t_stop - t_start
if status in [CsstStatus.PERFECT, CsstStatus.WARNING, CsstStatus.ERROR]:
# perfect / warning / error
logger.info(f"Module finished with status: {status} - time cost: {t_cost:.1f} sec")
else:
# invalid status
logger.error(f"Invalid status: {status}")
# record exception traceback info
logger.info(
f"ModuleResult: \n"
f" - name: {func.__name__}\n"
f" - status: {status}\n"
f" - additional output: {output}\n"
f" - fr: [{fr.summary}]\n{fr}\n"
)
# write time stamp
if tstamp is not None:
with open(tstamp, "a+") as f:
f.write(f"{Time.now().isot}+08:00\n")
return ModuleResult(func.__name__, t_cost, status, fr, output)
return call_module
if __name__ == "__main__":
@l1ppl_module
def call_add(dm, a, b):
if isinstance(a, float) and isinstance(b, float):
return ModuleResult(CsstStatus.PERFECT, None, a + b)
else:
return ModuleResult(CsstStatus.ERROR, None, (a, b))
# dm = CsstMsDataManager()
# print(call_add(dm, 1., 2.))
# print(call_add(dm, 1., None))
""" # """
Identifier: KSC-SJ4-tests/test_data_manager.py # Identifier: KSC-SJ4-tests/test_data_manager.py
Name: test_data_manager.py # Name: test_data_manager.py
Description: data manager unit test # Description: data manager unit test
Author: Bo Zhang # Author: Bo Zhang
Created: 2022-09-13 # Created: 2022-09-13
Modified-History: # Modified-History:
2022-09-13, Bo Zhang, created # 2022-09-13, Bo Zhang, created
2022-09-29, Bo Zhang, added test for CsstMbiDataManager # 2022-09-29, Bo Zhang, added test for CsstMbiDataManager
2022-10-28, Bo Zhang, deleted unit test for CsstMsDataManager # 2022-10-28, Bo Zhang, deleted unit test for CsstMsDataManager
""" # """
import os # import os
import unittest # import unittest
#
from csst_common.data_manager import CsstMsDataManager # from csst_common.data_manager import CsstMsDataManager
from csst_common.params import CSST_PARAMS as CP # from csst_common.params import CSST_PARAMS as CP
#
dir_unittest = "/nfsdata/share/pipeline-unittest/csst_common" # dir_unittest = "/nfsdata/share/pipeline-unittest/csst_common"
#
#
class TestCsstMsDataManager(unittest.TestCase): # class TestCsstMsDataManager(unittest.TestCase):
def setUp(self) -> None: # def setUp(self) -> None:
self.dm_mbi = CsstMsDataManager.quickstart( # self.dm_mbi = CsstMsDataManager.quickstart(
ver_sim="C6.2", datatype="mbi", dir_l1=dir_unittest, exposure_id=100) # ver_sim="C6.2", datatype="mbi", dir_l1=dir_unittest, exposure_id=100)
self.dm_sls = CsstMsDataManager.quickstart( # self.dm_sls = CsstMsDataManager.quickstart(
ver_sim="C6.2", datatype="sls", dir_l1=dir_unittest, exposure_id=100) # ver_sim="C6.2", datatype="sls", dir_l1=dir_unittest, exposure_id=100)
self.dm_mbi.target_detectors = None # self.dm_mbi.target_detectors = None
self.dm_sls.target_detectors = None # self.dm_sls.target_detectors = None
#
def test_mbi_data_existence(self): # def test_mbi_data_existence(self):
self.assertTrue(self.dm_mbi.target_detectors == CP["mbi"]["detectors"]) # self.assertTrue(self.dm_mbi.target_detectors == CP["mbi"]["detectors"])
self.assertTrue(os.path.exists(self.dm_mbi.l0_detector(6))) # self.assertTrue(os.path.exists(self.dm_mbi.l0_detector(6)))
self.assertTrue(os.path.exists(self.dm_mbi.l0_log(6))) # self.assertTrue(os.path.exists(self.dm_mbi.l0_log(6)))
self.assertTrue(os.path.exists(self.dm_mbi.l0_cat(6))) # self.assertTrue(os.path.exists(self.dm_mbi.l0_cat(6)))
self.assertTrue(os.path.exists(self.dm_mbi.l0_crs(6))) # self.assertTrue(os.path.exists(self.dm_mbi.l0_crs(6)))
self.assertTrue(isinstance(self.dm_mbi.l1_detector(6, post="img.fits"), str)) # self.assertTrue(isinstance(self.dm_mbi.l1_detector(6, post="img.fits"), str))
self.assertTrue(isinstance(self.dm_mbi.l1_file(name="some_file.ext", comment="a demo file"), str)) # self.assertTrue(isinstance(self.dm_mbi.l1_file(pipeline_id="some_file.ext", comment="a demo file"), str))
#
def test_sls_data_existence(self): # def test_sls_data_existence(self):
self.assertTrue(self.dm_sls.target_detectors == CP["sls"]["detectors"]) # self.assertTrue(self.dm_sls.target_detectors == CP["sls"]["detectors"])
self.assertTrue(os.path.exists(self.dm_sls.l0_detector(1))) # self.assertTrue(os.path.exists(self.dm_sls.l0_detector(1)))
self.assertTrue(os.path.exists(self.dm_sls.l0_log(1))) # self.assertTrue(os.path.exists(self.dm_sls.l0_log(1)))
self.assertTrue(os.path.exists(self.dm_sls.l0_cat(1))) # self.assertTrue(os.path.exists(self.dm_sls.l0_cat(1)))
self.assertTrue(os.path.exists(self.dm_sls.l0_crs(1))) # self.assertTrue(os.path.exists(self.dm_sls.l0_crs(1)))
self.assertTrue(isinstance(self.dm_sls.l1_detector(1, post="flt.fits"), str)) # self.assertTrue(isinstance(self.dm_sls.l1_detector(1, post="flt.fits"), str))
self.assertTrue(isinstance(self.dm_sls.l1_file(name="some_file.ext", comment="a demo file"), str)) # self.assertTrue(isinstance(self.dm_sls.l1_file(pipeline_id="some_file.ext", comment="a demo file"), str))
#
# DFS is not always available # # DFS is not always available
# def test_dfs_is_available(self): # # def test_dfs_is_available(self):
# self.assertTrue(self.dm_mbi.dfs_is_available()) # # self.assertTrue(self.dm_mbi.dfs_is_available())
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment