Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,10 @@ path = "fuzz_targets/reconstruct_key.rs"
test = false
doc = false
bench = false

[[bin]]
name = "grow_hashtable"
path = "fuzz_targets/grow_hashtable.rs"
test = false
doc = false
bench = false
236 changes: 236 additions & 0 deletions fuzz/fuzz_targets/grow_hashtable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
#![no_main]

use std::{
collections::{BTreeMap, BTreeSet},
path::Path,
};

use arbitrary::Arbitrary;
use libfuzzer_sys::fuzz_target;
use nomt::{
grow_hashtable, hasher::Blake3Hasher, trie::KeyPath, validate_hashtable, KeyReadWrite, Nomt,
Options, SessionParams, Value,
};

fuzz_target!(|run: Run| {
let tempdir = tempfile::tempdir().unwrap();
let path = tempdir.path().join("db");
let mut buckets = run.initial_buckets;
let mut db = Some(Nomt::<Blake3Hasher>::open(options(&path, buckets)).unwrap());
let mut model = BTreeMap::<KeyPath, Value>::new();
let mut touched = BTreeSet::<KeyPath>::new();
let mut snapshots = vec![model.clone()];

for op in run.ops {
match op {
Op::Commit(changes) => {
let Some(nomt) = db.as_ref() else {
unreachable!("database should be open before commit")
};
if commit(nomt, changes, &mut model, &mut touched) {
snapshots.push(model.clone());
}
assert_model(nomt, &model, &touched);
}
Op::Grow(extra) => {
let Some(nomt) = db.take() else {
unreachable!("database should be open before grow")
};
let root = nomt.root();
drop(nomt);

let requested = buckets.saturating_add(1).saturating_add(extra as u32 * 512);
grow_hashtable(&options(&path, requested)).unwrap();
buckets = requested;

let utilization = validate_hashtable(&options(&path, buckets)).unwrap();
assert_eq!(utilization.capacity, buckets as usize);

let nomt = Nomt::<Blake3Hasher>::open(options(&path, buckets)).unwrap();
assert_eq!(nomt.root(), root);
assert_model(&nomt, &model, &touched);
db = Some(nomt);
}
Op::Rollback(raw_n) => {
if snapshots.len() <= 1 {
continue;
}

let n = raw_n as usize % (snapshots.len() - 1) + 1;
let target = snapshots.len() - 1 - n;
let Some(nomt) = db.as_ref() else {
unreachable!("database should be open before rollback")
};
nomt.rollback(n).unwrap();
model = snapshots[target].clone();
snapshots.push(model.clone());
assert_model(nomt, &model, &touched);
}
Op::Reopen => {
drop(db.take());
let nomt = Nomt::<Blake3Hasher>::open(options(&path, buckets)).unwrap();
assert_model(&nomt, &model, &touched);
db = Some(nomt);
}
Op::Validate => {
let Some(nomt) = db.take() else {
unreachable!("database should be open before validate")
};
let root = nomt.root();
drop(nomt);

validate_hashtable(&options(&path, buckets)).unwrap();

let nomt = Nomt::<Blake3Hasher>::open(options(&path, buckets)).unwrap();
assert_eq!(nomt.root(), root);
assert_model(&nomt, &model, &touched);
db = Some(nomt);
}
}
}
});

fn options(path: &Path, buckets: u32) -> Options {
let mut options = Options::new();
options.path(path);
options.bitbox_seed([0; 16]);
options.hashtable_buckets(buckets);
options.io_workers(1);
options.rollback(true);
options.max_rollback_log_len(128);
options.preallocate_ht(false);
options
}

fn commit(
nomt: &Nomt<Blake3Hasher>,
changes: Vec<Change>,
model: &mut BTreeMap<KeyPath, Value>,
touched: &mut BTreeSet<KeyPath>,
) -> bool {
let mut dedup = BTreeMap::<KeyPath, Option<Value>>::new();
for change in changes {
dedup.insert(change.key, change.value);
}
if dedup.is_empty() {
return false;
}

let session = nomt.begin_session(SessionParams::default());
let operations = dedup
.iter()
.map(|(key, value)| (*key, KeyReadWrite::Write(value.clone())))
.collect::<Vec<_>>();
for (key, _) in &operations {
session.warm_up(*key);
}
session.finish(operations).unwrap().commit(nomt).unwrap();

for (key, value) in dedup {
touched.insert(key);
if let Some(value) = value {
model.insert(key, value);
} else {
model.remove(&key);
}
}

true
}

fn assert_model(
nomt: &Nomt<Blake3Hasher>,
model: &BTreeMap<KeyPath, Value>,
touched: &BTreeSet<KeyPath>,
) {
for key in touched {
assert_eq!(nomt.read(*key).unwrap().as_ref(), model.get(key));
}
}

#[derive(Debug)]
struct Run {
initial_buckets: u32,
ops: Vec<Op>,
}

#[derive(Debug)]
enum Op {
Commit(Vec<Change>),
Grow(u8),
Rollback(u8),
Reopen,
Validate,
}

#[derive(Debug)]
struct Change {
key: KeyPath,
value: Option<Value>,
}

impl<'a> Arbitrary<'a> for Run {
fn arbitrary(input: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let initial_buckets = *input.choose(&[4096u32, 8192])?;
let op_count = input.int_in_range(0..=16)?;
let mut ops = Vec::with_capacity(op_count);
for _ in 0..op_count {
ops.push(Op::arbitrary(input)?);
}

Ok(Self {
initial_buckets,
ops,
})
}
}

impl<'a> Arbitrary<'a> for Op {
fn arbitrary(input: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
Ok(match input.int_in_range(0..=9)? {
0..=4 => {
let len = input.int_in_range(0..=8)?;
let mut changes = Vec::with_capacity(len);
for _ in 0..len {
changes.push(Change::arbitrary(input)?);
}
Self::Commit(changes)
}
5..=6 => Self::Grow(input.arbitrary()?),
7 => Self::Rollback(input.arbitrary()?),
8 => Self::Reopen,
9 => Self::Validate,
_ => unreachable!(),
})
}
}

impl<'a> Arbitrary<'a> for Change {
fn arbitrary(input: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let mut key = [0; 32];
input.fill_buffer(&mut key)?;

let value = if input.ratio(1, 4)? {
None
} else {
Some(arbitrary_value(input)?)
};

Ok(Self { key, value })
}
}

