Skip to content

feat: prototype allocator-level OOM circuit breaker (OomGuard)#4582

Draft
andygrove wants to merge 15 commits into
apache:mainfrom
andygrove:oom-guard-circuit-breaker
Draft

feat: prototype allocator-level OOM circuit breaker (OomGuard)#4582
andygrove wants to merge 15 commits into
apache:mainfrom
andygrove:oom-guard-circuit-breaker

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Relates to #4576. This is an exploratory prototype of the RSS circuit breaker ("OomGuard") half of that issue, not a complete implementation, so it does not close it.

Rationale for this change

Comet's memory accounting relies on voluntary MemoryPool reservations, which miss allocations made by Arrow buffers, join scratch space, and expression kernels. When real native memory exceeds the container limit, the OS/YARN/Kubernetes kills the entire executor JVM, losing every task and all cached data on it.

This prototype adds an executor-global, allocator-level circuit breaker. It tracks the real bytes the global allocator hands out and, when an armed, over-budget condition is detected on a query-worker thread, fails that single task with a retriable ResourcesExhausted error instead of letting the executor get OOM-killed. The approach adapts the AccountingAllocator from apache/datafusion#22626 (the byte-tracking allocator wrapper) rather than depending on it, since that code lives in DataFusion's test-only sqllogictest crate.

What changes are included in this PR?

Gated behind a new oom-guard cargo feature; the default build is unchanged with zero added per-allocation overhead.

  • native/core/src/execution/memory_pools/oom_guard.rs (new): AccountingAllocator<A> wrapping the inner global allocator; a single process-wide balance with per-thread drift settled at a 64 KiB threshold; arm/disarm/stamp_current_thread/current_balance; a typed OomGuardPanic, raised via panic_any on an armed, stamped thread that crosses the limit, with reentrancy protection so the panic's own boxing allocation does not recurse.
  • native/core/src/lib.rs: under the oom-guard feature, installs the wrapper as #[global_allocator] over jemalloc / mimalloc / system; mutually exclusive cfgs leave the default build untouched.
  • native/core/src/execution/jni_api.rs: stamps tokio worker threads (on_thread_start) and the JNI caller thread; arms the guard from config in createPlan; maps OomGuardPanic to DataFusionError::ResourcesExhausted at both execution boundaries (the spawned/channel path, both producer and consumer, and the busy-poll block_on path).
  • spark/src/main/scala/org/apache/comet/CometConf.scala: registers spark.comet.exec.memoryGuard.enabled (default false) and spark.comet.exec.memoryGuard.size (optional; defaults to the executor off-heap size).

Known limitations / out of scope for this prototype (candidates for follow-ups):

  • Executor-global granularity only; no per-task attribution or fairness.
  • Only tokio workers and the JNI caller thread are stamped, so allocations on spawn_blocking/IO/other pools are tracked but cannot themselves trip the breaker.
  • Layout-byte accounting only; no periodic resync to real jemalloc/mimalloc resident stats.
  • Does not feed the live balance back into DataFusion's MemoryPool (the "online accounting" half of Investigate adopting DataFusion's allocator-level memory accounting to replace manual memory tuning #4576).
  • Process-global, last-writer-wins limit, armed per createPlan.

Note: commits carry [skip ci] intentionally while this is an early prototype.

How are these changes tested?

  • Rust unit tests in oom_guard.rs cover the decision/settle helpers, and that the breaker trips only on an armed, stamped thread (disarmed never trips, unstamped never trips).
  • An end-to-end Rust test drives a real 64 MiB heap allocation through the installed AccountingAllocator and asserts an OomGuardPanic is raised and caught.
  • Verified the build and clippy -D warnings across the default, oom-guard, and jemalloc,oom-guard feature combinations.

// exceed isize::MAX on any real platform, so no wrapping or overflow occurs.
let old = layout.size() as isize;
let new = new_size as isize;
track(new - old);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixed this bug in DF. You need to panic before the realloc, otherwise the caller still has the old pointer and tries to free it on unwind and segfaults.

…ard [skip ci]

Account for and enforce the size delta before delegating to the inner
realloc. Panicking after inner.realloc is unsound: realloc may have freed
or moved the old block, leaving the caller to free a dangling old pointer
on unwind and segfault. Enforce while the old pointer is still valid.

Gate panic_any behind a compare_exchange on ARMED so at most one thread
fires the guard panic per arm cycle. The relaxed ARMED load on the hot
path is not a serialization point: several threads can read ARMED=true in
the same window and each dispatch a panic, which Rust's unwind ABI can
turn into a process abort ("failed to initiate panic", exit 133). The
guard re-arms on the next createPlan.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants