Source code for kitty.model.low_level.field

# Copyright (C) 2016 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
#
# This file is part of Kitty.
#
# Kitty is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Kitty is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Kitty.  If not, see <http://www.gnu.org/licenses/>.

'''
This module is the "Heart" of the data model.
It contains all the basic building blocks for a Template.
Each "field" type is a discrete component in the full Template.
'''
from random import Random
import copy
import os
import logging
from bitstring import Bits
from kitty.core import KittyObject, KittyException, kassert, khash
from kitty.model.low_level.encoder import ENC_STR_DEFAULT, StrEncoder
from kitty.model.low_level.encoder import ENC_INT_DEFAULT, BitFieldEncoder
from kitty.model.low_level.encoder import ENC_BITS_DEFAULT, BitsEncoder
from kitty.model.low_level.encoder import ENC_FLT_DEFAULT, FloatEncoder


empty_bits = Bits()


[docs]class BaseField(KittyObject): ''' Basic type for all fields and containers, it contains the common logic. This class should never be used directly. ''' _encoder_type_ = None
[docs] def __init__(self, value, encoder=ENC_BITS_DEFAULT, fuzzable=True, name=None): ''' :param value: default value :type encoder: :class:`~kitty.model.low_level.encoder.BaseEncoder` :param encoder: encoder for the field :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) ''' if name and '/' in name: raise KittyException('Name (%s) includes invalid chars /' % (name)) super(BaseField, self).__init__(name, logger=logging.getLogger('DataModel')) kassert.is_of_types(encoder, self.__class__._encoder_type_) self._encoder = encoder self._num_mutations = 0 self._fuzzable = fuzzable self._default_value = value self._default_rendered = self._encode_value(self._default_value) self._current_value = value self._current_rendered = self._default_rendered self._current_index = -1 self.enclosing = None self._initialized = False self._hash = None self._need_second_pass = False self.offset = None self._controlled = False
[docs] def set_offset(self, offset): ''' :param offset: absolute offset of this field (in bits) ''' self.offset = offset
def _mutating(self): return self._current_index != -1
[docs] def set_current_value(self, value): ''' Sets the current value of the field :param value: value to set :return: rendered value ''' self._current_value = value self._current_rendered = self._encode_value(self._current_value) return self._current_rendered
def _last_index(self): ''' :return: last mutation index of this field ''' return self.num_mutations() - 1
[docs] def num_mutations(self): ''' :return: number of mutation in this field ''' return self._num_mutations if self._fuzzable else 0
def _exhausted(self): ''' :return: True if field exhusted, False otherwise ''' return self._current_index >= self._last_index()
[docs] def skip(self, count): ''' Skip up to [count] cases, default behavior is to just mutate [count] times :count: number of cases to skip :rtype: int :return: number of cases skipped ''' skipped = 0 for _ in range(count): if self.mutate(): skipped += 1 else: break return skipped
[docs] def mutate(self): ''' Mutate the field :rtype: boolean :return: True if field the mutated ''' self._initialize() if self._exhausted(): return False self._current_index += 1 self._mutate() return True
def _initialize(self): if self._initialized: return self._init() self._initialized = True self._hash = self.hash() def _init(self): self.reset()
[docs] def render(self, ctx=None): ''' Render the current value of the field :rtype: Bits :return: rendered value ''' self._initialize() if not self.is_default(): self._current_rendered = self._encode_value(self._current_value) return self._current_rendered
[docs] def reset(self): ''' Reset the field to its default state ''' self._current_index = -1 self._current_value = self._default_value self._current_rendered = self._default_rendered self.offset = None
def _mutate(self): ''' Perform the actual mutation. The default behavior is to do nothing. ''' pass
[docs] def get_structure(self): info = { 'field_type': type(self).__name__, 'mutation': { 'current_index': self._current_index, 'total_number': self._num_mutations, }, 'name': self.name if self.name else '<no name>', } return info
[docs] def get_info(self): ''' :rtype: dictionary :return: field information ''' info = { 'name': self.name if self.name else '<no name>', 'path': self.name if self.name else '<no name>', 'field_type': type(self).__name__, 'value': { 'raw': repr(self._current_value), 'rendered': { 'base64': self._current_rendered.tobytes().encode('base64'), 'length_in_bits': len(self._current_rendered), 'length_in_bytes': len(self._current_rendered.tobytes()), } }, 'mutation': { 'total_number': self._num_mutations, 'current_index': self._current_index, 'mutating': self._mutating(), 'fuzzable': self._fuzzable, }, } return info
def _encode_value(self, value): return self._encoder.encode(value)
[docs] def resolve_field(self, field): ''' Resolve a field from name :param field: name of the field to resolve :rtype: BaseField :return: the resolved field :raises: KittyException if field could not be resolved ''' if isinstance(field, BaseField): return field name = field if name.startswith('/'): return self.resolve_absolute_name(name) resolved_field = self.scan_for_field(name) if not resolved_field: container = self.enclosing if container: resolved_field = container.resolve_field(name) if not resolved_field: raise KittyException('Could not resolve field by name (%s)' % name) return resolved_field
[docs] def resolve_absolute_name(self, name): ''' Resolve a field from an absolute name. An absolute name is just like unix absolute path, starts with '/' and each name component is separated by '/'. :param name: absolute name, e.g. "/container/subcontainer/field" :return: field with this absolute name :raises: KittyException if field could not be resolved ''' current = self while current.enclosing: current = current.enclosing if name == '/': return current else: components = name.split('/')[1:] for component in components: current = current.get_field_by_name(component) return current
[docs] def get_field_by_name(self, name): ''' :param name: name of field to get :raises: :class:`~kitty.core.KittyException` if no direct subfield with this name ''' raise KittyException('Basic field (%s) does not contain any fields inside (looked for "%s")' % (self, name))
[docs] def copy(self): ''' :return: a copy of the field ''' return copy.copy(self)
[docs] def scan_for_field(self, field_name): ''' Scan for field field with given name :param field_name: field name to look for :return: None ''' if self.get_name() == field_name: return self else: return None
[docs] def get_rendered_fields(self, ctx=None): ''' :return: ordered list of the fields that will be rendered ''' if len(self.render(ctx)): return [self] return []
[docs] def is_default(self): ''' Checks if the field is in its default form :return: True if field is in default form ''' return not self._mutating() and not self._controlled
def __str__(self): data = [] # data.append(hex(id(self))) data.append(self.get_name() if self.get_name() else '<no name>') data.append(type(self).__name__) if self._default_value: data.append('default:%s' % self._default_value) if self._fuzzable: if self._mutating(): data.append('%s/%s' % (self._current_index, self.num_mutations())) data.append('+') return '|'.join(data)
[docs] def hash(self): ''' :rtype: int :return: hash of the field ''' if self._hash is None: self._initialize() self._hash = khash(type(self).__name__, self._default_value, self._fuzzable) return self._hash
def _initialize_default_buffer(self): self.set_current_value(self._default_value) return self._default_rendered
class _MultiListAccessor(object): ''' Wrapper for multiple lists to be accessed as a single list Allows skipping of indices ''' def __init__(self): self._lists = [] self._size = 0 self._to_skip = set([]) def add_list(self, l): self._lists.append(l) self._size += len(l) def skip_index(self, idx): self._to_skip.add(idx) def size(self): return self._size - len(self._to_skip) def get(self, idx): if (idx < 0) or (idx >= self.size()): raise KittyException('index out of range: %d list length: %d' % (idx, self.size())) old_idx = -1 while old_idx != idx: new_idx = idx + len([x for x in self._to_skip if ((old_idx < x) and (x <= idx))]) old_idx = idx idx = new_idx for i in range(len(self._lists)): if idx < len(self._lists[i]): return self._lists[i][idx] idx -= len(self._lists[i]) class _LibraryField(BaseField): ''' Base class for a field with mutations from a library. there are two libraries for each instance: 1. Shared library between all instances 2. Instance library with mutations that are specific for this instance ''' def __init__(self, value, encoder, fuzzable=True, name=None): super(_LibraryField, self).__init__(value, encoder, fuzzable, name) self._lib = None self._initialize() def skip(self, count): ''' skip up to [count] cases :param count: number of cases to skip :rtype: int :return: number of cases skipped ''' self._initialize() skipped = 0 if not self._exhausted(): skipped = min(count, self._last_index() - self._current_index) self._current_index += skipped return skipped def _mutate(self): value = self._lib.get(self._current_index)[0] # [1] is the description self._current_value = value def _init(self): lib = _MultiListAccessor() # library that depend on the value of the field lib.add_list(self._get_local_lib()) # library that is always added lib.add_list(self._wrap_get_class_lib()) # libraries that depend on the tags of the field for tagged_lib in self._get_tagged_libs(): lib.add_list(tagged_lib) self._lib = lib self._filter_lib() self._num_mutations = self._lib.size() def get_info(self): info = super(_LibraryField, self).get_info() idx = self._current_index if idx >= 0 and idx < self._lib.size(): current = self._lib.get(idx) if current: info['mutation']['description'] = current[1] return info def _filter_lib(self): pass def _get_local_lib(self): ''' Get library that depend on the value of the instance :rtype: list :return: list of local lib ''' self.not_implemented('_get_local_lib') def _wrap_get_class_lib(self): if self.__class__.lib: return self.__class__.lib else: self.__class__.lib = self._get_class_lib() return self.__class__.lib def _get_class_lib(self): ''' Get library that is always relevant for this field :rtype: list :return: list of class lib ''' self.not_implemented('_get_class_lib') def _get_tagged_libs(self): ''' Get libraries that are relevant for the tags of the current instance ''' return []
[docs]class Static(BaseField): ''' A static field does not mutate. It is used for constant parts of the model ''' _encoder_type_ = StrEncoder
[docs] def __init__(self, value, encoder=ENC_STR_DEFAULT, name=None): ''' :type value: str :param value: default value :type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder` :param encoder: encoder for the field (default: ENC_STR_DEFAULT) :param name: name of the object (default: None) :example: :: Static('this will never change') ''' super(Static, self).__init__(value=value, encoder=encoder, fuzzable=False, name=name)
[docs]def gen_power_list(val, min_power=0, max_power=10, mutation_desc=''): return [(val * (2 ** i), mutation_desc) for i in range(max_power, min_power, -3)]
[docs]class String(_LibraryField): ''' Represent a string, the mutation target common string-related vulnerabilities ''' _encoder_type_ = StrEncoder lib = None
[docs] def __init__(self, value, max_size=None, encoder=ENC_STR_DEFAULT, fuzzable=True, name=None): ''' :type value: str :param value: default value :param max_size: maximal size of the string before encoding (default: None) :type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder` :param encoder: encoder for the field :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) :example: :: String('this is the default value', max_size=5) ''' self._max_size = None if max_size is None else max_size if isinstance(value, unicode): value = value.encode('utf-8') super(String, self).__init__(value=value, encoder=encoder, fuzzable=fuzzable, name=name)
def _get_local_lib(self): lib = [] default_len = len(self._default_value) for i in [2, 10, 100]: lib.append((self._default_value * i, 'duplicate value %s times' % i)) lib.append((self._default_value + '\xfe', 'value with utf8 escape char')) lib.append(('\x00' + self._default_value, 'null before value')) lib.append((self._default_value[:default_len / 2] + '\x00' + self._default_value[default_len / 2:], 'null in middle of value')) lib.append((self._default_value + '\x00', 'null after value')) return lib def _add_command_injection_strings_unix(self, lib): ''' .. note:: The following mutations will probably not be detected unless there's a monitor that checks if the file /tmp/KITTY was created in the post test ''' lib.append(('|touch /tmp/KITTY', 'command injection - pipe')) lib.append((';touch /tmp/KITTY;', 'command injection - additional command')) lib.append((';ls>/tmp/KITTY', 'command injection - additional command')) lib.append(('";ls>/tmp/KITTY;ls>/dev/null"', 'command injection - escape double qoutes')) lib.append(('\';ls>/tmp/KITTY;ls>\'/dev/null\'', 'command injection - escape qoutes')) lib.append(('`ls>/tmp/KITTY`', 'command injection - backtick')) lib.append((' `ls>/tmp/KITTY`', 'command injection - backtick with space')) def _add_sql_injection_strings(self, lib): ''' .. note:: The following mutations will probably not be detected unless some entity will check the response from the target on the validity of the DB. ''' entries = [ ("' or 1=1 --", 'mssql - bypass check'), ("\'; desc users; --", 'mysql - mssql - get desc users'), ("' or username is not NULL or username = '", 'mysql - mssql - bypass check'), ("1 union all select 1,2,3,4,5,6,name from sysobjects where xtype = 'u' --", 'mysql - mssql - get sysobjects'), ("` or `1`=`1", 'oracle - bypass check'), ("' or '1'='1", 'oracle - bypass check'), ] for (s, desc) in entries: lib.append((s, 'sqli - %s' % desc)) def _add_buffer_overflow_strings(self, lib): for i in [2, 10, 100, 1000, 5000, 10000]: lib.append(('A' * i, 'overflow - %d chars' % (i))) def _add_path_traversal_strings(self, lib): ''' .. note:: The following mutations will probably not be detected unless there's a monitor that checks if the file /tmp/KITTY was created in the post test ''' entries = [ (('/etc/passwd', 'absolute - /etc/passwd')), (('./../../../../../../../../../../../../../../etc/passwd', 'relative - /etc/passwd')), (('../../../../../../../../../../../../../../etc/passwd', 'relative - /etc/passwd')), (('/proc/cpuinfo', 'absolute path - /proc/cpuinfo')), (('./../../../../../../../../../../../../../../proc/cpuinfo', 'relative - /proc/cpuinfo')), (('../../../../../../../../../../../../../../proc/cpuinfo', 'relative - /proc/cpuinfo')), (('~/.profile', 'absolute - home')), (('$HOME/.profile', 'environment - home')), (('/../../../../../../../../../../../../boot.ini', 'relative - boot.ini')), ] for (s, desc) in entries: lib.append((s, 'path traversal - %s' % desc)) def _get_class_lib(self): lib = [] lib.append(('', 'empty string')) # format strings for s in ['%s', '%%s', '"%s"', '%n', '%%n', '"%n"']: lib.extend(gen_power_list(s, max_power=10, mutation_desc='format string')) # taken from fuzzdb lib.append(('%.16705u%2\\$hn', 'format string')) for s in ['\r\n', '\n']: lib.extend(gen_power_list(s, max_power=10, mutation_desc='line breaks')) for s in ['\xde\xad\xbe\xef']: lib.extend(gen_power_list(s, max_power=10, mutation_desc='binary values')) # *nix command injection self._add_command_injection_strings_unix(lib) # windows command injection # TODO: need to learn a bit about windows cmd line injection... lib.append(('|notepad', 'windows - should be replaced')) lib.append((';notepad;', 'windows - should be replaced')) lib.append(('\nnotepad\n', 'windows - should be replaced')) self._add_sql_injection_strings(lib) self._add_buffer_overflow_strings(lib) self._add_path_traversal_strings(lib) lib.extend(gen_power_list('/\\', max_power=9)) lib.extend(gen_power_list('/.', max_power=9)) lib.append(('!@#$%%^#$%#$@#$%$$@#$%^^**(()', 'some weird chars')) lib.append(('%01%02%03%04%0a%0d%0aADSF', 'binary variants')) lib.append(('%01%02%03@%04%0a%0d%0aADSF', 'binary variants')) lib.append(('/%00/', 'null variant')) lib.append(('%00/', 'null variant')) lib.append(('%00', 'null variant')) lib.append(('%u0000', 'utf16 null')) lib.extend(gen_power_list('%\xfe\xf0%\x01\xff', max_power=5)) lib.extend(self._add_strings_from_file('./kitty_strings.txt')) return lib def _filter_lib(self): if self._max_size is not None: for i in range(self._lib.size(), 0, -1): i -= 1 val = self._lib.get(i)[0] if len(val) > self._max_size: self._lib.skip_index(i) self._num_mutations = self._lib.size() def _add_strings_from_file(self, file_name): res = [] if os.path.exists(file_name): try: with open(file_name, 'rb') as f: for line in f: if line.endswith('\n'): line = line[:-1] res.append((line, 'from file %s' % (file_name))) except Exception as e: self.logger.warning('Could not read strings from file %s: %s' % (file_name, e)) else: self.logger.info('No strings file [%s]' % file_name) return res
[docs] def hash(self): ''' :rtype: int :return: hash of the field ''' hashed = super(String, self).hash() return khash(hashed, self._max_size)
[docs]class Delimiter(String): ''' Represent a text delimiter, the mutations target common delimiter-related vulnerabilities ''' _encoder_type_ = StrEncoder lib = None
[docs] def __init__(self, value, max_size=None, encoder=ENC_STR_DEFAULT, fuzzable=True, name=None): ''' :type value: str :param value: default value :param max_size: maximal size of the string before encoding (default: None) :type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder` :param encoder: encoder for the field (default: ENC_STR_DEFAULT) :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) :example: :: Delimiter('=', max_size=30, encoder=ENC_STR_BASE64) ''' super(Delimiter, self).__init__(value=value, max_size=max_size, encoder=encoder, fuzzable=fuzzable, name=name)
def _get_class_lib(self): lib = [] delims = ' \t!@#$%^&*()-_+=:;\'"/\\?<>.,\r\n' for delim in delims: lib.extend(gen_power_list(delim, max_power=2)) lib.extend(gen_power_list('\r\n', max_power=3)) lib.extend(gen_power_list('\t\r\n', max_power=3)) lib.append(('', 'empty delimiter')) return lib
[docs]class Float(_LibraryField): ''' Represent a floating point number. The mutations target edge cases and invalid floating point numbers. ''' _encoder_type_ = FloatEncoder lib = None
[docs] def __init__(self, value, encoder=ENC_FLT_DEFAULT, fuzzable=True, name=None): ''' :type value: float :param value: default value :type encoder: :class:`~kitty.model.low_level.encoder.FloatEncoder` :param encoder: encoder for the field (default: ENC_FLT_DEFAULT) :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) :example: :: Float(0.3) ''' super(Float, self).__init__(value=value, encoder=encoder, fuzzable=fuzzable, name=name)
def _get_local_lib(self): return [] def _get_class_lib(self): lib = [] lib.append((float('NaN'), 'positive NaN')) lib.append((float('-NaN'), 'negative NaN')) lib.append((float('inf'), 'positive infinity')) lib.append((float('-inf'), 'negative infinity')) lib.append((float(0.), 'positive zero')) lib.append((float(-0.), 'negative zero')) # # what else? # return lib
def _calc_bitfield_bounds(self, value, minv, maxv): if self._length <= 0: raise KittyException('length (%d) <= 0' % (self._length)) max_possible = 2 ** self._length - 1 if self._signed: self._min_value = ~(max_possible >> 1) else: self._min_value = 0 self._max_value = max_possible + self._min_value self._max_min_diff = max_possible if maxv is not None: if maxv > self._max_value: raise KittyException('max_value is too big %d > %d' % (maxv, self._max_value)) self._max_value = maxv if minv is not None: if minv < self._min_value: raise KittyException('min_value is too small %d < %d' % (minv, self._min_value)) self._min_value = minv if self._min_value > self._max_value: raise KittyException('min_value (%d) > max_value (%d)' % (self._min_value, self._max_value)) if (value < self._min_value) or (value > self._max_value): raise KittyException('default value (%d) not in range (min=%d, max=%d)' % (value, self._min_value, self._max_value)) class _FullRangeBitField(BaseField): ''' Represents a fixed-length sequence of bits, the mutations are the entire range between min_value and max_value ''' _encoder_type_ = BitFieldEncoder lib = None def __init__(self, value, length, signed=False, min_value=None, max_value=None, encoder=ENC_INT_DEFAULT, fuzzable=True, name=None): ''' :type value: int :param value: default value :type length: positive int :param length: length of field in bits :param signed: are the values signed (default: False) :param min_value: minimal allowed value (default: None) :param max_value: maximal allowed value (default: None) :type encoder: :class:`~kitty.model.low_level.encoder.BitFieldEncoder` :param encoder: encoder for the field :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) ''' self._length = length self._signed = signed self._min_value = None self._max_value = None self._max_min_diff = None self._calc_bounds(value, min_value, max_value) super(_FullRangeBitField, self).__init__(value=value, encoder=encoder, fuzzable=fuzzable, name=name) self._initialize() _calc_bounds = _calc_bitfield_bounds def _init(self): self._num_mutations = self._max_value - self._min_value + 1 def _mutate(self): self._current_value = self._min_value + self._current_index def _encode_value(self, value): return self._encoder.encode(value, self._length, self._signed) def skip(self, count): self._initialize() skipped = 0 if not self._exhausted(): skipped = min(count, self.num_mutations() - self._current_index - 1) self._current_index += skipped return skipped def hash(self): ''' :rtype: int :return: hash of the field ''' hashed = super(_FullRangeBitField, self).hash() return khash(hashed, self._length, self._signed, self._min_value, self._max_value) class _LibraryBitField(_LibraryField): ''' Represents a fixed-length sequence of bits, the mutations target common integer related vulnerabilities ''' _encoder_type_ = BitFieldEncoder lib = None def __init__(self, value, length, signed=False, min_value=None, max_value=None, encoder=ENC_INT_DEFAULT, fuzzable=True, name=None): ''' :type value: int :param value: default value :type length: positive int :param length: length of field in bits :param signed: are the values signed (default: False) :param min_value: minimal allowed value (default: None) :param max_value: maximal allowed value (default: None) :type encoder: :class:`~kitty.model.low_level.encoder.BitFieldEncoder` :param encoder: encoder for the field :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) :examples: This field will generate mutations within the range of -16385 and 1000, will have the default value 123 and the value will be represented as 2's complement binary value. :: BitField(123, length=15, signed=True, max_value=1000) This field will generate mutations within the range of 0 and 255, will have a default value of 17, and will be encoded as a decimal number. :: UInt8(17, encoder=ENC_INT_DEC) ''' self._length = length self._signed = signed self._min_value = None self._max_value = None self._max_min_diff = None self._calc_bounds(value, min_value, max_value) super(_LibraryBitField, self).__init__(value=value, encoder=encoder, fuzzable=fuzzable, name=name) _calc_bounds = _calc_bitfield_bounds def _get_local_lib(self): lib = [] for i in range(self._length - 1, 0, -3): lib.append( ( (lambda x, i=i: x._default_value ^ (1 << i)), 'xor bit %d of value' % (i) ) ) return lib def _get_class_lib(self): ''' If the range is from a to b, we try a few numbers around the arrows a b +-------------------------------------------+ ^ ^ ^ ^ ^ ''' lib = [] num_sections = 4 for i in range(3): lib.append(((lambda x, i=i: x._min_value + i), 'off by %d from max value' % i)) lib.append(((lambda x, i=i: x._max_value - i), 'off by -%d from min value' % i)) for s in range(1, num_sections): lib.append( ( (lambda x, i=i, s=s: x._max_value - (x._max_min_diff / num_sections) * s + i), 'off by %d from section %d' % (i, s) ) ) lib.append( ( (lambda x, i=i, s=s: x._max_value - (x._max_min_diff / num_sections) * s - i), 'off by %d from section %d' % (i, s) ) ) # off-by-N for i in range(1, 3): lib.append(((lambda x, i=i: x._default_value + i), 'off by %d from value' % i)) lib.append(((lambda x, i=i: x._default_value - i), 'off by %d from value' % -i)) self._add_ints_from_file('./kitty_integers.txt') return lib def _mutate(self): func = self._lib.get(self._current_index)[0] self._current_value = func(self) def _encode_value(self, value): return self._encoder.encode(value, self._length, self._signed) def _filter_lib(self): vals = [] for i in range(self._lib.size(), 0, -1): i -= 1 func = self._lib.get(i)[0] res = func(self) if res in vals: self._lib.skip_index(i) elif (res < self._min_value) or (res > self._max_value): self._lib.skip_index(i) else: vals.append(res) self._num_mutations = self._lib.size() def _add_ints_from_file(self, file_name): res = [] if os.path.exists(file_name): try: with open(file_name, 'rb') as f: for line in f: if line.endswith('\n'): line = line[:-1] def func(_, i=int(line, 0)): return i res.append(func) except Exception as e: self.logger.warning('Could not read integers from file %s: %s' % (file_name, e)) else: self.logger.info('No integers file [%s]' % file_name) return res def hash(self): ''' :rtype: int :return: hash of the field ''' hashed = super(_LibraryBitField, self).hash() return khash(hashed, self._length, self._signed, self._min_value, self._max_value)
[docs]def BitField(value, length, signed=False, min_value=None, max_value=None, encoder=ENC_INT_DEFAULT, fuzzable=True, name=None, full_range=False): ''' Returns an instance of some BitField class .. note:: Since BitField is frequently used in binary format, multiple aliases were created for it. See aliases.py for more details. ''' if not full_range: return _LibraryBitField(value, length, signed, min_value, max_value, encoder, fuzzable, name) else: return _FullRangeBitField(value, length, signed, min_value, max_value, encoder, fuzzable, name)
[docs]class Group(_LibraryField): ''' A field with fixed set of possible mutations ''' _encoder_type_ = StrEncoder lib = None
[docs] def __init__(self, values, encoder=ENC_STR_DEFAULT, fuzzable=True, name=None): ''' :type values: list of strings :param values: possible values for the field :type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder` :param encoder: encoder for the field (default: ENC_STR_DEFAULT) :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) :example: This field will generate exactly 3 mutations: 'GET', 'PUT' and 'POST' :: Group(['GET', 'PUT', 'POST'], name='http methods') ''' self._values = values super(Group, self).__init__(values[0], encoder, fuzzable, name)
def _get_local_lib(self): return [(x, '') for x in self._values] def _get_class_lib(self): return []
[docs] def hash(self): ''' :rtype: int :return: hash of the field ''' hashed = super(Group, self).hash() return khash(hashed, frozenset(self._values))
[docs]class Dynamic(BaseField): ''' A field that gets its value from the fuzzer at runtime ''' _encoder_type_ = StrEncoder
[docs] def __init__(self, key, default_value, length=None, encoder=ENC_STR_DEFAULT, fuzzable=False, name=None): ''' :type key: str :param key: key for the data in the session_data dictionary :type default_value: str :param default_value: default value of the field :param length: length of the field in bytes. must be set if fuzzable=True (default: None) :type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder` :param encoder: encoder for the field (default: ENC_STR_DEFAULT) :param fuzzable: is field fuzzable (default: False) :param name: name of the object (default: None) :examples: :: Dynamic(key='session id', default_value='\x01\x02\x03\x04') Dynamic(key='session id', default_value='\x01\x02\x03\x04', length=4, fuzzable=True) ''' self._key = key super(Dynamic, self).__init__(value=default_value, encoder=encoder, fuzzable=fuzzable, name=name) self._length = length if self._fuzzable: kassert.is_int(self._length) self._num_mutations = self._length * 8 self._last_value = default_value
[docs] def render(self, ctx=None): self._initialize() if self._mutating(): xor_bits = Bits(uint=1 << self._current_index, length=self._length * 8) self._current_rendered = self._current_rendered ^ xor_bits return self._current_rendered
[docs] def skip(self, count): self._initialize() skipped = 0 if not self._exhausted(): skipped = min(count, self.num_mutations() - self._current_index - 1) self._current_index += skipped return skipped
[docs] def set_session_data(self, session_data): if self._key in session_data: self.set_current_value(session_data[self._key]) return True return False
[docs] def hash(self): ''' :rtype: int :return: hash of the field ''' hashed = super(Dynamic, self).hash() return khash(hashed, self._key, self._length)
[docs] def is_default(self): ''' Checks if the field is in its default form :return: True if field is in default form ''' return False
[docs]class RandomBits(BaseField): ''' A random sequence of bits. The length of the sequence is between *min_length* and *max_length*, and decided either randomally (if *step* is *None*) or starts from *min_length* and inreased by *step* bits (if *step* has a value). ''' _encoder_type_ = BitsEncoder
[docs] def __init__( self, value, min_length, max_length, unused_bits=0, seed=1235, num_mutations=25, step=None, encoder=ENC_BITS_DEFAULT, fuzzable=True, name=None): ''' :type value: str :param value: default value, the last *unsused_bits* will be removed from the value :param min_length: minimal length of the field (in bits) :param max_length: maximal length of the field (in bits) :param unused_bits: how many bits from the value are not used (default: 0) :param seed: seed for the random number generator, to allow consistency between runs (default: 1235) :param num_mutations: number of mutations to perform (if step is None) (default:25) :type step: int :param step: step between lengths of each mutation (default: None) :type encoder: :class:`~kitty.model.low_level.encoder.BitsEncoder` :param encoder: encoder for the field (default: ENC_BITS_DEFAULT) :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) :examples: :: RandomBits(value='1234', min_length=0, max_length=75, unused_bits=0, step=15) RandomBits(value='1234', min_length=0, max_length=75, unused_bits=3, num_mutations=80) ''' if unused_bits not in range(8): raise KittyException('unused bits (%d) is not between 0-7' % unused_bits) value = Bits(bytes=value) if unused_bits: value = value[:-unused_bits] super(RandomBits, self).__init__(value=value, encoder=encoder, fuzzable=fuzzable, name=name) self._validate_lengths(min_length, max_length) self._min_length = min_length self._max_length = max_length self._num_mutations = num_mutations self._step = step self._random = Random() self._seed = seed self._random.seed(self._seed) if self._step: if self._step < 0: raise KittyException('step (%d) < 0' % (step)) self._num_mutations = (self._max_length - self._min_length) / self._step
def _validate_lengths(self, min_length, max_length): kassert.is_int(min_length) kassert.is_int(max_length) if min_length > max_length: raise KittyException('min_length(%d) > max_length(%d)' % (min_length, max_length)) elif min_length < 0: raise KittyException('min_length(%d) < 0' % (min_length)) elif max_length <= 0: raise KittyException('max_length(%d) < 0' % (max_length))
[docs] def reset(self): super(RandomBits, self).reset() self._random.seed(self._seed)
def _mutate(self): if self._step: length = self._min_length + self._step * self._current_index else: length = self._random.randint(self._min_length, self._max_length) current_bytes = '' for i in range(length / 8 + 1): current_bytes += chr(self._random.randint(0, 255)) self._current_value = Bits(bytes=current_bytes)[:length]
[docs] def hash(self): ''' :rtype: int :return: hash of the field ''' hashed = super(RandomBits, self).hash() return khash(hashed, self._min_length, self._max_length, self._num_mutations, self._step, self._seed)
[docs]class RandomBytes(BaseField): ''' A random sequence of bytes The length of the sequence is between *min_length* and *max_length*, and decided either randomally (if *step* is *None*) or starts from *min_length* and inreased by *step* bytes (if *step* has a value). ''' _encoder_type_ = StrEncoder
[docs] def __init__(self, value, min_length, max_length, seed=1234, num_mutations=25, step=None, encoder=ENC_STR_DEFAULT, fuzzable=True, name=None): ''' :type value: str :param value: default value :param min_length: minimal length of the field (in bytes) :param max_length: maximal length of the field (in bytes) :param seed: seed for the random number generator, to allow consistency between runs (default: 1234) :param num_mutations: number of mutations to perform (if step is None) (default:25) :type step: int :param step: step between lengths of each mutation (default: None) :type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder` :param encoder: encoder for the field (default: ENC_STR_DEFAULT) :param fuzzable: is field fuzzable (default: True) :param name: name of the object (default: None) :examples: :: RandomBytes(value='1234', min_length=0, max_length=75, step=15) RandomBytes(value='1234', min_length=0, max_length=75, num_mutations=80) ''' super(RandomBytes, self).__init__(value=value, encoder=encoder, fuzzable=fuzzable, name=name) self._validate_lengths(min_length, max_length) self._min_length = min_length self._max_length = max_length self._num_mutations = num_mutations self._step = step self._random = Random() self._seed = seed self._random.seed(self._seed) if self._step: if self._step < 0: raise KittyException('step (%d) < 0' % (step)) self._num_mutations = (self._max_length - self._min_length) / self._step
def _validate_lengths(self, min_length, max_length): kassert.is_int(min_length) kassert.is_int(max_length) if min_length > max_length: raise KittyException('min_length(%d) > max_length(%d)' % (min_length, max_length)) elif min_length < 0: raise KittyException('min_length(%d) < 0' % (min_length)) elif max_length <= 0: raise KittyException('max_length(%d) < 0' % (max_length))
[docs] def reset(self): super(RandomBytes, self).reset() self._random.seed(self._seed)
def _mutate(self): if self._step: length = self._min_length + self._step * self._current_index else: length = self._random.randint(self._min_length, self._max_length) current = '' for i in range(length): current += chr(self._random.randint(0, 255)) self._current_value = current
[docs] def hash(self): ''' :rtype: int :return: hash of the field ''' hashed = super(RandomBytes, self).hash() return khash(hashed, self._min_length, self._max_length, self._num_mutations, self._step, self._seed)