Skip to content
level1.py 10.4 KiB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
import os
Wei Shoulin's avatar
c3  
Wei Shoulin committed
import grpc
Wei Shoulin's avatar
Wei Shoulin committed
import datetime
Wei Shoulin's avatar
c3  
Wei Shoulin committed

from csst_dfs_commons.models import Result
Wei Shoulin's avatar
Wei Shoulin committed
from csst_dfs_commons.models.common import from_proto_model_list
Wei Shoulin's avatar
Wei Shoulin committed
from csst_dfs_commons.models.facility import Level1Record
Wei Shoulin's avatar
Wei Shoulin committed
from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE
Wei Shoulin's avatar
Wei Shoulin committed
from csst_dfs_proto.facility.level1 import level1_pb2, level1_pb2_grpc
Wei Shoulin's avatar
c3  
Wei Shoulin committed

Wei Shoulin's avatar
Wei Shoulin committed
from ..common.service import grpc_channel
Wei Shoulin's avatar
c3  
Wei Shoulin committed
from ..common.utils import *

class Level1DataApi(object):
    """
    Level1 Data Operation Class
    """    
    def __init__(self):
Wei Shoulin's avatar
Wei Shoulin committed
        self.stub_class = level1_pb2_grpc.Level1SrvStub
        self.stub = None
Wei Shoulin's avatar
c3  
Wei Shoulin committed

Wei Shoulin's avatar
Wei Shoulin committed
    @grpc_channel
Wei Shoulin's avatar
c3  
Wei Shoulin committed
    def find(self, **kwargs):
        ''' retrieve level1 records from database

        parameter kwargs:
Wei Shoulin's avatar
Wei Shoulin committed
            level0_id: [str]
Wei Shoulin's avatar
Wei Shoulin committed
            module_id: [str]
Wei Shoulin's avatar
c3  
Wei Shoulin committed
            data_type: [str]
            create_time : (start, end),
            qc1_status : [int],
            prc_status : [int],
            filename: [str]
            limit: limits returns the number of records,default 0:no-limit

        return: csst_dfs_common.models.Result
        '''
        try:
            resp, _ =  self.stub.Find.with_call(level1_pb2.FindLevel1Req(
Wei Shoulin's avatar
Wei Shoulin committed
                obs_id = get_parameter(kwargs, "obs_id"),
Wei Shoulin's avatar
Wei Shoulin committed
                level0_id = get_parameter(kwargs, "level0_id"),
Wei Shoulin's avatar
Wei Shoulin committed
                module_id = get_parameter(kwargs, "module_id"),
Wei Shoulin's avatar
c3  
Wei Shoulin committed
                data_type = get_parameter(kwargs, "data_type"),
                create_time_start = get_parameter(kwargs, "create_time", [None, None])[0],
                create_time_end = get_parameter(kwargs, "create_time", [None, None])[1],
Wei Shoulin's avatar
Wei Shoulin committed
                qc1_status = get_parameter(kwargs, "qc1_status", 1024),
                prc_status = get_parameter(kwargs, "prc_status", 1024),
Wei Shoulin's avatar
c3  
Wei Shoulin committed
                filename = get_parameter(kwargs, "filename"),
                limit = get_parameter(kwargs, "limit", 0),
Wei Shoulin's avatar
Wei Shoulin committed
                pipeline_id = get_parameter(kwargs, "pipeline_id", ""),
Wei Shoulin's avatar
Wei Shoulin committed
                detector_no = get_parameter(kwargs, "detector_no", ""),
Wei Shoulin's avatar
Wei Shoulin committed
                filter = get_parameter(kwargs, "filter", ""),
Wei Shoulin's avatar
Wei Shoulin committed
                object_name = get_parameter(kwargs, "object_name", ""),
Wei Shoulin's avatar
Wei Shoulin committed
                other_conditions = {
                    "ra_cen": str(get_parameter(kwargs, "ra_cen", '')),
                    "dec_cen": str(get_parameter(kwargs, "dec_cen", '')),
                    "radius_cen": str(get_parameter(kwargs, "radius_cen", ''))
                }
Wei Shoulin's avatar
c3  
Wei Shoulin committed
            ),metadata = get_auth_headers())

            if resp.success:
Wei Shoulin's avatar
Wei Shoulin committed
                return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount)
Wei Shoulin's avatar
c3  
Wei Shoulin committed
            else:
                return Result.error(message = str(resp.error.detail))

        except grpc.RpcError as e:
Wei Shoulin's avatar
Wei Shoulin committed
            return Result.error(message="%s:%s" % (e.code().value, e.details()))
Wei Shoulin's avatar
c3  
Wei Shoulin committed

Wei Shoulin's avatar
Wei Shoulin committed
    @grpc_channel
