Source code for kitty.interfaces.web

# Copyright (C) 2016 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
#
# This file is part of Kitty.
#
# Kitty is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Kitty is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Kitty.  If not, see <http://www.gnu.org/licenses/>.

import BaseHTTPServer
import json
import datetime
import time
from urlparse import urlparse, parse_qs
import os

from kitty.interfaces.base import EmptyInterface
from kitty.core.threading_utils import FuncThread


class _WebInterfaceServer(BaseHTTPServer.HTTPServer):
    '''
    http://docs.python.org/lib/module-BaseHTTPServer.html
    '''

    def __init__(self, server_address, handler, interface):
        '''
        :param server_address: address of the server
        :param handler: handler for requests
        :param interface: reference to the interface object
        '''
        BaseHTTPServer.HTTPServer.__init__(self, server_address, handler)
        # kitty.interfaces... interface object
        self.interface = interface
        self.RequestHandlerClass.logger = interface.logger
        self.RequestHandlerClass.dataman = interface.dataman

    @classmethod
    def log_message(cls, dummy1, *dummy2):
        '''
        Used to silence the direct logging to stdout/stderr
        '''
        # self.logger.info(dummy1, *dummy2)
        return


class _WebInterfaceHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    '''
    Our HTTP request handler
    '''
    def __init__(self, request, client_address, server):
        '''
        :param request: the request from the client
        :param client_address: client address
        :param server: HTTP server
        '''
        BaseHTTPServer.BaseHTTPRequestHandler.__init__(
            self, request, client_address, server)
        self.dataman = None

    @classmethod
    def log_message(cls, dummy1, *dummy2):
        '''
        Used to silence the direct logging to stdout/stderr
        '''
        # self.logger.info(dummy1, *dummy2)
        return

    def _pause_fuzzer(self):
        self.server.interface.pause()

    def _resume_fuzzer(self):
        self.server.interface.resume()

    def _handle_image_request(self, content_type='image/jpeg'):
        path = self.path
        filename = path.split('/')[-1]
        try:
            this_dir = os.path.dirname(os.path.realpath(__file__))
            file_path = os.path.join(this_dir, 'web', 'images', filename)
            imgf = open(file_path, 'rb')
            buff = imgf.read()
            self.send_response(200)
            self.send_header('Content-type', content_type)
            self.send_header('Content-Length', '%d' % len(buff))
            self.send_header('Cache-Control', 'public, max-age=999936000')
            self.end_headers()
            return buff
        except Exception:
            self.send_response(401)
            self.end_headers()
            return ''

    def _handle_favicon_request(self):
        return self._handle_image_request(content_type='image/x-icon')

    def do_GET(self):
        '''
        handle GET request
        '''
        self._my_handle()

    def do_POST(self):
        '''
        handle POST request
        '''
        self._my_handle()

    def _handle_text_request(self, filename=None, content_type=None):
        try:
            parsed = urlparse(self.path)
            path = parsed.path.lower()
            if filename is None:
                filename = path.split('/')[-1]
            if content_type is None:
                extension = filename.split('.')[-1]
                if extension == 'js':
                    extension = 'javascript'
                content_type = 'text/' + extension
            this_dir = os.path.dirname(os.path.realpath(__file__))
            file_path = os.path.join(this_dir, 'web', 'static', filename)
            with open(file_path, 'rb') as the_file:
                buff = the_file.read()
            self.send_response(200)
            self.send_header('Content-type', content_type)
            self.send_header('Content-Length', '%d' % len(buff))
            self.end_headers()
        except Exception:
            buff = None
        return buff

    def _handle_index(self):
        return self._handle_text_request('index.html', 'text/html')

    def _get_eta(self, info):
        end_index = info.end_index
        current_index = info.current_index
        start_index = info.start_index
        if current_index is None:
            tests_left = 0
            tests_passed = end_index - start_index
        else:
            tests_left = end_index - current_index
            tests_passed = current_index - start_index
        current_time = time.time()
        time_passed = current_time - info.start_time
        if tests_passed == 0:
            return 'unknown'
        else:
            average_test_time = time_passed / tests_passed
            eta = average_test_time * tests_left
            return str(datetime.timedelta(seconds=int(eta)))

    def _get_stats(self):
        is_paused = self.server.interface.is_paused()
        session_info = self.dataman.get_session_info()
        eta_s = self._get_eta(session_info)
        stats = session_info.as_dict()
        stats['fuzzer_name'] = self.dataman.get('fuzzer_name')
        stats['session_file_name'] = self.dataman.get('session_file_name')
        stats['log_file_name'] = self.dataman.get('log_file_name')
        report_list = self.dataman.get_report_list()
        resp_dict = {
            'paused': is_paused,
            'eta': eta_s,
            'stats': stats,
            'current_test': self.dataman.get('test_info'),
            'reports_extended': report_list,
        }
        return json.dumps(resp_dict)

    def _handle_api_request(self):
        parsed = urlparse(self.path)
        path = parsed.path.lower()[5:]

        response = None
        data_type = 'text/json'
        if path == 'stats.json':
            response = self._get_stats()
        elif path == 'template_info.json':
            response = json.dumps(self.dataman.get('template_info'))
        elif path == 'stages.json':
            response = json.dumps(self.dataman.get('stages'))
        elif path.startswith('report'):
            response = self._get_report()
        elif path == 'action/pause':
            response = ''
            self._pause_fuzzer()
        elif path == 'action/resume':
            response = ''
            self._resume_fuzzer()
        if response is not None:
            self.send_response(200)
            self.send_header('Content-type', data_type)
            self.end_headers()
        return response

    def _get_report(self):
        report = None
        encoding = 'base64'

        parsed = urlparse(self.path)
        query = parse_qs(parsed.query)
        if 'report_id' in query:
            try:
                report_id_string = query['report_id'][0]
                key = int(report_id_string)
                report = self.dataman.get_report_by_id(key)
                if report:
                    response = {
                        'encoding': encoding,
                        'report': report.to_dict(encoding)
                    }
                else:
                    raise 'No report with id %d' % key
            except Exception as ex:
                response = {'error': 'Failed to get report %s' % ex}
        else:
            response = {'error': 'No report_id provided'}
        return json.dumps(response)

    def _my_handle(self):

        exact_endpoints = {
            '/': self._handle_index,
            '/index.html': self._handle_index,
            '/favicon.ico': self._handle_favicon_request
        }

        endpoints = {
            '/api/': self._handle_api_request,
            '/static/': self._handle_text_request,
            '/css/': self._handle_text_request,
            '/js/': self._handle_text_request,
            '/images/': self._handle_image_request
        }
        response = None

        for endpoint in exact_endpoints:
            if self.path.lower() == endpoint:
                response = exact_endpoints[endpoint]()
                break

        if response is None:
            for endpoint in endpoints:
                if self.path.lower().startswith(endpoint):
                    response = endpoints[endpoint]()
                    break

        if response is not None:
            self.wfile.write(response)
        else:
            self.send_response(401)
            self.send_header('Content-type', 'text/html')
            self.end_headers()


[docs]class WebInterface(EmptyInterface): ''' Web UI for the fuzzer '''
[docs] def __init__(self, host='127.0.0.1', port=26000): ''' :param host: listening address :param port: listening port ''' super(WebInterface, self).__init__('WebInterface') self._host = host self._port = port self._web_thread = FuncThread(self._server_func)
def _start(self): self._web_thread.start()
[docs] def get_description(self): ''' :return: description string (with listening host:port) ''' return '%(description)s listening on %(host)s:%(port)s' % { 'description': super(WebInterface, self).get_description(), 'host': self._host, 'port': self._port }
def _server_func(self): server = _WebInterfaceServer((self._host, self._port), _WebInterfaceHandler, self) self._server = server server.allow_reuse_address = True server.serve_forever() def _stop(self, timeout=None): self._server.shutdown() self._server.server_close()