Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions crates/uffs-daemon/src/cache/journal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,22 +370,79 @@ async fn applier_task(
idx_weak: Weak<IndexManager>,
cursor_store: Arc<dyn CursorStore>,
) {
while let Some(msg) = rx.recv().await {
// A message pulled off the channel during apply-coalescing that did not
// belong to the merged run (different letter, or a Save/Wrap); processed
// on the next iteration before blocking on `recv`.
let mut carried: Option<ApplyMsg> = None;
loop {
let msg = match carried.take() {
Some(held) => held,
None => match rx.recv().await {
Some(received) => received,
None => break,
},
};
let Some(idx_strong) = idx_weak.upgrade() else {
tracing::debug!(
target: "shard.journal",
"IndexManager dropped; exiting applier task",
);
return;
};
dispatch_msg(&idx_strong, cursor_store.as_ref(), msg).await;
// Coalesce a run of consecutive same-letter `Apply` messages into a
// single body patch. When apply ticks fire faster than the O(n)
// rebuild drains them (e.g. a tiny `UFFS_USN_APPLY_INTERVAL_MS` on a
// huge, busy drive), the channel backs up with many small applies;
// merging collapses N rebuilds into one and bounds the apply rate to
// the rebuild rate. No change is dropped — FIFO order is preserved by
// appending in receive order. Save / Wrap are never merged (they
// carry their own cursor / reload semantics) and fall through.
let to_dispatch = if let ApplyMsg::Apply { letter, changes } = msg {
let (merged, leftover) = coalesce_apply_run(letter, changes, &mut rx);
carried = leftover;
ApplyMsg::Apply {
letter,
changes: merged,
}
} else {
msg
};
dispatch_msg(&idx_strong, cursor_store.as_ref(), to_dispatch).await;
}
tracing::debug!(
target: "shard.journal",
"Sink channel closed; applier task exiting",
);
}

/// Greedily merge the run of consecutive same-`letter` [`ApplyMsg::Apply`]
/// messages currently queued behind a first apply into one change vector,
/// collapsing a backlog of apply ticks into a single body rebuild.
///
/// Drains via non-blocking [`mpsc::UnboundedReceiver::try_recv`], appending
/// each same-letter batch in receive order (FIFO preserved, so create →
/// delete → create-into-reused-FRS sequences still apply correctly). Stops
/// at the first message that is **not** a same-letter `Apply` — a different
/// letter, or a `Save` / `Wrap` — and returns it as the "carried" leftover
/// for the caller to process next, so no message is reordered past a Save or
/// dropped.
fn coalesce_apply_run(
letter: uffs_mft::platform::DriveLetter,
mut changes: Vec<FileChange>,
rx: &mut mpsc::UnboundedReceiver<ApplyMsg>,
) -> (Vec<FileChange>, Option<ApplyMsg>) {
loop {
match rx.try_recv() {
Ok(ApplyMsg::Apply {
letter: next_letter,
changes: more,
}) if next_letter == letter => changes.extend(more),
Ok(other) => return (changes, Some(other)),
Err(_) => return (changes, None),
}
}
}

/// Dispatch a single drained [`ApplyMsg`] to the appropriate
/// [`IndexManager`] entry point.
///
Expand Down
112 changes: 112 additions & 0 deletions crates/uffs-daemon/src/cache/journal_sink/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,115 @@ async fn no_warm_shard_save_does_not_persist_cursor() {
cursor_store.store_log(),
);
}

/// Pin: `coalesce_apply_run` merges a run of consecutive same-letter
/// `Apply` messages into one change vector, in FIFO order, draining the
/// channel — collapsing an apply backlog into a single rebuild.
#[tokio::test]
async fn coalesce_merges_consecutive_same_letter_applies() {
let (tx, mut rx) = mpsc::unbounded_channel::<ApplyMsg>();
// The "first" apply (10) is passed by value; (11), (12) are queued.
tx.send(ApplyMsg::Apply {
letter: uffs_mft::platform::DriveLetter::C,
changes: vec![make_change(11)],
})
.unwrap();
tx.send(ApplyMsg::Apply {
letter: uffs_mft::platform::DriveLetter::C,
changes: vec![make_change(12)],
})
.unwrap();

let (merged, leftover) = coalesce_apply_run(
uffs_mft::platform::DriveLetter::C,
vec![make_change(10)],
&mut rx,
);

assert_eq!(
merged
.iter()
.map(|change| change.frs.raw())
.collect::<Vec<_>>(),
[10, 11, 12],
"merged changes must preserve FIFO order across the coalesced run",
);
assert!(
leftover.is_none(),
"channel fully drained → no carried message"
);
}

/// Pin: coalescing stops at a different-letter `Apply` and returns it as
/// the carried leftover (never merged into the wrong letter's batch).
#[tokio::test]
async fn coalesce_stops_at_other_letter() {
let (tx, mut rx) = mpsc::unbounded_channel::<ApplyMsg>();
tx.send(ApplyMsg::Apply {
letter: uffs_mft::platform::DriveLetter::C,
changes: vec![make_change(2)],
})
.unwrap();
tx.send(ApplyMsg::Apply {
letter: uffs_mft::platform::DriveLetter::D,
changes: vec![make_change(9)],
})
.unwrap();

let (merged, leftover) = coalesce_apply_run(
uffs_mft::platform::DriveLetter::C,
vec![make_change(1)],
&mut rx,
);

assert_eq!(
merged
.iter()
.map(|change| change.frs.raw())
.collect::<Vec<_>>(),
[1, 2]
);
let Some(ApplyMsg::Apply { letter, changes }) = leftover else {
panic!("expected carried Apply for 'D'");
};
assert_eq!(letter, uffs_mft::platform::DriveLetter::D);
assert_eq!(
changes
.iter()
.map(|change| change.frs.raw())
.collect::<Vec<_>>(),
[9]
);
}

/// Pin: a `Save` is never merged into an apply run — it carries cursor /
/// persistence semantics, so coalescing stops and returns it intact.
#[tokio::test]
async fn coalesce_does_not_swallow_save() {
let (tx, mut rx) = mpsc::unbounded_channel::<ApplyMsg>();
tx.send(ApplyMsg::Save {
letter: uffs_mft::platform::DriveLetter::C,
reason: SaveReason::EventsExceeded,
changes: vec![make_change(5)],
cursor: 42,
})
.unwrap();

let (merged, leftover) = coalesce_apply_run(
uffs_mft::platform::DriveLetter::C,
vec![make_change(1)],
&mut rx,
);

assert_eq!(
merged
.iter()
.map(|change| change.frs.raw())
.collect::<Vec<_>>(),
[1]
);
assert!(
matches!(leftover, Some(ApplyMsg::Save { cursor: 42, .. })),
"Save must be carried out intact, not merged into the apply batch",
);
}
34 changes: 20 additions & 14 deletions scripts/windows/usn-verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,26 @@ use std::time::Duration;

use anyhow::{Context, Result, bail};

/// Time to let the per-shard USN loop ingest a batch and patch the live
/// body. With [`APPLY_INTERVAL_MS`] pinned to 500 ms below, the apply
/// tick fires on essentially the first poll that sees the new events, so
/// 3 s is a comfortable margin (the body is searchable well under 1 s
/// after the file op in practice). No 5-minute disk-save wait is needed
/// — the apply tick is decoupled from the rare compact-cache save.
const POLL_SETTLE: Duration = Duration::from_secs(3);
/// Apply-cadence override (ms) for the test daemon — pins
/// `UFFS_USN_APPLY_INTERVAL_MS` low so the near-live body patch fires
/// promptly and deterministically within [`POLL_SETTLE`]. The
/// production default is 30 s (tuned so constant FS churn stays
/// background noise); the harness pins it to 500 ms so the short
/// create / rename / delete rounds don't have to wait that out.
const APPLY_INTERVAL_MS: &str = "500";
/// Time to let the full USN pipeline settle before searching: journal
/// poll → buffer → apply tick → O(n) index rebuild → body swap. On a
/// large, busy volume (a multi-million-record C:) that end-to-end latency
/// is a couple of seconds, not sub-second — the per-apply rebuild alone is
/// ~600 ms on a ~4M-record drive. 6 s gives a comfortable margin so a
/// search never races an in-flight apply (a 3 s settle sat right on the
/// edge and intermittently read pre-rename/-delete state). No 5-minute
/// disk-save wait is needed — the apply tick is decoupled from the rare
/// compact-cache save.
const POLL_SETTLE: Duration = Duration::from_secs(6);
/// Apply-cadence override (ms) for the test daemon (`UFFS_USN_APPLY_INTERVAL_MS`).
///
/// The production default is 30 s (tuned so constant FS churn stays
/// background noise); the harness pins it lower so the short create /
/// rename / delete rounds don't wait that out. Kept **above** the
/// per-apply rebuild cost (~600 ms on a multi-million-record drive) so
/// apply ticks don't outrun the rebuild and pile up — the daemon also
/// coalesces a backlog of apply ticks into one rebuild, but giving the
/// interval real headroom keeps the test deterministic regardless.
const APPLY_INTERVAL_MS: &str = "1500";
/// Settle time after `--daemon stop` so the socket / PID file clear.
const KILL_SETTLE: Duration = Duration::from_secs(2);
/// Tracing directive: per-change USN trace + daemon-side debug (backfill,
Expand Down
Loading