Wei Shoulin's avatar
Wei Shoulin committed
    def find_by_brick_ids(self, **kwargs):
        ''' retrieve level1 records by brick_ids like [1,2,3,4]

        :param kwargs: Parameter dictionary, key items support:
            brick_ids: [list]

        return: csst_dfs_common.models.Result
        '''
        try:
            resp, _ =  self.stub.FindByBrickIds.with_call(level1_pb2.FindByBrickIdsReq(
                brick_ids = get_parameter(kwargs, "brick_ids", [])
            ),metadata = get_auth_headers())

            if resp.success:
                return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records))
            else:
                return Result.error(message = str(resp.error.detail))

        except grpc.RpcError as e:
            return Result.error(message="%s:%s" % (e.code().value, e.details()))

Wei Shoulin's avatar
Wei Shoulin committed
    @grpc_channel
Wei Shoulin's avatar
Wei Shoulin committed
    def find_by_ids(self, **kwargs):
        ''' retrieve level1 records by internal level1 ids like [1,2,3,4]

        :param kwargs: Parameter dictionary, key items support:
Wei Shoulin's avatar
Wei Shoulin committed
            ids: [list]
Wei Shoulin's avatar
Wei Shoulin committed

        return: csst_dfs_common.models.Result
        '''
        try:
            resp, _ =  self.stub.FindByIds.with_call(level1_pb2.FindByIdsReq(
                ids = get_parameter(kwargs, "ids", [])
            ),metadata = get_auth_headers())

            if resp.success:
                return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records))
            else:
                return Result.error(message = str(resp.error.detail))

        except grpc.RpcError as e:
            return Result.error(message="%s:%s" % (e.code().value, e.details()))
Wei Shoulin's avatar
Wei Shoulin committed
    
    @grpc_channel
Wei Shoulin's avatar
Wei Shoulin committed
    def sls_find_by_qc1_status(self, **kwargs):
        ''' retrieve level1 records from database

        parameter kwargs:
            qc1_status: [str]
            limit: limits returns the number of records,default 1

        return: csst_dfs_common.models.Result
        '''
        try:
            resp, _ =  self.stub.FindByQc1Status.with_call(level1_pb2.FindLevel1Req(
                level0_id = None,
                data_type = None,
                create_time_start = None,
                create_time_end = None,
Wei Shoulin's avatar
Wei Shoulin committed
                qc1_status = get_parameter(kwargs, "qc1_status", -1),
Wei Shoulin's avatar
Wei Shoulin committed
                prc_status = None,
                limit = get_parameter(kwargs, "limit", 1),
                other_conditions = {"orderBy":"create_time asc", "module_id": 'SLS'}
            ),metadata = get_auth_headers())

            if resp.success:
                return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount)
            else:
                return Result.error(message = str(resp.error.detail))

        except grpc.RpcError as e:
            return Result.error(message="%s:%s" % (e.code().value, e.details()))

Wei Shoulin's avatar
Wei Shoulin committed
    @grpc_channel
Wei Shoulin's avatar
c3  
Wei Shoulin committed
    def get(self, **kwargs):
        '''  fetch a record from database

        parameter kwargs:
            id : [int] 

        return csst_dfs_common.models.Result
        '''
        try:
            resp, _ =  self.stub.Get.with_call(level1_pb2.GetLevel1Req(
Wei Shoulin's avatar
Wei Shoulin committed
                id = get_parameter(kwargs, "id"),
                level0_id = get_parameter(kwargs, "level0_id"),
                data_type = get_parameter(kwargs, "data_type") 
Wei Shoulin's avatar
c3  
Wei Shoulin committed
            ),metadata = get_auth_headers())

Wei Shoulin's avatar
Wei Shoulin committed
            if resp.record is None or resp.record.id == 0:
Wei Shoulin's avatar
Wei Shoulin committed
                return Result.error(message=f"data not found")  
Wei Shoulin's avatar
c3  
Wei Shoulin committed

Wei Shoulin's avatar
Wei Shoulin committed
            return Result.ok_data(data = Level1Record().from_proto_model(resp.record))
Wei Shoulin's avatar
c3  
Wei Shoulin committed
        except grpc.RpcError as e:
Wei Shoulin's avatar
Wei Shoulin committed
            return Result.error(message="%s:%s" % (e.code().value, e.details()))   
Wei Shoulin's avatar
c3  
Wei Shoulin committed

Wei Shoulin's avatar
Wei Shoulin committed
    @grpc_channel
Wei Shoulin's avatar
c3  
Wei Shoulin committed
    def update_proc_status(self, **kwargs):
        ''' update the status of reduction

        parameter kwargs:
            id : [int],
            status : [int]

        return csst_dfs_common.models.Result
        '''
        fits_id = get_parameter(kwargs, "id")
        status = get_parameter(kwargs, "status")
        try:
            resp,_ = self.stub.UpdateProcStatus.with_call(
                level1_pb2.UpdateProcStatusReq(id=fits_id, status=status),
                metadata = get_auth_headers()
            )
            if resp.success:
                return Result.ok_data()
            else:
                return Result.error(message = str(resp.error.detail))
        except grpc.RpcError as e:
Wei Shoulin's avatar
Wei Shoulin committed
            return Result.error(message="%s:%s" % (e.code().value, e.details()))
