Commit 6ebbd274 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

leve0

parent 644b034b
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_proto.facility.level0 import level0_pb2, level0_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
class Level0DataApi(object):
def __init__(self):
self.stub = level0_pb2_grpc.Level0SrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve level0 records from database
parameter kwargs:
obs_id: [int]
detector_no: [str]
obs_type: [str]
exp_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(level0_pb2.FindLevel0DataReq(
obs_id = get_parameter(kwargs, "obs_id"),
detector_no = get_parameter(kwargs, "detector_no"),
obs_type = get_parameter(kwargs, "obs_type"),
exp_time_start = get_parameter(kwargs, "exp_time", [None, None])[0],
exp_time_end = get_parameter(kwargs, "exp_time", [None, None])[1],
qc0_status = get_parameter(kwargs, "qc0_status"),
prc_status = get_parameter(kwargs, "prc_status"),
file_name = get_parameter(kwargs, "file_name"),
limit = get_parameter(kwargs, "limit", 0),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = resp.message)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
fits_id : [int]
return csst_dfs_common.models.Result
'''
try:
fits_id = get_parameter(kwargs, "fits_id")
resp, _ = self.stub.Get.with_call(level0_pb2.GetLevel0DataReq(
id = fits_id
),metadata = get_auth_headers())
if resp.record is None:
return Result.error(message=f"fits_id:{fits_id} not found")
return Result.ok_data(data=resp.record)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def read(self, **kwargs):
''' yield bytes of fits file
parameter kwargs:
fits_id = [int],
file_path = [str],
chunk_size = [int] default 20480
yield bytes of fits file
'''
try:
streams = self.stub.Read.with_call(level0_pb2.ReadLevel0DatasReq(
id = get_parameter(kwargs, "fits_id"),
file_path = get_parameter(kwargs, "file_path"),
chunk_size = get_parameter(kwargs, "chunk_size", 20480)
),metadata = get_auth_headers())
for stream in streams:
yield stream.data
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
fits_id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateProcStatus.with_call(
level0_pb2.UpdateProcStatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_qc0_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
fits_id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateQc0Status.with_call(
level0_pb2.UpdateQc0StatusReq(id=fits_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_proto.facility.level0prc import level0prc_pb2, level0prc_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
class Level0PrcApi(object):
def __init__(self):
self.stub = level0prc_pb2_grpc.Level0PrcSrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve level0 procedure records from database
parameter kwargs:
level0_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(level0prc_pb2.FindLevel0PrcReq(
level0_id = get_parameter(kwargs, "level0_id"),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
prc_status = get_parameter(kwargs, "prc_status"),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = resp.message)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
obs_id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateProcStatus.with_call(
level0prc_pb2.UpdateProcStatusReq(id=id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a level0 procedure record into database
parameter kwargs:
level0_id : [int]
pipeline_id : [str]
prc_module : [str]
params_id : [str]
prc_status : [int]
prc_time : [str]
file_path : [str]
return csst_dfs_common.models.Result
'''
rec = level0prc_pb2.Level0PrcRecord(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_id = get_parameter(kwargs, "params_id"),
prc_status = get_parameter(kwargs, "prc_status"),
prc_time = get_parameter(kwargs, "prc_time"),
file_path = get_parameter(kwargs, "file_path")
)
req = level0prc_pb2.WriteLevel0PrcReq(record = rec)
try:
resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.record)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
import grpc
from csst_dfs_commons.models import Result
from csst_dfs_proto.facility.observation import observation_pb2, observation_pb2_grpc
from ..common.service import ServiceProxy
from ..common.utils import *
from ..common.constants import UPLOAD_CHUNK_SIZE
class ObservationApi(object):
def __init__(self):
self.stub = observation_pb2_grpc.ObservationSrvStub(ServiceProxy().channel())
def find(self, **kwargs):
''' retrieve exposure records from database
parameter kwargs:
module_id: [str]
obs_type: [str]
exp_time : (start, end),
qc0_status : [int],
prc_status : [int],
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
resp, _ = self.stub.Find.with_call(observation_pb2.FindObservationReq(
module_id = get_parameter(kwargs, "module_id"),
obs_type = get_parameter(kwargs, "obs_type"),
exp_time_start = get_parameter(kwargs, "exp_time", [None, None])[0],
exp_time_end = get_parameter(kwargs, "exp_time", [None, None])[1],
qc0_status = get_parameter(kwargs, "qc0_status"),
prc_status = get_parameter(kwargs, "prc_status"),
limit = get_parameter(kwargs, "limit", 0),
other_conditions = {"test":"cnlab.test"}
),metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.records).append("totalCount", resp.totalCount)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
obs_id : [int]
return csst_dfs_common.models.Result
'''
try:
obs_id = get_parameter(kwargs, "obs_id")
resp, _ = self.stub.Get.with_call(observation_pb2.GetObservationReq(
obs_id = obs_id
),metadata = get_auth_headers())
if resp.observation is None:
return Result.error(message=f"obs_id:{obs_id} not found")
return Result.ok_data(data=resp.observation)
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
obs_id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
obs_id = get_parameter(kwargs, "obs_id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateProcStatus.with_call(
observation_pb2.UpdateProcStatusReq(obs_id=obs_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def update_qc0_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
obs_id : [int],
status : [int]
'''
obs_id = get_parameter(kwargs, "obs_id")
status = get_parameter(kwargs, "status")
try:
resp,_ = self.stub.UpdateQc0Status.with_call(
observation_pb2.UpdateQc0StatusReq(obs_id=obs_id, status=status),
metadata = get_auth_headers()
)
if resp.success:
return Result.ok_data()
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
def write(self, **kwargs):
''' insert a observational record into database
parameter kwargs:
file_path : [str]
'''
exp_begin_time = get_parameter(kwargs, "exp_begin_time")
exp_end_time = get_parameter(kwargs, "exp_end_time")
module_id = get_parameter(kwargs, "module_id")
obs_type = get_parameter(kwargs, "obs_type")
facility_status_id = get_parameter(kwargs, "facility_status_id")
module_status_id = get_parameter(kwargs, "module_status_id")
req = observation_pb2.WriteObservationReq(
exp_begin_time=exp_begin_time,
exp_end_time=exp_end_time,
module_id=module_id,
obs_type=obs_type,
facility_status_id=facility_status_id,
module_status_id=module_status_id)
try:
resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers())
if resp.success:
return Result.ok_data(data=resp.record)
else:
return Result.error(message = str(resp.error.detail))
except grpc.RpcError as e:
return Result.error(message="%s:%s" % (e.code().value, e.details))
......@@ -10,5 +10,5 @@ class CommonEphemTestCase(unittest.TestCase):
self.api = CatalogApi()
def test_gaia3_query(self):
recs = self.api.gaia3_query(ra=38.444, dec=55, radius=1, min_mag=-1, max_mag=-1, obstime = -1, limit = 2)
recs = self.api.gaia3_query(ra=222.1, dec=40, radius=0.5, min_mag=-1, max_mag=-1, obstime = -1, limit = 2)
print('find:', recs)
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.facility.level0 import Level0DataApi
class Level0DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0DataApi()
def test_find(self):
recs = self.api.find(obs_id = 9, obs_type = 'sci', limit = 0)
print('find:', recs)
def test_get(self):
rec = self.api.get(fits_id = 100)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(fits_id = 100, status = 6)
print('update_proc_status:', rec)
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(fits_id = 100, status = 7)
print('update_qc0_status:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.facility.level0prc import Level0PrcApi
class Level0PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0PrcApi()
def test_find(self):
recs = self.api.find(level0_id=134)
print('find:', recs)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 8, status = 4)
print('update_proc_status:', rec)
def test_write(self):
rec = self.api.write(level0_id=134,
pipeline_id = "P1",
prc_module = "QC0",
params_id = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
file_path = "/opt/dddasd.header")
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_cluster.facility.observation import ObservationApi
class FacilityObservationTestCase(unittest.TestCase):
def setUp(self):
self.api = ObservationApi()
def test_find(self):
recs = self.api.find(module_id="MSC",limit = 0)
print('find:', recs)
def test_get(self):
rec = self.api.get(obs_id=9)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(obs_id = 9, status = 3, )
print('update_proc_status:', rec)
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(obs_id = 9, status = 3, )
print('update_qc0_status:', rec)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment