Track "RSS" vs MemoryPool suring SLTs to prevent OOMs#22626
Open
avantgardnerio wants to merge 9 commits into
Open
Track "RSS" vs MemoryPool suring SLTs to prevent OOMs#22626avantgardnerio wants to merge 9 commits into
avantgardnerio wants to merge 9 commits into
Conversation
Introduces a minimal `AccountingAllocator<A: GlobalAlloc>` that forwards every alloc/realloc/dealloc unchanged to an inner allocator (default `System`) and updates a thread-local `isize` byte counter on each call. Reading the counter from another thread is allowed but only reflects that thread's view — sum across threads for a global picture. Gated by the new `memory-accounting` feature on `datafusion-sqllogictest`. With the feature off the module is not compiled and the binary's `#[global_allocator]` declaration is absent, so default builds are completely unaffected. With the feature on the slt binary installs the wrapper as its global allocator. A unit test verifies the counter moves on a single-thread alloc/free pair. Future commits will add the sampler and assertion plumbing on top of this counter.
Extends AccountingAllocator from a per-thread counter into a budget model:
* Allocations debit, deallocations credit.
* Each thread maintains a local balance; when its magnitude crosses
SETTLE_THRESHOLD (64 KB) it atomically transfers into a global bank
and zeros out. Amortizes the atomic over ~thousands of allocs.
* init_bank(budget) preloads the bank with a starting balance and clears
the overdraft flag.
* bank_balance() reads the bank with one relaxed load. Negative = the
program has allocated more than init_bank seeded.
* is_overdrawn() is a sticky flag set by the settling thread at the
moment its fetch_add drops the bank past zero. Edge-triggered, so
callers don't need a separate sampler to detect transitions.
* clear_overdraft() resets the flag without touching the bank.
* settle_thread_local() force-flushes the current thread's local into
the bank — useful when a test wants an exact snapshot.
Hot-path cost is one branch + thread-local update; the atomic settle
fires roughly once per SETTLE_THRESHOLD bytes of net change per thread.
On a contended workload the bank's fetch_add runs ~1000 times/sec total
across all cores — effectively free compared to a per-op atomic that
would cost 5-10% of total runtime on hashagg-heavy code.
Four unit tests cover the new surface: debit/credit, init, overdraft
edge detection, and auto-settle on threshold crossing.
Adds the enforcement layer to the accounting allocator. Until now an overdraft only set a sticky flag; the program kept running. This commit gives the allocator the authority to panic the in-flight query work that caused the overdraft -- but only work that has explicitly opted in. The opt-in is a `kill_on_overdraft(future).await` scope built on `tokio::task_local!`. While `f` is being polled, the very next debit that observes the bank below zero panics with a typed `OverdraftPanic` payload. Drops on the unwind do not re-panic because the same task-local holds a `Cell<bool>` re-entry guard that flips to `true` the moment we decide to panic. Crucially, allocations on threads NOT inside such a scope (tokio reactor, hyper, anything system) are untouched -- `KILL_GUARD.try_with(...)` returns `Err` outside the scope and the alloc fast path skips the kill check entirely. So the same worker thread can run a datafusion query (killable) and a system task (exempt) back-to-back without leaking state, because the marker lives on the future, not the thread. Why a task_local rather than a thread_local + RAII guard: futures can migrate between worker threads across `.await` boundaries. A thread_local guard would mark one thread and clean up a different one, leaking the mark. The task_local moves with the future and is unwound by tokio's own machinery on completion / panic / cancellation -- no manual cleanup is even possible. Why the panic check fires only on debits: dealloc runs from Drop during unwinding; a panic in Drop double-faults and aborts the process. Credit operations therefore skip the kill check entirely -- they can't drive the bank negative anyway. Two new tests cover the enforcement surface: a wrapped future that overdraws and is caught downcasting the OverdraftPanic payload, and an identical workload outside the scope that overdraws without panicking. All six accounting tests now serialize on a shared mutex since they share the global bank.
Stores INITIAL_BUDGET alongside the bank on init_bank() so reporters can express usage as a positive "bytes allocated since the budget was set" number rather than a deeply-negative bank balance. Needed by the upcoming SLT integration commit: when a query is killed by OverdraftPanic, the handler logs "actual N MB / supposed M MB" where "actual" comes from bytes_used() and "supposed" comes from DataFusion's voluntary MemoryPool.reserved(). The delta is the accounting bug.
The per-statement reset in run_one was doing let budget = bank_balance().max(memory_tracker_limit() as isize); init_bank(budget); which was clever-for-the-wrong-reasons: it tried to pick the larger of the current drifted bank and the configured tracker limit, conflating "reset" with "init" with "configure". And the post-panic reset path used init_bank(memory_tracker_limit() as isize) which dropped the +25% killer overhead each time the bank reset (so the first overdraft permanently shrank the budget for every subsequent statement in the file). Replace both with reset_bank(), which restores BANK to whatever value was passed to init_bank() and clears the overdraft flag. INITIAL_BUDGET remembers that value across the whole run, so per-statement resets land on the same number main set during startup.
The kill_on_overdraft_panics_inside_scope test holds the SERIAL Mutex guard across `.await` to serialize against the shared global bank. Clippy's await-holding-lock lint flags this since in general holding a sync Mutex across await can deadlock other tokio tasks, but here the test uses `flavor = "current_thread"` so there's no concurrent task to block, and the guard exists precisely to keep other accounting tests from concurrently mutating the bank for the duration of the body. Annotate with #[expect(...)] (with a reason) rather than #[allow(...)] to satisfy clippy::allow_attributes; if the lint stops firing in the future the expect will turn into an error and we'll revisit.
Old shape inverted DataFusion's intended `with_memory_limit(max, fraction)`
contract:
--memory-tracker-mb 1024
--memory-killer-overhead-pct 25
-> DF pool sized to 1024 MB
-> bank set to 1280 MB
That pretended DF gets 100% of some smaller pie and bolted "overhead"
onto the top, instead of starting from the actual total budget. With
the allocator bank tracking ALL process allocations (DF, hyper, tokio,
gRPC, gossip, etc.), the natural framing is:
--total-memory-mb 1024
--datafusion-memory-fraction 0.6
-> bank set to 1024 MB (the whole pie)
-> DF pool sized to 614 MB (its share)
-> remaining 410 MB is non-DF budget
The bank trips when total RSS exceeds the OS-visible limit, regardless
of which consumer caused the drift. The DF pool trips when DF itself
exceeds its voluntary share. Both reads are meaningful and the relation
between them is now explicit instead of fudged.
CLI:
--total-memory-mb (was --memory-tracker-mb)
--datafusion-memory-fraction f64 default 0.6 (replaces --memory-killer-overhead-pct)
Startup log now prints all three numbers so it's obvious what each
knob is doing:
memory-accounting on: total=1024 MB, datafusion=614 MB (fraction 0.60),
non-datafusion budget=409 MB
Wires the new feature into the existing full-corpus SLT run inside
the `verify-benchmark-results` job (which despite its name runs every
.slt file with `INCLUDE_TPCH=true`). Budget is intentionally loose:
16 GB total, 60% to DataFusion, ~6.4 GB to non-DF consumers. The
existing test corpus peaks well under 1 GB, so nothing should fail.
What this validates on every PR:
- `--features memory-accounting` still compiles cleanly
- AccountingAllocator is installed as #[global_allocator]
- init_bank, reset_bank, and bytes_used run on every statement
- Each query goes through kill_on_overdraft + catch_unwind without
introducing measurable overhead or panicking incorrectly
- The per-file MemoryPool gets sized via build_runtime_env
If the integration bit-rots in any future PR, CI catches it. Once
we have confidence in the wiring we can tighten the budget in-place
or graduate this into a dedicated job that exercises tight budgets
to surface actual memory-accounting bugs in DF operators.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
Explained in the issue.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
Users should see fewer OOMs.
example output: