from astropy_healpix import HEALPix from astropy.coordinates import ICRS from astropy import units as u from astropy.coordinates import SkyCoord from astropy.io import fits from csst_dfs_commons.models.constants import PI, DEFAULT_NSIDE def get_header_value(key: str, headers, default_value = None): try: ret_value = None if not isinstance(headers, list): headers = [headers] for header in headers: if key in header: ret_value = header[key] if type(ret_value) == str: return ret_value.strip() return ret_value if ret_value is None: return default_value except Exception as e: return default_value def get_healpix_id(ra, dec, nside=256): try: hp = HEALPix(nside=nside, order='nested', frame=ICRS()) coord = SkyCoord(ra = ra * u.deg, dec = dec * u.deg, frame='icrs') return hp.skycoord_to_healpix(coord) except: return -1 def get_healpix_ids(ra, dec, radius, nside=256): hp = HEALPix(nside=nside, order='nested', frame=ICRS()) coord = SkyCoord(ra = ra * u.deg, dec = dec * u.deg, frame='icrs') return hp.cone_search_skycoord(coord, radius = radius * u.deg) def get_raw_header(filepath, **keys): union = {} with fits.open(filepath, **keys) as hdulist: for hdu in hdulist: for card in hdu.header.cards: card.verify('fix') key, value = card.keyword, card.value if not key: continue union[key]= value return union def heapix_sql_condition(ra, dec, radius, ra_col = 'ra', dec_col = 'dec', nside = DEFAULT_NSIDE): """ 生成用于筛选特定天区范围内数据的SQL条件语句(大圆公式)。 Args: ra (float): 天区中心的赤经值,单位为度。 dec (float): 天区中心的赤纬值,单位为度。 radius (float): 天区的半径,单位为度。 ra_col (str, optional): 数据表中赤经值的列名,默认为'ra'。 dec_col (str, optional): 数据表中赤纬值的列名,默认为'dec'。 nside (int, optional): Healpix像素化的nside参数,默认为DEFAULT_NSIDE。 Returns: str: 用于筛选特定天区范围内数据的SQL条件语句。 """ arcDec = (PI / 180) * dec whereSql = f"abs((180./pi()) * ACOS(SIN(pi() * {dec_col}/180) * SIN({arcDec}) + COS(pi() * {dec_col}/180) * COS({arcDec}) * COS(pi()* ({ra_col} - {ra})/180))) < {radius}" heapix_ids = get_healpix_ids(ra, dec, radius, nside) nside_column = "NS%dHIdx" % (nside,) whereZoneSql = "%s in (%s)" % \ (nside_column, ','.join([str(i) for i in heapix_ids])) return f"{whereSql} and {whereZoneSql}" def catalog_heapix_sql_condition(ra, dec, radius, ra_col = 'ra', dec_col = 'dec', nside = DEFAULT_NSIDE): x2 = f"cos(pi()*{dec_col}/180.0)*cos(pi()*{ra_col}/180.0)" y2 = f"cos(pi()*{dec_col}/180.0)*sin(pi()*{ra_col}/180.0)" z2 = f"sin(pi()*{dec_col}/180.0)" x1 = f"cos(pi()*{dec}/180.0)*cos(pi()*{ra}/180.0)" y1 = f"cos(pi()*{dec}/180.0)*sin(pi()*{ra}/180.0)" z1 = f"sin(pi()*{dec}/180.0)" distance = f"pow(({x2}-{x1}),2)+pow(({y2}-{y1}),2)+pow(({z2}-{z1}),2)" heapix_ids = get_healpix_ids(ra, dec, radius, nside) nside_column = "NS%dHIdx" % (nside,) whereZoneSql = "%s in (%s)" % \ (nside_column, ','.join([str(i) for i in heapix_ids])) whereSql = f"{distance} <= 4*pow(pi()*{radius}/180.0/2, 2) and {whereZoneSql}" return whereSql def dfs_heapix_sql_condition(ra, dec, radius, ra_col = 'ra', dec_col = 'dec', brick_col = "brick_id", nside = DEFAULT_NSIDE): x2 = f"cos(pi()*{dec_col}/180.0)*cos(pi()*{ra_col}/180.0)" y2 = f"cos(pi()*{dec_col}/180.0)*sin(pi()*{ra_col}/180.0)" z2 = f"sin(pi()*{dec_col}/180.0)" x1 = f"cos(pi()*{dec}/180.0)*cos(pi()*{ra}/180.0)" y1 = f"cos(pi()*{dec}/180.0)*sin(pi()*{ra}/180.0)" z1 = f"sin(pi()*{dec}/180.0)" distance = f"pow(({x2}-{x1}),2)+pow(({y2}-{y1}),2)+pow(({z2}-{z1}),2)" heapix_ids = get_healpix_ids(ra, dec, radius, nside) whereZoneSql = "%s in (%s)" % \ (brick_col, ','.join([str(i) for i in heapix_ids])) whereSql = f"{distance} <= 4*pow(pi()*{radius}/180.0/2, 2) and {whereZoneSql}" return whereSql