Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use spacetimedb_fs_utils::compression::Zstd;
pub use spacetimedb_fs_utils::compression::{CompressOnce, CompressionStats};

use crate::{
commit::Commit,
commit::{self, Commit},
error,
index::{IndexFile, IndexFileMut},
segment::{self, FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer},
Expand Down Expand Up @@ -375,16 +375,24 @@ pub fn resume_segment_writer<R: Repo>(
if reader.sealed() {
Ok(ResumedSegment::Sealed(meta))
} else {
let Metadata {
header: _,
tx_range,
size_in_bytes,
max_epoch,
max_commit_offset: _,
max_commit: _,
} = meta;
let mut writer = repo.open_segment_writer(offset)?;
// Ensure that the segment's size is exactly what we determined.
//
// When `Metadata` encounters EOF, it could be that there actually are
// trailing bytes in the segment, but less than the commit header
// length. This is difficult to detect due to the use of `read_exact`,
// so ensure we remove any trailing bytes.
//
// To be extra cautious, check that no more than the header length
// bytes are left over before truncating the segment. This is an assert
// because it would be a bug in `Metadata::extract`.
assert!(
writer.segment_len()? < meta.size_in_bytes + commit::Header::LEN as u64,
"{repo}: trailing bytes exceed commit header length in segment {offset}"
);
writer.ftruncate(meta.tx_range.end, meta.size_in_bytes)?;
// Ensure we have enough space for this segment.
//
// The segment could have been created without the `fallocate` feature
// enabled, so we call this here again to ensure writes can't fail due
// to ENOSPC.
Expand All @@ -394,15 +402,15 @@ pub fn resume_segment_writer<R: Repo>(

Ok(ResumedSegment::Resumed(Writer {
commit: Commit {
min_tx_offset: tx_range.end,
min_tx_offset: meta.tx_range.end,
n: 0,
records: Vec::new(),
epoch: max_epoch,
epoch: meta.max_epoch,
},
inner: io::BufWriter::new(writer),

min_tx_offset: tx_range.start,
bytes_written: size_in_bytes,
min_tx_offset: meta.tx_range.start,
bytes_written: meta.size_in_bytes,

offset_index_head: create_offset_index_writer(repo, offset, opts),
}))
Expand Down
63 changes: 61 additions & 2 deletions crates/commitlog/tests/random_payload/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::io::Write;

use log::info;
use spacetimedb_commitlog::repo::Repo;
use spacetimedb_commitlog::repo::{Repo, SegmentLen};
use spacetimedb_commitlog::tests::helpers::enable_logging;
use spacetimedb_commitlog::{commitlog, payload, repo, Commitlog, Options};
use spacetimedb_commitlog::{commit, commitlog, payload, repo, Commitlog, Options};
use spacetimedb_paths::server::CommitLogDir;
use spacetimedb_paths::FromPathUnchecked;
use tempfile::tempdir;
Expand Down Expand Up @@ -196,3 +198,60 @@ fn resume_empty_segment() {
}
}
}

/// Tests that resuming a segment that has trailing bytes smaller than a
/// commitlog header causes those trailing bytes to be removed.
///
/// Regression test for https://github.com/clockworklabs/SpacetimeDB/pull/5116
#[test]
fn resume_small_trailing_garbage() {
enable_logging();

let root = tempdir().unwrap();
let path = CommitLogDir::from_path_unchecked(root.path());

let repo = repo::Fs::new(path, None).unwrap();
// Write some data.
{
let mut clog = commitlog::Generic::open(&repo, <_>::default()).unwrap();
for (i, payload) in compressible_payloads().take(1024).enumerate() {
clog.commit([(i as u64, payload)]).unwrap();
clog.flush().unwrap();
clog.sync();
}
}

// Add some extra bytes, less than the commit header length.
let last_segment_size = {
let segments = repo.existing_offsets().unwrap();
let mut last_segment = repo.open_segment_writer(segments.last().copied().unwrap()).unwrap();
last_segment.write_all(&[67u8; commit::Header::LEN - 1]).unwrap();
last_segment.flush().unwrap();
last_segment.sync_all().unwrap();
last_segment.segment_len().unwrap()
};
{
let mut clog = commitlog::Generic::open(&repo, <_>::default()).unwrap();

// The extra bytes should have been truncated away.
let segments = repo.existing_offsets().unwrap();
let mut last_segment = repo.open_segment_writer(segments.last().copied().unwrap()).unwrap();
assert_eq!(
last_segment.segment_len().unwrap(),
last_segment_size - (commit::Header::LEN - 1) as u64
);

// Add some more data.
for (i, payload) in compressible_payloads()
.take(1024)
.enumerate()
.map(|(offset, payload)| (offset + 1024, payload))
{
clog.commit([(i as u64, payload)]).unwrap();
clog.flush().unwrap();
clog.sync();
}

assert_eq!(2048, clog.commits_from(0).map(Result::unwrap).count());
}
}
Loading