From 5bef75d09502db9f4fb842c1c53474460ac93f4c Mon Sep 17 00:00:00 2001 From: Adam Korczynski Date: Fri, 10 Apr 2026 18:34:54 +0100 Subject: [PATCH] Add FuzzedDataProvider utility for structured fuzzing Pure-Python FuzzedDataProvider matching the atheris API. This is a drop-in replacement for atheris.FuzzedDataProvider that requires no native compilation and is used by structured fuzz harnesses that need controlled random input generation. --- fuzzeddataprovider.py | 244 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 fuzzeddataprovider.py diff --git a/fuzzeddataprovider.py b/fuzzeddataprovider.py new file mode 100644 index 0000000..10df426 --- /dev/null +++ b/fuzzeddataprovider.py @@ -0,0 +1,244 @@ +"""Pure-Python FuzzedDataProvider matching the atheris API. + +This is a drop-in replacement for atheris.FuzzedDataProvider that requires +no native compilation. It matches atheris's consumption semantics: + - ConsumeBytes/ConsumeInt/ConsumeFloat/ConsumeUnicode consume from the FRONT + - ConsumeIntInRange/ConsumeBool/PickValueInList consume from the BACK + +Reference: https://github.com/google/atheris +""" + +import struct + + +class FuzzedDataProvider: + def __init__(self, data): + if not isinstance(data, (bytes, bytearray)): + raise TypeError("data must be bytes or bytearray") + self._data = bytes(data) + self._front = 0 + self._back = len(self._data) + + def remaining_bytes(self): + return max(0, self._back - self._front) + + def buffer(self): + return self._data[self._front : self._back] + + # -- Front-consuming methods (ConsumeBytes, ConsumeInt, etc.) -- + + def _consume_front(self, n): + n = min(n, self.remaining_bytes()) + result = self._data[self._front : self._front + n] + self._front += n + return result + + def ConsumeBytes(self, count): + count = max(0, int(count)) + return self._consume_front(count) + + def ConsumeInt(self, byte_count): + byte_count = max(0, int(byte_count)) + raw = self._consume_front(byte_count) + if not raw: + return 0 + val = int.from_bytes(raw, "little") + bits = len(raw) * 8 + if val >= (1 << (bits - 1)): + val -= 1 << bits + return val + + def ConsumeUInt(self, byte_count): + byte_count = max(0, int(byte_count)) + raw = self._consume_front(byte_count) + if not raw: + return 0 + return int.from_bytes(raw, "little") + + def ConsumeFloat(self): + raw = self._consume_front(8) + if len(raw) < 8: + raw = raw + b"\x00" * (8 - len(raw)) + return struct.unpack(" hi: + lo, hi = hi, lo + if lo == hi: + return lo + rng = hi - lo + # Match LLVM: consume ceil(bits_needed/8) bytes from back + # LLVM loops while offset < sizeof(T)*8 && (range >> offset) > 0 + nbytes = (rng.bit_length() + 7) // 8 + raw = self._consume_back(nbytes) + if not raw: + return lo + # LLVM reads bytes from back as big-endian accumulation: + # result = (result << 8) | next_byte_from_back + # which equals int.from_bytes(reversed_bytes, 'big') + # But since _consume_back returns bytes in memory order and + # int.from_bytes(raw, 'little') produces the same value, we use that. + val = int.from_bytes(raw, "little") + return lo + (val % (rng + 1)) + + # Alias for LLVM naming compatibility + ConsumeIntegralInRange = ConsumeIntInRange + + def ConsumeBool(self): + # Matches LLVM: 1 & ConsumeIntegral() + # ConsumeIntegral() = ConsumeIntegralInRange(0, 255) + return (self.ConsumeIntInRange(0, 255) & 1) == 1 + + def ConsumeProbability(self): + # Matches LLVM: ConsumeIntegral() / UINT64_MAX + # ConsumeIntegral() = ConsumeIntegralInRange(0, 2^64-1) + # When range == UINT64_MAX, no modulo is applied (special case) + raw = self._consume_back(8) + if not raw: + return 0.0 + val = int.from_bytes(raw, "little") + return val / float((1 << 64) - 1) + + def ConsumeFloatInRange(self, lo, hi): + lo, hi = float(lo), float(hi) + if lo > hi: + lo, hi = hi, lo + p = self.ConsumeProbability() + return lo + (hi - lo) * p + + def PickValueInList(self, lst): + if not lst: + raise ValueError("list must not be empty") + idx = self.ConsumeIntInRange(0, len(lst) - 1) + return lst[idx] + + # -- List methods -- + + def ConsumeIntList(self, count, byte_count): + count = max(0, int(count)) + return [self.ConsumeInt(byte_count) for _ in range(count)] + + def ConsumeIntListInRange(self, count, lo, hi): + count = max(0, int(count)) + return [self.ConsumeIntInRange(lo, hi) for _ in range(count)] + + def ConsumeFloatList(self, count): + count = max(0, int(count)) + return [self.ConsumeFloat() for _ in range(count)] + + def ConsumeFloatListInRange(self, count, lo, hi): + count = max(0, int(count)) + return [self.ConsumeFloatInRange(lo, hi) for _ in range(count)] + + def ConsumeProbabilityList(self, count): + count = max(0, int(count)) + return [self.ConsumeProbability() for _ in range(count)] + + def ConsumeRegularFloatList(self, count): + count = max(0, int(count)) + return [self.ConsumeRegularFloat() for _ in range(count)] + + # -- Arbitrary value -- + + _ANY_TYPE_INT = 0 + _ANY_TYPE_FLOAT = 1 + _ANY_TYPE_BOOL = 2 + _ANY_TYPE_BYTES = 3 + _ANY_TYPE_STRING = 4 + _ANY_TYPE_NONE = 5 + + def ConsumeRandomValue(self): + """Return a value of a randomly chosen primitive type.""" + t = self.ConsumeIntInRange(self._ANY_TYPE_INT, self._ANY_TYPE_NONE) + if t == self._ANY_TYPE_INT: + return self.ConsumeInt(4) + elif t == self._ANY_TYPE_FLOAT: + return self.ConsumeFloat() + elif t == self._ANY_TYPE_BOOL: + return self.ConsumeBool() + elif t == self._ANY_TYPE_BYTES: + return self.ConsumeBytes(self.ConsumeIntInRange(0, 64)) + elif t == self._ANY_TYPE_STRING: + return self.ConsumeUnicode(self.ConsumeIntInRange(0, 64)) + else: + return None