from ..common.service import grpc_channel from ..common.utils import * from ..common.constants import * class OtherDataApi(object): def __init__(self): self.stub = None @grpc_channel def find(self, **kwargs): return find_req("OtherDataServicer.Find", kwargs) @grpc_channel def get(self, **kwargs): return get_req("OtherDataServicer.Get", kwargs) @grpc_channel def write(self, **kwargs): try: file_path = get_parameter(kwargs, "file_path", "") filename = get_parameter(kwargs, "filename", "") if not file_path: return Result.error(message="file_path is blank") if not os.path.exists(file_path): return Result.error(message="the file [%s] not existed" % (file_path, )) if not filename: filename = os.path.basename(file_path) conditions = {} conditions.update(kwargs) conditions["filename"] = filename with open(file_path, 'rb') as f: byte_stream = io.BytesIO(f.read()) return write_stream_req("OtherDataServicer.Write", byte_stream, kwargs) except Exception as e: return Result.error(message="%s" % (e,))