From a42941703c07da2225dfa8d17618cbcc19d771c3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 10 Apr 2026 10:24:09 -0600 Subject: [PATCH 01/11] feat: add fair_unified_task_shared memory pool to fix 2x memory allocation When Comet executes a shuffle, it creates two separate native plans (the child plan and the shuffle writer plan) that run concurrently in a pipelined fashion. Previously, each plan got its own memory pool at the full per-task limit, effectively allowing 2x the intended memory to be consumed. The new `fair_unified_task_shared` pool type shares a single CometFairMemoryPool across all native plans within the same Spark task. This ensures the total memory stays within the per-task limit while dynamically distributing memory among operators based on how many register as memory consumers (e.g. if the child plan is a simple scan+filter, the shuffle writer gets 100% of the pool). This is now the default for off-heap mode. Closes #3921 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../main/scala/org/apache/comet/CometConf.scala | 8 ++++++-- docs/source/user-guide/latest/tuning.md | 13 ++++++++++--- native/core/src/execution/memory_pools/config.rs | 8 +++++++- native/core/src/execution/memory_pools/mod.rs | 16 ++++++++++++++++ 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..223b6f52eb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -647,10 +647,14 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TUNING) .doc( "The type of memory pool to be used for Comet native execution when running Spark in " + - "off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " + + "off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, and " + + "`fair_unified_task_shared`. The `fair_unified_task_shared` pool is shared across " + + "all native plans within the same Spark task, ensuring that the total memory " + + "consumption does not exceed the per-task limit even when multiple native plans " + + "(e.g. a shuffle writer and its child plan) execute concurrently. " + s"$TUNING_GUIDE.") .stringConf - .createWithDefault("fair_unified") + .createWithDefault("fair_unified_task_shared") val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf( "spark.comet.exec.onHeap.memoryPool") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..54e34d5b83 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -58,13 +58,20 @@ Comet implements multiple memory pool implementations. The type of pool can be s The valid pool types are: -- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified` - `greedy_unified` -The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory +The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the +same Spark task. This ensures that the total memory consumption does not exceed the per-task limit even when multiple +native plans (e.g. a shuffle writer and its child plan) execute concurrently. Without this, each plan gets its own +pool at the full per-task limit, effectively doubling the memory that can be consumed. + +The `fair_unified` pool prevents operators from using more than an even fraction of the available memory (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even -when there is sufficient memory in order to leave enough memory for other operators. +when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool +type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit. The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index d30126a99a..4f2c03ecbb 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -21,6 +21,7 @@ use crate::errors::{CometError, CometResult}; pub(crate) enum MemoryPoolType { GreedyUnified, FairUnified, + FairUnifiedTaskShared, Greedy, FairSpill, GreedyTaskShared, @@ -34,7 +35,9 @@ impl MemoryPoolType { pub(crate) fn is_task_shared(&self) -> bool { matches!( self, - MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared + MemoryPoolType::GreedyTaskShared + | MemoryPoolType::FairSpillTaskShared + | MemoryPoolType::FairUnifiedTaskShared ) } } @@ -63,6 +66,9 @@ pub(crate) fn parse_memory_pool_config( let memory_pool_config = if off_heap_mode { match memory_pool_type.as_str() { "fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size), + "fair_unified_task_shared" => { + MemoryPoolConfig::new(MemoryPoolType::FairUnifiedTaskShared, pool_size) + } "greedy_unified" => { // the `unified` memory pool interacts with Spark's memory pool to allocate // memory therefore does not need a size to be explicitly set. The pool size diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index d44290d058..cfb708b58f 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -87,6 +87,22 @@ pub(crate) fn create_memory_pool( }); Arc::clone(memory_pool) } + MemoryPoolType::FairUnifiedTaskShared => { + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometFairMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + memory_pool_config.pool_size, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) + } MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => { let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); let per_task_memory_pool = From e213af6238daeeae1546014e9fd5513e2980a62c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 09:18:01 -0600 Subject: [PATCH 02/11] fix: deduplicate task-shared pools in tracing memory accounting When using fair_unified_task_shared, multiple execution contexts on the same thread share a single Arc. The tracing code was summing pool.reserved() for each registered context, double-counting the shared pool and reporting 2x the actual memory reservation. Deduplicate pools by Arc data pointer before summing so each underlying pool is only counted once. --- native/core/src/execution/jni_api.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 93f75bae96..eee80ff2d9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -141,7 +141,12 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { map.remove(&thread_id); return 0; } - return pools.values().map(|p| p.reserved()).sum::(); + let mut seen = std::collections::HashSet::new(); + return pools + .values() + .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) + .map(|p| p.reserved()) + .sum::(); } 0 } @@ -149,7 +154,16 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { fn total_reserved_for_thread(thread_id: u64) -> usize { let map = get_thread_memory_pools().lock(); map.get(&thread_id) - .map(|pools| pools.values().map(|p| p.reserved()).sum::()) + .map(|pools| { + // Deduplicate pools that share the same underlying allocation + // (e.g. task-shared pools registered by multiple execution contexts) + let mut seen = std::collections::HashSet::new(); + pools + .values() + .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) + .map(|p| p.reserved()) + .sum::() + }) .unwrap_or(0) } From fed742817e8bd031ff3559158b0dcbcd536d85ad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 10:04:54 -0600 Subject: [PATCH 03/11] feat: change default memory pool back to fair_unified Make fair_unified_task_shared opt-in rather than the default to simplify review. Update docs to reflect the new default. --- .../main/scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/latest/tuning.md | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 223b6f52eb..b63a563ec7 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -654,7 +654,7 @@ object CometConf extends ShimCometConf { "(e.g. a shuffle writer and its child plan) execute concurrently. " + s"$TUNING_GUIDE.") .stringConf - .createWithDefault("fair_unified_task_shared") + .createWithDefault("fair_unified") val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf( "spark.comet.exec.onHeap.memoryPool") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 54e34d5b83..517bfe3bb9 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -58,21 +58,21 @@ Comet implements multiple memory pool implementations. The type of pool can be s The valid pool types are: -- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) -- `fair_unified` +- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified_task_shared` - `greedy_unified` -The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the -same Spark task. This ensures that the total memory consumption does not exceed the per-task limit even when multiple -native plans (e.g. a shuffle writer and its child plan) execute concurrently. Without this, each plan gets its own -pool at the full per-task limit, effectively doubling the memory that can be consumed. - The `fair_unified` pool prevents operators from using more than an even fraction of the available memory (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit. +The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the +same Spark task. This ensures that the total memory consumption does not exceed the per-task limit even when multiple +native plans (e.g. a shuffle writer and its child plan) execute concurrently. Without this, each plan gets its own +pool at the full per-task limit, effectively doubling the memory that can be consumed. + The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. From f8b15e544b36b39da137b887b35f35375443387a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 10:06:44 -0600 Subject: [PATCH 04/11] docs: explain dual native plan architecture in tuning guide Add context about how Comet creates two concurrent native plans per Spark task during shuffle and why this matters for pool selection. --- docs/source/user-guide/latest/tuning.md | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 517bfe3bb9..00fdf0203a 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -56,6 +56,11 @@ For more details about Spark off-heap memory mode, please refer to [Spark docume Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`. +When Comet executes a shuffle, it creates two separate native plans within the same Spark task: the child plan +(e.g. scan, filter, join) and the shuffle writer plan. These two plans run concurrently in a pipelined fashion — +the child plan produces batches that the shuffle writer consumes and repartitions. This means both plans hold +memory reservations at the same time, which is important to understand when choosing a memory pool type. + The valid pool types are: - `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) @@ -66,12 +71,14 @@ The `fair_unified` pool prevents operators from using more than an even fraction (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool -type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit. +type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit when two +plans are running concurrently (e.g. during shuffle). The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the -same Spark task. This ensures that the total memory consumption does not exceed the per-task limit even when multiple -native plans (e.g. a shuffle writer and its child plan) execute concurrently. Without this, each plan gets its own -pool at the full per-task limit, effectively doubling the memory that can be consumed. +same Spark task. Because the child plan and shuffle writer each get their own pool with `fair_unified`, both can +independently allocate up to the full per-task memory limit, effectively allowing 2x the intended memory to be +consumed. The `fair_unified_task_shared` pool avoids this by sharing a single pool instance, ensuring that the +combined memory usage of both plans stays within the per-task limit. The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. From 394f0387839f0ced4874dff2b56392ec0b787bf4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 22:04:48 -0600 Subject: [PATCH 05/11] change default back --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/latest/tuning.md | 4 ++-- native/Cargo.lock | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index b63a563ec7..223b6f52eb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -654,7 +654,7 @@ object CometConf extends ShimCometConf { "(e.g. a shuffle writer and its child plan) execute concurrently. " + s"$TUNING_GUIDE.") .stringConf - .createWithDefault("fair_unified") + .createWithDefault("fair_unified_task_shared") val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf( "spark.comet.exec.onHeap.memoryPool") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 00fdf0203a..b836861e7b 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -63,8 +63,8 @@ memory reservations at the same time, which is important to understand when choo The valid pool types are: -- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) -- `fair_unified_task_shared` +- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) +- `fair_unified` - `greedy_unified` The `fair_unified` pool prevents operators from using more than an even fraction of the available memory diff --git a/native/Cargo.lock b/native/Cargo.lock index 480f7ad06d..b5c7f2b0c7 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -2070,9 +2070,7 @@ dependencies = [ "num", "rand 0.10.0", "regex", - "serde", "serde_json", - "thiserror 2.0.18", "tokio", "twox-hash", ] @@ -5885,7 +5883,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.13.1", "itoa", "memchr", "serde", From a5ed8d3a8abe871193bb75665a632bc809896e5f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 11 Apr 2026 22:07:56 -0600 Subject: [PATCH 06/11] prettier --- docs/source/user-guide/latest/tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index b836861e7b..55005e94f6 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -64,7 +64,7 @@ memory reservations at the same time, which is important to understand when choo The valid pool types are: - `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) -- `fair_unified` +- `fair_unified` - `greedy_unified` The `fair_unified` pool prevents operators from using more than an even fraction of the available memory From a1af8547fa8a2ebf10af188a0595ec07092fab78 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 05:37:42 -0600 Subject: [PATCH 07/11] refactor: address PR review feedback - Clarify terminology: replace "native plans" with "execution contexts" to avoid confusion with Spark plan/stage concepts - Rewrite fair_unified_task_shared config description with concrete example of the 2x memory problem during shuffle - Use filter_map with then() for pool deduplication --- .../scala/org/apache/comet/CometConf.scala | 11 +++++--- docs/source/user-guide/latest/tuning.md | 26 ++++++++++--------- native/core/src/execution/jni_api.rs | 12 ++++++--- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 223b6f52eb..7b3e087689 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -648,10 +648,13 @@ object CometConf extends ShimCometConf { .doc( "The type of memory pool to be used for Comet native execution when running Spark in " + "off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, and " + - "`fair_unified_task_shared`. The `fair_unified_task_shared` pool is shared across " + - "all native plans within the same Spark task, ensuring that the total memory " + - "consumption does not exceed the per-task limit even when multiple native plans " + - "(e.g. a shuffle writer and its child plan) execute concurrently. " + + "`fair_unified_task_shared`. During shuffle, Comet runs two native execution " + + "contexts concurrently in the same Spark task: one for the pre-shuffle operators " + + "(e.g. scan, filter, join) and one for the shuffle writer. With `fair_unified`, " + + "each context gets its own pool, so both can allocate up to the per-task memory " + + "limit independently, potentially using 2x the intended memory. " + + "The `fair_unified_task_shared` pool avoids this by sharing a single pool across " + + "all native execution contexts in the same task. " + s"$TUNING_GUIDE.") .stringConf .createWithDefault("fair_unified_task_shared") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 55005e94f6..252a18da99 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -56,10 +56,12 @@ For more details about Spark off-heap memory mode, please refer to [Spark docume Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`. -When Comet executes a shuffle, it creates two separate native plans within the same Spark task: the child plan -(e.g. scan, filter, join) and the shuffle writer plan. These two plans run concurrently in a pipelined fashion — -the child plan produces batches that the shuffle writer consumes and repartitions. This means both plans hold -memory reservations at the same time, which is important to understand when choosing a memory pool type. +When Comet executes a shuffle, it runs two native execution contexts concurrently within the same +Spark task. For example, consider a stage that scans, filters, and repartitions data: one execution +context runs the scan and filter operators, while a second execution context runs the shuffle writer. +These two contexts execute in a pipelined fashion — the first produces batches that the second +consumes and repartitions. Because both contexts hold memory reservations at the same time, the +choice of memory pool type affects how much memory a single task can consume. The valid pool types are: @@ -71,14 +73,14 @@ The `fair_unified` pool prevents operators from using more than an even fraction (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool -type, each native plan gets its own pool, so the total memory consumption can exceed the per-task limit when two -plans are running concurrently (e.g. during shuffle). - -The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all native plans within the -same Spark task. Because the child plan and shuffle writer each get their own pool with `fair_unified`, both can -independently allocate up to the full per-task memory limit, effectively allowing 2x the intended memory to be -consumed. The `fair_unified_task_shared` pool avoids this by sharing a single pool instance, ensuring that the -combined memory usage of both plans stays within the per-task limit. +type, each execution context gets its own pool, so the total memory consumption can exceed the per-task limit when +two contexts are running concurrently (e.g. during shuffle). + +The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all execution contexts +within the same Spark task. Because the pre-shuffle operators and the shuffle writer each get their own pool +with `fair_unified`, both can independently allocate up to the full per-task memory limit, effectively allowing +2x the intended memory to be consumed. The `fair_unified_task_shared` pool avoids this by sharing a single pool +instance, ensuring that the combined memory usage stays within the per-task limit. The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index eee80ff2d9..9ffe46c731 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -144,8 +144,10 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { let mut seen = std::collections::HashSet::new(); return pools .values() - .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) - .map(|p| p.reserved()) + .filter_map(|p| { + let ptr = Arc::as_ptr(p) as *const (); + seen.insert(ptr).then(|| p.reserved()) + }) .sum::(); } 0 @@ -160,8 +162,10 @@ fn total_reserved_for_thread(thread_id: u64) -> usize { let mut seen = std::collections::HashSet::new(); pools .values() - .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) - .map(|p| p.reserved()) + .filter_map(|p| { + let ptr = Arc::as_ptr(p) as *const (); + seen.insert(ptr).then(|| p.reserved()) + }) .sum::() }) .unwrap_or(0) From 38ebe1aaf448e7732f5e57e5d75e2299b34e6556 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 07:28:05 -0600 Subject: [PATCH 08/11] feat: add greedy_unified_task_shared memory pool Add greedy_unified_task_shared pool type for completeness alongside fair_unified_task_shared. Both _task_shared variants share a single memory pool across all native execution contexts in the same Spark task, preventing 2x memory consumption during shuffle. --- .../main/scala/org/apache/comet/CometConf.scala | 13 +++++++------ docs/source/user-guide/latest/tuning.md | 7 ++++++- native/core/src/execution/memory_pools/config.rs | 5 +++++ native/core/src/execution/memory_pools/mod.rs | 16 ++++++++++++++++ 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 7b3e087689..f6bb0a66ba 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -647,13 +647,14 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TUNING) .doc( "The type of memory pool to be used for Comet native execution when running Spark in " + - "off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, and " + - "`fair_unified_task_shared`. During shuffle, Comet runs two native execution " + + "off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, " + + "`fair_unified_task_shared`, and `greedy_unified_task_shared`. " + + "During shuffle, Comet runs two native execution " + "contexts concurrently in the same Spark task: one for the pre-shuffle operators " + - "(e.g. scan, filter, join) and one for the shuffle writer. With `fair_unified`, " + - "each context gets its own pool, so both can allocate up to the per-task memory " + - "limit independently, potentially using 2x the intended memory. " + - "The `fair_unified_task_shared` pool avoids this by sharing a single pool across " + + "(e.g. scan, filter, join) and one for the shuffle writer. With `fair_unified` or " + + "`greedy_unified`, each context gets its own pool, so both can allocate up to the " + + "per-task memory limit independently, potentially using 2x the intended memory. " + + "The `_task_shared` variants avoid this by sharing a single pool across " + "all native execution contexts in the same task. " + s"$TUNING_GUIDE.") .stringConf diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 252a18da99..3c4b2b07c3 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -67,6 +67,7 @@ The valid pool types are: - `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) - `fair_unified` +- `greedy_unified_task_shared` - `greedy_unified` The `fair_unified` pool prevents operators from using more than an even fraction of the available memory @@ -83,7 +84,11 @@ with `fair_unified`, both can independently allocate up to the full per-task mem instance, ensuring that the combined memory usage stays within the per-task limit. The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not -need to spill or have a single spillable operator. +need to spill or have a single spillable operator. Like `fair_unified`, each execution context gets its own pool, +so memory consumption can exceed the per-task limit during shuffle. + +The `greedy_unified_task_shared` pool is the same as `greedy_unified` but is shared across all execution contexts +within the same Spark task, ensuring the combined memory usage stays within the per-task limit. [shuffle]: #shuffle [Advanced Memory Tuning]: #advanced-memory-tuning diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index 4f2c03ecbb..7b2e3e3abd 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -22,6 +22,7 @@ pub(crate) enum MemoryPoolType { GreedyUnified, FairUnified, FairUnifiedTaskShared, + GreedyUnifiedTaskShared, Greedy, FairSpill, GreedyTaskShared, @@ -38,6 +39,7 @@ impl MemoryPoolType { MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared | MemoryPoolType::FairUnifiedTaskShared + | MemoryPoolType::GreedyUnifiedTaskShared ) } } @@ -75,6 +77,9 @@ pub(crate) fn parse_memory_pool_config( // shared with Spark is set by `spark.memory.offHeap.size`. MemoryPoolConfig::new(MemoryPoolType::GreedyUnified, 0) } + "greedy_unified_task_shared" => { + MemoryPoolConfig::new(MemoryPoolType::GreedyUnifiedTaskShared, 0) + } _ => { return Err(CometError::Config(format!( "Unsupported memory pool type for off-heap mode: {memory_pool_type}" diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index cfb708b58f..6def58dd09 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -103,6 +103,22 @@ pub(crate) fn create_memory_pool( per_task_memory_pool.num_plans += 1; Arc::clone(&per_task_memory_pool.memory_pool) } + MemoryPoolType::GreedyUnifiedTaskShared => { + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometUnifiedMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + task_attempt_id, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) + } MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => { let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); let per_task_memory_pool = From 3a403f60543030920e3cd6d0ccd16d729ec783d9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 07:31:31 -0600 Subject: [PATCH 09/11] reorder --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index f6bb0a66ba..753987398e 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -647,7 +647,7 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TUNING) .doc( "The type of memory pool to be used for Comet native execution when running Spark in " + - "off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, " + + "off-heap mode. Available pool types are `fair_unified`, `greedy_unified`, " + "`fair_unified_task_shared`, and `greedy_unified_task_shared`. " + "During shuffle, Comet runs two native execution " + "contexts concurrently in the same Spark task: one for the pre-shuffle operators " + From 209d47d23da335cf85441a65a3cc8d7dd68cd3e9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 11:44:47 -0600 Subject: [PATCH 10/11] fix: make unified memory pools task-shared by default Instead of adding new _task_shared pool variants, fix the existing fair_unified and greedy_unified pools to share a single pool instance across all native execution contexts within the same Spark task. This fixes the bug where concurrent execution contexts (e.g. pre-shuffle operators and shuffle writer) could each allocate up to the full per-task memory limit independently. --- .../scala/org/apache/comet/CometConf.scala | 12 +-- docs/source/user-guide/latest/tuning.md | 32 ++------ .../core/src/execution/memory_pools/config.rs | 12 +-- native/core/src/execution/memory_pools/mod.rs | 74 +++++++------------ 4 files changed, 40 insertions(+), 90 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 753987398e..046ccf0b1c 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -647,18 +647,10 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TUNING) .doc( "The type of memory pool to be used for Comet native execution when running Spark in " + - "off-heap mode. Available pool types are `fair_unified`, `greedy_unified`, " + - "`fair_unified_task_shared`, and `greedy_unified_task_shared`. " + - "During shuffle, Comet runs two native execution " + - "contexts concurrently in the same Spark task: one for the pre-shuffle operators " + - "(e.g. scan, filter, join) and one for the shuffle writer. With `fair_unified` or " + - "`greedy_unified`, each context gets its own pool, so both can allocate up to the " + - "per-task memory limit independently, potentially using 2x the intended memory. " + - "The `_task_shared` variants avoid this by sharing a single pool across " + - "all native execution contexts in the same task. " + + "off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " + s"$TUNING_GUIDE.") .stringConf - .createWithDefault("fair_unified_task_shared") + .createWithDefault("fair_unified") val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf( "spark.comet.exec.onHeap.memoryPool") diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 3c4b2b07c3..ff9acee1f4 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -56,39 +56,23 @@ For more details about Spark off-heap memory mode, please refer to [Spark docume Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`. -When Comet executes a shuffle, it runs two native execution contexts concurrently within the same -Spark task. For example, consider a stage that scans, filters, and repartitions data: one execution -context runs the scan and filter operators, while a second execution context runs the shuffle writer. -These two contexts execute in a pipelined fashion — the first produces batches that the second -consumes and repartitions. Because both contexts hold memory reservations at the same time, the -choice of memory pool type affects how much memory a single task can consume. - The valid pool types are: -- `fair_unified_task_shared` (default when `spark.memory.offHeap.enabled=true` is set) -- `fair_unified` -- `greedy_unified_task_shared` +- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) - `greedy_unified` +Both pool types are shared across all native execution contexts within the same Spark task. When +Comet executes a shuffle, it runs two native execution contexts concurrently (e.g. one for +pre-shuffle operators and one for the shuffle writer). The shared pool ensures that the combined +memory usage stays within the per-task limit. + The `fair_unified` pool prevents operators from using more than an even fraction of the available memory (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even -when there is sufficient memory in order to leave enough memory for other operators. Note that when using this pool -type, each execution context gets its own pool, so the total memory consumption can exceed the per-task limit when -two contexts are running concurrently (e.g. during shuffle). - -The `fair_unified_task_shared` pool is the same as `fair_unified` but is shared across all execution contexts -within the same Spark task. Because the pre-shuffle operators and the shuffle writer each get their own pool -with `fair_unified`, both can independently allocate up to the full per-task memory limit, effectively allowing -2x the intended memory to be consumed. The `fair_unified_task_shared` pool avoids this by sharing a single pool -instance, ensuring that the combined memory usage stays within the per-task limit. +when there is sufficient memory in order to leave enough memory for other operators. The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not -need to spill or have a single spillable operator. Like `fair_unified`, each execution context gets its own pool, -so memory consumption can exceed the per-task limit during shuffle. - -The `greedy_unified_task_shared` pool is the same as `greedy_unified` but is shared across all execution contexts -within the same Spark task, ensuring the combined memory usage stays within the per-task limit. +need to spill or have a single spillable operator. [shuffle]: #shuffle [Advanced Memory Tuning]: #advanced-memory-tuning diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index 7b2e3e3abd..83d6c14a36 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -21,8 +21,6 @@ use crate::errors::{CometError, CometResult}; pub(crate) enum MemoryPoolType { GreedyUnified, FairUnified, - FairUnifiedTaskShared, - GreedyUnifiedTaskShared, Greedy, FairSpill, GreedyTaskShared, @@ -38,8 +36,8 @@ impl MemoryPoolType { self, MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared - | MemoryPoolType::FairUnifiedTaskShared - | MemoryPoolType::GreedyUnifiedTaskShared + | MemoryPoolType::FairUnified + | MemoryPoolType::GreedyUnified ) } } @@ -68,18 +66,12 @@ pub(crate) fn parse_memory_pool_config( let memory_pool_config = if off_heap_mode { match memory_pool_type.as_str() { "fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size), - "fair_unified_task_shared" => { - MemoryPoolConfig::new(MemoryPoolType::FairUnifiedTaskShared, pool_size) - } "greedy_unified" => { // the `unified` memory pool interacts with Spark's memory pool to allocate // memory therefore does not need a size to be explicitly set. The pool size // shared with Spark is set by `spark.memory.offHeap.size`. MemoryPoolConfig::new(MemoryPoolType::GreedyUnified, 0) } - "greedy_unified_task_shared" => { - MemoryPoolConfig::new(MemoryPoolType::GreedyUnifiedTaskShared, 0) - } _ => { return Err(CometError::Config(format!( "Unsupported memory pool type for off-heap mode: {memory_pool_type}" diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index 6def58dd09..389e348990 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -42,22 +42,36 @@ pub(crate) fn create_memory_pool( const NUM_TRACKED_CONSUMERS: usize = 10; match memory_pool_config.pool_type { MemoryPoolType::GreedyUnified => { - // Set Comet memory pool for native - let memory_pool = - CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id); - Arc::new(TrackConsumersPool::new( - memory_pool, - NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), - )) + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometUnifiedMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + task_attempt_id, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) } MemoryPoolType::FairUnified => { - // Set Comet fair memory pool for native - let memory_pool = - CometFairMemoryPool::new(comet_task_memory_manager, memory_pool_config.pool_size); - Arc::new(TrackConsumersPool::new( - memory_pool, - NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), - )) + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometFairMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + memory_pool_config.pool_size, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) } MemoryPoolType::Greedy => Arc::new(TrackConsumersPool::new( GreedyMemoryPool::new(memory_pool_config.pool_size), @@ -87,38 +101,6 @@ pub(crate) fn create_memory_pool( }); Arc::clone(memory_pool) } - MemoryPoolType::FairUnifiedTaskShared => { - let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); - let per_task_memory_pool = - memory_pool_map.entry(task_attempt_id).or_insert_with(|| { - let pool: Arc = Arc::new(TrackConsumersPool::new( - CometFairMemoryPool::new( - Arc::clone(&comet_task_memory_manager), - memory_pool_config.pool_size, - ), - NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), - )); - PerTaskMemoryPool::new(pool) - }); - per_task_memory_pool.num_plans += 1; - Arc::clone(&per_task_memory_pool.memory_pool) - } - MemoryPoolType::GreedyUnifiedTaskShared => { - let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); - let per_task_memory_pool = - memory_pool_map.entry(task_attempt_id).or_insert_with(|| { - let pool: Arc = Arc::new(TrackConsumersPool::new( - CometUnifiedMemoryPool::new( - Arc::clone(&comet_task_memory_manager), - task_attempt_id, - ), - NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), - )); - PerTaskMemoryPool::new(pool) - }); - per_task_memory_pool.num_plans += 1; - Arc::clone(&per_task_memory_pool.memory_pool) - } MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => { let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); let per_task_memory_pool = From d9aca396ac149e9b1c935c86a399b6a042ff00ed Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 12:11:54 -0600 Subject: [PATCH 11/11] refactor: use import for HashSet instead of qualified path --- native/core/src/execution/jni_api.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 9ffe46c731..ff01abc5a5 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -26,6 +26,8 @@ use crate::{ }, jvm_bridge::JVMClasses, }; +use std::collections::HashSet; + use arrow::array::{Array, RecordBatch, UInt32Array}; use arrow::compute::{take, TakeOptions}; use arrow::datatypes::DataType as ArrowDataType; @@ -141,7 +143,7 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { map.remove(&thread_id); return 0; } - let mut seen = std::collections::HashSet::new(); + let mut seen = HashSet::new(); return pools .values() .filter_map(|p| { @@ -159,7 +161,7 @@ fn total_reserved_for_thread(thread_id: u64) -> usize { .map(|pools| { // Deduplicate pools that share the same underlying allocation // (e.g. task-shared pools registered by multiple execution contexts) - let mut seen = std::collections::HashSet::new(); + let mut seen = HashSet::new(); pools .values() .filter_map(|p| {