Wei Shoulin's avatar
c3  
Wei Shoulin committed

Wei Shoulin's avatar
Wei Shoulin committed
    @grpc_channel
Wei Shoulin's avatar
c3  
Wei Shoulin committed
    def update_qc1_status(self, **kwargs):
        ''' update the status of QC0
        
        parameter kwargs:
            id : [int],
            status : [int]
        '''        
        fits_id = get_parameter(kwargs, "id")
        status = get_parameter(kwargs, "status")
        try:
            resp,_ = self.stub.UpdateQc1Status.with_call(
                level1_pb2.UpdateQc1StatusReq(id=fits_id, status=status),
                metadata = get_auth_headers()
            )
            if resp.success:
                return Result.ok_data()
            else:
                return Result.error(message = str(resp.error.detail))
        except grpc.RpcError as e:
Wei Shoulin's avatar
Wei Shoulin committed
            return Result.error(message="%s:%s" % (e.code().value, e.details()))
Wei Shoulin's avatar
c3  
Wei Shoulin committed

Wei Shoulin's avatar
Wei Shoulin committed
    @grpc_channel
Wei Shoulin's avatar
c3  
Wei Shoulin committed
    def write(self, **kwargs):
        ''' insert a level1 record into database
Wei Shoulin's avatar
c3  
Wei Shoulin committed
        parameter kwargs:
Wei Shoulin's avatar
Wei Shoulin committed
            level0_id : [str]
Wei Shoulin's avatar
c3  
Wei Shoulin committed
            data_type : [str]
            cor_sci_id : [int]
            prc_params : [str]
            filename : [str]
            file_path : [str]            
            prc_status : [int]
            prc_time : [str]
            pipeline_id : [str]
Wei Shoulin's avatar
Wei Shoulin committed
            pmapname : [str]
Wei Shoulin's avatar
Wei Shoulin committed
            build : [int]
Wei Shoulin's avatar
c3  
Wei Shoulin committed

        return csst_dfs_common.models.Result
        '''   

        rec = level1_pb2.Level1Record(
            id = 0,
Wei Shoulin's avatar
Wei Shoulin committed
            level0_id = get_parameter(kwargs, "level0_id", ""),
            module_id = get_parameter(kwargs, "module_id", ""),
            data_type = get_parameter(kwargs, "data_type", ""),
            cor_sci_id = get_parameter(kwargs, "cor_sci_id", 0),
            prc_params = get_parameter(kwargs, "prc_params", ""),
Wei Shoulin's avatar
Wei Shoulin committed
            filename = get_parameter(kwargs, "filename", ""),
            file_path = get_parameter(kwargs, "file_path", ""),
Wei Shoulin's avatar
Wei Shoulin committed
            qc1_status = get_parameter(kwargs, "qc1_status", 0),
            prc_status = get_parameter(kwargs, "prc_status", 0),
Wei Shoulin's avatar
Wei Shoulin committed
            prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
Wei Shoulin's avatar
Wei Shoulin committed
            pipeline_id = get_parameter(kwargs, "pipeline_id", ""),
Wei Shoulin's avatar
Wei Shoulin committed
            build = get_parameter(kwargs, "build", 0),
Wei Shoulin's avatar
Wei Shoulin committed
            pmapname = get_parameter(kwargs, "pmapname", ""),
Wei Shoulin's avatar
Wei Shoulin committed
            refs = get_parameter(kwargs, "refs", {})
Wei Shoulin's avatar
c3  
Wei Shoulin committed
        )
Wei Shoulin's avatar
Wei Shoulin committed
        def stream(rec):
            with open(rec.file_path, 'rb') as f:
                while True:
                    data = f.read(UPLOAD_CHUNK_SIZE)
                    if not data:
                        break
                    yield level1_pb2.WriteLevel1Req(record = rec, data = data)
Wei Shoulin's avatar
c3  
Wei Shoulin committed
        try:
Wei Shoulin's avatar
Wei Shoulin committed
            if not rec.file_path:
                return Result.error(message="file_path is blank")
Wei Shoulin's avatar
Wei Shoulin committed
            if not os.path.exists(rec.file_path):
                return Result.error(message="the file [%s] not existed" % (rec.file_path, ))
Wei Shoulin's avatar
Wei Shoulin committed
            if not rec.filename:
                rec.filename = os.path.basename(rec.file_path)

Wei Shoulin's avatar
Wei Shoulin committed
            resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers())
Wei Shoulin's avatar
c3  
Wei Shoulin committed
            if resp.success:
Wei Shoulin's avatar
Wei Shoulin committed
                return Result.ok_data(data=Level1Record().from_proto_model(resp.record))
Wei Shoulin's avatar
c3  
Wei Shoulin committed
            else:
                return Result.error(message = str(resp.error.detail))
        except grpc.RpcError as e:
Wei Shoulin's avatar
Wei Shoulin committed
            return Result.error(message="%s:%s" % (e.code().value, e.details()))