Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ The valid pool types are:
- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `greedy_unified`

The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory
Both pool types are shared across all native execution contexts within the same Spark task. When
Comet executes a shuffle, it runs two native execution contexts concurrently (e.g. one for
pre-shuffle operators and one for the shuffle writer). The shared pool ensures that the combined
memory usage stays within the per-task limit.

The `fair_unified` pool prevents operators from using more than an even fraction of the available memory
(i.e. `pool_size / num_reservations`). This pool works best when you know beforehand
the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even
when there is sufficient memory in order to leave enough memory for other operators.
Expand Down
4 changes: 1 addition & 3 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 22 additions & 2 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::{
},
jvm_bridge::JVMClasses,
};
use std::collections::HashSet;

use arrow::array::{Array, RecordBatch, UInt32Array};
use arrow::compute::{take, TakeOptions};
use arrow::datatypes::DataType as ArrowDataType;
Expand Down Expand Up @@ -141,15 +143,33 @@ fn unregister_and_total(thread_id: u64, context_id: i64) -> usize {
map.remove(&thread_id);
return 0;
}
return pools.values().map(|p| p.reserved()).sum::<usize>();
let mut seen = HashSet::new();
return pools
.values()
.filter_map(|p| {
let ptr = Arc::as_ptr(p) as *const ();
seen.insert(ptr).then(|| p.reserved())
})
.sum::<usize>();
}
0
}

fn total_reserved_for_thread(thread_id: u64) -> usize {
let map = get_thread_memory_pools().lock();
map.get(&thread_id)
.map(|pools| pools.values().map(|p| p.reserved()).sum::<usize>())
.map(|pools| {
// Deduplicate pools that share the same underlying allocation
// (e.g. task-shared pools registered by multiple execution contexts)
let mut seen = HashSet::new();
pools
.values()
.filter_map(|p| {
let ptr = Arc::as_ptr(p) as *const ();
seen.insert(ptr).then(|| p.reserved())
})
.sum::<usize>()
})
.unwrap_or(0)
}

Expand Down
5 changes: 4 additions & 1 deletion native/core/src/execution/memory_pools/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ impl MemoryPoolType {
pub(crate) fn is_task_shared(&self) -> bool {
matches!(
self,
MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared
MemoryPoolType::GreedyTaskShared
| MemoryPoolType::FairSpillTaskShared
| MemoryPoolType::FairUnified
| MemoryPoolType::GreedyUnified
)
}
}
Expand Down
42 changes: 28 additions & 14 deletions native/core/src/execution/memory_pools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,36 @@ pub(crate) fn create_memory_pool(
const NUM_TRACKED_CONSUMERS: usize = 10;
match memory_pool_config.pool_type {
MemoryPoolType::GreedyUnified => {
// Set Comet memory pool for native
let memory_pool =
CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);
Arc::new(TrackConsumersPool::new(
memory_pool,
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
))
let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap();
let per_task_memory_pool =
memory_pool_map.entry(task_attempt_id).or_insert_with(|| {
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
CometUnifiedMemoryPool::new(
Arc::clone(&comet_task_memory_manager),
task_attempt_id,
),
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
));
PerTaskMemoryPool::new(pool)
});
per_task_memory_pool.num_plans += 1;
Arc::clone(&per_task_memory_pool.memory_pool)
}
MemoryPoolType::FairUnified => {
// Set Comet fair memory pool for native
let memory_pool =
CometFairMemoryPool::new(comet_task_memory_manager, memory_pool_config.pool_size);
Arc::new(TrackConsumersPool::new(
memory_pool,
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
))
let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap();
let per_task_memory_pool =
memory_pool_map.entry(task_attempt_id).or_insert_with(|| {
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
CometFairMemoryPool::new(
Arc::clone(&comet_task_memory_manager),
memory_pool_config.pool_size,
),
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
));
PerTaskMemoryPool::new(pool)
});
per_task_memory_pool.num_plans += 1;
Arc::clone(&per_task_memory_pool.memory_pool)
}
MemoryPoolType::Greedy => Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(memory_pool_config.pool_size),
Expand Down
Loading