Commit aa4f0df0 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

updated pipeline

parent 7c80839b
# def do_one_exposure():
import glob import glob
import os import os
from csst.msc.backbone import CCD_ID_LIST, VER_SIMS
from csst.msc.calib_flux import CsstProcFluxCalibration
from csst.msc.calib_pos import CsstProcMscPositionCalibration from csst.msc.calib_pos import CsstProcMscPositionCalibration
from csst.msc.data import CsstMscImgData from csst.msc.data import CsstMscImgData
from csst.msc.inst_corr import CsstMscInstrumentProc from csst.msc.inst_corr import CsstMscInstrumentProc
from csst.msc.phot import CsstMscPhotometryProc
HOSTNAME = os.uname()[1] CONFIG_DANDELION = dict(
if HOSTNAME == "tulip": # test and working directory
# on Tulip dir_raw="/home/csstpipeline/L1Pipeline/msc/MSC_0000020",
DIR_TEST = "/share/Cycle-3-SimuData/multipleBandsImaging/CSST_shearOFF/MSC_0000020/" # MSC_MS_210525220000_100000020_06_raw.fits dir_work="/home/csstpipeline/L1Pipeline/msc/work/",
PATH_BIAS = "/share/HDD7/csstpipeline/ref/MSC_CLB_210525200000_100000016_{:02d}_combine.fits" filename_fmt="{}/MSC_MS_*_{:02}_raw.fits",
PATH_DARK = "/share/HDD7/csstpipeline/ref/MSC_CLD_210525202000_100000016_{:02d}_combine.fits"
PATH_FLAT = "/share/HDD7/csstpipeline/ref/MSC_CLF_210525201000_100000016_{:02d}_combine.fits"
DIR_WORK = "/share/HDD7/csstpipeline/msc"
# gaia catalog directory (for position calibration)
DIR_GAIA_CATALOG = ""
elif HOSTNAME == "Dandelion":
# on Dandelion # on Dandelion
DIR_TEST = "/home/csstpipeline/L1Pipeline/msc/MSC_0000020" path_aux="/home/csstpipeline/L1Pipeline/msc/ref/MSC_{}_*_{:02d}_combine.fits",
PATH_BIAS = "/home/csstpipeline/L1Pipeline/msc/ref/MSC_CLB_210525200000_100000016_{:02d}_combine.fits"
PATH_DARK = "/home/csstpipeline/L1Pipeline/msc/ref/MSC_CLD_210525202000_100000016_{:02d}_combine.fits"
PATH_FLAT = "/home/csstpipeline/L1Pipeline/msc/ref/MSC_CLF_210525201000_100000016_{:02d}_combine.fits"
# working directory
DIR_WORK = "/home/csstpipeline/L1Pipeline/msc/work/"
# gaia catalog directory (for position calibration) # gaia catalog directory (for position calibration)
DIR_GAIA_CATALOG = "/home/csstpipeline/L1Pipeline/msc/gaia_dr3/" dir_gaia_catalog="/home/csstpipeline/L1Pipeline/msc/gaia_dr3/",
# version of simulation data
ver_sim="C3",
# only 18 cores available in cloud machine from PMO # only 18 cores available in cloud machine from PMO
NJOBS = 18 n_jobs=18,
# shut down backend multithreading
else: backend_multithreading=False
raise ValueError("Invalid HOSTNAME {}!".format(HOSTNAME)) )
# define CCD ID list
CCD_ID_LIST = [6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22, 23, 24, 25] CONFIG_PMO = dict(
# test and working directory
# prohibit multi-threading in backend dir_raw="/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/NGP_AstrometryON_shearOFF/MSC_0000100",
os.environ["MKL_NUM_THREADS"] = '1' dir_work="/home/user/L1Pipeline/msc/work/",
os.environ["NUMEXPR_NUM_THREADS"] = '1' filename_fmt="{}/CSST_MSC_MS_CRS_*_{:02d}_L0_1.fits",
os.environ["OMP_NUM_THREADS"] = '1' # on PMO
path_aux="/home/user/L1Pipeline/msc/ref/MSC_{}_*_{:02d}_combine.fits",
# Step 1. Correct instrumental effect # gaia catalog directory (for position calibration)
os.chdir(DIR_WORK) dir_gaia_catalog="/home/csstpipeline/L1Pipeline/msc/gaia_dr3/",
# version of simulation data
img_list = [] ver_sim="C5.1",
wht_list = [] # only 18 cores available in cloud machine from PMO
flg_list = [] n_jobs=18,
fn_list = [] # shut down backend multithreading
for i_ccd in CCD_ID_LIST: backend_multithreading=False
)
def do_one_exposure(dir_raw="", dir_work="", filename_fmt="", path_aux="", dir_gaia_catalog="", ver_sim="C5.1", ccd_ids=None,
n_jobs=18, backend_multithreading=False):
# currently C3 and C5.1 are tested
try:
assert ver_sim in VER_SIMS
except AssertionError as ae:
print("Available options for ver_sim: ", VER_SIMS)
raise ae
# shut down backend multi-threading
if not backend_multithreading:
# prohibit multi-threading in backend
os.environ["MKL_NUM_THREADS"] = '1'
os.environ["NUMEXPR_NUM_THREADS"] = '1'
os.environ["OMP_NUM_THREADS"] = '1'
if ccd_ids is None:
ccd_ids = CCD_ID_LIST
# Step 1. Correct instrumental effect
os.chdir(dir_work)
img_list = []
wht_list = []
flg_list = []
fn_list = []
for i_ccd in ccd_ids:
print("processing CCD {}".format(i_ccd)) print("processing CCD {}".format(i_ccd))
fp_raw = glob.glob("{}/MSC_MS_*_{:02}_raw.fits".format(DIR_TEST, i_ccd)) fp_raw = glob.glob(filename_fmt.format(dir_raw, i_ccd))
assert len(fp_raw) == 1 assert len(fp_raw) == 1
fp_raw = fp_raw[0] fp_raw = fp_raw[0]
print(fp_raw)
# fn_raw = os.path.basename(fp_raw)
# read data with CsstMscImgData.read # read data with CsstMscImgData.read
raw = CsstMscImgData.read(fp_raw) raw = CsstMscImgData.read(fp_raw)
# in future, get_* functions grab # in the future, use get_* functions grab
bias = raw.get_bias(PATH_BIAS.format(i_ccd)) bias = raw.get_bias(path_aux.format("CLB", i_ccd))
dark = raw.get_dark(PATH_DARK.format(i_ccd)) dark = raw.get_dark(path_aux.format("CLD", i_ccd))
flat = raw.get_flat(PATH_FLAT.format(i_ccd)) flat = raw.get_flat(path_aux.format("CLF", i_ccd))
# initialize Instrument Processor # initialize Instrument Processor
instProc = CsstMscInstrumentProc() instProc = CsstMscInstrumentProc()
instProc.prepare(n_jobs=2) instProc.prepare(n_jobs=n_jobs)
img, wht, flg = instProc.run(raw, bias, dark, flat) img, wht, flg = instProc.run(raw, bias, dark, flat, ver_sim)
instProc.cleanup() instProc.cleanup()
fp_img = img[0].header["FILENAME"] + '.fits' fp_img = img[0].header["FILENAME"] + '.fits'
# save img, wht, flg to somewhere # save img, wht, flg to somewhere
img.writeto("{}/{}.fits".format(DIR_WORK, img.get_keyword("FILENAME")), overwrite=True) img.writeto("{}/{}.fits".format(dir_work, img.get_keyword("FILENAME")), overwrite=True)
wht.writeto("{}/{}.fits".format(DIR_WORK, wht.get_keyword("FILENAME")), overwrite=True) wht.writeto("{}/{}.fits".format(dir_work, wht.get_keyword("FILENAME")), overwrite=True)
flg.writeto("{}/{}.fits".format(DIR_WORK, flg.get_keyword("FILENAME")), overwrite=True) flg.writeto("{}/{}.fits".format(dir_work, flg.get_keyword("FILENAME")), overwrite=True)
# save header # save header
img[1].header.tofile("{}/{}.head".format(DIR_WORK, img.get_keyword("FILENAME").replace(".fits", "")), img[1].header.tofile(
overwrite=True) "{}/{}.head".format(dir_work, img.get_keyword("FILENAME").replace(".fits", "")), overwrite=True)
# append img, wht, flg list # append img, wht, flg list
img_list.append(img) img_list.append(img)
...@@ -84,38 +107,97 @@ for i_ccd in CCD_ID_LIST: ...@@ -84,38 +107,97 @@ for i_ccd in CCD_ID_LIST:
flg_list.append(flg) flg_list.append(flg)
fn_list.append(fp_img) fn_list.append(fp_img)
# Step 2. Calibrate Position # # parallel step 1
pcProc = CsstProcMscPositionCalibration() # def corr_one_img(i_ccd, dir_raw, dir_work):
pcProc.run(img_list, wht_list, flg_list, fn_list, DIR_GAIA_CATALOG, DIR_WORK, 2.0) # fp_raw = glob.glob("{}/MSC_MS_*_{:02}_raw.fits".format(dir_raw, i_ccd))
# if img_list: # assert len(fp_raw) == 1
# pcProc.run(img_list, wht_list, flg_list, fn_list, DIR_GAIA_CATALOG, DIR_WORK, 2.0) # fp_raw = fp_raw[0]
# else: #
# for i_ccd in CCD_ID_LIST: # # read data with CsstMscImgData.read
# fp_img = "{}/MSC_MS_*_{:02}_img.fits".format(DIR_WORK, i_ccd) # raw = CsstMscImgData.read(fp_raw)
# fp_wht = "{}/MSC_MS_*_{:02}_wht.fits".format(DIR_WORK, i_ccd) #
# fp_flg = "{}/MSC_MS_*_{:02}_flg.fits".format(DIR_WORK, i_ccd) # # in future, get_* functions grab
# img = CsstMscImgData.read(fp_img) # bias = raw.get_bias(PATH_BIAS.format(i_ccd))
# wht = CsstMscImgData.read(fp_wht) # dark = raw.get_dark(PATH_DARK.format(i_ccd))
# flg = CsstMscImgData.read(fp_flg) # flat = raw.get_flat(PATH_FLAT.format(i_ccd))
# img_list.append(img) #
# wht_list.append(wht) # # initialize Instrument Processor
# flg_list.append(flg) # instProc = CsstMscInstrumentProc()
# fn_list.append(fp_img) # instProc.prepare(n_jobs=1, n_threads=1)
# pcProc.run(img_list, wht_list, flg_list, fn_list, DIR_GAIA_CATALOG, DIR_WORK, 2.0) # img, wht, flg = instProc.run(raw, bias, dark, flat)
pcProc.cleanup(img_list, DIR_WORK) # instProc.cleanup()
# fp_img = img[0].header["FILENAME"] + '.fits'
# Step 3. Calibrate Flux #
from csst.msc.calib_flux import CsstProcFluxCalibration # # save img, wht, flg to somewhere
fcProc = CsstProcFluxCalibration() # img.writeto("{}/{}.fits".format(dir_work, img.get_keyword("FILENAME")), overwrite=True)
# fcProc.prepare() # wht.writeto("{}/{}.fits".format(dir_work, wht.get_keyword("FILENAME")), overwrite=True)
fcProc.run(fn_list, img_list, wht_list, flg_list, wcsdir=DIR_WORK, L1dir=DIR_WORK, workdir=DIR_WORK, refdir=DIR_TEST, # flg.writeto("{}/{}.fits".format(dir_work, flg.get_keyword("FILENAME")), overwrite=True)
# # save header
# img[1].header.tofile("{}/{}.head".format(dir_work, img.get_keyword("FILENAME").replace(".fits", "")),
# overwrite=True)
# return OrderedDict(img=img, wht=wht, flg=flg, fp_img=fp_img)
#
#
# result = joblib.Parallel(n_jobs=NJOBS, verbose=5)(
# joblib.delayed(corr_one_img)(i_ccd, dir_raw, dir_work) for i_ccd in CCD_ID_LIST)
# img_list = [_["img"] for _ in result]
# wht_list = [_["wht"] for _ in result]
# flg_list = [_["flg"] for _ in result]
# fn_list = [_["fp_img"] for _ in result]
# Step 2. Calibrate Position
pcProc = CsstProcMscPositionCalibration()
pcProc.run(img_list, wht_list, flg_list, fn_list, dir_gaia_catalog, dir_work, 2.0)
pcProc.cleanup(img_list, dir_work)
# if img_list:
# pcProc.run(img_list, wht_list, flg_list, fn_list, dir_gaia_catalog, dir_work, 2.0)
# else:
# for i_ccd in CCD_ID_LIST:
# fp_img = "{}/MSC_MS_*_{:02}_img.fits".format(dir_work, i_ccd)
# fp_wht = "{}/MSC_MS_*_{:02}_wht.fits".format(dir_work, i_ccd)
# fp_flg = "{}/MSC_MS_*_{:02}_flg.fits".format(dir_work, i_ccd)
# img = CsstMscImgData.read(fp_img)
# wht = CsstMscImgData.read(fp_wht)
# flg = CsstMscImgData.read(fp_flg)
# img_list.append(img)
# wht_list.append(wht)
# flg_list.append(flg)
# fn_list.append(fp_img)
# pcProc.run(img_list, wht_list, flg_list, fn_list, dir_gaia_catalog, dir_work, 2.0)
# Step 3. Calibrate Flux
fcProc = CsstProcFluxCalibration()
# fcProc.prepare()
fcProc.run(
fn_list, img_list, wht_list, flg_list, wcsdir=dir_work, L1dir=dir_work, workdir=dir_work, refdir=dir_raw,
addhead=True, morehead=False, plot=False, nodel=False, update=False, upcat=True) addhead=True, morehead=False, plot=False, nodel=False, update=False, upcat=True)
fcProc.cleanup(fn_list, DIR_WORK) fcProc.cleanup(fn_list, dir_work)
# Step 4. Photometry # Step 4. Photometry
from csst.msc.phot import CsstMscPhotometryProc ptProc = CsstMscPhotometryProc()
ptProc = CsstMscPhotometryProc() ptProc.prepare()
ptProc.prepare() ptProc.run(fn_list, out_dir=dir_work, n_jobs=n_jobs)
ptProc.run(fn_list, out_dir=DIR_WORK, n_jobs=18) ptProc.cleanup()
ptProc.cleanup()
return
if __name__ == "__main__":
# identify where you are
HOSTNAME = os.uname()[1]
# you have to run this pipeline in some well-defined servers
assert HOSTNAME in ["ubuntu", "Dandelion"]
# get config parameters
if HOSTNAME == "ubuntu":
config = CONFIG_PMO
elif HOSTNAME == "Dandelion":
config = CONFIG_DANDELION
else:
raise ValueError("HOSTNAME {} not known!".format(HOSTNAME))
# process this exposure
do_one_exposure(**config)
for k, v in config.items():
eval("{}=config[\"{}\"]".format(k, k))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment