Source code for kitty.targets.server

# Copyright (C) 2016 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
#
# This file is part of Kitty.
#
# Kitty is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Kitty is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Kitty.  If not, see <http://www.gnu.org/licenses/>.

import time
import traceback
from kitty.targets.base import BaseTarget
from kitty.data.report import Report


[docs]class ServerTarget(BaseTarget): ''' This class represents a target when fuzzing a server. Its main job, beside using the Controller and Monitors, is to send and receive data from/to the target. '''
[docs] def __init__(self, name, logger=None, expect_response=False): ''' :param name: name of the target :param logger: logger for this object (default: None) :param expect_response: should wait for response from the victim (default: False) ''' super(ServerTarget, self).__init__(name, logger) self.expect_response = expect_response self.send_failure = False self.receive_failure = False self.transmission_count = 0 self.transmission_report = None
[docs] def set_expect_response(self, expect_response): ''' :param expect_response: should wait for response from the victim (default: False) ''' self.expect_response = expect_response
def _send_to_target(self, payload): self.not_implemented('send_to_target') def _receive_from_target(self): self.not_implemented('receive_from_target')
[docs] def pre_test(self, test_num): ''' Called before each test :param test_num: the test number ''' super(ServerTarget, self).pre_test(test_num) self.send_failure = False self.receive_failure = False self.transmission_count = 0
[docs] def transmit(self, payload): ''' Transmit single payload, and receive response, if expected. The actual implementation of the send/receive should be in ``_send_to_target`` and ``_receive_from_target``. :type payload: str :param payload: payload to send :rtype: str :return: the response (if received) ''' response = None trans_report_name = 'transmission_0x%04x' % self.transmission_count trans_report = Report(trans_report_name) self.transmission_report = trans_report self.report.add(trans_report_name, trans_report) try: trans_report.add('request (hex)', payload.encode('hex')) trans_report.add('request (raw)', '%s' % payload) trans_report.add('request length', len(payload)) trans_report.add('request time', time.time()) request = payload.encode('hex') request = request if len(request) < 100 else (request[:100] + ' ...') self.logger.info('request(%d): %s' % (len(payload), request)) self._send_to_target(payload) trans_report.success() if self.expect_response: try: response = self._receive_from_target() trans_report.add('response time', time.time()) trans_report.add('response (hex)', response.encode('hex')) trans_report.add('response (raw)', '%s' % response) trans_report.add('response length', len(response)) printed_response = response.encode('hex') printed_response = printed_response if len(printed_response) < 100 else (printed_response[:100] + ' ...') self.logger.info('response(%d): %s' % (len(response), printed_response)) except Exception as ex2: trans_report.failed('failed to receive response: %s' % ex2) trans_report.add('traceback', traceback.format_exc()) self.logger.error('target.transmit - failure in receive (exception: %s)' % ex2) self.logger.error(traceback.format_exc()) self.receive_failure = True else: response = '' except Exception as ex1: trans_report.failed('failed to send payload: %s' % ex1) trans_report.add('traceback', traceback.format_exc()) self.logger.error('target.transmit - failure in send (exception: %s)' % ex1) self.logger.error(traceback.format_exc()) self.send_failure = True self.transmission_count += 1 return response
[docs] def post_test(self, test_num): ''' Called after each test :param test_num: the test number ''' super(ServerTarget, self).post_test(test_num) if self.send_failure: self.report.failed('send failure') elif self.expect_response and self.receive_failure: self.report.failed('receive failure')