fn arbitrary_value(input: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Value> {
let len = match input.int_in_range(0..=7)? {
0 => 0,
1 => 1,
2 => input.int_in_range(2..=32)?,
3 => input.int_in_range(33..=256)?,
4 => input.int_in_range(257..=1333)?,
5 => input.int_in_range(1334..=2048)?,
_ => input.int_in_range(2049..=4096)?,
};
let mut value = vec![0; len];
input.fill_buffer(&mut value)?;
Ok(value)
}
27 changes: 18 additions & 9 deletions nomt/src/bitbox/ht_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ pub struct HTOffsets {
}

impl HTOffsets {
pub(super) fn new(num_pages: u32) -> Self {
Self {
data_page_offset: num_meta_byte_pages(num_pages) as u64,
}
}

/// Returns the page number of the `ix`th item in the data section of the store.
pub fn data_page_index(&self, ix: u64) -> u64 {
self.data_page_offset + ix
Expand All @@ -28,12 +34,12 @@ impl HTOffsets {
}
}

fn expected_file_len(num_pages: u32) -> u64 {
pub(super) fn expected_file_len(num_pages: u32) -> u64 {
(num_meta_byte_pages(num_pages) + num_pages) as u64 * PAGE_SIZE as u64
}

fn num_meta_byte_pages(num_pages: u32) -> u32 {
(num_pages + 4095) / PAGE_SIZE as u32
pub(super) fn num_meta_byte_pages(num_pages: u32) -> u32 {
((num_pages as u64 + PAGE_SIZE as u64 - 1) / PAGE_SIZE as u64) as u32
}

/// Opens the HT file, checks its length and reads the meta map.
Expand All @@ -46,16 +52,15 @@ pub fn open(
anyhow::bail!("Store corrupted; unexpected file length");
}

let num_meta_byte_pages = num_meta_byte_pages(num_pages);
let mut meta_bytes = Vec::with_capacity(num_meta_byte_pages as usize * PAGE_SIZE);
for pn in 0..num_meta_byte_pages {
let meta_byte_pages = num_meta_byte_pages(num_pages);
let mut meta_bytes = Vec::with_capacity(meta_byte_pages as usize * PAGE_SIZE);
for pn in 0..meta_byte_pages {
let extra_meta_page = io::read_page(page_pool, ht_fd, pn as u64)?;
meta_bytes.extend_from_slice(&*extra_meta_page);
}

let data_page_offset = num_meta_byte_pages as u64;
Ok((
HTOffsets { data_page_offset },
HTOffsets::new(num_pages),
MetaMap::from_bytes(meta_bytes, num_pages as usize),
))
}
Expand Down Expand Up @@ -89,7 +94,11 @@ pub fn create(path: PathBuf, num_pages: u32, preallocate: bool) -> std::io::Resu
/// and may silently fall back to regular allocation.
///
/// After this call, if successful, the file size is set to `len` bytes.
fn resize_and_prealloc(ht_file: &File, len: u64, preallocate: bool) -> std::io::Result<()> {
pub(super) fn resize_and_prealloc(
ht_file: &File,
len: u64,
preallocate: bool,
) -> std::io::Result<()> {
if !preallocate {
// If not preallocating, just set the file size and return.
ht_file.set_len(len)?;
Expand Down
12 changes: 11 additions & 1 deletion nomt/src/bitbox/meta_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl MetaMap {
}

pub fn full_count(&self) -> usize {
self.bitvec
self.bitvec[..self.buckets]
.iter()
.filter(|&&byte| byte & FULL_MASK != 0)
.count()
Expand All @@ -38,6 +38,16 @@ impl MetaMap {
self.bitvec[bucket] = full_entry(hash);
}

pub(super) fn is_full(&self, bucket: usize) -> bool {
self.bitvec[bucket] & FULL_MASK != 0
}

pub(super) fn padding_is_empty(&self) -> bool {
self.bitvec[self.buckets..]
.iter()
.all(|&byte| byte == EMPTY)
}

pub fn set_tombstone(&mut self, bucket: usize) {
self.bitvec[bucket] = TOMBSTONE;
}
Expand Down
21 changes: 19 additions & 2 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ pub use wal::WalBlobBuilder;

mod ht_file;
mod meta_map;
mod rehash;
mod validate;
mod wal;
pub(crate) mod writeout;

pub(crate) use rehash::{finish_pending_rehash, grow_hashtable};
pub(crate) use validate::validate_hashtable;

/// During assigning a bucket to a page, the allocator gave up, meaning that the occupancy rate
/// is too high.
#[derive(fmt::Debug)]
Expand Down Expand Up @@ -651,7 +656,15 @@ fn allocate_bucket(
meta_map: &mut MetaMap,
seed: &[u8; 16],
) -> Option<BucketIndex> {
let mut probe_seq = ProbeSequence::new(page_id, &meta_map, seed);
allocate_bucket_raw(page_id.encode(), meta_map, seed)
}

fn allocate_bucket_raw(
page_id: [u8; 32],
meta_map: &mut MetaMap,
seed: &[u8; 16],
) -> Option<BucketIndex> {
let mut probe_seq = ProbeSequence::new_raw(page_id, &meta_map, seed);

let mut i = 0;
loop {
Expand Down Expand Up @@ -695,7 +708,11 @@ enum ProbeResult {

impl ProbeSequence {
fn new(page_id: &PageId, meta_map: &MetaMap, seed: &[u8; 16]) -> Self {
let hash = hash_page_id(page_id, seed);
Self::new_raw(page_id.encode(), meta_map, seed)
}

fn new_raw(page_id: [u8; 32], meta_map: &MetaMap, seed: &[u8; 16]) -> Self {
let hash = hash_raw_page_id(page_id, seed);
Self {
hash,
bucket: hash % meta_map.len() as u64,
Expand Down
Loading