Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Fang Yuedong
injection_pipeline
Commits
72ee59d1
Commit
72ee59d1
authored
Apr 22, 2024
by
Fang Yuedong
Browse files
add wrapper for photometry calls
parent
c49fa531
Changes
2
Show whitespace changes
Inline
Side-by-side
measurement_pipeline/L1_pipeline/csst_msc_mbi_photometry/csst_photometry.py
View file @
72ee59d1
...
...
@@ -84,6 +84,10 @@ prog_dir = os.path.dirname(__file__)
config_path
=
os
.
path
.
join
(
prog_dir
,
'data/'
)
__version__
=
"1.4.3"
# [TODO]
zero_points
=
{
'NUV'
:
24.8
,
'u'
:
25.22
,
'g'
:
26.23
,
'r'
:
26.04
,
'i'
:
25.85
,
'z'
:
25.27
,
'y'
:
23.92
}
def
pick_psfstars
(
psffile
,
remove_polution
=
False
,
min_nstar
=
10
,
max_nstar
=
1500
,
class_star
=
0.7
,
match_dist
=
10
,
min_sn
=
20
,
max_elp
=
0.3
,
fwhm_range
=
[
1.5
,
20
]):
...
...
@@ -134,12 +138,15 @@ def pick_psfstars(psffile, remove_polution=False, min_nstar=10, max_nstar=1500,
cat
=
cat
[
index
]
print
(
"selecting isolated stars:"
,
len
(
cat
))
mask
=
(
cat
[
'flags'
]
==
0
)
&
(
cat
[
'nimaflags_iso'
]
<
3
)
&
(
cat
[
'CLASS_STAR'
]
>
class_star
)
&
(
cat
[
'ELLIPTICITY'
]
<
max_elp
)
&
(
# mask = (cat['flags'] == 0) & (cat['nimaflags_iso'] < 3) & (cat['CLASS_STAR'] > class_star) & (cat['ELLIPTICITY'] < max_elp) & (
# cat['SNR_WIN'] > min_sn) & (cat['FWHM_IMAGE'] > fwhm_range[0]) & (cat['FWHM_IMAGE'] < fwhm_range[1])
# [TODO]
mask
=
(
cat
[
'flags'
]
==
0
)
&
(
cat
[
'CLASS_STAR'
]
>
class_star
)
&
(
cat
[
'ELLIPTICITY'
]
<
max_elp
)
&
(
cat
[
'SNR_WIN'
]
>
min_sn
)
&
(
cat
[
'FWHM_IMAGE'
]
>
fwhm_range
[
0
])
&
(
cat
[
'FWHM_IMAGE'
]
<
fwhm_range
[
1
])
cat
=
cat
[
mask
]
if
len
(
cat
)
<
min_nstar
:
print
(
"too few psf stars...
"
)
print
(
"too few psf stars...
, got %d"
%
(
len
(
cat
))
)
hdulist
.
close
()
return
False
...
...
@@ -224,7 +231,8 @@ def get_psf(imgfile, whtfile, flgfile, psffile, psf_size=101, degree=3, variabil
# fitsname = dm.l1_detector(detector=detector, post="")
head
=
fits
.
getheader
(
imgfile
,
0
)
gain
=
head
[
'exptime'
]
# gain = head['exptime']
gain
=
head
[
'EXPTIME'
]
saturate
=
50000.0
/
gain
# fitsname,_=os.path.splitext(imgfile)
...
...
@@ -324,7 +332,9 @@ def photometry(imgfile, whtfile, flgfile, psffile, fluxfile, catfile, skyfile, s
if
zp_name
not
in
head
or
head
[
zp_name
]
<=
0
:
print
(
"No zeropoint in the header or bad zeropoint"
)
return
False
# return False
# [TODO]
# date=head['date-obs']
# ccd=head['ccd_no']
...
...
@@ -335,7 +345,8 @@ def photometry(imgfile, whtfile, flgfile, psffile, fluxfile, catfile, skyfile, s
# flagfile=rootname[:indpos]+'_flg.fits'
# weightfile=rootname[:indpos]+'_wht.fits'
gain
=
head
[
'exptime'
]
# gain = head['exptime']
gain
=
head
[
'EXPTIME'
]
saturate
=
50000.0
/
gain
# types = {'sky': 'BACKGROUND', 'seg': 'SEGMENTATION', 'mod': 'MODELS', 'res': '-MODELS'}
...
...
@@ -655,12 +666,12 @@ def clean_catalog(cat, head, sep=2.0, star_thresh=0.005):
# add IDs
bits
=
6
objid
=
np
.
array
(
cat
[
'ObjID'
])
if
'
ccdchip
'
in
head
:
ccdno
=
int
(
str
.
strip
(
head
[
'
ccdchip
'
])[
3
:])
if
'
CCDCHIP
'
in
head
:
ccdno
=
int
(
str
.
strip
(
head
[
'
CCDCHIP
'
])[
3
:])
else
:
ccdno
=
int
(
str
.
strip
(
head
[
'
chipid
'
]))
ccdno
=
int
(
str
.
strip
(
head
[
'
CHIPID
'
]))
obsid
=
int
(
head
[
'
obsid
'
])
obsid
=
int
(
head
[
'
OBSID
'
])
ids
=
np
.
array
(
cat
[
'ObjID'
],
dtype
=
int
)
+
ccdno
*
\
10
**
bits
+
obsid
*
10
**
(
bits
+
2
)
col
=
Table
.
Column
(
...
...
@@ -670,7 +681,7 @@ def clean_catalog(cat, head, sep=2.0, star_thresh=0.005):
cat
.
add_column
(
col
,
index
=
0
)
col
=
Table
.
Column
(
ids
,
name
=
'ID'
)
cat
.
add_column
(
col
,
index
=
0
)
filt
=
head
[
'
filter
'
]
filt
=
head
[
'
FILTER
'
]
ff
=
np
.
zeros
(
len
(
cat
),
dtype
=
'<U3'
)
ff
[:]
=
filt
col
=
Table
.
Column
(
ff
,
name
=
'Filter'
)
...
...
@@ -728,7 +739,9 @@ def calibrate_fluxadu(fluxadu, head, head_zpt=None, zp_name='ZP'):
c
=
head_zpt
else
:
if
zp_name
not
in
head
or
head
[
zp_name
]
<=
0
:
return
False
# return False
# [TODO]
c
=
zero_points
[
head
[
'FILTER'
]]
else
:
c
=
head
[
zp_name
]
...
...
measurement_pipeline/run_csst_msc_mbi_photometry
0 → 100644
View file @
72ee59d1
"""_summary_
"""
import os
from glob import glob
import mpi4py.MPI as MPI
from L1_pipeline.csst_msc_mbi_photometry.csst_photometry import core_msc_l1_mbi_phot
from typing import Optional
def run_csst_msc_mbi_photometry(image_path,
output_dir,
weight_path: str = None,
flag_path: str = None,
psf_ccds_path: Optional[str] = None,
plot_flag: bool = False,
getpsf_flag: bool = False,):
input_dir = os.path.dirname(image_path)
img_filename = os.path.basename(image_path)
if weight_path is None:
weight_path = os.path.join(
input_dir, img_filename.replace("img", "wht"))
if flag_path is None:
flag_path = os.path.join(input_dir, img_filename.replace("img", "flg"))
psf_local_path = os.path.join(
output_dir, img_filename.replace("img", "psf"))
flux_path = os.path.join(
output_dir, img_filename.replace("img", "flux"))
cat_path = os.path.join(
output_dir, img_filename.replace("img", "cat"))
seg_path = os.path.join(
output_dir, img_filename.replace("img", "seg"))
sky_path = os.path.join(
output_dir, img_filename.replace("img", "sky"))
core_msc_l1_mbi_phot(
image_path=image_path,
weight_path=weight_path,
flag_path=flag_path,
psf_ccds_path=None,
psf_local_path=psf_local_path,
flux_path=flux_path,
cat_path=cat_path,
seg_path=seg_path,
sky_path=sky_path,
plot_flag=plot_flag,
getpsf_flag=getpsf_flag)
def genearte_path_list_for_one_pointing(input_dir,
pointing_label,
chip_label_list=None):
"""_summary_
Args:
input_dir (_type_): _description_
pointing_label (_type_): _description_
chip_label_list (_type_, optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
pointing_dir = os.path.join(input_dir, pointing_label)
if chip_label_list is None:
image_path_list = glob(
pointing_dir + '/CSST_MSC_MS_SCIE_*_' + '*_img_*')
else:
image_path_list = []
for chip_label in chip_label_list:
image_path = glob(pointing_dir + '/CSST_MSC_MS_SCIE_*_' +
chip_label + '_img_*')[0]
image_path_list.append(image_path)
return image_path_list
def run_pointing_list(input_dir,
pointing_label_list,
output_dir,
chip_label_list=None):
image_path_list = []
output_path_list = []
try:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
except OSError:
pass
for pointing_label in pointing_label_list:
output_pointing_dir = os.path.join(output_dir, pointing_label)
try:
if not os.path.exists(output_pointing_dir):
os.makedirs(output_pointing_dir)
except OSError:
pass
temp_img_path_list = genearte_path_list_for_one_pointing(input_dir=input_dir,
pointing_label=pointing_label,
chip_label_list=chip_label_list)
image_path_list = image_path_list + temp_img_path_list
output_path_list = output_path_list + \
[output_pointing_dir] * len(temp_img_path_list)
comm = MPI.COMM_WORLD
ind_thread = comm.Get_rank()
num_thread = comm.Get_size()
for i in range(len(image_path_list)):
if i % num_thread != ind_thread:
continue
image_path = image_path_list[i]
output_path = output_path_list[i]
run_csst_msc_mbi_photometry(image_path=image_path,
output_dir=output_path)
if __name__ == "__main__":
input_dir = "/public/home/fangyuedong/project/50sqDeg_L1_outputs"
# pointing_label_list = ["MSC_0000000", "MSC_0000001",
# "MSC_0000002", "MSC_0000003", "MSC_0000004", "MSC_0000005"]
# chip_label_list = None
pointing_label_list = ["MSC_0000000"]
chip_label_list = ["08"]
output_dir = "/public/home/fangyuedong/project/test_photometry"
run_pointing_list(input_dir=input_dir,
pointing_label_list=pointing_label_list,
output_dir=output_dir,
chip_label_list=chip_label_list)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment