Commit 732d8334 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

added photometry routine from Hu Zou

parent 9978aeeb
This diff is collapsed.
# author zouhu
import numpy as np
def fluxerr2magerr(flux, fluxerr, asinh=False, filter='u', zp=22.5):
"""
convert flux and flux error to mag and mag error (in pogson or asinh form)
Parameters:
flux, fluxerr in nanamaggie
return mag and magerr
"""
flux = np.array(flux)
fluxerr = np.array(fluxerr)
# f0=1.0e9
f0 = 10 ** (zp / 2.5)
nn = flux.size
mag = np.array(np.ones_like(flux) * 99.0)
magerr = np.array(np.ones_like(flux) * 99.0)
if not asinh:
mask = flux > 0
if mask.any():
mag[mask] = -2.5 * np.log10(flux[mask] / f0)
magerr[mask] = 2.5 / np.log(10.0) * fluxerr[mask] / flux[mask]
else:
bs = {'u': 1.4e-10, 'g': 0.9e-10, 'r': 1.2e-10, 'i': 1.8e-10, 'z': 7.4e-10}
b = bs[filter]
mag = -(2.5 / np.log(10.0)) * (np.arcsinh((flux / f0) / (2.0 * b)) + np.log(b))
magerr = 2.5 / np.log(10.0) * (fluxerr / f0) / (2.0 * b) / np.sqrt(1.0 + ((flux / f0) / (2.0 * b)) ** 2)
return mag, magerr
def magerr2fluxerr(mag, magerr, asinh=False, filter='u', zp=22.5):
"""
convert mag and mag error to flux and flux error (in pogson or asinh form)
Parameters:
mag,magerr ndarray
return
flux, fluxerr in nanamaggie
"""
# f0=1.0e9
f0 = 10 ** (zp / 2.5)
if not asinh:
flux = 10.0 ** (mag / (-2.5)) * f0
fluxerr = flux * magerr * np.log(10.0) / 2.5
else:
bs = {'u': 1.4e-10, 'g': 0.9e-10, 'r': 1.2e-10, 'i': 1.8e-10, 'z': 7.4e-10}
b = bs[filter]
flux = np.sinh(mag / (-2.5 / np.log(10.0)) - np.log(b)) * 2.0 * b * f0
fluxerr = magerr * np.log(10.0) / 2.5 * (2.0 * b) * np.sqrt(1.0 + ((flux / f0) / (2.0 * b)) ** 2) * f0
return flux, fluxerr
def asinhpogson(mag1, magerr1, asinh2pogson=False, filter='u', zp=22.5):
"""
convert magnitude form between asinh and pogson
"""
flux, fluxerr = magerr2fluxerr(mag1, magerr1, asinh=asinh2pogson, filter=filter, zp=zp)
mag2, magerr2 = fluxerr2magerr(flux, fluxerr, asinh=(not asinh2pogson), filter=filter, zp=zp)
return mag2, magerr2
how to use csst photometry pipeline
Package dependencies: latest versions of "SExtrator" and "PSFEx"
1. python3.8 csst_photometry.py csst_image -o output_path
2. use the API in a pipeline
from csst_photometry import do_phot
imagefile='/line17/csst/simulation_new/MSC_MS_210525121500_100000001_01_img.fits'
outdir='./'
do_phot(imagefile,outdir=outdir)
import os
import time
import numpy as np
from csst_photometry import do_phot
ccds = np.arange(6, 26, 1)
indir = '/line17/zouhu/csst/simulation_new'
outdir = '/line17/zouhu/csst/simulation_new/cat_tan/'
for i in range(len(ccds)):
iccdstr = '%2.2d' % ccds[i]
start = time.time()
ifits = os.path.join(indir, 'MSC_MS_210525121500_100000001_' + iccdstr + '_img.fits')
print(ifits)
if not os.path.isfile(ifits): continue
do_phot(ifits, outdir=outdir, stage="phot")
# author zouhu
import astropy.stats as ast
import numpy as np
def rebin_image(a, shape, fun=np.sum):
ashape = a.shape
if a.shape[0] != shape[0] or a.shape[1] != shape[1]:
sh = shape[0], a.shape[0] // shape[0], shape[1], a.shape[1] / shape[1]
return fun(fun(a.reshape(sh), -1), 1)
else:
return a
def sigmaclip_limitsig(data, error=None, sig_limit=None, **kwd):
data = np.array(data)
mdata = ast.sigma_clip(data, **kwd)
if sig_limit is not None:
while (True):
med = np.ma.median(mdata)
sig = np.ma.std(mdata)
if sig < sig_limit: break
index = np.ma.argmax(np.ma.abs(mdata - med))
mdata.mask[index] = True
return mdata
def bin_stats(x, y=None, minbin=None, maxbin=None, nbin=30, bins=None, **kwd):
if minbin is None: minbin = np.min(x)
if maxbin is None: maxbin = np.max(x)
if nbin is None: nbin = 30
if bins is None: bins = np.linspace(minbin, maxbin, nbin + 1)
if y is None: y = x
nbin = len(bins)
yy = np.zeros((5, nbin - 1))
for i in range(nbin - 1):
yy[0, i] = (bins[i] + bins[i + 1]) / 2.0
mask = (x >= bins[i]) & (x < bins[i + 1])
yy[4, i] = mask.sum()
if mask.any():
ymask = sigmaclip_limitsig(y[mask], **kwd)
yy[1, i] = np.ma.median(ymask)
yy[2, i] = np.ma.std(ymask)
yy[3, i] = ymask.mask.sum()
return yy
def binnum_stats(x, y, minbin=None, maxbin=None, binnum=None, binedges=None, **kwd):
x = np.array(x)
y = np.array(y)
nx = len(x)
if len(y) != nx: raise ValueError("y should have same size of x")
index = np.argsort(x)
x = x[index]
y = y[index]
if minbin is None: minbin = np.min(x)
if maxbin is None: maxbin = np.max(x)
mask = (x >= minbin) & (x <= maxbin)
x = x[mask];
y = y[mask]
nx = len(x)
if binnum is None: binnum = nx // 10
binedges = [x[0]]
yy = []
count = 0
while (True):
count += 1
ind0 = (count - 1) * binnum
if ind0 >= nx: break
ind1 = count * binnum
if ind1 > nx: ind1 = nx
binedges.append(x[ind1 - 1])
ibin = (binedges[count] + binedges[count - 1]) / 2.0
slc = slice(ind0, ind1)
ymask = sigmaclip_limitsig(y[slc], **kwd)
yy.append([ibin, np.ma.median(ymask), np.ma.std(ymask), (~ymask.mask).sum()])
return np.array(yy)
def int2inf(data):
# get integer towards positive/negative infinite
data = np.array(data)
intdata = np.floor(data)
mask = data > 0
intdata[mask] = np.ceil(data[mask])
return intdata
def valid_coordinates(coordinates, size=None):
"""
convert tuple, list or np.ndarray of cooridinates to a 2d ndarray
and check the coordinates within an image size
INPUT
coordinates: 2d array for coordinates
size: size of an image
OUTPUT:
coord: reshape coordinates
indcoord: index of coordinates in a size range
"""
if isinstance(coordinates, (list, tuple, np.ndarray)):
coord = np.atleast_2d(coordinates)
if coord.shape[0] != 2 and coord.shape[1] != 2:
raise ValueError("coordinates should have at least one axis with 2 elements")
if coord.shape[1] != 2 and coord.shape[0] == 2:
coord = coord.transpose()
else:
raise TypeError("coordinates should be list or array of (x,y) pixel positions")
if size is None:
return coord
else:
if len(size) != 2:
raise ValueError("size should have 2 elements")
nx, ny = size
x = coord[:, 0]
y = coord[:, 1]
indcoord = np.arange(coord.shape[0])
good = (x >= 0.5) & (x < nx + 0.5) & (y >= 0.5) & (y < ny + 0.5)
if np.any(good):
indcoord = indcoord[good]
else:
raise ValueError('coordinates are not in the image range')
return coord, indcoord
def closest_match(coord1, coord2, min_dist=1.0):
"""
find closest pairs between two sets of coordinates
coord1: coordinates to be matched
coord2: coordinates matched to
min_dist: separation tolerance
output: idx1,idx2
idx1: matched index for coord1
idx2: matched index for coord2
"""
coord1 = valid_coordinates(coord1)
coord2 = valid_coordinates(coord2)
n1 = len(coord1)
n2 = len(coord2)
index1 = []
index2 = []
x2 = coord2[:, 0]
y2 = coord2[:, 1]
for i in range(n1):
ix1, iy1 = coord1[i]
index = np.where((np.abs(x2 - ix1) < min_dist) & (np.abs(y2 - iy1) < min_dist))[0]
nmatch = len(index)
if nmatch < 1:
continue
if nmatch > 1:
x2tmp = x2[index];
y2tmp = y2[index]
dist = ((x2tmp - ix1) ** 2 + (y2tmp - iy1) ** 2)
indsort = np.argsort(dist)
index1.append(i)
index2.append(index[indsort])
else:
index1.append(i)
index2.append(index)
return index1, index2
def weighted_mean(x, sigmax, weight_square=True):
"""
Calculate the mean and estimated errors for a set of data points
DESCRIPTION:
This routine is adapted from Program 5-1, XFIT, from "Data Reduction
and Error Analysis for the Physical Sciences", p. 76, by Philip R.
Bevington, McGraw Hill. This routine computes the weighted mean using
Instrumental weights (w=1/sigma^2).
INPUTS:
x - Array of data points
sigmax - array of standard deviations for data points
weight_square - if True, weight is invariance, else the reciprocal of the error
OUTPUTS:
xmean - weighted mean
sigmam - standard deviation of mean
stdm - standard deviation of data
"""
x = np.atleast_1d(x).copy()
sigmax = np.atleast_1d(sigmax).copy()
if len(x) == 1:
xmean = x
sigmam = sigmax
stdm = sigmax
else:
weight = 1.0 / sigmax ** 2
weight1 = weight
if not weight_square: weight1 = 1.0 / sigmax
wsum = weight.sum()
xmean = (weight1 * x).sum() / weight1.sum()
sigmam = np.sqrt(1.0 / wsum)
stdm = np.sqrt(np.sum((x - xmean) ** 2) / len(x))
return xmean, sigmam, stdm
import random as rd
import subprocess
import sys
from multiprocessing import Process, Manager, Pool
def execute_thread(nobj, nthread, fun_thread, args):
threads = []
if nthread > nobj:
nthread = nobj
step = nobj // nthread
nrest = nobj % nthread
index_threads = [0]
for i in range(nthread):
if i < nrest:
inext = index_threads[i] + step + 1
else:
inext = index_threads[i] + step
index_threads.append(inext)
manager = Manager()
dict_manage = manager.dict()
args = list(args)
for i in range(nthread):
index_slc = slice(index_threads[i], index_threads[i + 1])
args.append(dict_manage)
args.append(index_slc)
args.append(i)
t = Process(target=fun_thread, args=args)
args.pop()
args.pop()
args.pop()
threads.append(t)
for i in range(nthread):
threads[i].start()
for i in range(nthread):
threads[i].join()
return dict_manage
def pool_nthread(fun_thread, nthread, args, rand_index=False, rand_seed=None, **kwd):
"""
create multiprocessing pool and run
"""
if nthread <= 0: nthread = multiprocessing.cpu_count()
nargs = len(args)
if len(args) <= 0: raise ValueError("args should have at least one elements")
nrow = len(args[0])
rangeii = range(nrow)
if rand_index:
rangeii = rd.sample(rangeii, nrow)
pool = Pool(nthread)
res = {}
for ii in rangeii:
argv = [iargv[ii] for iargv in args]
res[ii] = pool.apply_async(fun_thread, argv, kwd)
return res
def cmd_exists(cmd):
"""
check wether a shell command exists
"""
return subprocess.call("type " + cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0
def progressbar(count, total, bar_length=10, barchar='#', prefix='[', suffix=']', numshow=False, onlypercent=False,
onlynum=False):
percent = float(count) / total
if len(barchar) > 1:
barchar = '#'
bar = barchar * int(round(percent * bar_length))
spaces = ' ' * (bar_length - len(bar))
percent = percent * 100
perc = "%3.1f%%" % percent
progbar = prefix + bar + spaces + suffix + perc
if numshow:
numperc = "%d/%d" % (count, total)
progbar = prefix + bar + suffix + numperc
if onlynum:
numperc = "%d/%d" % (count, total)
progbar = prefix + numperc + suffix
if onlypercent:
progbar = prefix + perc + suffix
progbar = "\r" + progbar
if count == total:
progbar = progbar + '\n'
sys.stdout.write(progbar)
sys.stdout.flush()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment