Commit af5a87db authored by Shoulin Wei's avatar Shoulin Wei
Browse files

init

parents
# Python temp files
**/__pycache__/
**/*.py~
**/*.pyc
# Python virtualenv folder
**/venv/
# IDE Project files
**/.idea/
**/.vscode/
**/*.swp
# MacOS folder attributes files
**/*.DS_Store
# Files created during testing
/_build/
test_reports.xml
**/.cache
**/.pytest_cache/
# Compiled python modules.
**/.pyc
# Setuptools distribution folder.
**/build/
**/dist/
# Code coverage
**/.coverage
# log
*.log
# Python egg metadata, regenerated from source files by setuptools.
**/*.egg-info
**/*.rdb
**/.eggs
**/_version.py
# CSST local APIs library
## Introduction
This package provides APIs to access csst's files and databases in your localized environment.
## Installation
This library can be installed with the following command:
```bash
python setup.py install
```
## Configuration
set enviroment variable
CSST_LOCAL_FILE_ROOT = [a local file directory] #default: /opt/temp/csst
__version_info__ = (1, 0, 0)
__version__ = '.'.join(map(str, __version_info__))
\ No newline at end of file
from .catalog import CatalogApi
\ No newline at end of file
from ..common.client_config import ClientConfigurator
from ..common.service import ServiceProxy
from csst_dfs_proto.common.ephem import ephem_pb2, ephem_pb2_grpc
from ..common.constants import *
import grpc
class CatalogApi(object):
def __init__(self):
self.proxy = ServiceProxy(ClientConfigurator().gatewayCfg)
self.stub = self.proxy.insecure(ephem_pb2_grpc.EphemSearchSrvStub)
def gaia3_query(self, ra: float, dec: float, radius: float, min_mag: float, max_mag: float, obstime: int, limit: int):
''' retrieval GAIA DR 3
args:
ra: in deg
dec: in deg
radius: in deg
min_mag: minimal magnitude
max_mag: maximal magnitude
obstime: seconds
limit: limits returns the number of records
return: a dict as {success: true, totalCount: 100, records:[.....]}
'''
try:
resp = self.stub.Gaia3Search(ephem_pb2.EphemSearchRequest(
ra = ra,
dec = dec,
radius = radius,
minMag = min_mag,
maxMag = max_mag,
obstime = obstime,
limit = limit
))
return resp
except grpc.RpcError as identifier:
raise Exception("Rpc Error")
import os
from .config.grpc_loader import GrpcLoader
from .config.configurator import Configurator
from .utils import singleton
def configOptions(opts):
opts.address = os.getenv('CSST_DFS_CONFIG_SERVER', "localhost:9600")
@singleton
class ClientConfigurator(object):
def __init__(self) -> None:
self.configurator = Configurator(GrpcLoader(configOptions))
self.globalCfg = self.configurator.Global()
self.gatewayCfg = self.configurator.Gateway()
globalConfig = ClientConfigurator()
import time
class ChangeSet(object):
def __init__(self):
self._data = bytes([])
self._checksum = ""
self._format = ""
self._source = ""
self._timestamp = time.time()
@property
def data(self):
return self._data
@data.setter
def data(self, value):
self._data = value
@property
def checksum(self):
return self._checksum
@checksum.setter
def checksum(self, value):
self._checksum = value
@property
def format(self):
return self._format
@format.setter
def format(self, value):
self._format = value
@property
def source(self):
return self._source
@source.setter
def source(self, value):
self._source = value
@property
def timestamp(self):
return self._timestamp
@timestamp.setter
def timestamp(self, value):
self._timestamp = value
import orjson
from .model import Server as ServerCfg, Global as GlobalCfg, Gateway as GatewayCfg
class Configurator(object):
def __init__(self, loader):
self.loader = loader
change_set = self.loader.Read()
self.config = orjson.loads(change_set.data)
def Server(self, srv_name):
return self.Any(srv_name, ServerCfg)
def Gateway(self, path = "gateway"):
return self.Any(path, GatewayCfg)
def Global(self, path = "global"):
return self.Any(path, GlobalCfg)
def Any(self, path, ref_object):
paths = path.split(".")
cfg = self.config
for p in paths:
cfg = cfg[p]
if isinstance(cfg, dict):
return ref_object(**cfg)
else:
return ref_object(cfg)
\ No newline at end of file
import grpc
from .grpc_options import GrpcOptions
from pymicro.config.grpc.proto import grpc_pb2, grpc_pb2_grpc
from .changeset import ChangeSet
class GrpcLoader(object):
def __init__(self, *args, **kwargs):
self.options = GrpcOptions()
for o in args:
o(self.options)
self._connect()
def Read(self):
request = grpc_pb2.ReadRequest(path = self.options.path)
response = self.stub.Read(request)
return self._to_changset(response.change_set)
def _connect(self):
self.channel = grpc.insecure_channel(self.options.address)
self.stub = grpc_pb2_grpc.SourceStub(self.channel)
def _to_changset(self, cs):
changeset = ChangeSet()
changeset.data = cs.data
changeset.checksum = cs.checksum
changeset.source = cs.source
changeset.format = cs.format
changeset.timestamp = cs.timestamp
return changeset
from pymicro.config.options import Options
class GrpcOptions(Options):
def __init__(self):
self.address = "localhost:9600"
self.path = "micro"
\ No newline at end of file
import dataclasses
@dataclasses.dataclass
class Server:
name: str=""
version: str=""
address: str=""
port: int=-1
@dataclasses.dataclass
class Global:
fitsFileRootDir: str=""
fileExternalPrefix: str=""
@dataclasses.dataclass
class Gateway:
enabled: bool=True
url: str=""
class Options(object):
def __init__(self):
pass
\ No newline at end of file
syntax = "proto3";
service Source {
rpc Read(ReadRequest) returns (ReadResponse) {};
rpc Watch(WatchRequest) returns (stream WatchResponse) {};
}
message ChangeSet {
bytes data = 1;
string checksum = 2;
string format = 3;
string source = 4;
int64 timestamp = 5;
}
message ReadRequest {
string path = 1;
}
message ReadResponse {
ChangeSet change_set = 1;
}
message WatchRequest {
string path = 1;
}
message WatchResponse {
ChangeSet change_set = 1;
}
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: grpc.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='grpc.proto',
package='',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\ngrpc.proto\"^\n\tChangeSet\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x03 \x01(\t\x12\x0e\n\x06source\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\x03\"\x1b\n\x0bReadRequest\x12\x0c\n\x04path\x18\x01 \x01(\t\".\n\x0cReadResponse\x12\x1e\n\nchange_set\x18\x01 \x01(\x0b\x32\n.ChangeSet\"\x1c\n\x0cWatchRequest\x12\x0c\n\x04path\x18\x01 \x01(\t\"/\n\rWatchResponse\x12\x1e\n\nchange_set\x18\x01 \x01(\x0b\x32\n.ChangeSet2[\n\x06Source\x12%\n\x04Read\x12\x0c.ReadRequest\x1a\r.ReadResponse\"\x00\x12*\n\x05Watch\x12\r.WatchRequest\x1a\x0e.WatchResponse\"\x00\x30\x01\x62\x06proto3')
)
_CHANGESET = _descriptor.Descriptor(
name='ChangeSet',
full_name='ChangeSet',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='data', full_name='ChangeSet.data', index=0,
number=1, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='checksum', full_name='ChangeSet.checksum', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='format', full_name='ChangeSet.format', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='source', full_name='ChangeSet.source', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='timestamp', full_name='ChangeSet.timestamp', index=4,
number=5, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=14,
serialized_end=108,
)
_READREQUEST = _descriptor.Descriptor(
name='ReadRequest',
full_name='ReadRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='path', full_name='ReadRequest.path', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=110,
serialized_end=137,
)
_READRESPONSE = _descriptor.Descriptor(
name='ReadResponse',
full_name='ReadResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='change_set', full_name='ReadResponse.change_set', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=139,
serialized_end=185,
)
_WATCHREQUEST = _descriptor.Descriptor(
name='WatchRequest',
full_name='WatchRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='path', full_name='WatchRequest.path', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=187,
serialized_end=215,
)
_WATCHRESPONSE = _descriptor.Descriptor(
name='WatchResponse',
full_name='WatchResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='change_set', full_name='WatchResponse.change_set', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=217,
serialized_end=264,
)
_READRESPONSE.fields_by_name['change_set'].message_type = _CHANGESET
_WATCHRESPONSE.fields_by_name['change_set'].message_type = _CHANGESET
DESCRIPTOR.message_types_by_name['ChangeSet'] = _CHANGESET
DESCRIPTOR.message_types_by_name['ReadRequest'] = _READREQUEST
DESCRIPTOR.message_types_by_name['ReadResponse'] = _READRESPONSE
DESCRIPTOR.message_types_by_name['WatchRequest'] = _WATCHREQUEST
DESCRIPTOR.message_types_by_name['WatchResponse'] = _WATCHRESPONSE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
ChangeSet = _reflection.GeneratedProtocolMessageType('ChangeSet', (_message.Message,), {
'DESCRIPTOR' : _CHANGESET,
'__module__' : 'grpc_pb2'
# @@protoc_insertion_point(class_scope:ChangeSet)
})
_sym_db.RegisterMessage(ChangeSet)
ReadRequest = _reflection.GeneratedProtocolMessageType('ReadRequest', (_message.Message,), {
'DESCRIPTOR' : _READREQUEST,
'__module__' : 'grpc_pb2'
# @@protoc_insertion_point(class_scope:ReadRequest)
})
_sym_db.RegisterMessage(ReadRequest)
ReadResponse = _reflection.GeneratedProtocolMessageType('ReadResponse', (_message.Message,), {
'DESCRIPTOR' : _READRESPONSE,
'__module__' : 'grpc_pb2'
# @@protoc_insertion_point(class_scope:ReadResponse)
})
_sym_db.RegisterMessage(ReadResponse)
WatchRequest = _reflection.GeneratedProtocolMessageType('WatchRequest', (_message.Message,), {
'DESCRIPTOR' : _WATCHREQUEST,
'__module__' : 'grpc_pb2'
# @@protoc_insertion_point(class_scope:WatchRequest)
})
_sym_db.RegisterMessage(WatchRequest)
WatchResponse = _reflection.GeneratedProtocolMessageType('WatchResponse', (_message.Message,), {
'DESCRIPTOR' : _WATCHRESPONSE,
'__module__' : 'grpc_pb2'
# @@protoc_insertion_point(class_scope:WatchResponse)
})
_sym_db.RegisterMessage(WatchResponse)
_SOURCE = _descriptor.ServiceDescriptor(
name='Source',
full_name='Source',
file=DESCRIPTOR,
index=0,
serialized_options=None,
serialized_start=266,
serialized_end=357,
methods=[
_descriptor.MethodDescriptor(
name='Read',
full_name='Source.Read',
index=0,
containing_service=None,
input_type=_READREQUEST,
output_type=_READRESPONSE,
serialized_options=None,
),
_descriptor.MethodDescriptor(
name='Watch',
full_name='Source.Watch',
index=1,
containing_service=None,
input_type=_WATCHREQUEST,
output_type=_WATCHRESPONSE,
serialized_options=None,
),
])
_sym_db.RegisterServiceDescriptor(_SOURCE)
DESCRIPTOR.services_by_name['Source'] = _SOURCE
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
from . import grpc_pb2 as grpc__pb2
class SourceStub(object):
# missing associated documentation comment in .proto file
pass
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Read = channel.unary_unary(
'/Source/Read',
request_serializer=grpc__pb2.ReadRequest.SerializeToString,
response_deserializer=grpc__pb2.ReadResponse.FromString,
)
self.Watch = channel.unary_stream(
'/Source/Watch',
request_serializer=grpc__pb2.WatchRequest.SerializeToString,
response_deserializer=grpc__pb2.WatchResponse.FromString,
)
class SourceServicer(object):
# missing associated documentation comment in .proto file
pass
def Read(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Watch(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_SourceServicer_to_server(servicer, server):
rpc_method_handlers = {
'Read': grpc.unary_unary_rpc_method_handler(
servicer.Read,
request_deserializer=grpc__pb2.ReadRequest.FromString,
response_serializer=grpc__pb2.ReadResponse.SerializeToString,
),
'Watch': grpc.unary_stream_rpc_method_handler(
servicer.Watch,
request_deserializer=grpc__pb2.WatchRequest.FromString,
response_serializer=grpc__pb2.WatchResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'Source', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
UPLOAD_CHUNK_SIZE = 1024*1024*4
FITS_SRV = "net.cnlab.csst.srv.fits."
DB_SRV = "net.cnlab.csst.srv.db."
EPHEM_SRV = "net.cnlab.csst.srv.ephem."
\ No newline at end of file
import grpc
import random
class _Service:
def __init__(self, data):
self._data = data
@property
def name(self):
raise NotImplementedError()
@property
def version(self):
raise NotImplementedError()
@property
def metadata(self):
raise NotImplementedError()
@property
def id(self):
raise NotImplementedError()
@property
def address(self):
raise NotImplementedError()
@property
def port(self):
raise NotImplementedError()
@property
def node_metadata(self):
raise NotImplementedError()
def __str__(self):
return '{}:{}'.format(self.name, self.address)
class Service(_Service):
def __init__(self, data):
self._data = data
@property
def name(self):
return self._data['name']
@property
def version(self):
return self._data['version']
@property
def metadata(self):
return self._data['metadata']
@property
def _node(self):
return random.choice(self._data['nodes'])
@property
def id(self):
return self._node['id']
@property
def address(self):
return self._node['address']
@property
def node_metadata(self):
return self._node['metadata']
class ServiceProxy:
def __init__(self, gatewayCfg = None):
self.gatewayCfg = gatewayCfg
def insecure(self, stubCls):
channel = grpc.insecure_channel(self.gatewayCfg.url)
return stubCls(channel)
\ No newline at end of file
from datetime import datetime
import time
def format_datetime(dt):
return dt.strftime('%Y-%m-%d %H:%M:%S')
def format_date(dt):
return dt.strftime('%Y-%m-%d')
def format_time_ms(float_time):
local_time = time.localtime(float_time)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = (float_time - int(float_time)) * 1000
return "%s.%03d" % (data_head, data_secs)
def get_parameter(kwargs, key, default=None):
""" Get a specified named value for this (calling) function
The parameter is searched for in kwargs
:param kwargs: Parameter dictionary
:param key: Key e.g. 'max_workers'
:param default: Default value
:return: result
"""
if kwargs is None:
return default
value = default
if key in kwargs.keys():
value = kwargs[key]
return value
def to_int(s, default_value = 0):
try:
return int(s)
except:
return default_value
def singleton(cls):
_instance = {}
def inner():
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return inner
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment