fits.py 2.66 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
2
3
from astropy_healpix import HEALPix
from astropy.coordinates import ICRS
from astropy import units as u
Wei Shoulin's avatar
Wei Shoulin committed
4
from astropy.coordinates import SkyCoord, spherical_to_cartesian
Wei Shoulin's avatar
Wei Shoulin committed
5
from astropy.io import fits
Wei Shoulin's avatar
Wei Shoulin committed
6
7
8
from astropy import wcs
import numpy as np
import healpy as hp
Wei Shoulin's avatar
add hp    
Wei Shoulin committed
9

Wei Shoulin's avatar
Wei Shoulin committed
10
def get_header_value(key: str, headers, default_value = None):
Wei Shoulin's avatar
Wei Shoulin committed
11
    try:
Wei Shoulin's avatar
Wei Shoulin committed
12
13
14
15
16
17
18
19
        ret_value = None
        if not isinstance(headers, list):
            headers = [headers]
        for header in headers:
            if key in header:
                ret_value = header[key]
                if type(ret_value) == str:
                    return ret_value.strip()
Wei Shoulin's avatar
Wei Shoulin committed
20
                return ret_value
Wei Shoulin's avatar
Wei Shoulin committed
21
22
        if ret_value is None:
            return default_value
Wei Shoulin's avatar
Wei Shoulin committed
23
    except Exception as e:
Wei Shoulin's avatar
Wei Shoulin committed
24
25
26
        return default_value

def get_healpix_id(ra, dec, nside=32):
Wei Shoulin's avatar
Wei Shoulin committed
27
28
29
30
31
32
    try:
        hp = HEALPix(nside=nside, order='nested', frame=ICRS())
        coord = SkyCoord(ra = ra * u.deg, dec = dec * u.deg, frame='icrs')
        return hp.skycoord_to_healpix(coord)
    except:
        return -1
Wei Shoulin's avatar
h    
Wei Shoulin committed
33
34
35
36

def get_healpix_ids(ra, dec, radius, nside=32):
    hp = HEALPix(nside=nside, order='nested', frame=ICRS())
    coord = SkyCoord(ra = ra * u.deg, dec = dec * u.deg, frame='icrs')
Wei Shoulin's avatar
Wei Shoulin committed
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    return hp.cone_search_skycoord(coord, radius = radius * u.deg)

def get_raw_header(filepath, **keys):
    union = {}
    with fits.open(filepath, **keys) as hdulist:
        for hdu in hdulist:
            for card in hdu.header.cards:
                card.verify('fix')
                key, value = card.keyword, card.value
                if not key:
                    continue
                union[key]= value
    return union

def fits_of_healpix_ids(filepath, nside=256):
    """ Get healpix ids for the fits file

    :param filepath: the file path of fits
    :param nside: default 128, approximate resolution at NSIDE 1024 is 0.057 deg, 12582912 bricks
    :return: a numpy.ndarray (n,)
    """    
    with fits.open(filepath) as hdulist:
Wei Shoulin's avatar
add hp    
Wei Shoulin committed
59
60
61
62
63
        return hdul_of_healpix_ids(hdulist, nside)
    
def hdul_of_healpix_ids(hdulist, nside=256):
    nx = get_header_value('NAXIS1', hdulist, 0)
    ny = get_header_value('NAXIS2', hdulist, 0)
Wei Shoulin's avatar
Wei Shoulin committed
64

Wei Shoulin's avatar
add hp    
Wei Shoulin committed
65
66
67
68
69
70
71
72
73
74
75
    fits_wcs = wcs.WCS(hdulist[1])
    ra0, dec0 = fits_wcs.all_pix2world(0, 0, 1)
    ra0, dec1 = fits_wcs.all_pix2world(0, ny, 1)
    ra1, dec0 = fits_wcs.all_pix2world(nx, 0, 1)
    ra1, dec1 = fits_wcs.all_pix2world(nx, ny, 1)
    ra_poly, dec_poly = np.array([[ra0, ra0, ra1, ra1], [dec0, dec1, dec0, dec1]])
    xyzpoly = spherical_to_cartesian(np.deg2rad(ra_poly), np.deg2rad(dec_poly), 1)
    xyzpoly = tuple(map(tuple, xyzpoly))
    xyzpoly = np.array(xyzpoly).reshape(-1, 3)
    healpixids = hp.query_polygon(nside, xyzpoly.T)
    return healpixids