# Copyright (C) 2016 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
#
# This file is part of Kitty.
#
# Kitty is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Kitty is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Kitty. If not, see <http://www.gnu.org/licenses/>.
'''
RPC implementation, based on jsonrpc
https://json-rpc.readthedocs.io/
'''
import requests
import json
import six
import traceback
from six.moves.BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
if six.PY3:
import codecs
JSONRPC_NO_RESULT_STR = u'No result from JSON-RPC method.'
JSONRPC_PARSE_ERROR = -32700
JSONRPC_METHOD_NOT_FOUND = -32601
JSONRPC_INTERNAL_ERROR = -32603
JSONRPC_NO_RESULT = -32000
[docs]def encode_string(data, encoding='hex'):
'''
Encode string
:param data: string to encode
:param encoding: encoding to use (default: 'hex')
:return: encoded string
'''
if six.PY2:
return data.encode(encoding)
else:
if isinstance(data, str):
data = bytes(data, 'utf-8')
return codecs.encode(data, encoding).decode('ascii')
[docs]def decode_string(data, encoding='hex'):
'''
Decode string
:param data: string to decode
:param encoding: encoding to use (default: 'hex')
:return: decoded string
'''
if six.PY2:
return data.decode(encoding)
else:
return codecs.decode(data.encode('ascii'), encoding)
[docs]def encode_data(data):
'''
Encode data - list, dict, string, bool or int (and nested)
:param data: data to encode
:param encoding: encoding to use (default: 'hex')
:return: encoded object of the same type
'''
if isinstance(data, (six.string_types, bytes)):
return encode_string(data)
elif isinstance(data, (six.integer_types, bool, float)):
return data
elif data is None:
return data
elif isinstance(data, list):
return [encode_data(x) for x in data]
elif isinstance(data, dict):
return {k: encode_data(v) for k, v in data.items()}
else:
raise ValueError('Cannot encode data of type %s' % type(data))
[docs]def decode_data(data):
'''
Decode data - list, dict, string, bool or int (and nested)
:param data: data to decode
:param encoding: encoding to use (default: 'hex')
:return: decoded object of the same type
'''
if isinstance(data, (six.string_types, bytes)):
return decode_string(data)
elif isinstance(data, (six.integer_types, bool, float)):
return data
elif data is None:
return data
elif isinstance(data, list):
return [decode_data(x) for x in data]
elif isinstance(data, dict):
return {k: decode_data(v) for k, v in data.items()}
else:
raise ValueError('Cannot decode data of type %s' % type(data))
[docs]class RpcClient(object):
[docs] def __init__(self, host, port):
'''
:param url: URL of the RPC server
'''
self.cache = {}
self.url = 'http://%s:%d' % (host, port)
self.headers = {'content-type': 'application/json'}
self.uid = 0
def __getattr__(self, key):
'''
Return a function with that name, which performs json rpc request
:param key: name of the function
:return: function with that name
'''
if key in self.cache:
func = self.cache[key]
else:
func = self._generate_rpc_method(key)
self.cache[key] = func
return func
[docs] def get_unique_msg_id(self):
'''
:return: a unique message id
'''
uid = self.uid
self.uid += 1
return uid
def _generate_rpc_method(self, method):
'''
Generate a function that performs rpc call
:param method: method name
:return: rpc function
'''
def _(**kwargs):
'''
always use named arguments
'''
msg_id = self.get_unique_msg_id()
params = encode_data(kwargs)
payload = {
'method': method,
'params': params,
'jsonrpc': '2.0',
'id': msg_id
}
response = requests.post(self.url, data=json.dumps(payload), headers=self.headers).json()
if ('error' in response):
if response['error']['code'] == JSONRPC_NO_RESULT:
return None
raise Exception('Got error from RPC server when called "%s" error: %s' % (method, response['error']))
if 'result' in response:
result = decode_data(response['result'])
return result
return _
[docs] def stop_remote_server(self):
'''
Stop the remote server (after responding to this message)
'''
self._meta_stop_server()
[docs]class RpcHttpServer(HTTPServer):
[docs] def __init__(self, server_address, handler, impl, meta):
'''
:param server_address: address of the server
:param handler: handler for requests
:param impl: reference to the implementation object
'''
HTTPServer.__init__(self, server_address, handler)
self.impl = impl
self.meta = meta
[docs] def log_message(self, fmt, *args):
'''
Override default log and do nothing
'''
return
[docs]class RpcHandler(BaseHTTPRequestHandler):
[docs] def log_message(self, fmt, *args):
'''
Override default log and do nothing
'''
return
def _parse_request(self):
'''
Parse the request
'''
self.req_method = 'unknown'
self.req_params = {}
self.req_rpc_version = '2.0'
self.req_id = 0
self.data = self.rfile.read(int(self.headers.getheader('content-length')))
data_dict = json.loads(self.data)
self.req_method = data_dict['method']
self.req_params = decode_data(data_dict['params'])
self.req_rpc_version = data_dict['jsonrpc']
self.req_id = data_dict['id']
[docs] def do_POST(self):
'''
Handle POST requests
'''
try:
self._parse_request()
except Exception as ex1:
print(traceback.format_exc())
self.error_response(JSONRPC_PARSE_ERROR, 'exception when parsing jsonrpc request [%s]' % (ex1))
return
try:
if self.req_method.startswith('_meta_'):
self.req_method = self.req_method.replace('_meta_', '')
instance = self.server.meta
else:
instance = self.server.impl
method = getattr(instance, self.req_method)
except AttributeError:
self.error_response(JSONRPC_METHOD_NOT_FOUND, 'no method named "%s"' % self.req_method)
return
try:
res = method(**self.req_params)
except Exception as ex1:
self.error_response(JSONRPC_INTERNAL_ERROR, 'exception in call "%s(%s)" -> %s' % (self.req_method, self.req_params, ex1))
return
if res is None:
self.error_response(JSONRPC_NO_RESULT, JSONRPC_NO_RESULT_STR)
else:
self.valid_response(res)
[docs] def error_response(self, code, msg):
'''
Send an error response
:param code: error code
:param msg: error message
'''
self.send_result({
'error': {
'code': code,
'message': msg
}
})
[docs] def valid_response(self, result):
'''
Send a valid response with the result
:param result: the result of the call
'''
self.send_result({
'result': encode_data(result)
})
[docs] def send_result(self, additional_dict):
'''
Send a result to the RPC client
:param additional_dict: the dictionary with the response
'''
self.send_response(200)
self.send_header("Content-type", "application/json")
response = {
'jsonrpc': self.req_rpc_version,
'id': self.req_id,
}
response.update(additional_dict)
jresponse = json.dumps(response)
self.send_header("Content-length", len(jresponse))
self.end_headers()
self.wfile.write(jresponse)
[docs]class RpcServer(object):
_STATE_IDLE = 1
_STATE_RUN = 2
_STATE_SHOULD_STOP = 3
[docs] def __init__(self, host, port, impl):
'''
:param host: listening address
:param port: listening port
:param impl: implementation class
'''
self.host = host
self.port = port
self.server = RpcHttpServer((host, port), RpcHandler, impl, self)
self.impl = impl
self.running = True
self.state = RpcServer._STATE_IDLE
[docs] def start(self):
'''
Serving loop
'''
print('Waiting for a client to connect to url http://%s:%d/' % (self.host, self.port))
self.state = RpcServer._STATE_RUN
while self.state == RpcServer._STATE_RUN:
self.server.handle_request()
self.server.server_close()
self.state = RpcServer._STATE_IDLE
[docs] def stop_server(self):
'''
Mark the server state to be stopped.
No further action needed when called from remote RPC client (stop_remote_server),
but requires another request if called directly
'''
self.state = RpcServer._STATE_SHOULD_STOP
[docs] def is_running(self):
'''
Check if the server is currently running
:return: whether the server is currently running
'''
return self.state != RpcServer._STATE_IDLE