From e81e91b32dbdd64d2426a3bf7a61af45cbe6d871 Mon Sep 17 00:00:00 2001 From: Robert M1 <50460704+githubrobbi@users.noreply.github.com> Date: Fri, 26 Jun 2026 10:07:13 -0700 Subject: [PATCH] fix(usn): coalesce a backlog of apply ticks into one body rebuild MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each USN apply does a full O(n) index rebuild (children CSR + trigram + ext index, ~600ms on a ~4M-record drive). When apply ticks fire faster than that drains — e.g. a small UFFS_USN_APPLY_INTERVAL_MS on a huge, busy volume — the applier channel backs up and the pipeline latency creeps to seconds, so a freshly renamed/deleted file can read stale until the backlog clears. (Surfaced by the verify harness pinning a 500ms apply interval, below the rebuild cost, on a 3.76M-record C:.) The applier now coalesces a run of consecutive same-letter Apply messages into a single rebuild (coalesce_apply_run): it greedily drains queued same-letter applies via try_recv, appends their changes in FIFO order (create -> delete -> reuse sequences still apply correctly), and stops at the first different-letter Apply / Save / Wrap, carrying it forward intact (Save keeps its cursor + persistence semantics, never merged). This bounds the apply rate to the rebuild rate regardless of the interval — the system self-throttles instead of piling up rebuilds. This is the cheap robustness guard; the real fix for the per-apply O(n) cost is incremental (base + delta + tombstone) index maintenance, tracked separately. scripts/windows/usn-verify.rs: raise POLL_SETTLE 3s -> 6s and the apply pin 500 -> 1500ms (above the rebuild cost) so the test is deterministic on a busy multi-million-record drive. Co-Authored-By: Claude Opus 4.8 --- crates/uffs-daemon/src/cache/journal_sink.rs | 61 +++++++++- .../src/cache/journal_sink/tests.rs | 112 ++++++++++++++++++ scripts/windows/usn-verify.rs | 34 +++--- 3 files changed, 191 insertions(+), 16 deletions(-) diff --git a/crates/uffs-daemon/src/cache/journal_sink.rs b/crates/uffs-daemon/src/cache/journal_sink.rs index 86a078781..c3163399d 100644 --- a/crates/uffs-daemon/src/cache/journal_sink.rs +++ b/crates/uffs-daemon/src/cache/journal_sink.rs @@ -370,7 +370,18 @@ async fn applier_task( idx_weak: Weak, cursor_store: Arc, ) { - while let Some(msg) = rx.recv().await { + // A message pulled off the channel during apply-coalescing that did not + // belong to the merged run (different letter, or a Save/Wrap); processed + // on the next iteration before blocking on `recv`. + let mut carried: Option = None; + loop { + let msg = match carried.take() { + Some(held) => held, + None => match rx.recv().await { + Some(received) => received, + None => break, + }, + }; let Some(idx_strong) = idx_weak.upgrade() else { tracing::debug!( target: "shard.journal", @@ -378,7 +389,25 @@ async fn applier_task( ); return; }; - dispatch_msg(&idx_strong, cursor_store.as_ref(), msg).await; + // Coalesce a run of consecutive same-letter `Apply` messages into a + // single body patch. When apply ticks fire faster than the O(n) + // rebuild drains them (e.g. a tiny `UFFS_USN_APPLY_INTERVAL_MS` on a + // huge, busy drive), the channel backs up with many small applies; + // merging collapses N rebuilds into one and bounds the apply rate to + // the rebuild rate. No change is dropped — FIFO order is preserved by + // appending in receive order. Save / Wrap are never merged (they + // carry their own cursor / reload semantics) and fall through. + let to_dispatch = if let ApplyMsg::Apply { letter, changes } = msg { + let (merged, leftover) = coalesce_apply_run(letter, changes, &mut rx); + carried = leftover; + ApplyMsg::Apply { + letter, + changes: merged, + } + } else { + msg + }; + dispatch_msg(&idx_strong, cursor_store.as_ref(), to_dispatch).await; } tracing::debug!( target: "shard.journal", @@ -386,6 +415,34 @@ async fn applier_task( ); } +/// Greedily merge the run of consecutive same-`letter` [`ApplyMsg::Apply`] +/// messages currently queued behind a first apply into one change vector, +/// collapsing a backlog of apply ticks into a single body rebuild. +/// +/// Drains via non-blocking [`mpsc::UnboundedReceiver::try_recv`], appending +/// each same-letter batch in receive order (FIFO preserved, so create → +/// delete → create-into-reused-FRS sequences still apply correctly). Stops +/// at the first message that is **not** a same-letter `Apply` — a different +/// letter, or a `Save` / `Wrap` — and returns it as the "carried" leftover +/// for the caller to process next, so no message is reordered past a Save or +/// dropped. +fn coalesce_apply_run( + letter: uffs_mft::platform::DriveLetter, + mut changes: Vec, + rx: &mut mpsc::UnboundedReceiver, +) -> (Vec, Option) { + loop { + match rx.try_recv() { + Ok(ApplyMsg::Apply { + letter: next_letter, + changes: more, + }) if next_letter == letter => changes.extend(more), + Ok(other) => return (changes, Some(other)), + Err(_) => return (changes, None), + } + } +} + /// Dispatch a single drained [`ApplyMsg`] to the appropriate /// [`IndexManager`] entry point. /// diff --git a/crates/uffs-daemon/src/cache/journal_sink/tests.rs b/crates/uffs-daemon/src/cache/journal_sink/tests.rs index cfdd3409e..e3c9e27fe 100644 --- a/crates/uffs-daemon/src/cache/journal_sink/tests.rs +++ b/crates/uffs-daemon/src/cache/journal_sink/tests.rs @@ -566,3 +566,115 @@ async fn no_warm_shard_save_does_not_persist_cursor() { cursor_store.store_log(), ); } + +/// Pin: `coalesce_apply_run` merges a run of consecutive same-letter +/// `Apply` messages into one change vector, in FIFO order, draining the +/// channel — collapsing an apply backlog into a single rebuild. +#[tokio::test] +async fn coalesce_merges_consecutive_same_letter_applies() { + let (tx, mut rx) = mpsc::unbounded_channel::(); + // The "first" apply (10) is passed by value; (11), (12) are queued. + tx.send(ApplyMsg::Apply { + letter: uffs_mft::platform::DriveLetter::C, + changes: vec![make_change(11)], + }) + .unwrap(); + tx.send(ApplyMsg::Apply { + letter: uffs_mft::platform::DriveLetter::C, + changes: vec![make_change(12)], + }) + .unwrap(); + + let (merged, leftover) = coalesce_apply_run( + uffs_mft::platform::DriveLetter::C, + vec![make_change(10)], + &mut rx, + ); + + assert_eq!( + merged + .iter() + .map(|change| change.frs.raw()) + .collect::>(), + [10, 11, 12], + "merged changes must preserve FIFO order across the coalesced run", + ); + assert!( + leftover.is_none(), + "channel fully drained → no carried message" + ); +} + +/// Pin: coalescing stops at a different-letter `Apply` and returns it as +/// the carried leftover (never merged into the wrong letter's batch). +#[tokio::test] +async fn coalesce_stops_at_other_letter() { + let (tx, mut rx) = mpsc::unbounded_channel::(); + tx.send(ApplyMsg::Apply { + letter: uffs_mft::platform::DriveLetter::C, + changes: vec![make_change(2)], + }) + .unwrap(); + tx.send(ApplyMsg::Apply { + letter: uffs_mft::platform::DriveLetter::D, + changes: vec![make_change(9)], + }) + .unwrap(); + + let (merged, leftover) = coalesce_apply_run( + uffs_mft::platform::DriveLetter::C, + vec![make_change(1)], + &mut rx, + ); + + assert_eq!( + merged + .iter() + .map(|change| change.frs.raw()) + .collect::>(), + [1, 2] + ); + let Some(ApplyMsg::Apply { letter, changes }) = leftover else { + panic!("expected carried Apply for 'D'"); + }; + assert_eq!(letter, uffs_mft::platform::DriveLetter::D); + assert_eq!( + changes + .iter() + .map(|change| change.frs.raw()) + .collect::>(), + [9] + ); +} + +/// Pin: a `Save` is never merged into an apply run — it carries cursor / +/// persistence semantics, so coalescing stops and returns it intact. +#[tokio::test] +async fn coalesce_does_not_swallow_save() { + let (tx, mut rx) = mpsc::unbounded_channel::(); + tx.send(ApplyMsg::Save { + letter: uffs_mft::platform::DriveLetter::C, + reason: SaveReason::EventsExceeded, + changes: vec![make_change(5)], + cursor: 42, + }) + .unwrap(); + + let (merged, leftover) = coalesce_apply_run( + uffs_mft::platform::DriveLetter::C, + vec![make_change(1)], + &mut rx, + ); + + assert_eq!( + merged + .iter() + .map(|change| change.frs.raw()) + .collect::>(), + [1] + ); + assert!( + matches!(leftover, Some(ApplyMsg::Save { cursor: 42, .. })), + "Save must be carried out intact, not merged into the apply batch", + ); +} diff --git a/scripts/windows/usn-verify.rs b/scripts/windows/usn-verify.rs index c9c2ee746..406616807 100644 --- a/scripts/windows/usn-verify.rs +++ b/scripts/windows/usn-verify.rs @@ -52,20 +52,26 @@ use std::time::Duration; use anyhow::{Context, Result, bail}; -/// Time to let the per-shard USN loop ingest a batch and patch the live -/// body. With [`APPLY_INTERVAL_MS`] pinned to 500 ms below, the apply -/// tick fires on essentially the first poll that sees the new events, so -/// 3 s is a comfortable margin (the body is searchable well under 1 s -/// after the file op in practice). No 5-minute disk-save wait is needed -/// — the apply tick is decoupled from the rare compact-cache save. -const POLL_SETTLE: Duration = Duration::from_secs(3); -/// Apply-cadence override (ms) for the test daemon — pins -/// `UFFS_USN_APPLY_INTERVAL_MS` low so the near-live body patch fires -/// promptly and deterministically within [`POLL_SETTLE`]. The -/// production default is 30 s (tuned so constant FS churn stays -/// background noise); the harness pins it to 500 ms so the short -/// create / rename / delete rounds don't have to wait that out. -const APPLY_INTERVAL_MS: &str = "500"; +/// Time to let the full USN pipeline settle before searching: journal +/// poll → buffer → apply tick → O(n) index rebuild → body swap. On a +/// large, busy volume (a multi-million-record C:) that end-to-end latency +/// is a couple of seconds, not sub-second — the per-apply rebuild alone is +/// ~600 ms on a ~4M-record drive. 6 s gives a comfortable margin so a +/// search never races an in-flight apply (a 3 s settle sat right on the +/// edge and intermittently read pre-rename/-delete state). No 5-minute +/// disk-save wait is needed — the apply tick is decoupled from the rare +/// compact-cache save. +const POLL_SETTLE: Duration = Duration::from_secs(6); +/// Apply-cadence override (ms) for the test daemon (`UFFS_USN_APPLY_INTERVAL_MS`). +/// +/// The production default is 30 s (tuned so constant FS churn stays +/// background noise); the harness pins it lower so the short create / +/// rename / delete rounds don't wait that out. Kept **above** the +/// per-apply rebuild cost (~600 ms on a multi-million-record drive) so +/// apply ticks don't outrun the rebuild and pile up — the daemon also +/// coalesces a backlog of apply ticks into one rebuild, but giving the +/// interval real headroom keeps the test deterministic regardless. +const APPLY_INTERVAL_MS: &str = "1500"; /// Settle time after `--daemon stop` so the socket / PID file clear. const KILL_SETTLE: Duration = Duration::from_secs(2); /// Tracing directive: per-change USN trace + daemon-side debug (backfill,