diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..ff9acee1f4 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -61,7 +61,12 @@ The valid pool types are: - `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) - `greedy_unified` -The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory +Both pool types are shared across all native execution contexts within the same Spark task. When +Comet executes a shuffle, it runs two native execution contexts concurrently (e.g. one for +pre-shuffle operators and one for the shuffle writer). The shared pool ensures that the combined +memory usage stays within the per-task limit. + +The `fair_unified` pool prevents operators from using more than an even fraction of the available memory (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even when there is sufficient memory in order to leave enough memory for other operators. diff --git a/native/Cargo.lock b/native/Cargo.lock index 480f7ad06d..b5c7f2b0c7 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -2070,9 +2070,7 @@ dependencies = [ "num", "rand 0.10.0", "regex", - "serde", "serde_json", - "thiserror 2.0.18", "tokio", "twox-hash", ] @@ -5885,7 +5883,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.13.1", "itoa", "memchr", "serde", diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 93f75bae96..ff01abc5a5 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -26,6 +26,8 @@ use crate::{ }, jvm_bridge::JVMClasses, }; +use std::collections::HashSet; + use arrow::array::{Array, RecordBatch, UInt32Array}; use arrow::compute::{take, TakeOptions}; use arrow::datatypes::DataType as ArrowDataType; @@ -141,7 +143,14 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { map.remove(&thread_id); return 0; } - return pools.values().map(|p| p.reserved()).sum::(); + let mut seen = HashSet::new(); + return pools + .values() + .filter_map(|p| { + let ptr = Arc::as_ptr(p) as *const (); + seen.insert(ptr).then(|| p.reserved()) + }) + .sum::(); } 0 } @@ -149,7 +158,18 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize { fn total_reserved_for_thread(thread_id: u64) -> usize { let map = get_thread_memory_pools().lock(); map.get(&thread_id) - .map(|pools| pools.values().map(|p| p.reserved()).sum::()) + .map(|pools| { + // Deduplicate pools that share the same underlying allocation + // (e.g. task-shared pools registered by multiple execution contexts) + let mut seen = HashSet::new(); + pools + .values() + .filter_map(|p| { + let ptr = Arc::as_ptr(p) as *const (); + seen.insert(ptr).then(|| p.reserved()) + }) + .sum::() + }) .unwrap_or(0) } diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index d30126a99a..83d6c14a36 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -34,7 +34,10 @@ impl MemoryPoolType { pub(crate) fn is_task_shared(&self) -> bool { matches!( self, - MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared + MemoryPoolType::GreedyTaskShared + | MemoryPoolType::FairSpillTaskShared + | MemoryPoolType::FairUnified + | MemoryPoolType::GreedyUnified ) } } diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index d44290d058..389e348990 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -42,22 +42,36 @@ pub(crate) fn create_memory_pool( const NUM_TRACKED_CONSUMERS: usize = 10; match memory_pool_config.pool_type { MemoryPoolType::GreedyUnified => { - // Set Comet memory pool for native - let memory_pool = - CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id); - Arc::new(TrackConsumersPool::new( - memory_pool, - NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), - )) + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometUnifiedMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + task_attempt_id, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) } MemoryPoolType::FairUnified => { - // Set Comet fair memory pool for native - let memory_pool = - CometFairMemoryPool::new(comet_task_memory_manager, memory_pool_config.pool_size); - Arc::new(TrackConsumersPool::new( - memory_pool, - NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), - )) + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometFairMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + memory_pool_config.pool_size, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) } MemoryPoolType::Greedy => Arc::new(TrackConsumersPool::new( GreedyMemoryPool::new(memory_pool_config.pool_size),