Commit 0f21e235 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

adopted dm in pipeline module

parent 22a45f02
...@@ -7,12 +7,13 @@ from csst.msc.calib_pos import CsstProcMscPositionCalibration ...@@ -7,12 +7,13 @@ from csst.msc.calib_pos import CsstProcMscPositionCalibration
from csst.msc.data import CsstMscImgData from csst.msc.data import CsstMscImgData
from csst.msc.inst_corr import CsstMscInstrumentProc from csst.msc.inst_corr import CsstMscInstrumentProc
from csst.msc.phot import CsstMscPhotometryProc from csst.msc.phot import CsstMscPhotometryProc
from csst.msc.data_manager import CsstMscDataManager
CONFIG_DANDELION = dict( CONFIG_DANDELION = dict(
# test and working directory # test and working directory
dir_raw="/home/csstpipeline/L1Pipeline/msc/MSC_0000020", dir_l0="/home/csstpipeline/L1Pipeline/msc/MSC_0000020",
dir_work="/home/csstpipeline/L1Pipeline/msc/work/", dir_l1="/home/csstpipeline/L1Pipeline/msc/work/",
filename_fmt="{}/MSC_MS_*_{:02}_raw.fits",
# on Dandelion # on Dandelion
path_aux="/home/csstpipeline/L1Pipeline/msc/ref/MSC_{}_*_{:02d}_combine.fits", path_aux="/home/csstpipeline/L1Pipeline/msc/ref/MSC_{}_*_{:02d}_combine.fits",
# gaia catalog directory (for position calibration) # gaia catalog directory (for position calibration)
...@@ -28,13 +29,12 @@ CONFIG_DANDELION = dict( ...@@ -28,13 +29,12 @@ CONFIG_DANDELION = dict(
CONFIG_PMO = dict( CONFIG_PMO = dict(
# test and working directory # test and working directory
dir_raw="/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/NGP_AstrometryON_shearOFF/MSC_0000100", dir_l0="/share/simudata/CSSOSDataProductsSims/data/CSSTSimImage_C5/NGP_AstrometryON_shearOFF/MSC_0000100",
dir_work="/home/user/L1Pipeline/msc/work/", dir_l1="/home/user/L1Pipeline/msc/work/",
filename_fmt='{}/CSST_MSC_MS_SCI_*_{:02d}_L0_1.fits',
# on PMO # on PMO
path_aux="/data/sim_data/MSC_0000100/ref/MSC_{}_*_{:02d}_combine.fits", path_aux="/data/sim_data/MSC_0000100/ref/MSC_{}_*_{:02d}_combine.fits",
# gaia catalog directory (for position calibration) # gaia catalog directory (for position calibration)
dir_gaia_catalog="/home/user/L1Pipeline/msc/gaia_dr3/", dir_pcref="/home/user/L1Pipeline/msc/gaia_dr3/",
# version of simulation data # version of simulation data
ver_sim="C5.1", ver_sim="C5.1",
# only 18 cores available in cloud machine from PMO # only 18 cores available in cloud machine from PMO
...@@ -44,7 +44,7 @@ CONFIG_PMO = dict( ...@@ -44,7 +44,7 @@ CONFIG_PMO = dict(
) )
def do_one_exposure(dir_raw="", dir_work="", filename_fmt="", path_aux="", dir_gaia_catalog="", ver_sim="C5.1", ccd_ids=None, def do_one_exposure(ver_sim="C5.1", dir_l0="", dir_l1="", dir_pcref="", path_aux="", ccd_ids=None,
n_jobs=18, backend_multithreading=False): n_jobs=18, backend_multithreading=False):
# currently C3 and C5.1 are tested # currently C3 and C5.1 are tested
...@@ -61,30 +61,29 @@ def do_one_exposure(dir_raw="", dir_work="", filename_fmt="", path_aux="", dir_g ...@@ -61,30 +61,29 @@ def do_one_exposure(dir_raw="", dir_work="", filename_fmt="", path_aux="", dir_g
os.environ["NUMEXPR_NUM_THREADS"] = '1' os.environ["NUMEXPR_NUM_THREADS"] = '1'
os.environ["OMP_NUM_THREADS"] = '1' os.environ["OMP_NUM_THREADS"] = '1'
if ccd_ids is None: # set paths
ccd_ids = CCD_ID_LIST dm = CsstMscDataManager(ver_sim=ver_sim, dir_l0=dir_l0, dir_l1=dir_l1, dir_pcref=dir_pcref, path_aux=path_aux)
# request for ccd_ids
ccd_ids = dm.get_ccd_ids(ccd_ids)
# Step 1. Correct instrumental effect # Step 1. Correct instrumental effect
os.chdir(dir_work) os.chdir(dir_l1)
img_list = [] img_list = []
wht_list = [] wht_list = []
flg_list = [] flg_list = []
fn_list = [] fn_list = []
for i_ccd in ccd_ids: for this_ccd_id in ccd_ids:
print("processing CCD {}".format(i_ccd)) print("processing CCD {}".format(this_ccd_id))
fp_raw = glob.glob(filename_fmt.format(dir_raw, i_ccd)) fp_raw = dm.l0_sci(ccd_id=this_ccd_id)
assert len(fp_raw) == 1
fp_raw = fp_raw[0]
print(fp_raw)
# fn_raw = os.path.basename(fp_raw)
# read data with CsstMscImgData.read # read data with CsstMscImgData.read
raw = CsstMscImgData.read(fp_raw) raw = CsstMscImgData.read(fp_raw)
# in the future, use get_* functions grab # in the future, use get_* functions grab
bias = raw.get_bias(path_aux.format("CLB", i_ccd)) bias = dm.get_bias(this_ccd_id)
dark = raw.get_dark(path_aux.format("CLD", i_ccd)) dark = dm.get_dark(this_ccd_id)
flat = raw.get_flat(path_aux.format("CLF", i_ccd)) flat = dm.get_flat(this_ccd_id)
# initialize Instrument Processor # initialize Instrument Processor
instProc = CsstMscInstrumentProc() instProc = CsstMscInstrumentProc()
...@@ -94,12 +93,11 @@ def do_one_exposure(dir_raw="", dir_work="", filename_fmt="", path_aux="", dir_g ...@@ -94,12 +93,11 @@ def do_one_exposure(dir_raw="", dir_work="", filename_fmt="", path_aux="", dir_g
fp_img = img[0].header["FILENAME"] + '.fits' fp_img = img[0].header["FILENAME"] + '.fits'
# save img, wht, flg to somewhere # save img, wht, flg to somewhere
img.writeto("{}/{}.fits".format(dir_work, img.get_keyword("FILENAME")), overwrite=True) img.writeto(dm.l1_sci(ccd_id=this_ccd_id, suffix="img", ext="fits"), overwrite=True)
wht.writeto("{}/{}.fits".format(dir_work, wht.get_keyword("FILENAME")), overwrite=True) wht.writeto(dm.l1_sci(ccd_id=this_ccd_id, suffix="wht", ext="fits"), overwrite=True)
flg.writeto("{}/{}.fits".format(dir_work, flg.get_keyword("FILENAME")), overwrite=True) flg.writeto(dm.l1_sci(ccd_id=this_ccd_id, suffix="flg", ext="fits"), overwrite=True)
# save header # save header
img[1].header.tofile( img[1].header.tofile(dm.l1_sci(ccd_id=this_ccd_id, suffix="img", ext="head"), overwrite=True)
"{}/{}.head".format(dir_work, img.get_keyword("FILENAME").replace(".fits", "")), overwrite=True)
# append img, wht, flg list # append img, wht, flg list
img_list.append(img) img_list.append(img)
...@@ -107,77 +105,45 @@ def do_one_exposure(dir_raw="", dir_work="", filename_fmt="", path_aux="", dir_g ...@@ -107,77 +105,45 @@ def do_one_exposure(dir_raw="", dir_work="", filename_fmt="", path_aux="", dir_g
flg_list.append(flg) flg_list.append(flg)
fn_list.append(fp_img) fn_list.append(fp_img)
# # parallel step 1
# def corr_one_img(i_ccd, dir_l0, dir_l1):
# fp_raw = glob.glob("{}/MSC_MS_*_{:02}_raw.fits".format(dir_l0, i_ccd))
# assert len(fp_raw) == 1
# fp_raw = fp_raw[0]
#
# # read data with CsstMscImgData.read
# raw = CsstMscImgData.read(fp_raw)
#
# # in future, get_* functions grab
# bias = raw.get_bias(PATH_BIAS.format(i_ccd))
# dark = raw.get_dark(PATH_DARK.format(i_ccd))
# flat = raw.get_flat(PATH_FLAT.format(i_ccd))
#
# # initialize Instrument Processor
# instProc = CsstMscInstrumentProc()
# instProc.prepare(n_jobs=1, n_threads=1)
# img, wht, flg = instProc.run(raw, bias, dark, flat)
# instProc.cleanup()
# fp_img = img[0].header["FILENAME"] + '.fits'
#
# # save img, wht, flg to somewhere
# img.writeto("{}/{}.fits".format(dir_l1, img.get_keyword("FILENAME")), overwrite=True)
# wht.writeto("{}/{}.fits".format(dir_l1, wht.get_keyword("FILENAME")), overwrite=True)
# flg.writeto("{}/{}.fits".format(dir_l1, flg.get_keyword("FILENAME")), overwrite=True)
# # save header
# img[1].header.tofile("{}/{}.head".format(dir_l1, img.get_keyword("FILENAME").replace(".fits", "")),
# overwrite=True)
# return OrderedDict(img=img, wht=wht, flg=flg, fp_img=fp_img)
#
#
# result = joblib.Parallel(n_jobs=NJOBS, verbose=5)(
# joblib.delayed(corr_one_img)(i_ccd, dir_l0, dir_l1) for i_ccd in CCD_ID_LIST)
# img_list = [_["img"] for _ in result]
# wht_list = [_["wht"] for _ in result]
# flg_list = [_["flg"] for _ in result]
# fn_list = [_["fp_img"] for _ in result]
# Step 2. Calibrate Position # Step 2. Calibrate Position
pcProc = CsstProcMscPositionCalibration() pcProc = CsstProcMscPositionCalibration()
pcProc.run(img_list, wht_list, flg_list, fn_list, dir_gaia_catalog, dir_work, 2.0) pcProc.run(img_list, wht_list, flg_list, fn_list, dir_pcref, dir_l1, 2.0)
pcProc.cleanup(img_list, dir_work) pcProc.cleanup(img_list, dir_l1)
# if img_list: """
# pcProc.run(img_list, wht_list, flg_list, fn_list, dir_gaia_catalog, dir_l1, 2.0) pcProc = CsstProcMscPositionCalibration()
# else: pcProc.prepare(dm)
# for i_ccd in CCD_ID_LIST: pcProc.run(2.0)
# fp_img = "{}/MSC_MS_*_{:02}_img.fits".format(dir_l1, i_ccd) pcProc.cleanup()
# fp_wht = "{}/MSC_MS_*_{:02}_wht.fits".format(dir_l1, i_ccd)
# fp_flg = "{}/MSC_MS_*_{:02}_flg.fits".format(dir_l1, i_ccd) get these parameters from dm.l1_sci(*):
# img = CsstMscImgData.read(fp_img) img_list, wht_list, flg_list, fn_list, dir_pcref, dir_l1
# wht = CsstMscImgData.read(fp_wht) img_list, dir_l1
# flg = CsstMscImgData.read(fp_flg) """
# img_list.append(img)
# wht_list.append(wht)
# flg_list.append(flg)
# fn_list.append(fp_img)
# pcProc.run(img_list, wht_list, flg_list, fn_list, dir_gaia_catalog, dir_l1, 2.0)
# Step 3. Calibrate Flux # Step 3. Calibrate Flux
fcProc = CsstProcFluxCalibration() fcProc = CsstProcFluxCalibration()
# fcProc.prepare() # fcProc.prepare()
fcProc.run( fcProc.run(
fn_list, img_list, wht_list, flg_list, wcsdir=dir_work, L1dir=dir_work, workdir=dir_work, refdir=dir_raw, fn_list, img_list, wht_list, flg_list, wcsdir=dir_l1, L1dir=dir_l1, workdir=dir_l1, refdir=dir_l0,
addhead=True, morehead=False, plot=False, nodel=False, update=False, upcat=True) addhead=True, morehead=False, plot=False, nodel=False, update=False, upcat=True)
fcProc.cleanup(fn_list, dir_work) fcProc.cleanup(fn_list, dir_l1)
"""
fcProc = CsstProcFluxCalibration()
fcProc.prepare(dm)
fcProc.run(addhead=True, morehead=False, plot=False, nodel=False, update=False, upcat=True)
fcProc.cleanup()
get these parameters from dm.l1_sci(*):
fn_list, img_list, wht_list, flg_list, wcsdir=dir_l1, L1dir=dir_l1, workdir=dir_l1, refdir=dir_l0,
fn_list, dir_l1
"""
# Step 4. Photometry # Step 4. Photometry
ptProc = CsstMscPhotometryProc() # ptProc = CsstMscPhotometryProc()
ptProc.prepare() # ptProc.prepare()
ptProc.run(fn_list, out_dir=dir_work, n_jobs=n_jobs) # ptProc.run(fn_list, out_dir=dir_l1, n_jobs=n_jobs)
ptProc.cleanup() # ptProc.cleanup()
return return
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment