From a43fc1b8114ce413d7ad213fa3180603500133b9 Mon Sep 17 00:00:00 2001 From: Theodore Bugnet Date: Thu, 4 Jun 2026 20:10:44 +0100 Subject: [PATCH 1/2] Initial implementation --- nomt/src/bitbox/ht_file.rs | 27 ++- nomt/src/bitbox/meta_map.rs | 6 +- nomt/src/bitbox/mod.rs | 19 ++- nomt/src/bitbox/rehash.rs | 315 +++++++++++++++++++++++++++++++++++ nomt/src/lib.rs | 16 ++ nomt/src/store/mod.rs | 22 ++- nomt/tests/grow_hashtable.rs | 83 +++++++++ 7 files changed, 475 insertions(+), 13 deletions(-) create mode 100644 nomt/src/bitbox/rehash.rs create mode 100644 nomt/tests/grow_hashtable.rs diff --git a/nomt/src/bitbox/ht_file.rs b/nomt/src/bitbox/ht_file.rs index 8b0a15d31b..16f6bb72a6 100644 --- a/nomt/src/bitbox/ht_file.rs +++ b/nomt/src/bitbox/ht_file.rs @@ -17,6 +17,12 @@ pub struct HTOffsets { } impl HTOffsets { + pub(super) fn new(num_pages: u32) -> Self { + Self { + data_page_offset: num_meta_byte_pages(num_pages) as u64, + } + } + /// Returns the page number of the `ix`th item in the data section of the store. pub fn data_page_index(&self, ix: u64) -> u64 { self.data_page_offset + ix @@ -28,12 +34,12 @@ impl HTOffsets { } } -fn expected_file_len(num_pages: u32) -> u64 { +pub(super) fn expected_file_len(num_pages: u32) -> u64 { (num_meta_byte_pages(num_pages) + num_pages) as u64 * PAGE_SIZE as u64 } -fn num_meta_byte_pages(num_pages: u32) -> u32 { - (num_pages + 4095) / PAGE_SIZE as u32 +pub(super) fn num_meta_byte_pages(num_pages: u32) -> u32 { + ((num_pages as u64 + PAGE_SIZE as u64 - 1) / PAGE_SIZE as u64) as u32 } /// Opens the HT file, checks its length and reads the meta map. @@ -46,16 +52,15 @@ pub fn open( anyhow::bail!("Store corrupted; unexpected file length"); } - let num_meta_byte_pages = num_meta_byte_pages(num_pages); - let mut meta_bytes = Vec::with_capacity(num_meta_byte_pages as usize * PAGE_SIZE); - for pn in 0..num_meta_byte_pages { + let meta_byte_pages = num_meta_byte_pages(num_pages); + let mut meta_bytes = Vec::with_capacity(meta_byte_pages as usize * PAGE_SIZE); + for pn in 0..meta_byte_pages { let extra_meta_page = io::read_page(page_pool, ht_fd, pn as u64)?; meta_bytes.extend_from_slice(&*extra_meta_page); } - let data_page_offset = num_meta_byte_pages as u64; Ok(( - HTOffsets { data_page_offset }, + HTOffsets::new(num_pages), MetaMap::from_bytes(meta_bytes, num_pages as usize), )) } @@ -89,7 +94,11 @@ pub fn create(path: PathBuf, num_pages: u32, preallocate: bool) -> std::io::Resu /// and may silently fall back to regular allocation. /// /// After this call, if successful, the file size is set to `len` bytes. -fn resize_and_prealloc(ht_file: &File, len: u64, preallocate: bool) -> std::io::Result<()> { +pub(super) fn resize_and_prealloc( + ht_file: &File, + len: u64, + preallocate: bool, +) -> std::io::Result<()> { if !preallocate { // If not preallocating, just set the file size and return. ht_file.set_len(len)?; diff --git a/nomt/src/bitbox/meta_map.rs b/nomt/src/bitbox/meta_map.rs index 7d12580f42..eaebed763b 100644 --- a/nomt/src/bitbox/meta_map.rs +++ b/nomt/src/bitbox/meta_map.rs @@ -24,7 +24,7 @@ impl MetaMap { } pub fn full_count(&self) -> usize { - self.bitvec + self.bitvec[..self.buckets] .iter() .filter(|&&byte| byte & FULL_MASK != 0) .count() @@ -38,6 +38,10 @@ impl MetaMap { self.bitvec[bucket] = full_entry(hash); } + pub(super) fn is_full(&self, bucket: usize) -> bool { + self.bitvec[bucket] & FULL_MASK != 0 + } + pub fn set_tombstone(&mut self, bucket: usize) { self.bitvec[bucket] = TOMBSTONE; } diff --git a/nomt/src/bitbox/mod.rs b/nomt/src/bitbox/mod.rs index ec4d1704bf..342c49a960 100644 --- a/nomt/src/bitbox/mod.rs +++ b/nomt/src/bitbox/mod.rs @@ -27,9 +27,12 @@ pub use wal::WalBlobBuilder; mod ht_file; mod meta_map; +mod rehash; mod wal; pub(crate) mod writeout; +pub(crate) use rehash::{finish_pending_rehash, grow_hashtable}; + /// During assigning a bucket to a page, the allocator gave up, meaning that the occupancy rate /// is too high. #[derive(fmt::Debug)] @@ -651,7 +654,15 @@ fn allocate_bucket( meta_map: &mut MetaMap, seed: &[u8; 16], ) -> Option { - let mut probe_seq = ProbeSequence::new(page_id, &meta_map, seed); + allocate_bucket_raw(page_id.encode(), meta_map, seed) +} + +fn allocate_bucket_raw( + page_id: [u8; 32], + meta_map: &mut MetaMap, + seed: &[u8; 16], +) -> Option { + let mut probe_seq = ProbeSequence::new_raw(page_id, &meta_map, seed); let mut i = 0; loop { @@ -695,7 +706,11 @@ enum ProbeResult { impl ProbeSequence { fn new(page_id: &PageId, meta_map: &MetaMap, seed: &[u8; 16]) -> Self { - let hash = hash_page_id(page_id, seed); + Self::new_raw(page_id.encode(), meta_map, seed) + } + + fn new_raw(page_id: [u8; 32], meta_map: &MetaMap, seed: &[u8; 16]) -> Self { + let hash = hash_raw_page_id(page_id, seed); Self { hash, bucket: hash % meta_map.len() as u64, diff --git a/nomt/src/bitbox/rehash.rs b/nomt/src/bitbox/rehash.rs new file mode 100644 index 0000000000..ae66c65fe1 --- /dev/null +++ b/nomt/src/bitbox/rehash.rs @@ -0,0 +1,315 @@ +use std::{ + fs::{File, OpenOptions}, + io::{Read as _, Write as _}, + os::unix::fs::FileExt as _, + path::Path, +}; + +use anyhow::Context as _; + +use crate::{ + io::{self, PagePool, PAGE_SIZE}, + store::meta::Meta, +}; + +use super::{allocate_bucket_raw, ht_file, meta_map::MetaMap, recover}; + +const TMP_HT_FILE: &str = "ht.rehashing"; +const MARKER_FILE: &str = "ht.rehashing-marker"; +const MARKER_MAGIC: [u8; 8] = *b"NOMTRH1\0"; +const MARKER_LEN: usize = 16; + +struct RehashMarker { + old_num_pages: u32, + new_num_pages: u32, +} + +impl RehashMarker { + fn encode(&self) -> [u8; MARKER_LEN] { + let mut out = [0u8; MARKER_LEN]; + out[..8].copy_from_slice(&MARKER_MAGIC); + out[8..12].copy_from_slice(&self.old_num_pages.to_le_bytes()); + out[12..16].copy_from_slice(&self.new_num_pages.to_le_bytes()); + out + } + + fn decode(bytes: &[u8]) -> anyhow::Result { + if bytes.len() != MARKER_LEN { + anyhow::bail!("invalid rehash marker length: {}", bytes.len()); + } + if bytes[..8] != MARKER_MAGIC { + anyhow::bail!("invalid rehash marker magic"); + } + + let old_num_pages = u32::from_le_bytes(bytes[8..12].try_into().unwrap()); + let new_num_pages = u32::from_le_bytes(bytes[12..16].try_into().unwrap()); + if new_num_pages <= old_num_pages { + anyhow::bail!( + "invalid rehash marker bucket counts: old={}, new={}", + old_num_pages, + new_num_pages + ); + } + + Ok(Self { + old_num_pages, + new_num_pages, + }) + } +} + +pub(crate) fn finish_pending_rehash(path: &Path, page_pool: &PagePool) -> anyhow::Result<()> { + let Some(marker) = read_marker(path)? else { + return Ok(()); + }; + + let meta_path = path.join("meta"); + let meta_fd = OpenOptions::new() + .read(true) + .write(true) + .open(&meta_path) + .with_context(|| format!("failed to open {}", meta_path.display()))?; + let mut meta = Meta::read(page_pool, &meta_fd) + .with_context(|| format!("failed to read {}", meta_path.display()))?; + meta.validate()?; + + let ht_path = path.join("ht"); + let ht_len = ht_path + .metadata() + .with_context(|| format!("failed to stat {}", ht_path.display()))? + .len(); + let old_len = ht_file::expected_file_len(marker.old_num_pages); + let new_len = ht_file::expected_file_len(marker.new_num_pages); + + match meta.bitbox_num_pages { + n if n == marker.new_num_pages => { + if ht_len != new_len { + anyhow::bail!( + "pending rehash marker says meta is updated, but ht length is {}; expected {}", + ht_len, + new_len + ); + } + } + n if n == marker.old_num_pages && ht_len == new_len => { + meta.bitbox_num_pages = marker.new_num_pages; + Meta::write(page_pool, &meta_fd, &meta) + .context("failed to finish pending rehash metadata update")?; + } + n if n == marker.old_num_pages && ht_len == old_len => { + // The marker was persisted before the replacement rename completed. Keep the old table + // and discard the fully rebuilt temporary table, if it exists. + } + n => { + anyhow::bail!( + "pending rehash marker is inconsistent with meta/ht: meta buckets={}, old={}, new={}, ht_len={}", + n, + marker.old_num_pages, + marker.new_num_pages, + ht_len + ); + } + } + + remove_file_if_exists(&path.join(TMP_HT_FILE))?; + remove_file_if_exists(&path.join(MARKER_FILE))?; + sync_dir(path)?; + Ok(()) +} + +pub(crate) fn grow_hashtable( + path: &Path, + page_pool: &PagePool, + new_num_pages: u32, + preallocate: bool, +) -> anyhow::Result<()> { + if new_num_pages == 0 { + anyhow::bail!("hashtable bucket count must be greater than zero"); + } + + finish_pending_rehash(path, page_pool)?; + + let meta_path = path.join("meta"); + let meta_fd = OpenOptions::new() + .read(true) + .write(true) + .open(&meta_path) + .with_context(|| format!("failed to open {}", meta_path.display()))?; + let mut meta = Meta::read(page_pool, &meta_fd) + .with_context(|| format!("failed to read {}", meta_path.display()))?; + meta.validate()?; + + if new_num_pages < meta.bitbox_num_pages { + anyhow::bail!( + "cannot shrink hashtable from {} to {} buckets", + meta.bitbox_num_pages, + new_num_pages + ); + } + if new_num_pages == meta.bitbox_num_pages { + return Ok(()); + } + + remove_file_if_exists(&path.join(TMP_HT_FILE))?; + rehash_to_tmp(path, page_pool, &meta, new_num_pages, preallocate)?; + + write_marker( + path, + RehashMarker { + old_num_pages: meta.bitbox_num_pages, + new_num_pages, + }, + )?; + + std::fs::rename(path.join(TMP_HT_FILE), path.join("ht")) + .context("failed to replace hashtable file")?; + sync_dir(path)?; + + meta.bitbox_num_pages = new_num_pages; + Meta::write(page_pool, &meta_fd, &meta).context("failed to update NOMT metadata")?; + + remove_file_if_exists(&path.join(MARKER_FILE))?; + sync_dir(path)?; + Ok(()) +} + +fn rehash_to_tmp( + path: &Path, + page_pool: &PagePool, + meta: &Meta, + new_num_pages: u32, + preallocate: bool, +) -> anyhow::Result<()> { + let ht_path = path.join("ht"); + let ht_fd = OpenOptions::new() + .read(true) + .write(true) + .open(&ht_path) + .with_context(|| format!("failed to open {}", ht_path.display()))?; + let wal_path = path.join("wal"); + let wal_fd = OpenOptions::new() + .read(true) + .write(true) + .open(&wal_path) + .with_context(|| format!("failed to open {}", wal_path.display()))?; + + let (old_offsets, mut old_meta_map) = ht_file::open(meta.bitbox_num_pages, page_pool, &ht_fd)?; + if wal_fd.metadata()?.len() > 0 { + recover( + meta.sync_seqn, + &ht_fd, + &wal_fd, + page_pool, + &old_offsets, + &mut old_meta_map, + meta.bitbox_seed, + )?; + } + + let new_meta_bytes = + vec![0u8; ht_file::num_meta_byte_pages(new_num_pages) as usize * PAGE_SIZE]; + let mut new_meta_map = MetaMap::from_bytes(new_meta_bytes, new_num_pages as usize); + if old_meta_map.full_count() > new_meta_map.len() { + anyhow::bail!( + "new hashtable has {} buckets but old table has {} occupied buckets", + new_meta_map.len(), + old_meta_map.full_count() + ); + } + + let tmp_path = path.join(TMP_HT_FILE); + let tmp_fd = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&tmp_path) + .with_context(|| format!("failed to create {}", tmp_path.display()))?; + ht_file::resize_and_prealloc( + &tmp_fd, + ht_file::expected_file_len(new_num_pages), + preallocate, + ) + .with_context(|| format!("failed to resize {}", tmp_path.display()))?; + + let new_offsets = ht_file::HTOffsets::new(new_num_pages); + for old_bucket in 0..old_meta_map.len() { + if !old_meta_map.is_full(old_bucket) { + continue; + } + + let old_pn = old_offsets.data_page_index(old_bucket as u64); + let page = io::read_page(page_pool, &ht_fd, old_pn) + .with_context(|| format!("failed to read old hashtable bucket {}", old_bucket))?; + let page_id_bytes: [u8; 32] = page[PAGE_SIZE - 32..].try_into().unwrap(); + let Some(new_bucket) = + allocate_bucket_raw(page_id_bytes, &mut new_meta_map, &meta.bitbox_seed) + else { + anyhow::bail!( + "failed to allocate bucket while rehashing old bucket {}", + old_bucket + ); + }; + let new_pn = new_offsets.data_page_index(new_bucket.0); + tmp_fd + .write_all_at(&page, new_pn * PAGE_SIZE as u64) + .with_context(|| format!("failed to write new hashtable bucket {}", new_bucket.0))?; + } + + for meta_page_ix in 0..ht_file::num_meta_byte_pages(new_num_pages) as usize { + tmp_fd + .write_all_at( + new_meta_map.page_slice(meta_page_ix), + meta_page_ix as u64 * PAGE_SIZE as u64, + ) + .with_context(|| format!("failed to write new meta-map page {}", meta_page_ix))?; + } + + tmp_fd + .sync_all() + .with_context(|| format!("failed to sync {}", tmp_path.display()))?; + sync_dir(path)?; + Ok(()) +} + +fn read_marker(path: &Path) -> anyhow::Result> { + let marker_path = path.join(MARKER_FILE); + let mut fd = match File::open(&marker_path) { + Ok(fd) => fd, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(e) => { + return Err(e).with_context(|| format!("failed to open {}", marker_path.display())) + } + }; + + let mut bytes = Vec::new(); + fd.read_to_end(&mut bytes) + .with_context(|| format!("failed to read {}", marker_path.display()))?; + Ok(Some(RehashMarker::decode(&bytes)?)) +} + +fn write_marker(path: &Path, marker: RehashMarker) -> anyhow::Result<()> { + let marker_path = path.join(MARKER_FILE); + let mut fd = OpenOptions::new() + .write(true) + .create_new(true) + .open(&marker_path) + .with_context(|| format!("failed to create {}", marker_path.display()))?; + fd.write_all(&marker.encode()) + .with_context(|| format!("failed to write {}", marker_path.display()))?; + fd.sync_all() + .with_context(|| format!("failed to sync {}", marker_path.display()))?; + sync_dir(path)?; + Ok(()) +} + +fn remove_file_if_exists(path: &Path) -> std::io::Result<()> { + match std::fs::remove_file(path) { + Ok(()) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } +} + +fn sync_dir(path: &Path) -> std::io::Result<()> { + File::open(path)?.sync_all() +} diff --git a/nomt/src/lib.rs b/nomt/src/lib.rs index e19a66e172..e2ccfb1396 100644 --- a/nomt/src/lib.rs +++ b/nomt/src/lib.rs @@ -951,6 +951,22 @@ pub fn check_iou_permissions() -> IoUringPermission { crate::io::check_iou_permissions() } +/// Grow the Bitbox hash table of an offline NOMT database. +/// +/// This function takes the database directory lock, recovers any pending Bitbox write-ahead log, +/// rebuilds the hash table with the bucket count configured by [`Options::hashtable_buckets`], and +/// updates the on-disk metadata. The database must not be open in the current process while this +/// runs. +/// +/// Only the [`Options`] path, hashtable bucket count, and preallocation setting are used. +pub fn grow_hashtable(options: &Options) -> anyhow::Result<()> { + store::grow_hashtable( + &options.path, + options.bitbox_num_pages, + options.preallocate_ht, + ) +} + #[cfg(test)] mod tests { use crate::hasher::Blake3Hasher; diff --git a/nomt/src/store/mod.rs b/nomt/src/store/mod.rs index 345aec3322..2c2be3e78b 100644 --- a/nomt/src/store/mod.rs +++ b/nomt/src/store/mod.rs @@ -27,7 +27,7 @@ pub use self::page_loader::{PageLoad, PageLoader}; pub use bitbox::{BucketIndex, HashTableUtilization, SharedMaybeBucketIndex}; mod flock; -mod meta; +pub(crate) mod meta; mod page_loader; mod sync; @@ -66,6 +66,7 @@ impl Store { options.read(true); db_dir_fd = options.open(&o.path)?; flock = flock::Flock::lock(&o.path, ".lock")?; + bitbox::finish_pending_rehash(&o.path, &page_pool)?; } let db_dir_fd = Arc::new(db_dir_fd); @@ -318,6 +319,25 @@ impl Drop for Shared { } } +pub(crate) fn grow_hashtable( + path: &std::path::Path, + hashtable_buckets: u32, + preallocate_ht: bool, +) -> anyhow::Result<()> { + let page_pool = PagePool::new(); + + let db_dir_fd = { + let mut options = OpenOptions::new(); + options.read(true); + options.open(path)? + }; + let _flock = flock::Flock::lock(path, ".lock")?; + + bitbox::grow_hashtable(path, &page_pool, hashtable_buckets, preallocate_ht)?; + db_dir_fd.sync_all()?; + Ok(()) +} + /// An atomic transaction on raw key/value pairs to be applied against the store /// with [`Store::commit`]. pub struct ValueTransaction { diff --git a/nomt/tests/grow_hashtable.rs b/nomt/tests/grow_hashtable.rs new file mode 100644 index 0000000000..bed9ffeea8 --- /dev/null +++ b/nomt/tests/grow_hashtable.rs @@ -0,0 +1,83 @@ +use std::path::Path; + +use nomt::{ + grow_hashtable, hasher::Blake3Hasher, trie::KeyPath, KeyReadWrite, Nomt, Options, Root, + SessionParams, WitnessMode, +}; + +fn options(path: &Path, buckets: u32, rollback: bool) -> Options { + let mut options = Options::new(); + options.path(path); + options.bitbox_seed([0; 16]); + options.hashtable_buckets(buckets); + options.io_workers(1); + options.rollback(rollback); + options.preallocate_ht(false); + options +} + +fn key(i: u64) -> KeyPath { + let mut input = [0u8; 32]; + input[24..].copy_from_slice(&i.to_be_bytes()); + *blake3::hash(&input).as_bytes() +} + +fn value(i: u64) -> Vec { + let mut value = Vec::with_capacity(16); + value.extend_from_slice(&i.to_le_bytes()); + value.extend_from_slice(&(i * 7).to_le_bytes()); + value +} + +fn commit_range(nomt: &Nomt, range: std::ops::Range) -> Root { + let session = + nomt.begin_session(SessionParams::default().witness_mode(WitnessMode::read_write())); + let mut operations = range + .map(|i| (key(i), KeyReadWrite::Write(Some(value(i))))) + .collect::>(); + operations.sort_by_key(|(key, _)| *key); + for (key, _) in &operations { + session.warm_up(*key); + } + + let finished = session.finish(operations).unwrap(); + let root = finished.root(); + finished.commit(nomt).unwrap(); + root +} + +#[test] +fn grow_hashtable_preserves_data_and_rollback() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path(); + + let nomt = Nomt::::open(options(path, 4096, true)).unwrap(); + let root_1 = commit_range(&nomt, 0..40); + let root_2 = commit_range(&nomt, 40..80); + assert_eq!(nomt.root(), root_2); + drop(nomt); + + grow_hashtable(&options(path, 8192, true)).unwrap(); + + let nomt = Nomt::::open(options(path, 4096, true)).unwrap(); + assert_eq!(nomt.root(), root_2); + assert_eq!(nomt.hash_table_utilization().capacity, 8192); + assert!(nomt.hash_table_utilization().occupied > 0); + + for i in 0..80 { + assert_eq!(nomt.read(key(i)).unwrap(), Some(value(i))); + } + + nomt.rollback(1).unwrap(); + assert_eq!(nomt.root(), root_1); + for i in 0..40 { + assert_eq!(nomt.read(key(i)).unwrap(), Some(value(i))); + } + for i in 40..80 { + assert_eq!(nomt.read(key(i)).unwrap(), None); + } + + let root_3 = commit_range(&nomt, 100..110); + assert_eq!(nomt.root(), root_3); + assert_eq!(nomt.hash_table_utilization().capacity, 8192); +} From c85fb01f526b9eee742bafbdc89c4a58313b249b Mon Sep 17 00:00:00 2001 From: Theodore Bugnet Date: Fri, 5 Jun 2026 13:41:36 +0100 Subject: [PATCH 2/2] Add growth support to fuzzing and the torture framework --- fuzz/Cargo.toml | 7 + fuzz/fuzz_targets/grow_hashtable.rs | 236 ++++++++++++++++++++++++++++ nomt/src/bitbox/meta_map.rs | 6 + nomt/src/bitbox/mod.rs | 2 + nomt/src/bitbox/rehash.rs | 168 ++++++++++++++++++++ nomt/src/bitbox/validate.rs | 153 ++++++++++++++++++ nomt/src/lib.rs | 10 ++ nomt/src/store/mod.rs | 16 ++ nomt/tests/grow_hashtable.rs | 7 +- torture/src/agent.rs | 97 +++++++++++- torture/src/message.rs | 33 ++++ torture/src/supervisor/comms.rs | 15 ++ torture/src/supervisor/config.rs | 20 +++ torture/src/supervisor/swarm.rs | 6 + torture/src/supervisor/workload.rs | 143 +++++++++++++++++ 15 files changed, 915 insertions(+), 4 deletions(-) create mode 100644 fuzz/fuzz_targets/grow_hashtable.rs create mode 100644 nomt/src/bitbox/validate.rs diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index cd804fc2a6..805d37341d 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -58,3 +58,10 @@ path = "fuzz_targets/reconstruct_key.rs" test = false doc = false bench = false + +[[bin]] +name = "grow_hashtable" +path = "fuzz_targets/grow_hashtable.rs" +test = false +doc = false +bench = false diff --git a/fuzz/fuzz_targets/grow_hashtable.rs b/fuzz/fuzz_targets/grow_hashtable.rs new file mode 100644 index 0000000000..a199e75ec3 --- /dev/null +++ b/fuzz/fuzz_targets/grow_hashtable.rs @@ -0,0 +1,236 @@ +#![no_main] + +use std::{ + collections::{BTreeMap, BTreeSet}, + path::Path, +}; + +use arbitrary::Arbitrary; +use libfuzzer_sys::fuzz_target; +use nomt::{ + grow_hashtable, hasher::Blake3Hasher, trie::KeyPath, validate_hashtable, KeyReadWrite, Nomt, + Options, SessionParams, Value, +}; + +fuzz_target!(|run: Run| { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().join("db"); + let mut buckets = run.initial_buckets; + let mut db = Some(Nomt::::open(options(&path, buckets)).unwrap()); + let mut model = BTreeMap::::new(); + let mut touched = BTreeSet::::new(); + let mut snapshots = vec![model.clone()]; + + for op in run.ops { + match op { + Op::Commit(changes) => { + let Some(nomt) = db.as_ref() else { + unreachable!("database should be open before commit") + }; + if commit(nomt, changes, &mut model, &mut touched) { + snapshots.push(model.clone()); + } + assert_model(nomt, &model, &touched); + } + Op::Grow(extra) => { + let Some(nomt) = db.take() else { + unreachable!("database should be open before grow") + }; + let root = nomt.root(); + drop(nomt); + + let requested = buckets.saturating_add(1).saturating_add(extra as u32 * 512); + grow_hashtable(&options(&path, requested)).unwrap(); + buckets = requested; + + let utilization = validate_hashtable(&options(&path, buckets)).unwrap(); + assert_eq!(utilization.capacity, buckets as usize); + + let nomt = Nomt::::open(options(&path, buckets)).unwrap(); + assert_eq!(nomt.root(), root); + assert_model(&nomt, &model, &touched); + db = Some(nomt); + } + Op::Rollback(raw_n) => { + if snapshots.len() <= 1 { + continue; + } + + let n = raw_n as usize % (snapshots.len() - 1) + 1; + let target = snapshots.len() - 1 - n; + let Some(nomt) = db.as_ref() else { + unreachable!("database should be open before rollback") + }; + nomt.rollback(n).unwrap(); + model = snapshots[target].clone(); + snapshots.push(model.clone()); + assert_model(nomt, &model, &touched); + } + Op::Reopen => { + drop(db.take()); + let nomt = Nomt::::open(options(&path, buckets)).unwrap(); + assert_model(&nomt, &model, &touched); + db = Some(nomt); + } + Op::Validate => { + let Some(nomt) = db.take() else { + unreachable!("database should be open before validate") + }; + let root = nomt.root(); + drop(nomt); + + validate_hashtable(&options(&path, buckets)).unwrap(); + + let nomt = Nomt::::open(options(&path, buckets)).unwrap(); + assert_eq!(nomt.root(), root); + assert_model(&nomt, &model, &touched); + db = Some(nomt); + } + } + } +}); + +fn options(path: &Path, buckets: u32) -> Options { + let mut options = Options::new(); + options.path(path); + options.bitbox_seed([0; 16]); + options.hashtable_buckets(buckets); + options.io_workers(1); + options.rollback(true); + options.max_rollback_log_len(128); + options.preallocate_ht(false); + options +} + +fn commit( + nomt: &Nomt, + changes: Vec, + model: &mut BTreeMap, + touched: &mut BTreeSet, +) -> bool { + let mut dedup = BTreeMap::>::new(); + for change in changes { + dedup.insert(change.key, change.value); + } + if dedup.is_empty() { + return false; + } + + let session = nomt.begin_session(SessionParams::default()); + let operations = dedup + .iter() + .map(|(key, value)| (*key, KeyReadWrite::Write(value.clone()))) + .collect::>(); + for (key, _) in &operations { + session.warm_up(*key); + } + session.finish(operations).unwrap().commit(nomt).unwrap(); + + for (key, value) in dedup { + touched.insert(key); + if let Some(value) = value { + model.insert(key, value); + } else { + model.remove(&key); + } + } + + true +} + +fn assert_model( + nomt: &Nomt, + model: &BTreeMap, + touched: &BTreeSet, +) { + for key in touched { + assert_eq!(nomt.read(*key).unwrap().as_ref(), model.get(key)); + } +} + +#[derive(Debug)] +struct Run { + initial_buckets: u32, + ops: Vec, +} + +#[derive(Debug)] +enum Op { + Commit(Vec), + Grow(u8), + Rollback(u8), + Reopen, + Validate, +} + +#[derive(Debug)] +struct Change { + key: KeyPath, + value: Option, +} + +impl<'a> Arbitrary<'a> for Run { + fn arbitrary(input: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { + let initial_buckets = *input.choose(&[4096u32, 8192])?; + let op_count = input.int_in_range(0..=16)?; + let mut ops = Vec::with_capacity(op_count); + for _ in 0..op_count { + ops.push(Op::arbitrary(input)?); + } + + Ok(Self { + initial_buckets, + ops, + }) + } +} + +impl<'a> Arbitrary<'a> for Op { + fn arbitrary(input: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { + Ok(match input.int_in_range(0..=9)? { + 0..=4 => { + let len = input.int_in_range(0..=8)?; + let mut changes = Vec::with_capacity(len); + for _ in 0..len { + changes.push(Change::arbitrary(input)?); + } + Self::Commit(changes) + } + 5..=6 => Self::Grow(input.arbitrary()?), + 7 => Self::Rollback(input.arbitrary()?), + 8 => Self::Reopen, + 9 => Self::Validate, + _ => unreachable!(), + }) + } +} + +impl<'a> Arbitrary<'a> for Change { + fn arbitrary(input: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { + let mut key = [0; 32]; + input.fill_buffer(&mut key)?; + + let value = if input.ratio(1, 4)? { + None + } else { + Some(arbitrary_value(input)?) + }; + + Ok(Self { key, value }) + } +} + +fn arbitrary_value(input: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { + let len = match input.int_in_range(0..=7)? { + 0 => 0, + 1 => 1, + 2 => input.int_in_range(2..=32)?, + 3 => input.int_in_range(33..=256)?, + 4 => input.int_in_range(257..=1333)?, + 5 => input.int_in_range(1334..=2048)?, + _ => input.int_in_range(2049..=4096)?, + }; + let mut value = vec![0; len]; + input.fill_buffer(&mut value)?; + Ok(value) +} diff --git a/nomt/src/bitbox/meta_map.rs b/nomt/src/bitbox/meta_map.rs index eaebed763b..7f51349c18 100644 --- a/nomt/src/bitbox/meta_map.rs +++ b/nomt/src/bitbox/meta_map.rs @@ -42,6 +42,12 @@ impl MetaMap { self.bitvec[bucket] & FULL_MASK != 0 } + pub(super) fn padding_is_empty(&self) -> bool { + self.bitvec[self.buckets..] + .iter() + .all(|&byte| byte == EMPTY) + } + pub fn set_tombstone(&mut self, bucket: usize) { self.bitvec[bucket] = TOMBSTONE; } diff --git a/nomt/src/bitbox/mod.rs b/nomt/src/bitbox/mod.rs index 342c49a960..d499e3e2d8 100644 --- a/nomt/src/bitbox/mod.rs +++ b/nomt/src/bitbox/mod.rs @@ -28,10 +28,12 @@ pub use wal::WalBlobBuilder; mod ht_file; mod meta_map; mod rehash; +mod validate; mod wal; pub(crate) mod writeout; pub(crate) use rehash::{finish_pending_rehash, grow_hashtable}; +pub(crate) use validate::validate_hashtable; /// During assigning a bucket to a page, the allocator gave up, meaning that the occupancy rate /// is too high. diff --git a/nomt/src/bitbox/rehash.rs b/nomt/src/bitbox/rehash.rs index ae66c65fe1..4fa393850d 100644 --- a/nomt/src/bitbox/rehash.rs +++ b/nomt/src/bitbox/rehash.rs @@ -313,3 +313,171 @@ fn remove_file_if_exists(path: &Path) -> std::io::Result<()> { fn sync_dir(path: &Path) -> std::io::Result<()> { File::open(path)?.sync_all() } + +#[cfg(test)] +mod tests { + use std::{fs::OpenOptions, path::Path}; + + use crate::{ + bitbox::validate_hashtable, hasher::Blake3Hasher, io::PagePool, store::meta::Meta, + trie::KeyPath, KeyReadWrite, Nomt, Options, SessionParams, + }; + + use super::{ + grow_hashtable, rehash_to_tmp, sync_dir, write_marker, RehashMarker, MARKER_FILE, + TMP_HT_FILE, + }; + + const OLD_BUCKETS: u32 = 4096; + const NEW_BUCKETS: u32 = 8192; + + fn options(path: &Path, buckets: u32) -> Options { + let mut options = Options::new(); + options.path(path); + options.bitbox_seed([0; 16]); + options.hashtable_buckets(buckets); + options.io_workers(1); + options.preallocate_ht(false); + options + } + + fn key(i: u64) -> KeyPath { + let mut input = [0u8; 32]; + input[24..].copy_from_slice(&i.to_be_bytes()); + *blake3::hash(&input).as_bytes() + } + + fn value(i: u64) -> Vec { + let mut value = Vec::with_capacity(16); + value.extend_from_slice(&i.to_le_bytes()); + value.extend_from_slice(&(i * 11).to_le_bytes()); + value + } + + fn commit_range(nomt: &Nomt, range: std::ops::Range) { + let session = nomt.begin_session(SessionParams::default()); + let mut operations = range + .map(|i| (key(i), KeyReadWrite::Write(Some(value(i))))) + .collect::>(); + operations.sort_by_key(|(key, _)| *key); + for (key, _) in &operations { + session.warm_up(*key); + } + + session.finish(operations).unwrap().commit(nomt).unwrap(); + } + + fn create_db(path: &Path) { + let nomt = Nomt::::open(options(path, OLD_BUCKETS)).unwrap(); + commit_range(&nomt, 0..40); + drop(nomt); + } + + fn read_meta(path: &Path, page_pool: &PagePool) -> Meta { + let meta_fd = OpenOptions::new() + .read(true) + .write(true) + .open(path.join("meta")) + .unwrap(); + Meta::read(page_pool, &meta_fd).unwrap() + } + + fn write_meta(path: &Path, page_pool: &PagePool, meta: &Meta) { + let meta_fd = OpenOptions::new() + .read(true) + .write(true) + .open(path.join("meta")) + .unwrap(); + Meta::write(page_pool, &meta_fd, meta).unwrap(); + } + + fn prepare_tmp_and_marker(path: &Path, page_pool: &PagePool) { + let meta = read_meta(path, page_pool); + rehash_to_tmp(path, page_pool, &meta, NEW_BUCKETS, false).unwrap(); + write_marker( + path, + RehashMarker { + old_num_pages: OLD_BUCKETS, + new_num_pages: NEW_BUCKETS, + }, + ) + .unwrap(); + } + + fn assert_open_db(path: &Path, expected_buckets: usize) { + let nomt = Nomt::::open(options(path, OLD_BUCKETS)).unwrap(); + assert_eq!(nomt.hash_table_utilization().capacity, expected_buckets); + for i in 0..40 { + assert_eq!(nomt.read(key(i)).unwrap(), Some(value(i))); + } + } + + #[test] + fn stale_tmp_without_marker_is_replaced_by_next_grow() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path(); + create_db(path); + + let page_pool = PagePool::new(); + let meta = read_meta(path, &page_pool); + rehash_to_tmp(path, &page_pool, &meta, NEW_BUCKETS, false).unwrap(); + assert!(path.join(TMP_HT_FILE).exists()); + + grow_hashtable(path, &page_pool, NEW_BUCKETS, false).unwrap(); + assert!(!path.join(TMP_HT_FILE).exists()); + assert!(!path.join(MARKER_FILE).exists()); + validate_hashtable(path, &page_pool).unwrap(); + assert_open_db(path, NEW_BUCKETS as usize); + } + + #[test] + fn pending_rehash_before_rename_keeps_old_table() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path(); + create_db(path); + + let page_pool = PagePool::new(); + prepare_tmp_and_marker(path, &page_pool); + + assert_open_db(path, OLD_BUCKETS as usize); + assert!(!path.join(TMP_HT_FILE).exists()); + assert!(!path.join(MARKER_FILE).exists()); + validate_hashtable(path, &page_pool).unwrap(); + } + + #[test] + fn pending_rehash_after_rename_updates_meta() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path(); + create_db(path); + + let page_pool = PagePool::new(); + prepare_tmp_and_marker(path, &page_pool); + std::fs::rename(path.join(TMP_HT_FILE), path.join("ht")).unwrap(); + sync_dir(path).unwrap(); + + assert_open_db(path, NEW_BUCKETS as usize); + assert!(!path.join(MARKER_FILE).exists()); + validate_hashtable(path, &page_pool).unwrap(); + } + + #[test] + fn pending_rehash_after_meta_update_removes_marker() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path(); + create_db(path); + + let page_pool = PagePool::new(); + prepare_tmp_and_marker(path, &page_pool); + std::fs::rename(path.join(TMP_HT_FILE), path.join("ht")).unwrap(); + sync_dir(path).unwrap(); + + let mut meta = read_meta(path, &page_pool); + meta.bitbox_num_pages = NEW_BUCKETS; + write_meta(path, &page_pool, &meta); + + assert_open_db(path, NEW_BUCKETS as usize); + assert!(!path.join(MARKER_FILE).exists()); + validate_hashtable(path, &page_pool).unwrap(); + } +} diff --git a/nomt/src/bitbox/validate.rs b/nomt/src/bitbox/validate.rs new file mode 100644 index 0000000000..cc00b66cbe --- /dev/null +++ b/nomt/src/bitbox/validate.rs @@ -0,0 +1,153 @@ +//! Validation utilities for the on-disk Bitbox hash table. + +use std::{collections::HashSet, fs::OpenOptions, path::Path}; + +use anyhow::Context as _; + +use crate::{ + io::{self, PagePool, PAGE_SIZE}, + store::meta::Meta, +}; + +use super::{ + finish_pending_rehash, hash_raw_page_id, ht_file, meta_map::MetaMap, recover, + HashTableUtilization, +}; + +const MAX_VALIDATION_PROBES: usize = 100_000; + +/// Validate the on-disk hash table layout. +pub(crate) fn validate_hashtable( + path: &Path, + page_pool: &PagePool, +) -> anyhow::Result { + finish_pending_rehash(path, page_pool)?; + + let meta_path = path.join("meta"); + let meta_fd = OpenOptions::new() + .read(true) + .write(true) + .open(&meta_path) + .with_context(|| format!("failed to open {}", meta_path.display()))?; + let meta = Meta::read(page_pool, &meta_fd) + .with_context(|| format!("failed to read {}", meta_path.display()))?; + meta.validate()?; + + let ht_path = path.join("ht"); + let ht_fd = OpenOptions::new() + .read(true) + .write(true) + .open(&ht_path) + .with_context(|| format!("failed to open {}", ht_path.display()))?; + let wal_path = path.join("wal"); + let wal_fd = OpenOptions::new() + .read(true) + .write(true) + .open(&wal_path) + .with_context(|| format!("failed to open {}", wal_path.display()))?; + + let (offsets, mut meta_map) = ht_file::open(meta.bitbox_num_pages, page_pool, &ht_fd)?; + if meta_map.len() == 0 { + anyhow::bail!("hash table has zero buckets"); + } + + if wal_fd.metadata()?.len() > 0 { + recover( + meta.sync_seqn, + &ht_fd, + &wal_fd, + page_pool, + &offsets, + &mut meta_map, + meta.bitbox_seed, + )?; + } + + if !meta_map.padding_is_empty() { + anyhow::bail!("hash table meta-map has non-empty padding past logical buckets"); + } + + let mut labels = HashSet::new(); + for bucket in 0..meta_map.len() { + if !meta_map.is_full(bucket) { + continue; + } + + let pn = offsets.data_page_index(bucket as u64); + let page = io::read_page(page_pool, &ht_fd, pn) + .with_context(|| format!("failed to read hash table bucket {}", bucket))?; + let page_label: [u8; 32] = page[PAGE_SIZE - 32..].try_into().unwrap(); + if !labels.insert(page_label) { + anyhow::bail!( + "duplicate page label {} in hash table bucket {}", + hex_label(page_label), + bucket + ); + } + + let hash = hash_raw_page_id(page_label, &meta.bitbox_seed); + if meta_map.hint_not_match(bucket, hash) { + anyhow::bail!( + "hash table bucket {} has page label {} but mismatched meta hint", + bucket, + hex_label(page_label) + ); + } + + ensure_bucket_reachable(&meta_map, page_label, bucket, &meta.bitbox_seed)?; + } + + Ok(HashTableUtilization { + capacity: meta_map.len(), + occupied: labels.len(), + }) +} + +fn ensure_bucket_reachable( + meta_map: &MetaMap, + page_label: [u8; 32], + expected_bucket: usize, + seed: &[u8; 16], +) -> anyhow::Result<()> { + let hash = hash_raw_page_id(page_label, seed); + let mut bucket = hash % meta_map.len() as u64; + let mut step = 0u64; + let max_probes = meta_map.len().saturating_mul(2).max(MAX_VALIDATION_PROBES); + + for _ in 0..max_probes { + bucket += step; + step += 1; + bucket %= meta_map.len() as u64; + + let bucket = bucket as usize; + if bucket == expected_bucket { + return Ok(()); + } + + if meta_map.hint_empty(bucket) { + anyhow::bail!( + "hash table bucket {} for page label {} is unreachable; probe hit empty bucket {} first", + expected_bucket, + hex_label(page_label), + bucket, + ); + } + } + + anyhow::bail!( + "hash table bucket {} for page label {} was not reached within {} probes", + expected_bucket, + hex_label(page_label), + max_probes, + ); +} + +fn hex_label(label: [u8; 32]) -> String { + const HEX: &[u8; 16] = b"0123456789abcdef"; + let mut out = String::with_capacity(64); + for byte in label { + out.push(HEX[(byte >> 4) as usize] as char); + out.push(HEX[(byte & 0x0f) as usize] as char); + } + out +} diff --git a/nomt/src/lib.rs b/nomt/src/lib.rs index e2ccfb1396..bab4adad00 100644 --- a/nomt/src/lib.rs +++ b/nomt/src/lib.rs @@ -967,6 +967,16 @@ pub fn grow_hashtable(options: &Options) -> anyhow::Result<()> { ) } +/// Validate the Bitbox hash table layout of an offline NOMT database. +/// +/// This function takes the database directory lock, completes any pending Bitbox WAL or rehash +/// recovery, and then scans the on-disk hash table for layout inconsistencies. The database must +/// not be open in the current process while this runs. +#[doc(hidden)] +pub fn validate_hashtable(options: &Options) -> anyhow::Result { + store::validate_hashtable(&options.path) +} + #[cfg(test)] mod tests { use crate::hasher::Blake3Hasher; diff --git a/nomt/src/store/mod.rs b/nomt/src/store/mod.rs index 2c2be3e78b..7cb81a6886 100644 --- a/nomt/src/store/mod.rs +++ b/nomt/src/store/mod.rs @@ -338,6 +338,22 @@ pub(crate) fn grow_hashtable( Ok(()) } +/// Validate the Bitbox hash table of an offline database while holding the directory lock. +pub(crate) fn validate_hashtable(path: &std::path::Path) -> anyhow::Result { + let page_pool = PagePool::new(); + + let db_dir_fd = { + let mut options = OpenOptions::new(); + options.read(true); + options.open(path)? + }; + let _flock = flock::Flock::lock(path, ".lock")?; + + let utilization = bitbox::validate_hashtable(path, &page_pool)?; + db_dir_fd.sync_all()?; + Ok(utilization) +} + /// An atomic transaction on raw key/value pairs to be applied against the store /// with [`Store::commit`]. pub struct ValueTransaction { diff --git a/nomt/tests/grow_hashtable.rs b/nomt/tests/grow_hashtable.rs index bed9ffeea8..9490586017 100644 --- a/nomt/tests/grow_hashtable.rs +++ b/nomt/tests/grow_hashtable.rs @@ -1,8 +1,8 @@ use std::path::Path; use nomt::{ - grow_hashtable, hasher::Blake3Hasher, trie::KeyPath, KeyReadWrite, Nomt, Options, Root, - SessionParams, WitnessMode, + grow_hashtable, hasher::Blake3Hasher, trie::KeyPath, validate_hashtable, KeyReadWrite, Nomt, + Options, Root, SessionParams, WitnessMode, }; fn options(path: &Path, buckets: u32, rollback: bool) -> Options { @@ -58,6 +58,9 @@ fn grow_hashtable_preserves_data_and_rollback() { drop(nomt); grow_hashtable(&options(path, 8192, true)).unwrap(); + let utilization = validate_hashtable(&options(path, 8192, true)).unwrap(); + assert_eq!(utilization.capacity, 8192); + assert!(utilization.occupied > 0); let nomt = Nomt::::open(options(path, 4096, true)).unwrap(); assert_eq!(nomt.root(), root_2); diff --git a/torture/src/agent.rs b/torture/src/agent.rs index f20c03682c..2220d5bffd 100644 --- a/torture/src/agent.rs +++ b/torture/src/agent.rs @@ -22,8 +22,9 @@ use tracing::trace; use crate::message::Key; use crate::{ message::{ - self, CommitPayload, Envelope, InitOutcome, KeyValueChange, OpenOutcome, OpenPayload, - Outcome, RollbackPayload, ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE, + self, CommitPayload, Envelope, GrowHashtablePayload, InitOutcome, KeyValueChange, + OpenOutcome, OpenPayload, Outcome, RollbackPayload, ToAgent, ToSupervisor, + MAX_ENVELOPE_SIZE, }, panic::panic_to_err, }; @@ -126,6 +127,47 @@ pub async fn run(input: UnixStream) -> Result<()> { }) .await?; } + ToAgent::GrowHashtable(GrowHashtablePayload { + hashtable_buckets, + preallocate_ht, + should_crash: Some(crash_delay), + }) => { + stream + .send(Envelope { + reqno, + message: ToSupervisor::Ack, + }) + .await?; + + let workdir = workdir.clone(); + let task = async move { + let start = std::time::Instant::now(); + let _ = agent + .grow_hashtable(&workdir, hashtable_buckets, preallocate_ht) + .await; + tracing::info!("hash table grow took {:?}", start.elapsed()); + }; + + crash_task(task, crash_delay, "hash table grow").await; + unreachable!(); + } + ToAgent::GrowHashtable(GrowHashtablePayload { + hashtable_buckets, + preallocate_ht, + should_crash: None, + }) => { + let start = std::time::Instant::now(); + let outcome = agent + .grow_hashtable(&workdir, hashtable_buckets, preallocate_ht) + .await; + tracing::info!("hash table grow took {}ms", start.elapsed().as_millis()); + stream + .send(Envelope { + reqno, + message: ToSupervisor::GrowHashtableResponse { outcome }, + }) + .await?; + } ToAgent::Query(key) => { let value = agent.query(key)?; stream @@ -144,6 +186,15 @@ pub async fn run(input: UnixStream) -> Result<()> { }) .await?; } + ToAgent::QueryHashTableUtilization => { + let utilization = agent.hash_table_utilization()?; + stream + .send(Envelope { + reqno, + message: ToSupervisor::HashTableUtilizationResponse(utilization), + }) + .await?; + } ToAgent::GracefulShutdown => { stream .send(Envelope { @@ -405,6 +456,38 @@ impl Agent { rollback_outcome } + /// Grow the Bitbox hash table offline. + async fn grow_hashtable( + &mut self, + workdir: &Path, + hashtable_buckets: u32, + preallocate_ht: bool, + ) -> Outcome { + self.session.take(); + self.nomt.take(); + + let mut options = nomt::Options::new(); + options.path(workdir.join("nomt_db")); + options.hashtable_buckets(hashtable_buckets); + options.preallocate_ht(preallocate_ht); + + let grow_result = block_in_place( + || { + nomt::grow_hashtable(&options)?; + nomt::validate_hashtable(&options)?; + Ok(()) + }, + "Panic growing hash table", + ); + let grow_outcome = classify_result(grow_result); + + if !matches!(grow_outcome, Outcome::Success) { + trace!("unsuccessful hash table grow: {:?}", grow_outcome); + } + + grow_outcome + } + fn query(&mut self, key: message::Key) -> Result> { // UNWRAP: `nomt` is always `Some` except recreation. let nomt = self.nomt.as_ref().unwrap(); @@ -417,6 +500,16 @@ impl Agent { let nomt = self.nomt.as_ref().unwrap(); nomt.sync_seqn() } + + fn hash_table_utilization(&mut self) -> Result { + // UNWRAP: `nomt` is always `Some` except recreation. + let nomt = self.nomt.as_ref().unwrap(); + let utilization = nomt.hash_table_utilization(); + Ok(message::HashTableUtilization { + capacity: utilization.capacity, + occupied: utilization.occupied, + }) + } } /// Runs the provided blocking function on the current thread without diff --git a/torture/src/message.rs b/torture/src/message.rs index 76a3498de6..01fd06a132 100644 --- a/torture/src/message.rs +++ b/torture/src/message.rs @@ -108,6 +108,27 @@ pub struct RollbackPayload { pub should_crash: Option, } +/// The parameters for the [`ToAgent::GrowHashtable`] message. +#[derive(Debug, Serialize, Deserialize)] +pub struct GrowHashtablePayload { + /// The requested number of buckets after growth. + pub hashtable_buckets: u32, + /// Whether to preallocate the replacement hash table file. + pub preallocate_ht: bool, + /// If Some the supervisor expects growth to crash, + /// the crash should happen after the specified amount of time. + pub should_crash: Option, +} + +/// The hash table utilization reported by an agent. +#[derive(Debug, Serialize, Deserialize)] +pub struct HashTableUtilization { + /// The maximum number of buckets in the hash table. + pub capacity: usize, + /// The number of occupied buckets in the hash table. + pub occupied: usize, +} + /// The maximum size of an envelope, in the serialized form. pub const MAX_ENVELOPE_SIZE: usize = 128 * 1024 * 1024; @@ -138,11 +159,16 @@ pub enum ToAgent { /// The supervisor sends this message to the child process to indicate that the child should /// perform a rollback. Rollback(RollbackPayload), + /// The supervisor sends this message to the child process to grow the Bitbox hash table + /// offline. The agent drops its open database handle before running this operation. + GrowHashtable(GrowHashtablePayload), /// The supervisor sends this message to the child process to query the value of a given key. Query(Key), /// The supervisor sends this message to the child process to query the current sequence number /// of the database. QuerySyncSeqn, + /// The supervisor sends this message to the child process to query hash table utilization. + QueryHashTableUtilization, /// The supervisor sends this message to the child process to indicate that the child should /// do a clean shutdown. GracefulShutdown, @@ -202,8 +228,15 @@ pub enum ToSupervisor { /// The outcome of the rollback. outcome: Outcome, }, + /// The response to a completed hash table growth request. + GrowHashtableResponse { + /// The outcome of the growth. + outcome: Outcome, + }, /// The response to a query for a key-value pair. QueryValue(Option), /// The response to a query for the current sequence number of the database. SyncSeqn(u32), + /// The response to a query for hash table utilization. + HashTableUtilizationResponse(HashTableUtilization), } diff --git a/torture/src/supervisor/comms.rs b/torture/src/supervisor/comms.rs index e805d14f27..a4ec44c783 100644 --- a/torture/src/supervisor/comms.rs +++ b/torture/src/supervisor/comms.rs @@ -106,6 +106,21 @@ impl RequestResponse { resp => bail!("unexpected response: {:?}", resp), } } + + /// Requests the current hash table utilization from the agent. + pub async fn send_query_hash_table_utilization( + &self, + ) -> anyhow::Result { + match self + .send_request(crate::message::ToAgent::QueryHashTableUtilization) + .await? + { + crate::message::ToSupervisor::HashTableUtilizationResponse(utilization) => { + Ok(utilization) + } + resp => bail!("unexpected response: {:?}", resp), + } + } } /// A task that handles inbound messages and dispatches them to the corresponding request listener. diff --git a/torture/src/supervisor/config.rs b/torture/src/supervisor/config.rs index 8f9aeb5bc2..b23982bd67 100644 --- a/torture/src/supervisor/config.rs +++ b/torture/src/supervisor/config.rs @@ -100,6 +100,12 @@ pub struct WorkloadConfiguration { pub max_rollback_commits: usize, /// When executing a rollback this is the probability of causing it to crash. pub rollback_crash: f64, + /// When executing a workload iteration, this is the probability of growing the hash table. + pub grow_hashtable: f64, + /// The maximum hash table bucket count that growth can request. + pub max_hashtable_buckets: u32, + /// When growing the hash table this is the probability of causing it to crash. + pub grow_hashtable_crash: f64, /// Whether trickfs will be used or not. /// /// If false, enospc_on/off and latency_on/off will all be 0. @@ -150,6 +156,9 @@ impl WorkloadConfiguration { rollback: 0.0, commit_crash: 0.0, rollback_crash: 0.0, + grow_hashtable: 0.0, + max_hashtable_buckets: 0, + grow_hashtable_crash: 0.0, trickfs, enospc_on: 0.0, enospc_off: 0.0, @@ -189,6 +198,7 @@ impl WorkloadConfiguration { let hashtable_size = (avail_bytes as f64 * hashtable_ratio) as usize; let hashtable_buckets = (hashtable_size / 4096) as u32; config.hashtable_buckets = hashtable_buckets; + config.max_hashtable_buckets = hashtable_buckets; // Do not use the entire space left by the hashtable // for the beatree and rollbacks, instead, leave some room for estimation error. @@ -319,6 +329,16 @@ impl WorkloadConfiguration { self.max_rollback_commits = rng.random_range(2..100); } SwarmFeatures::RollbackCrash => self.rollback_crash = rng.random_range(0.01..1.00), + SwarmFeatures::GrowHashtable => { + self.grow_hashtable = rng.random_range(0.01..0.20); + self.max_hashtable_buckets = self + .hashtable_buckets + .saturating_mul(2) + .max(self.hashtable_buckets.saturating_add(1)); + } + SwarmFeatures::GrowHashtableCrash => { + self.grow_hashtable_crash = rng.random_range(0.01..1.00) + } SwarmFeatures::CommitCrash => self.commit_crash = rng.random_range(0.01..1.00), SwarmFeatures::PrepopulatePageCache => self.prepopulate_page_cache = true, SwarmFeatures::NewKeys => self.new_key = rng.random_range(0.01..=1.00), diff --git a/torture/src/supervisor/swarm.rs b/torture/src/supervisor/swarm.rs index 324302474b..b508c79fc8 100644 --- a/torture/src/supervisor/swarm.rs +++ b/torture/src/supervisor/swarm.rs @@ -26,6 +26,10 @@ pub enum SwarmFeatures { Rollback, /// Whether rollback crash should be exercised. RollbackCrash, + /// Whether the Bitbox hash table should be grown offline during the workload. + GrowHashtable, + /// Whether offline hash table growth should be crashed. + GrowHashtableCrash, /// Whether commit crash should be exercised. CommitCrash, /// Whether to prepopulate the upper levels of the page cache on startup. @@ -49,6 +53,8 @@ pub fn new_features_set(rng: &mut rand_pcg::Pcg64) -> Vec { SwarmFeatures::Read, SwarmFeatures::Rollback, SwarmFeatures::RollbackCrash, + SwarmFeatures::GrowHashtable, + SwarmFeatures::GrowHashtableCrash, SwarmFeatures::CommitCrash, SwarmFeatures::PrepopulatePageCache, SwarmFeatures::NewKeys, diff --git a/torture/src/supervisor/workload.rs b/torture/src/supervisor/workload.rs index 2fc23edbe9..d3e75e5f0d 100644 --- a/torture/src/supervisor/workload.rs +++ b/torture/src/supervisor/workload.rs @@ -353,6 +353,15 @@ impl Workload { self.schedule_rollback(should_crash).await? } + if self.scheduled_rollback.is_none() + && self.config.hashtable_buckets < self.config.max_hashtable_buckets + && self.rng.random_bool(self.config.grow_hashtable) + { + let should_crash = self.rng.random_bool(self.config.grow_hashtable_crash); + self.exercise_grow_hashtable(should_crash).await?; + return Ok(()); + } + let should_crash = self.rng.random_bool(self.config.commit_crash); self.exercise_commit(should_crash).await?; @@ -620,6 +629,140 @@ impl Workload { Ok(()) } + async fn exercise_grow_hashtable(&mut self, should_crash: bool) -> anyhow::Result<()> { + let Some(new_hashtable_buckets) = self.next_hashtable_bucket_count() else { + return Ok(()); + }; + let old_hashtable_buckets = self.config.hashtable_buckets; + let should_crash = if should_crash { + trace!( + "exercising hash table grow crash from {} to {} buckets", + old_hashtable_buckets, + new_hashtable_buckets + ); + Some(Duration::from_millis(10)) + } else { + trace!( + "exercising hash table grow from {} to {} buckets", + old_hashtable_buckets, + new_hashtable_buckets + ); + None + }; + + let grow_outcome = self + .rr() + .send_request(crate::message::ToAgent::GrowHashtable( + crate::message::GrowHashtablePayload { + hashtable_buckets: new_hashtable_buckets, + // The torture workload already sizes the initial table aggressively relative + // to its resource assignment. Keep grow sparse here so this exercises rehash + // correctness instead of mostly testing duplicate preallocation headroom. + preallocate_ht: false, + should_crash, + }, + )) + .await?; + + if should_crash.is_some() { + let ToSupervisor::Ack = grow_outcome else { + return Err(anyhow::anyhow!( + "Hash table grow crash did not execute successfully" + )); + }; + + self.wait_for_crash().await?; + self.spawn_new_agent().await?; + } else { + let ToSupervisor::GrowHashtableResponse { outcome } = grow_outcome else { + return Err(anyhow::anyhow!( + "Hash table grow did not execute successfully" + )); + }; + + self.ensure_grow_outcome_validity(&outcome).await?; + self.ensure_agent_open_db().await?; + } + + self.reconcile_hashtable_growth(old_hashtable_buckets, new_hashtable_buckets) + .await?; + self.ensure_snapshot_validity().await?; + Ok(()) + } + + async fn ensure_grow_outcome_validity( + &mut self, + outcome: &crate::message::Outcome, + ) -> Result<()> { + match outcome { + crate::message::Outcome::Success => { + if self.enabled_enospc { + return Err(anyhow::anyhow!( + "Hash table grow should have failed with ENOSPC" + )); + } + } + crate::message::Outcome::StorageFull => { + if !self.enabled_enospc { + return Err(anyhow::anyhow!("Hash table grow should have succeeded")); + } + + self.enabled_enospc = false; + self.trick_handle + .as_ref() + .unwrap() + .set_trigger_enospc(false); + } + crate::message::Outcome::UnknownFailure(err) => { + return Err(anyhow::anyhow!( + "Hash table grow failed due to unknown reasons: {}", + err + )); + } + } + Ok(()) + } + + async fn reconcile_hashtable_growth( + &mut self, + old_hashtable_buckets: u32, + new_hashtable_buckets: u32, + ) -> anyhow::Result<()> { + let utilization = self.rr().send_query_hash_table_utilization().await?; + if utilization.capacity == new_hashtable_buckets as usize { + self.config.hashtable_buckets = new_hashtable_buckets; + } else if utilization.capacity == old_hashtable_buckets as usize { + self.config.hashtable_buckets = old_hashtable_buckets; + } else { + return Err(anyhow::anyhow!( + "Unexpected hash table capacity after grow: old={}, requested={}, found={}", + old_hashtable_buckets, + new_hashtable_buckets, + utilization.capacity, + )); + } + + trace!( + "hash table utilization after grow: {}/{}", + utilization.occupied, + utilization.capacity + ); + Ok(()) + } + + fn next_hashtable_bucket_count(&mut self) -> Option { + let current = self.config.hashtable_buckets; + let max = self.config.max_hashtable_buckets; + if current == 0 || current >= max { + return None; + } + + let remaining = max - current; + let max_delta = (current / 8).max(1).min(remaining); + let delta = self.rng.random_range(1..=max_delta); + Some(current + delta) + } + async fn wait_for_crash(&mut self) -> anyhow::Result<()> { // Wait until the agent dies. We give it a grace period of 5 seconds. //