Newer
Older
from csst_dfs_commons.models.common import from_proto_model_list, Gaia3Record
from .service import ServiceProxy
from .constants import *
from .utils import get_auth_headers
self.stub = ephem_pb2_grpc.EphemSearchSrvStub(ServiceProxy().channel())
def gaia3_query(self, ra: float, dec: float, radius: float, min_mag: float, max_mag: float, obstime: int, limit: int):
''' retrieval GAIA DR 3
args:
ra: in deg
dec: in deg
radius: in deg
min_mag: minimal magnitude
max_mag: maximal magnitude
obstime: seconds
limit: limits returns the number of records
resps, _ = self.stub.Gaia3Search.with_call(ephem_pb2.EphemSearchRequest(
ra = ra,
dec = dec,
radius = radius,
minMag = min_mag,
maxMag = max_mag,
obstime = obstime,
limit = limit
),metadata = get_auth_headers())
records = []
totalCount = 0
for resp in resps:
if resp.success:
data = from_proto_model_list(Gaia3Record, resp.records)
records.extend(data)
totalCount = resp.totalCount
else:
return Result.error(message = str(resp.error.detail))
return Result.ok_data(data = records).append("totalCount", totalCount)
return Result.error(message="%s:%s" % (e.code().value, e.details()))