Source code for kitty.model.high_level.base

# Copyright (C) 2016 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
#
# This file is part of Kitty.
#
# Kitty is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Kitty is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Kitty.  If not, see <http://www.gnu.org/licenses/>.

from kitty.core import KittyObject, khash


class _RootNode(object):
    def get_name(self):
        return 'Start'

    def hash(self):
        return khash(self.get_name())


[docs]class Connection(object): ''' A connection between to messages, '''
[docs] def __init__(self, src, dst, callback): ''' :param src: the source node :param dst: the destination node :param callback: the function to call after sending src and before sending dst ''' self.src = src self.dst = dst self.callback = callback
def __repr__(self): return '%s -> %s' % (self.src, self.dst)
[docs]class BaseModel(KittyObject): ''' This class defines the API that is required to be implemented by any top- level model .. note:: This class should not be instantiated directly. '''
[docs] def __init__(self, name='BaseModel'): ''' :param name: name of the model (default: BaseModel) ''' super(BaseModel, self).__init__(name) self._num_mutations = 0 self._sequence = None self._current_index = -1 self._ready = False self._notification_handler = None self.ROOT_NODE = _RootNode()
[docs] def get_template_info(self): ''' :return: dictionary of information regarding the current template .. note:: by default, we return None, as the current template might not be interesting. Take a look at GraphModel for actual implementation. ''' return {}
[docs] def current_index(self): ''' :return: current mutation index ''' return self._current_index
def _get_ready(self): ''' Make sure model is ready for running ''' self.not_implemented('_get_ready') def _mutate(self): ''' Perform the actual mutation ''' self.not_implemented('_mutate')
[docs] def skip(self, count): ''' :param count: number of cases to skip :return: number of cases skipped ''' self._get_ready() skipped = 0 for i in range(count): if self.mutate(): skipped += 1 else: break return skipped
[docs] def mutate(self): ''' Mutate to next state :return: True if mutated, False if not ''' self._get_ready() if self._is_last_index(): return False self._current_index += 1 self._mutate() return True
[docs] def get_model_info(self): ''' :rtype: dict :return: model information ''' self.not_implemented('get_info')
[docs] def get_test_info(self): ''' :rtype: dict :return: test information ''' self._get_ready() res = { 'sequence': { 'current': self.get_sequence_str() }, 'mutation': { 'current_index': self._current_index, 'total_number': self.num_mutations() } } return res
[docs] def get_sequence(self): ''' :rtype: [Connection] :return: Sequence of current case ''' self._get_ready() assert self._sequence return self._sequence[:]
[docs] def get_sequence_str(self): ''' :return: string representation of the sequence ''' sequence = self.get_sequence() return '->'.join(e.dst.name for e in sequence)
[docs] def last_index(self): ''' :return: the last valid mutation index ''' return self.num_mutations() - 1
def _is_last_index(self): ''' :return: is current index the last index ''' return self._current_index == self.last_index()
[docs] def num_mutations(self): ''' :return: number of mutations in the model ''' self._get_ready() return self._num_mutations
[docs] def hash(self): ''' :return: a hash of the model object (used for notifying change in the model) ''' return khash(type(self).__name__)
[docs] def get_stages(self): ''' :return: dictionary of information regarding the stages in the fuzzing session .. note:: structure: { current: ['stage1', 'stage2', 'stage3'], 'stages': {'source1': ['dest1', 'dest2'], 'source2': ['dest1', 'dest3']}} ''' sequence = self.get_sequence() return { 'current': [e.dst.get_name() for e in sequence], 'stages': {e.src.get_name(): [e.dst.get_name()] for e in sequence} }
[docs] def set_notification_handler(self, handler): ''' :param handler: handler for notifications from the data model .. note:: The handler should support the following APIs: - handle_stage_changed(data_model) ''' self._notification_handler = handler