from astropy_healpix import HEALPix from astropy.coordinates import ICRS from astropy import units as u from astropy.coordinates import SkyCoord from astropy.io import fits from csst_dfs_commons.models.constants import PI, DEFAULT_NSIDE def get_header_value(key: str, headers, default_value = None): try: ret_value = None if not isinstance(headers, list): headers = [headers] for header in headers: if key in header: ret_value = header[key] if type(ret_value) == str: return ret_value.strip() return ret_value if ret_value is None: return default_value except Exception as e: return default_value def get_healpix_id(ra, dec, nside=256): try: hp = HEALPix(nside=nside, order='nested', frame=ICRS()) coord = SkyCoord(ra = ra * u.deg, dec = dec * u.deg, frame='icrs') return hp.skycoord_to_healpix(coord) except: return -1 def get_healpix_ids(ra, dec, radius, nside=256): hp = HEALPix(nside=nside, order='nested', frame=ICRS()) coord = SkyCoord(ra = ra * u.deg, dec = dec * u.deg, frame='icrs') return hp.cone_search_skycoord(coord, radius = radius * u.deg) def get_raw_header(filepath, **keys): union = {} with fits.open(filepath, **keys) as hdulist: for hdu in hdulist: for card in hdu.header.cards: card.verify('fix') key, value = card.keyword, card.value if not key: continue union[key]= value return union def heapix_sql_condition(ra, dec, radius, ra_col = 'ra', dec_col = 'dec', nside = DEFAULT_NSIDE): arcDec = (PI / 180) * dec whereSql = f"abs((180./{PI}) * ACOS(SIN({PI} * {dec_col}/180) * SIN({arcDec}) + COS({PI} * {dec_col}/180) * COS({arcDec}) * COS(({PI}/180) * ({ra_col} - {ra})))) < {radius}" heapix_ids = get_healpix_ids(ra, dec, radius, nside) nside_column = "NS%dHIdx" % (nside,) whereZoneSql = "%s in (%s)" % \ (nside_column, ','.join([str(i) for i in heapix_ids])) return whereZoneSql, whereSql def catalog_heapix_sql_condition(ra, dec, radius, ra_col = 'ra', dec_col = 'dec', nside = DEFAULT_NSIDE): x2 = f"cos({dec_col})*cos({ra_col})" y2 = f"cos({dec_col})*sin({ra_col})" z2 = f"sin({dec_col})" x1 = f"cos({dec})*cos({ra})" y1 = f"cos({dec})*sin({ra})" z1 = f"sin({dec})" distance = f"pow(({x2}-{x1}),2)+pow(({y2}-{y1}),2)+pow(({z2}-{z1}),2)" heapix_ids = get_healpix_ids(ra, dec, radius, nside) nside_column = "NS%dHIdx" % (nside,) whereZoneSql = "%s in (%s)" % \ (nside_column, ','.join([str(i) for i in heapix_ids])) whereSql = f"{distance} <= 4*pow({radius}/2, 2) and {whereZoneSql}" return whereSql def level2_heapix_sql_condition(ra, dec, radius, ra_col = 'ra', dec_col = 'dec', nside = DEFAULT_NSIDE): x2 = f"cos({dec_col})*cos({ra_col})" y2 = f"cos({dec_col})*sin({ra_col})" z2 = f"sin({dec_col})" x1 = f"cos({dec})*cos({ra})" y1 = f"cos({dec})*sin({ra})" z1 = f"sin({dec})" distance = f"pow(({x2}-{x1}),2)+pow(({y2}-{y1}),2)+pow(({z2}-{z1}),2)" heapix_ids = get_healpix_ids(ra, dec, radius, nside) nside_column = "brick_id" whereZoneSql = "%s in (%s)" % \ (nside_column, ','.join([str(i) for i in heapix_ids])) whereSql = f"{distance} <= 4*pow({radius}/2, 2) and {whereZoneSql}" return whereSql