diff --git a/.github/scripts/run-sql-bench.sh b/.github/scripts/run-sql-bench.sh index 96c2e2f19dc..4c8ec687d42 100755 --- a/.github/scripts/run-sql-bench.sh +++ b/.github/scripts/run-sql-bench.sh @@ -107,6 +107,7 @@ if [[ -n "$df_formats" ]]; then # shellcheck disable=SC2086 target/release_debug/datafusion-bench "$subcommand" \ -d gh-json \ + --show-session-metrics \ --formats "$df_formats" \ $opts \ -o df-results.json @@ -118,6 +119,7 @@ if [[ -n "$ddb_formats" ]]; then # shellcheck disable=SC2086 target/release_debug/duckdb-bench "$subcommand" \ -d gh-json \ + --show-session-metrics \ --formats "$ddb_formats" \ $opts \ --delete-duckdb-database \ @@ -131,6 +133,7 @@ if ! $is_remote && [[ "$has_lance" == "true" ]] && [[ -f "target/release_debug/l # shellcheck disable=SC2086 target/release_debug/lance-bench "$subcommand" \ -d gh-json \ + --show-session-metrics \ $opts \ -o lance-results.json diff --git a/AGENTS.md b/AGENTS.md index e6bf3b12c5d..30af7143968 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -10,6 +10,9 @@ think you've finished work. * run `cargo xtask public-api` to re-generate the public API lock files. Please do this every time you reach a stopping point or think you've finished work. +* exception: for trivial, isolated changes that cannot affect formatting or linting (for example constant-only edits + with no import/signature/control-flow changes), skip full-workspace `fmt`/`clippy`/`public-api` sweeps unless + explicitly requested; prefer targeted validation (for example crate-level `cargo check`/tests) only. * you can try running `cargo fix --lib --allow-dirty --allow-staged && cargo clippy --fix --lib --allow-dirty --allow-staged` to automatically many fix minor errors. @@ -64,4 +67,4 @@ ## Commits -* All commits must be signed of by the committers in the form `Signed-off-by: "COMMITTER" `. \ No newline at end of file +* All commits must be signed of by the committers in the form `Signed-off-by: "COMMITTER" `. diff --git a/Cargo.lock b/Cargo.lock index 9778e8403bc..cd1288cb763 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10184,6 +10184,7 @@ dependencies = [ "vortex-error", "vortex-flatbuffers", "vortex-mask", + "vortex-metrics", "vortex-proto", "vortex-session", "vortex-utils", diff --git a/bench-orchestrator/README.md b/bench-orchestrator/README.md index 3d9d7ac99c6..f24d242e593 100644 --- a/bench-orchestrator/README.md +++ b/bench-orchestrator/README.md @@ -52,6 +52,7 @@ vx-bench run [options] - `--iterations, -i`: Iterations per query (default: 5) - `--label, -l`: Label for this run (useful for later reference) - `--track-memory`: Enable memory usage tracking +- `--allocator`: Host allocator mode: `pooled`, `default`, or `ab` (run both in one run) - `--build/--no-build`: Build binaries before running (default: build) ### `compare` - Compare Results @@ -71,9 +72,9 @@ vx-bench compare [options] - `--format`: Filter results to a specific format - `--threshold`: Significance threshold (default: 0.10 = 10%) -**Within-run comparison** (`--run`): Compares different engine:format combinations within a single run. Output shows one row per query, with columns for each engine:format combo. +**Within-run comparison** (`--run`): Compares different engine:format (or engine:format:allocator) combinations within a single run. Output shows one row per query, with columns for each combo. -**Multi-run comparison** (`--runs`): Compares the same benchmarks across multiple runs. Output shows one row per (query, engine, format) combination, with columns for each run. +**Multi-run comparison** (`--runs`): Compares the same benchmarks across multiple runs. Output shows one row per (query, engine, format[, allocator]) combination, with columns for each run. ### `list` - List Benchmark Runs diff --git a/bench-orchestrator/bench_orchestrator/cli.py b/bench-orchestrator/bench_orchestrator/cli.py index eba7521fe9f..5a5f3c3cd63 100644 --- a/bench-orchestrator/bench_orchestrator/cli.py +++ b/bench-orchestrator/bench_orchestrator/cli.py @@ -4,7 +4,7 @@ """CLI for benchmark orchestration.""" from datetime import datetime, timedelta -from typing import Annotated +from typing import Annotated, Literal import pandas as pd import typer @@ -76,12 +76,22 @@ def run( iterations: Annotated[int, typer.Option("--iterations", "-i", help="Iterations per query")] = 5, label: Annotated[str | None, typer.Option("--label", "-l", help="Label for this run")] = None, track_memory: Annotated[bool, typer.Option("--track-memory", help="Track memory usage")] = False, + show_session_metrics: Annotated[ + bool, typer.Option("--show-session-metrics", help="Print session metrics after each engine run") + ] = False, samply: Annotated[bool, typer.Option("--samply", help="Record a profile using samply")] = False, sample_rate: Annotated[int, typer.Option("--sample-rate", help="Sample rate to run samply with")] = None, tracing: Annotated[bool, typer.Option("--tracing", help="Record a trace for use with perfetto")] = False, build: Annotated[bool, typer.Option("--build/--no-build", help="Build binaries before running")] = True, verbose: Annotated[bool, typer.Option("--verbose", "-v", help="Log underlying commands")] = False, options: Annotated[list[str] | None, typer.Option("--opt", help="Engine or benchmark specific options")] = None, + allocator: Annotated[ + str, + typer.Option( + "--allocator", + help="Host allocator mode: pooled, default, or ab (run both and compare)", + ), + ] = "pooled", ) -> None: """Run benchmarks with specified configuration.""" engines = parse_engines(engine) @@ -89,6 +99,25 @@ def run( query_list = parse_queries(queries) exclude_list = parse_queries(exclude_queries) bench_opts = {} + allocator_mode_value = allocator.strip().lower() + if allocator_mode_value not in {"pooled", "default", "ab"}: + console.print("[red]--allocator must be one of: pooled, default, ab[/red]") + raise typer.Exit(1) + + allocator_mode: Literal["pooled", "default", "ab"] + if allocator_mode_value == "default": + allocator_mode = "default" + elif allocator_mode_value == "pooled": + allocator_mode = "pooled" + else: + allocator_mode = "ab" + + allocator_variants: list[Literal["pooled", "default"]] + if allocator_mode == "ab": + allocator_variants = ["default", "pooled"] + else: + allocator_variants = [allocator_mode] + # Build options dict if options: for opt in options: @@ -108,6 +137,7 @@ def run( label=label, options=bench_opts, track_memory=track_memory, + allocator_mode=allocator_mode, ) # Validate configuration @@ -135,6 +165,7 @@ def run( console.print(f" Engines: {', '.join(e.value for e in engines)}") console.print(f" Formats: {', '.join(f.value for f in formats)}") console.print(f" Iterations: {iterations}") + console.print(f" Allocator: {allocator_mode}") if label: console.print(f" Label: {label}") console.print() @@ -153,20 +184,27 @@ def run( executor = BenchmarkExecutor(binary_paths[eng], eng, verbose=verbose) try: - results = executor.run( - benchmark=benchmark, - formats=engine_formats, - queries=query_list, - exclude_queries=exclude_list, - iterations=iterations, - options=bench_opts, - track_memory=track_memory, - samply=samply, - sample_rate=sample_rate, - tracing=tracing, - on_result=ctx.write_raw_json, - ) - console.print(f"[green]{eng.value}: {len(results)} results[/green]") + total_results = 0 + for allocator_variant in allocator_variants: + results = executor.run( + benchmark=benchmark, + formats=engine_formats, + queries=query_list, + exclude_queries=exclude_list, + iterations=iterations, + options=bench_opts, + track_memory=track_memory, + show_session_metrics=show_session_metrics, + samply=samply, + sample_rate=sample_rate, + tracing=tracing, + allocator_mode=allocator_variant, + on_result=ctx.write_raw_json, + ) + total_results += len(results) + console.print(f"[green]{eng.value} ({allocator_variant}): {len(results)} results[/green]") + if allocator_mode == "ab": + console.print(f"[green]{eng.value}: {total_results} total results[/green]") except RuntimeError as e: console.print(f"[red]{eng.value} failed: {e}[/red]") @@ -321,7 +359,10 @@ def compare( console.print(f"[red]{e}[/red]") raise typer.Exit(1) - table = pivot_comparison_table(pivot, threshold, row_keys=["query", "engine", "format"]) + row_keys = ["query", "engine", "format"] + if "allocator" in pivot.df.columns: + row_keys.append("allocator") + table = pivot_comparison_table(pivot, threshold, row_keys=row_keys) console.print(table) return diff --git a/bench-orchestrator/bench_orchestrator/comparison/analyzer.py b/bench-orchestrator/bench_orchestrator/comparison/analyzer.py index 7ac62f188ff..7b43196c6c6 100644 --- a/bench-orchestrator/bench_orchestrator/comparison/analyzer.py +++ b/bench-orchestrator/bench_orchestrator/comparison/analyzer.py @@ -61,6 +61,7 @@ def extract_target_fields(df: pd.DataFrame) -> pd.DataFrame: if isinstance(first_target, dict): df["engine"] = df["target"].apply(lambda t: t.get("engine", "") if isinstance(t, dict) else "") df["format"] = df["target"].apply(lambda t: t.get("format", "") if isinstance(t, dict) else "") + df["allocator"] = df["target"].apply(lambda t: t.get("allocator", "") if isinstance(t, dict) else "") # Extract query number from name if present if "name" in df.columns: @@ -196,9 +197,15 @@ def compare_within_run( if filter_format is not None and "format" in df.columns: df = df[df["format"] == filter_format] - # Find unique engine:format combinations - combos_df = df.groupby(["engine", "format"]).size().reset_index()[["engine", "format"]] - columns = [f"{row['engine']}:{row['format']}" for _, row in combos_df.iterrows()] + has_allocator = "allocator" in df.columns and df["allocator"].nunique(dropna=True) > 1 + + # Find unique engine:format[:allocator] combinations. + combo_keys = ["engine", "format"] + (["allocator"] if has_allocator else []) + combos_df = df.groupby(combo_keys).size().reset_index()[combo_keys] + if has_allocator: + columns = [f"{row['engine']}:{row['format']}:{row['allocator']}" for _, row in combos_df.iterrows()] + else: + columns = [f"{row['engine']}:{row['format']}" for _, row in combos_df.iterrows()] if len(columns) < 2: raise ValueError("Need at least 2 engine:format combinations to compare") @@ -208,10 +215,17 @@ def compare_within_run( baseline_engine = combos_df.iloc[0]["engine"] baseline_format = combos_df.iloc[0]["format"] - baseline_key = f"{baseline_engine}:{baseline_format}" + if has_allocator: + baseline_allocator = combos_df.iloc[0]["allocator"] + baseline_key = f"{baseline_engine}:{baseline_format}:{baseline_allocator}" + else: + baseline_key = f"{baseline_engine}:{baseline_format}" - # Create engine:format column - df["combo"] = df["engine"] + ":" + df["format"] + # Create combo column. + if has_allocator: + df["combo"] = df["engine"] + ":" + df["format"] + ":" + df["allocator"] + else: + df["combo"] = df["engine"] + ":" + df["format"] # Pivot to get queries as rows, combos as columns pivot = df.pivot_table(index="query", columns="combo", values="value", aggfunc="mean") @@ -268,8 +282,11 @@ def compare_runs( # Combine all runs combined: pd.DataFrame = pd.concat(processed, ignore_index=True) - # Pivot to get (query, engine, format) as rows, runs as columns - pivot = combined.pivot_table(index=["query", "engine", "format"], columns="run", values="value", aggfunc="mean") + # Pivot to get (query, engine, format[, allocator]) as rows, runs as columns. + index = ["query", "engine", "format"] + if "allocator" in combined.columns and combined["allocator"].nunique(dropna=True) > 1: + index.append("allocator") + pivot = combined.pivot_table(index=index, columns="run", values="value", aggfunc="mean") # Deduplicate labels while preserving order (two runs can share a label). unique_labels = list(dict.fromkeys(labels)) diff --git a/bench-orchestrator/bench_orchestrator/config.py b/bench-orchestrator/bench_orchestrator/config.py index 5a99f7262e8..98918c61131 100644 --- a/bench-orchestrator/bench_orchestrator/config.py +++ b/bench-orchestrator/bench_orchestrator/config.py @@ -6,6 +6,7 @@ from dataclasses import dataclass, field from enum import Enum from pathlib import Path +from typing import Literal class Engine(Enum): @@ -78,6 +79,7 @@ class RunConfig: label: str | None = None options: dict[str, str] = field(default_factory=dict) track_memory: bool = False + allocator_mode: Literal["pooled", "default", "ab"] = "pooled" def validate(self) -> list[str]: """Validate the configuration and return any warnings.""" diff --git a/bench-orchestrator/bench_orchestrator/runner/executor.py b/bench-orchestrator/bench_orchestrator/runner/executor.py index a80080d8d5a..feafa520e1d 100644 --- a/bench-orchestrator/bench_orchestrator/runner/executor.py +++ b/bench-orchestrator/bench_orchestrator/runner/executor.py @@ -3,10 +3,12 @@ """Benchmark binary execution.""" +import json +import os import subprocess from collections.abc import Callable from pathlib import Path -from typing import final +from typing import Literal, final from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn @@ -34,9 +36,11 @@ def run( iterations: int = 5, options: dict[str, str] | None = None, track_memory: bool = False, + show_session_metrics: bool = False, samply: bool = False, sample_rate: int | None = None, tracing: bool = False, + allocator_mode: Literal["pooled", "default"] = "pooled", on_result: Callable[[str], None] | None = None, ) -> list[str]: """ @@ -50,6 +54,8 @@ def run( iterations: Number of runs per query options: Additional options (e.g., scale_factor) track_memory: Enable memory tracking + show_session_metrics: Print session metrics at the end of the benchmark process + allocator_mode: Host allocator mode ("pooled" or "default") on_result: Callback for each result line (for streaming) Returns: @@ -73,6 +79,8 @@ def run( cmd.extend(["--exclude-queries", ",".join(map(str, exclude_queries))]) if track_memory: cmd.append("--track-memory") + if show_session_metrics: + cmd.append("--show-session-metrics") if tracing: cmd.append("--tracing") if options: @@ -83,7 +91,7 @@ def run( cmd = ["--"] + cmd cmd_prefix = ["samply", "record"] if sample_rate: - cmd = cmd_prefix + ["--rate", sample_rate] + cmd + cmd = cmd_prefix + ["--rate", str(sample_rate)] + cmd else: cmd = cmd_prefix + cmd @@ -94,6 +102,9 @@ def run( if self.verbose: console.print(f"[dim]$ {' '.join(cmd)}[/dim]") + env = os.environ.copy() + env["VORTEX_HOST_ALLOCATOR"] = allocator_mode + results = [] with Progress( @@ -108,11 +119,20 @@ def run( cmd, stdout=subprocess.PIPE, text=True, + env=env, ) for line in iter(process.stdout.readline, ""): line = line.strip() if line: + if line.startswith("{"): + try: + payload = json.loads(line) + if isinstance(payload.get("target"), dict): + payload["target"]["allocator"] = allocator_mode + line = json.dumps(payload) + except json.JSONDecodeError: + pass results.append(line) if on_result: on_result(line) diff --git a/bench-orchestrator/bench_orchestrator/storage/store.py b/bench-orchestrator/bench_orchestrator/storage/store.py index 1703af1ad6f..c1f544f527f 100644 --- a/bench-orchestrator/bench_orchestrator/storage/store.py +++ b/bench-orchestrator/bench_orchestrator/storage/store.py @@ -139,7 +139,10 @@ def create_run(self, config: RunConfig, build_config: BuildConfig) -> Iterator[R timestamp=datetime.now(), label=config.label, benchmark=config.benchmark.value, - dataset_config=config.options, + dataset_config={ + **config.options, + "allocator_mode": config.allocator_mode, + }, engines=[e.value for e in config.engines], formats=[f.value for f in config.formats], queries=config.queries or [], diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index 8d85dd8f4eb..ac914454ea9 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -26,6 +26,7 @@ use datafusion_physical_plan::collect; use futures::StreamExt; use parking_lot::Mutex; use tokio::fs::File; +use vortex::array::memory::MemorySessionExt; use vortex::scan::DataSourceRef; use vortex_bench::Benchmark; use vortex_bench::BenchmarkArg; @@ -97,6 +98,9 @@ struct Args { #[arg(long, default_value_t = false)] explain: bool, + #[arg(long, default_value_t = false)] + show_session_metrics: bool, + #[arg(long, value_delimiter = ',', value_parser = value_parser!(Format))] formats: Vec, @@ -227,6 +231,10 @@ async fn main() -> anyhow::Result<()> { runner.export_to(&args.display_format, writer)?; } + if args.show_session_metrics { + vortex_bench::print_session_metrics(); + } + Ok(()) } @@ -307,6 +315,7 @@ async fn register_v2_tables( let fs: vortex::io::filesystem::FileSystemRef = Arc::new(ObjectStoreFileSystem::new( Arc::clone(&store), SESSION.handle(), + SESSION.allocator(), )); let base_prefix = benchmark_base.path().trim_start_matches('/').to_string(); let fs = fs.with_prefix(base_prefix); diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index 7ab8f1ac7ab..9a1ec92a86e 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -81,6 +81,9 @@ struct Args { to keep all work on the same threads" )] reuse: bool, + + #[arg(long, default_value_t = false)] + show_session_metrics: bool, } fn main() -> anyhow::Result<()> { @@ -191,5 +194,9 @@ fn main() -> anyhow::Result<()> { runner.export_to(&args.display_format, writer)?; } + if args.show_session_metrics { + vortex_bench::print_session_metrics(); + } + Ok(()) } diff --git a/benchmarks/lance-bench/src/main.rs b/benchmarks/lance-bench/src/main.rs index 73fa8426ffe..2cc0bb056dd 100644 --- a/benchmarks/lance-bench/src/main.rs +++ b/benchmarks/lance-bench/src/main.rs @@ -65,6 +65,9 @@ struct Args { #[arg(long, default_value_t = false)] track_memory: bool, + #[arg(long, default_value_t = false)] + show_session_metrics: bool, + #[arg(long = "opt", value_delimiter = ',', value_parser = value_parser!(Opt))] options: Vec, } @@ -124,6 +127,10 @@ async fn main() -> anyhow::Result<()> { let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?; runner.export_to(&args.display_format, writer)?; + if args.show_session_metrics { + vortex_bench::print_session_metrics(); + } + Ok(()) } diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index dbdfd5a2cd9..9585e96bebe 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -72,6 +72,7 @@ vortex-buffer = { workspace = true, features = ["arrow"] } vortex-error = { workspace = true, features = ["flatbuffers"] } vortex-flatbuffers = { workspace = true, features = ["array", "dtype"] } vortex-mask = { workspace = true } +vortex-metrics = { workspace = true } vortex-proto = { workspace = true, features = ["dtype", "expr", "scalar"] } vortex-session = { workspace = true } vortex-utils = { workspace = true, features = ["dyn-traits"] } diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 885e2add71e..25b9248d98c 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -13298,6 +13298,26 @@ impl core::fmt::Debug for vortex_array::memory::MemorySession pub fn vortex_array::memory::MemorySession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +pub struct vortex_array::memory::PooledHostAllocator + +impl vortex_array::memory::PooledHostAllocator + +pub fn vortex_array::memory::PooledHostAllocator::max_bytes_per_thread(&self) -> usize + +pub fn vortex_array::memory::PooledHostAllocator::new(max_bytes_per_thread: usize, metrics_registry: alloc::sync::Arc) -> Self + +impl core::default::Default for vortex_array::memory::PooledHostAllocator + +pub fn vortex_array::memory::PooledHostAllocator::default() -> Self + +impl core::fmt::Debug for vortex_array::memory::PooledHostAllocator + +pub fn vortex_array::memory::PooledHostAllocator::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::memory::HostAllocator for vortex_array::memory::PooledHostAllocator + +pub fn vortex_array::memory::PooledHostAllocator::allocate(&self, len: usize, requested_alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult + pub struct vortex_array::memory::WritableHostBuffer impl vortex_array::memory::WritableHostBuffer @@ -13330,6 +13350,10 @@ impl vortex_array::memory::HostAllocator for vortex_array::memory::DefaultHostAl pub fn vortex_array::memory::DefaultHostAllocator::allocate(&self, len: usize, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult +impl vortex_array::memory::HostAllocator for vortex_array::memory::PooledHostAllocator + +pub fn vortex_array::memory::PooledHostAllocator::allocate(&self, len: usize, requested_alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult + pub trait vortex_array::memory::HostAllocatorExt: vortex_array::memory::HostAllocator pub fn vortex_array::memory::HostAllocatorExt::allocate_typed(&self, len: usize) -> vortex_error::VortexResult @@ -13366,6 +13390,8 @@ pub fn S::memory(&self) -> vortex_session::Ref<'_, vortex_array::memory::MemoryS pub fn S::memory_mut(&self) -> vortex_session::RefMut<'_, vortex_array::memory::MemorySession> +pub fn vortex_array::memory::default_pooled_allocator_metrics_snapshot() -> alloc::vec::Vec + pub type vortex_array::memory::HostAllocatorRef = alloc::sync::Arc pub mod vortex_array::normalize diff --git a/vortex-array/src/memory.rs b/vortex-array/src/memory/mod.rs similarity index 91% rename from vortex-array/src/memory.rs rename to vortex-array/src/memory/mod.rs index 47030b18131..96548d29035 100644 --- a/vortex-array/src/memory.rs +++ b/vortex-array/src/memory/mod.rs @@ -19,6 +19,13 @@ use vortex_session::Ref; use vortex_session::RefMut; use vortex_session::SessionExt; +mod pool; + +pub use pool::PooledHostAllocator; +pub use pool::default_pooled_allocator_metrics_snapshot; + +const VORTEX_HOST_ALLOCATOR_ENV: &str = "VORTEX_HOST_ALLOCATOR"; + /// Mutable host buffer contract used by [`WritableHostBuffer`]. pub trait HostBufferMut: Send + 'static { /// Returns the logical byte length of the buffer. @@ -195,7 +202,14 @@ impl MemorySession { impl Default for MemorySession { fn default() -> Self { - Self::new(Arc::new(DefaultHostAllocator)) + Self::new(default_host_allocator()) + } +} + +fn default_host_allocator() -> HostAllocatorRef { + match std::env::var(VORTEX_HOST_ALLOCATOR_ENV).as_deref() { + Ok("default") => Arc::new(DefaultHostAllocator), + _ => Arc::new(PooledHostAllocator::default()), } } @@ -225,21 +239,34 @@ pub struct DefaultHostAllocator; impl HostAllocator for DefaultHostAllocator { fn allocate(&self, len: usize, alignment: Alignment) -> VortexResult { - let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment); - // SAFETY: We fully initialize this slice before freezing it. - unsafe { buffer.set_len(len) }; - Ok(WritableHostBuffer::new(Box::new( - DefaultWritableHostBuffer { buffer, alignment }, - ))) + allocate_unpooled(len, alignment) } } +pub(super) fn allocate_unpooled( + len: usize, + alignment: Alignment, +) -> VortexResult { + let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment); + // SAFETY: We fully initialize this slice before freezing it. + unsafe { buffer.set_len(len) }; + Ok(WritableHostBuffer::new(Box::new( + DefaultWritableHostBuffer::new(buffer, alignment), + ))) +} + #[derive(Debug)] struct DefaultWritableHostBuffer { buffer: ByteBufferMut, alignment: Alignment, } +impl DefaultWritableHostBuffer { + fn new(buffer: ByteBufferMut, alignment: Alignment) -> Self { + Self { buffer, alignment } + } +} + #[derive(Debug)] struct HostBufferOwner { buffer: ByteBufferMut, diff --git a/vortex-array/src/memory/pool.rs b/vortex-array/src/memory/pool.rs new file mode 100644 index 00000000000..218e26d5a22 --- /dev/null +++ b/vortex-array/src/memory/pool.rs @@ -0,0 +1,726 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::cell::RefCell; +use std::sync::Arc; +use std::sync::OnceLock; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use bytes::Bytes; +use vortex_buffer::Alignment; +use vortex_buffer::ByteBuffer; +use vortex_buffer::ByteBufferMut; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_metrics::Counter; +use vortex_metrics::DefaultMetricsRegistry; +use vortex_metrics::Gauge; +use vortex_metrics::Histogram; +use vortex_metrics::Label; +use vortex_metrics::Metric; +use vortex_metrics::MetricBuilder; +use vortex_metrics::MetricsRegistry; +use vortex_utils::aliases::hash_map::HashMap; + +use super::HostAllocator; +use super::HostBufferMut; +use super::WritableHostBuffer; +use super::allocate_unpooled; + +const DEFAULT_MAX_BYTES_PER_THREAD: usize = 64 * 1024 * 1024; +const POOL_ALIGNMENT_BYTES: usize = 4 * 1024; + +// (bucket_size_bytes, max_entries_per_thread) +const POOL_BUCKETS: &[(usize, usize)] = &[ + (4 * 1024, 256), + (16 * 1024, 128), + (64 * 1024, 64), + (128 * 1024, 32), + (256 * 1024, 16), + (512 * 1024, 12), + (1024 * 1024, 12), + (2 * 1024 * 1024, 6), + (4 * 1024 * 1024, 3), + (8 * 1024 * 1024, 1), + (16 * 1024 * 1024, 1), +]; + +static NEXT_POOLED_ALLOCATOR_ID: AtomicU64 = AtomicU64::new(1); + +thread_local! { + static POOLED_HOST_ALLOCATOR_POOLS: RefCell> = + RefCell::new(HashMap::new()); +} + +fn default_pooled_metrics_registry() -> DefaultMetricsRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY + .get_or_init(DefaultMetricsRegistry::default) + .clone() +} + +/// Returns a snapshot of metrics recorded by default-constructed pooled host allocators. +pub fn default_pooled_allocator_metrics_snapshot() -> Vec { + default_pooled_metrics_registry().snapshot() +} + +/// A pooled host allocator with thread-local buckets and metric instrumentation. +#[derive(Debug)] +pub struct PooledHostAllocator { + id: u64, + max_bytes_per_thread: usize, + metrics: Arc, +} + +impl PooledHostAllocator { + /// Create a pooled allocator. + /// + /// `max_bytes_per_thread` controls the maximum total capacity retained in the thread-local + /// pool for this allocator. Set it to `0` to disable pooling while still recording metrics. + pub fn new(max_bytes_per_thread: usize, metrics_registry: Arc) -> Self { + let id = NEXT_POOLED_ALLOCATOR_ID.fetch_add(1, Ordering::Relaxed); + let labels = vec![ + Label::new("allocator", "pooled_host"), + Label::new("allocator_id", id.to_string()), + ]; + + Self { + id, + max_bytes_per_thread, + metrics: Arc::new(PooledAllocatorMetrics::new( + metrics_registry.as_ref(), + labels, + )), + } + } + + /// Maximum retained bytes per thread for this allocator. + pub fn max_bytes_per_thread(&self) -> usize { + self.max_bytes_per_thread + } +} + +impl Default for PooledHostAllocator { + fn default() -> Self { + Self::new( + DEFAULT_MAX_BYTES_PER_THREAD, + Arc::new(default_pooled_metrics_registry()), + ) + } +} + +impl HostAllocator for PooledHostAllocator { + fn allocate( + &self, + len: usize, + requested_alignment: Alignment, + ) -> VortexResult { + self.metrics.alloc_requests.add(1); + self.metrics.request_bytes.update(len as f64); + + if self.max_bytes_per_thread == 0 { + self.metrics.bypass_disabled.add(1); + return allocate_unpooled(len, requested_alignment); + } + + let pool_alignment = pooled_alignment(); + if !pool_alignment.is_aligned_to(requested_alignment) { + self.metrics.bypass_alignment.add(1); + return allocate_unpooled(len, requested_alignment); + } + + let Some(bucket_idx) = bucket_index_for_len(len) else { + self.metrics.bypass_size.add(1); + self.metrics.request_unbucketed_bytes.update(len as f64); + return allocate_unpooled(len, requested_alignment); + }; + + if let Some(bucket_metrics) = self.metrics.bucket(bucket_idx) { + bucket_metrics.requests.add(1); + } + + let (bucket_size, _) = POOL_BUCKETS[bucket_idx]; + if bucket_size > self.max_bytes_per_thread { + self.metrics.bypass_size.add(1); + return allocate_unpooled(len, requested_alignment); + } + + let ( + pooled, + retained_bytes, + retained_buffers, + bucket_retained_bytes, + bucket_retained_buffers, + ) = with_allocator_pool(self.id, |pool| { + let pooled = pool.take_buffer(bucket_idx); + ( + pooled, + pool.retained_bytes, + pool.buffer_count(), + pool.bucket_retained_bytes(bucket_idx), + pool.bucket_len(bucket_idx), + ) + }); + + self.metrics.retained_bytes.set(retained_bytes as f64); + self.metrics.retained_buffers.set(retained_buffers as f64); + self.metrics.set_bucket_retained( + bucket_idx, + bucket_retained_bytes, + bucket_retained_buffers, + ); + self.metrics.bucket_bytes.update(bucket_size as f64); + + let mut buffer = if let Some(buffer) = pooled { + if buffer.capacity() >= len { + self.metrics.hits.add(1); + if let Some(bucket_metrics) = self.metrics.bucket(bucket_idx) { + bucket_metrics.hits.add(1); + } + buffer + } else { + self.metrics.drops.add(1); + self.metrics + .add_bucket_drop(bucket_idx, DropReason::InvalidCapacity); + self.metrics.misses.add(1); + if let Some(bucket_metrics) = self.metrics.bucket(bucket_idx) { + bucket_metrics.misses.add(1); + } + ByteBufferMut::with_capacity_aligned(bucket_size, pool_alignment) + } + } else { + self.metrics.misses.add(1); + if let Some(bucket_metrics) = self.metrics.bucket(bucket_idx) { + bucket_metrics.misses.add(1); + } + ByteBufferMut::with_capacity_aligned(bucket_size, pool_alignment) + }; + + // SAFETY: We fully initialize this slice before freezing it. + unsafe { buffer.set_len(len) }; + + Ok(WritableHostBuffer::new(Box::new( + PooledWritableHostBuffer::new( + buffer, + requested_alignment, + PooledReturn { + allocator_id: self.id, + bucket_idx, + max_bytes_per_thread: self.max_bytes_per_thread, + metrics: Arc::clone(&self.metrics), + }, + ), + ))) + } +} + +#[derive(Debug)] +struct PooledAllocatorMetrics { + alloc_requests: Counter, + hits: Counter, + misses: Counter, + puts: Counter, + drops: Counter, + bypass_alignment: Counter, + bypass_size: Counter, + bypass_disabled: Counter, + request_bytes: Histogram, + request_unbucketed_bytes: Histogram, + bucket_bytes: Histogram, + retained_bytes: Gauge, + retained_buffers: Gauge, + bucket_metrics: Vec, +} + +impl PooledAllocatorMetrics { + fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec