diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 3d79f7f1e28..af04ad918bb 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -8,7 +8,7 @@ use spacetimedb_fs_utils::compression::Zstd; pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats}; use crate::{ - commit::Commit, + commit::{self, Commit}, error, index::{IndexFile, IndexFileMut}, segment::{self, FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, @@ -375,16 +375,24 @@ pub fn resume_segment_writer( if reader.sealed() { Ok(ResumedSegment::Sealed(meta)) } else { - let Metadata { - header: _, - tx_range, - size_in_bytes, - max_epoch, - max_commit_offset: _, - max_commit: _, - } = meta; let mut writer = repo.open_segment_writer(offset)?; + // Ensure that the segment's size is exactly what we determined. + // + // When `Metadata` encounters EOF, it could be that there actually are + // trailing bytes in the segment, but less than the commit header + // length. This is difficult to detect due to the use of `read_exact`, + // so ensure we remove any trailing bytes. + // + // To be extra cautious, check that no more than the header length + // bytes are left over before truncating the segment. This is an assert + // because it would be a bug in `Metadata::extract`. + assert!( + writer.segment_len()? < meta.size_in_bytes + commit::Header::LEN as u64, + "{repo}: trailing bytes exceed commit header length in segment {offset}" + ); + writer.ftruncate(meta.tx_range.end, meta.size_in_bytes)?; // Ensure we have enough space for this segment. + // // The segment could have been created without the `fallocate` feature // enabled, so we call this here again to ensure writes can't fail due // to ENOSPC. @@ -394,15 +402,15 @@ pub fn resume_segment_writer( Ok(ResumedSegment::Resumed(Writer { commit: Commit { - min_tx_offset: tx_range.end, + min_tx_offset: meta.tx_range.end, n: 0, records: Vec::new(), - epoch: max_epoch, + epoch: meta.max_epoch, }, inner: io::BufWriter::new(writer), - min_tx_offset: tx_range.start, - bytes_written: size_in_bytes, + min_tx_offset: meta.tx_range.start, + bytes_written: meta.size_in_bytes, offset_index_head: create_offset_index_writer(repo, offset, opts), })) diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index 752ab885836..ff149443c13 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -1,7 +1,9 @@ +use std::io::Write; + use log::info; -use spacetimedb_commitlog::repo::Repo; +use spacetimedb_commitlog::repo::{Repo, SegmentLen}; use spacetimedb_commitlog::tests::helpers::enable_logging; -use spacetimedb_commitlog::{commitlog, payload, repo, Commitlog, Options}; +use spacetimedb_commitlog::{commit, commitlog, payload, repo, Commitlog, Options}; use spacetimedb_paths::server::CommitLogDir; use spacetimedb_paths::FromPathUnchecked; use tempfile::tempdir; @@ -196,3 +198,60 @@ fn resume_empty_segment() { } } } + +/// Tests that resuming a segment that has trailing bytes smaller than a +/// commitlog header causes those trailing bytes to be removed. +/// +/// Regression test for https://github.com/clockworklabs/SpacetimeDB/pull/5116 +#[test] +fn resume_small_trailing_garbage() { + enable_logging(); + + let root = tempdir().unwrap(); + let path = CommitLogDir::from_path_unchecked(root.path()); + + let repo = repo::Fs::new(path, None).unwrap(); + // Write some data. + { + let mut clog = commitlog::Generic::open(&repo, <_>::default()).unwrap(); + for (i, payload) in compressible_payloads().take(1024).enumerate() { + clog.commit([(i as u64, payload)]).unwrap(); + clog.flush().unwrap(); + clog.sync(); + } + } + + // Add some extra bytes, less than the commit header length. + let last_segment_size = { + let segments = repo.existing_offsets().unwrap(); + let mut last_segment = repo.open_segment_writer(segments.last().copied().unwrap()).unwrap(); + last_segment.write_all(&[67u8; commit::Header::LEN - 1]).unwrap(); + last_segment.flush().unwrap(); + last_segment.sync_all().unwrap(); + last_segment.segment_len().unwrap() + }; + { + let mut clog = commitlog::Generic::open(&repo, <_>::default()).unwrap(); + + // The extra bytes should have been truncated away. + let segments = repo.existing_offsets().unwrap(); + let mut last_segment = repo.open_segment_writer(segments.last().copied().unwrap()).unwrap(); + assert_eq!( + last_segment.segment_len().unwrap(), + last_segment_size - (commit::Header::LEN - 1) as u64 + ); + + // Add some more data. + for (i, payload) in compressible_payloads() + .take(1024) + .enumerate() + .map(|(offset, payload)| (offset + 1024, payload)) + { + clog.commit([(i as u64, payload)]).unwrap(); + clog.flush().unwrap(); + clog.sync(); + } + + assert_eq!(2048, clog.commits_from(0).map(Result::unwrap).count()); + } +}