From 2cc9baaca16778e08d80e30e5b3bc83f2df4b703 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Thu, 21 May 2026 11:11:48 +0200 Subject: [PATCH 01/13] Replace file-metadata, list-files, statistics cache with the generic cache --- datafusion-cli/src/functions.rs | 41 +- datafusion-cli/src/main.rs | 9 +- datafusion/catalog-listing/src/table.rs | 6 +- .../src/datasource/listing_table_factory.rs | 12 +- datafusion/core/src/execution/context/mod.rs | 5 +- .../core/tests/parquet/file_statistics.rs | 21 +- datafusion/core/tests/sql/runtime_config.rs | 17 +- .../datasource-parquet/src/file_format.rs | 4 +- datafusion/datasource-parquet/src/metadata.rs | 7 +- datafusion/datasource-parquet/src/reader.rs | 10 +- datafusion/datasource/src/url.rs | 2 +- .../execution/src/cache/cache_manager.rs | 284 +++---- .../execution/src/cache/default_cache.rs | 311 +++++++ .../src/cache/file_metadata_cache.rs | 462 +++------- .../src/cache/file_statistics_cache.rs | 307 +------ .../execution/src/cache/list_files_cache.rs | 799 ++++-------------- datafusion/execution/src/cache/mod.rs | 92 +- 17 files changed, 866 insertions(+), 1523 deletions(-) create mode 100644 datafusion/execution/src/cache/default_cache.rs diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index df066992fb979..cb2372958e735 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -42,6 +42,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::scalar::ScalarValue; use async_trait::async_trait; +use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use parquet::basic::ConvertedType; use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::reader::FileReader; @@ -546,15 +547,17 @@ impl TableFunctionImpl for MetadataCacheFunc { for (path, entry) in cached_entries { path_arr.push(path.to_string()); file_modified_arr - .push(Some(entry.object_meta.last_modified.timestamp_millis())); - file_size_bytes_arr.push(entry.object_meta.size); - e_tag_arr.push(entry.object_meta.e_tag); - version_arr.push(entry.object_meta.version); + .push(Some(entry.value.meta.last_modified.timestamp_millis())); + file_size_bytes_arr.push(entry.value.meta.size); + e_tag_arr.push(entry.value.meta.e_tag); + version_arr.push(entry.value.meta.version); metadata_size_bytes.push(entry.size_bytes as u64); hits_arr.push(entry.hits as u64); let mut extra = entry - .extra + .value + .file_metadata + .extra_info() .iter() .map(|(k, v)| format!("{k}={v}")) .collect::>(); @@ -667,14 +670,22 @@ impl TableFunctionImpl for StatisticsCacheFunc { table_arr .push(path.table.map_or_else(|| "".to_string(), |t| t.to_string())); file_modified_arr - .push(Some(entry.object_meta.last_modified.timestamp_millis())); - file_size_bytes_arr.push(entry.object_meta.size); - e_tag_arr.push(entry.object_meta.e_tag); - version_arr.push(entry.object_meta.version); - num_rows_arr.push(entry.num_rows.to_string()); - num_columns_arr.push(entry.num_columns as u64); - table_size_bytes_arr.push(entry.table_size_bytes.to_string()); - statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64); + .push(Some(entry.value.meta.last_modified.timestamp_millis())); + file_size_bytes_arr.push(entry.value.meta.size); + e_tag_arr.push(entry.value.meta.e_tag); + version_arr.push(entry.value.meta.version); + num_rows_arr.push(entry.value.statistics.num_rows.to_string()); + num_columns_arr + .push(entry.value.statistics.column_statistics.len() as u64); + table_size_bytes_arr + .push(entry.value.statistics.total_byte_size.to_string()); + statistics_size_bytes_arr.push( + entry + .value + .statistics + .heap_size(&mut DFHeapSizeCtx::default()) + as u64, + ); } } @@ -827,14 +838,14 @@ impl TableFunctionImpl for ListFilesCacheFunc { .map(|t| t.duration_since(now).as_millis() as i64), ); - for meta in entry.metas.files.iter() { + for meta in entry.value.files.iter() { file_path_arr.push(meta.location.to_string()); file_modified_arr.push(meta.last_modified.timestamp_millis()); file_size_bytes_arr.push(meta.size); etag_arr.push(meta.e_tag.clone()); version_arr.push(meta.version.clone()); } - current_offset += entry.metas.files.len() as i32; + current_offset += entry.value.files.len() as i32; offsets.push(current_offset); } } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 935bf0a9744dd..e51d09c4b646d 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -441,9 +441,10 @@ mod tests { use std::time::Duration; use super::*; + use datafusion::execution::cache::default_cache::DefaultCache; use datafusion::{ common::test_util::batches_to_string, - execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig}, + execution::cache::cache_manager::CacheManagerConfig, prelude::{ParquetReadOptions, col, lit, split_part}, }; use insta::assert_snapshot; @@ -700,10 +701,8 @@ mod tests { #[tokio::test] async fn test_list_files_cache() -> Result<(), DataFusionError> { - let list_files_cache = Arc::new(DefaultListFilesCache::new( - 1024, - Some(Duration::from_secs(1)), - )); + let list_files_cache = + Arc::new(DefaultCache::with_ttl(1024, Some(Duration::from_secs(1)))); let rt = RuntimeEnvBuilder::new() .with_cache_manager( diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index dd3675bd2b39d..5e2507a79984c 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -34,8 +34,8 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics, }; -use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; +use datafusion_execution::cache::cache_manager::TableScopedPath; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; @@ -186,7 +186,7 @@ pub struct ListingTable { /// The SQL definition for this table, if any definition: Option, /// Cache for collected file statistics - collected_statistics: Option>, + collected_statistics: Option>, /// Constraints applied to this table constraints: Constraints, /// Column default expressions for columns that are not physically present in the data files @@ -259,7 +259,7 @@ impl ListingTable { /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics /// multiple times in the same session. /// - pub fn with_cache(mut self, cache: Option>) -> Self { + pub fn with_cache(mut self, cache: Option>) -> Self { self.collected_statistics = cache; self } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 349d941cc2bda..3c0bd3f630240 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -230,8 +230,9 @@ mod tests { test_util::parquet_test_data, }; use datafusion_execution::cache::CacheAccessor; - use datafusion_execution::cache::cache_manager::CacheManagerConfig; - use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; + use datafusion_execution::cache::cache_manager::{ + CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + }; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use glob::Pattern; @@ -242,6 +243,7 @@ mod tests { use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{DFSchema, TableReference}; + use datafusion_execution::cache::default_cache::DefaultCache; use datafusion_expr::registry::ExtensionTypeRegistryRef; #[tokio::test] @@ -484,7 +486,8 @@ mod tests { .to_string(); // Test with collect_statistics enabled - let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); + let file_statistics_cache = + Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT)); let cache_config = CacheManagerConfig::default() .with_file_statistics_cache(Some(file_statistics_cache.clone())); let runtime = RuntimeEnvBuilder::new() @@ -514,7 +517,8 @@ mod tests { ); // Test with collect_statistics disabled - let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); + let file_statistics_cache = + Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT)); let cache_config = CacheManagerConfig::default() .with_file_statistics_cache(Some(file_statistics_cache.clone())); let runtime = RuntimeEnvBuilder::new() diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b2ad5c7d7ada0..9176f0defc68d 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -76,8 +76,8 @@ use datafusion_common::{ }; pub use datafusion_execution::TaskContext; use datafusion_execution::cache::cache_manager::{ - DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, - DEFAULT_METADATA_CACHE_LIMIT, + DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, + DEFAULT_LIST_FILES_CACHE_TTL, DEFAULT_METADATA_CACHE_LIMIT, }; pub use datafusion_execution::config::SessionConfig; use datafusion_execution::disk_manager::{ @@ -103,7 +103,6 @@ use datafusion_session::SessionStore; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion_execution::cache::file_statistics_cache::DEFAULT_FILE_STATISTICS_MEMORY_LIMIT; use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 3e3b90a348b04..7c8e1d2ce0cc0 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -29,11 +29,11 @@ use datafusion::execution::session_state::SessionStateBuilder; use datafusion::prelude::SessionContext; use datafusion_common::DFSchema; use datafusion_common::stats::Precision; -use datafusion_execution::cache::DefaultListFilesCache; use datafusion_execution::cache::cache_manager::{ - CacheManagerConfig, FileStatisticsCache, + CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, FileStatisticsCache, ListFilesCache, }; -use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; +use datafusion_execution::cache::default_cache::DefaultCache; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::{Expr, col, lit}; @@ -238,7 +238,7 @@ async fn list_files_with_session_level_cache() { async fn get_listing_table( table_path: &ListingTableUrl, - static_cache: Option>, + static_cache: Option>, opt: &ListingOptions, ) -> ListingTable { let schema = opt @@ -256,14 +256,13 @@ async fn get_listing_table( .with_cache(static_cache) } -fn get_cache_runtime_state() -> ( - Arc, - Arc, - SessionState, -) { +fn get_cache_runtime_state() +-> (Arc, Arc, SessionState) { let cache_config = CacheManagerConfig::default(); - let file_static_cache = Arc::new(DefaultFileStatisticsCache::default()); - let list_file_cache = Arc::new(DefaultListFilesCache::default()); + let file_static_cache = + Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT)); + let list_file_cache = + Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT)); let cache_config = cache_config .with_file_statistics_cache(Some(file_static_cache.clone())) diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 2db1e1ce12f72..a9f57a0793463 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -23,9 +23,11 @@ use std::time::Duration; use datafusion::execution::context::SessionContext; use datafusion::execution::context::TaskContext; use datafusion::prelude::SessionConfig; -use datafusion_execution::cache::DefaultListFilesCache; -use datafusion_execution::cache::cache_manager::CacheManagerConfig; -use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; +use datafusion_execution::cache::cache_manager::{ + CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, +}; +use datafusion_execution::cache::default_cache::DefaultCache; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_plan::common::collect; @@ -260,7 +262,8 @@ async fn test_test_metadata_cache_limit() { #[tokio::test] async fn test_list_files_cache_limit() { - let list_files_cache = Arc::new(DefaultListFilesCache::default()); + let list_files_cache = + Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT)); let rt = RuntimeEnvBuilder::new() .with_cache_manager( @@ -303,7 +306,8 @@ async fn test_list_files_cache_limit() { #[tokio::test] async fn test_list_files_cache_ttl() { - let list_files_cache = Arc::new(DefaultListFilesCache::default()); + let list_files_cache = + Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT)); let rt = RuntimeEnvBuilder::new() .with_cache_manager( @@ -347,7 +351,8 @@ async fn test_list_files_cache_ttl() { #[tokio::test] async fn test_file_statistics_cache_limit() { - let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); + let file_statistics_cache = + Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT)); let rt = RuntimeEnvBuilder::new() .with_cache_manager( diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index fe81504e320d7..734ec6b536f69 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -626,7 +626,7 @@ pub async fn fetch_parquet_metadata( object_meta: &ObjectMeta, size_hint: Option, decryption_properties: Option<&FileDecryptionProperties>, - file_metadata_cache: Option>, + file_metadata_cache: Option>, ) -> Result> { let decryption_properties = decryption_properties.cloned().map(Arc::new); DFParquetMetadata::new(store, object_meta) @@ -650,7 +650,7 @@ pub async fn fetch_statistics( file: &ObjectMeta, metadata_size_hint: Option, decryption_properties: Option<&FileDecryptionProperties>, - file_metadata_cache: Option>, + file_metadata_cache: Option>, ) -> Result { let decryption_properties = decryption_properties.cloned().map(Arc::new); DFParquetMetadata::new(store, file) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index d3831766a42ab..1618050a8daae 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ - ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, + ColumnStatistics, DataFusionError, HashMap, Result, ScalarValue, Statistics, }; use datafusion_execution::cache::cache_manager::{ CachedFileMetadataEntry, FileMetadata, FileMetadataCache, @@ -48,7 +48,6 @@ use parquet::file::metadata::{ use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; /// Minimum fraction of row groups that must report NDV statistics for the @@ -69,7 +68,7 @@ pub struct DFParquetMetadata<'a> { object_meta: &'a ObjectMeta, metadata_size_hint: Option, decryption_properties: Option>, - file_metadata_cache: Option>, + file_metadata_cache: Option>, /// timeunit to coerce INT96 timestamps to pub coerce_int96: Option, /// Optional timezone applied to INT96-coerced timestamps. @@ -107,7 +106,7 @@ impl<'a> DFParquetMetadata<'a> { /// set file metadata cache pub fn with_file_metadata_cache( mut self, - file_metadata_cache: Option>, + file_metadata_cache: Option>, ) -> Self { self.file_metadata_cache = file_metadata_cache; self diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 482bf8dced4f8..f1d7c82b26d13 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -21,6 +21,7 @@ use crate::ParquetFileMetrics; use crate::metadata::DFParquetMetadata; use bytes::Bytes; +use datafusion_common::HashMap; use datafusion_datasource::PartitionedFile; use datafusion_execution::cache::cache_manager::FileMetadata; use datafusion_execution::cache::cache_manager::FileMetadataCache; @@ -32,7 +33,6 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::file::metadata::ParquetMetaData; use std::any::Any; -use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -182,13 +182,13 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { #[derive(Debug)] pub struct CachedParquetFileReaderFactory { store: Arc, - metadata_cache: Arc, + metadata_cache: Arc, } impl CachedParquetFileReaderFactory { pub fn new( store: Arc, - metadata_cache: Arc, + metadata_cache: Arc, ) -> Self { Self { store, @@ -241,7 +241,7 @@ pub struct CachedParquetFileReader { store: Arc, pub inner: ParquetObjectReader, partitioned_file: PartitionedFile, - metadata_cache: Arc, + metadata_cache: Arc, metadata_size_hint: Option, } @@ -251,7 +251,7 @@ impl CachedParquetFileReader { store: Arc, inner: ParquetObjectReader, partitioned_file: PartitionedFile, - metadata_cache: Arc, + metadata_cache: Arc, metadata_size_hint: Option, ) -> Self { Self { diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 4bf99fc325e2c..7985a29e4fd94 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use datafusion_common::{DataFusionError, Result, TableReference}; -use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::CachedFileList; +use datafusion_execution::cache::cache_manager::TableScopedPath; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 08a8dc9fd9cda..ddd9c57e22f5d 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -15,31 +15,59 @@ // specific language governing permissions and limitations // under the License. -use crate::cache::CacheAccessor; -use crate::cache::DefaultListFilesCache; -use crate::cache::file_statistics_cache::{ - DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DefaultFileStatisticsCache, - DefaultFilesMetadataCache, -}; -use crate::cache::list_files_cache::ListFilesEntry; -use crate::cache::list_files_cache::TableScopedPath; -use datafusion_common::TableReference; +use crate::cache::default_cache::DefaultCache; +use crate::cache::{Cache, Value}; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::stats::Precision; +use datafusion_common::{HashMap, TableReference}; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use object_store::ObjectMeta; use object_store::path::Path; use std::any::Any; -use std::collections::HashMap; -use std::fmt::{Debug, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; -pub use super::list_files_cache::{ - DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, -}; +/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap. +pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { + let mut size = object_meta.location.as_ref().len(); + + if let Some(e) = &object_meta.e_tag { + size += e.len(); + } + if let Some(v) = &object_meta.version { + size += v.len(); + } + + size +} + +/// Each entry is scoped to its use within a specific table so that the cache +/// can differentiate between identical paths in different tables, and +/// table-level cache invalidation. +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct TableScopedPath { + pub table: Option, + pub path: Path, +} + +impl DFHeapSize for TableScopedPath { + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) + } +} + +impl Display for TableScopedPath { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(table) = &self.table { + write!(f, "{}, {}", self.path, table) + } else { + write!(f, "{}", self.path) + } + } +} /// Cached metadata for a file, including statistics and ordering. /// @@ -78,37 +106,23 @@ impl CachedFileMetadata { } } -/// A cache for file statistics and orderings. -/// -/// This cache stores [`CachedFileMetadata`] which includes: -/// - File metadata for validation (size, last_modified) -/// - Statistics for the file -/// - Ordering information for the file -/// -/// If enabled via [`CacheManagerConfig::with_file_statistics_cache`] this -/// cache avoids inferring the same file statistics repeatedly during the -/// session lifetime. -/// -/// The typical usage pattern is: -/// 1. Call `get(path)` to check for cached value -/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` -/// 3. If invalid or missing, compute new value and call `put(path, new_value)` -/// -/// See [`crate::runtime_env::RuntimeEnv`] for more details -pub trait FileStatisticsCache: - CacheAccessor -{ - /// Cache memory limit in bytes. - fn cache_limit(&self) -> usize; +impl Value for CachedFileMetadata { + fn size(&self) -> usize { + DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) + } +} - /// Updates the cache with a new memory limit in bytes. - fn update_cache_limit(&self, limit: usize); +pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB - /// Retrieves the information about the entries currently cached. - fn list_entries(&self) -> HashMap; +pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite - fn drop_table_entries(&self, table_ref: &Option) -> Result<()>; -} +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MiB + +pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M + +pub type FileStatisticsCache = dyn Cache; +pub type ListFilesCache = dyn Cache; +pub type FileMetadataCache = dyn Cache; impl DFHeapSize for CachedFileMetadata { fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { @@ -181,6 +195,18 @@ impl CachedFileList { } } +impl Value for CachedFileList { + fn size(&self) -> usize { + self.files.capacity() * size_of::() + + self + .files + .iter() + .map(meta_heap_bytes) + .reduce(|acc, b| acc + b) + .unwrap_or(0) + } +} + impl Deref for CachedFileList { type Target = Arc>; fn deref(&self) -> &Self::Target { @@ -194,38 +220,6 @@ impl From> for CachedFileList { } } -/// Cache for storing the [`ObjectMeta`]s that result from listing a path -/// -/// Listing a path means doing an object store "list" operation or `ls` -/// command on the local filesystem. This operation can be expensive, -/// especially when done over remote object stores. -/// -/// The cache key is always the table's base path, ensuring a stable cache key. -/// The cached value is a [`CachedFileList`] containing the files and a timestamp. -/// -/// Partition filtering is done after retrieval using [`CachedFileList::files_matching_prefix`]. -/// -/// See [`crate::runtime_env::RuntimeEnv`] for more details. -pub trait ListFilesCache: CacheAccessor { - /// Returns the cache's memory limit in bytes. - fn cache_limit(&self) -> usize; - - /// Returns the TTL (time-to-live) for cache entries, if configured. - fn cache_ttl(&self) -> Option; - - /// Updates the cache with a new memory limit in bytes. - fn update_cache_limit(&self, limit: usize); - - /// Updates the cache with a new TTL (time-to-live). - fn update_cache_ttl(&self, ttl: Option); - - /// Retrieves the information about the entries currently cached. - fn list_entries(&self) -> HashMap; - - /// Drop all entries for the given table reference. - fn drop_table_entries(&self, table_ref: &Option) -> Result<()>; -} - /// Generic file-embedded metadata used with [`FileMetadataCache`]. /// /// For example, Parquet footers and page metadata can be represented @@ -240,7 +234,7 @@ pub trait FileMetadata: Any + Send + Sync { /// Returns the size of the metadata in bytes. fn memory_size(&self) -> usize; - /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]). + /// Returns extra information about this entry fn extra_info(&self) -> HashMap; } @@ -253,6 +247,12 @@ pub struct CachedFileMetadataEntry { pub file_metadata: Arc, } +impl Value for CachedFileMetadataEntry { + fn size(&self) -> usize { + self.file_metadata.memory_size() + } +} + impl CachedFileMetadataEntry { /// Create a new cached file metadata entry. pub fn new(meta: ObjectMeta, file_metadata: Arc) -> Self { @@ -278,37 +278,6 @@ impl Debug for CachedFileMetadataEntry { } } -/// Cache for file-embedded metadata. -/// -/// This cache stores per-file metadata in the form of [`CachedFileMetadataEntry`], -/// which includes the [`ObjectMeta`] for validation. -/// -/// For example, the built in [`ListingTable`] uses this cache to avoid parsing -/// Parquet footers multiple times for the same file. -/// -/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`], -/// and users can also provide their own implementations to implement custom -/// caching strategies. -/// -/// The typical usage pattern is: -/// 1. Call `get(path)` to check for cached value -/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` -/// 3. If invalid or missing, compute new value and call `put(path, new_value)` -/// -/// See [`crate::runtime_env::RuntimeEnv`] for more details. -/// -/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html -pub trait FileMetadataCache: CacheAccessor { - /// Returns the cache's memory limit in bytes. - fn cache_limit(&self) -> usize; - - /// Updates the cache with a new memory limit in bytes. - fn update_cache_limit(&self, limit: usize); - - /// Retrieves the information about the entries currently cached. - fn list_entries(&self) -> HashMap; -} - #[derive(Debug, Clone, PartialEq, Eq)] /// Represents information about a cached metadata entry. /// This is used to expose the metadata cache contents to outside modules. @@ -322,24 +291,6 @@ pub struct FileMetadataCacheEntry { pub extra: HashMap, } -impl Debug for dyn FileStatisticsCache { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Cache name: {} with length: {}", self.name(), self.len()) - } -} - -impl Debug for dyn ListFilesCache { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Cache name: {} with length: {}", self.name(), self.len()) - } -} - -impl Debug for dyn FileMetadataCache { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Cache name: {} with length: {}", self.name(), self.len()) - } -} - /// Manages various caches used in DataFusion. /// /// Following DataFusion design principles, DataFusion provides default cache @@ -349,28 +300,30 @@ impl Debug for dyn FileMetadataCache { /// See [`CacheManagerConfig`] for configuration options. #[derive(Debug)] pub struct CacheManager { - file_statistic_cache: Option>, - list_files_cache: Option>, - file_metadata_cache: Arc, + file_statistic_cache: Option>, + list_files_cache: Option>, + file_metadata_cache: Arc, } impl CacheManager { pub fn try_new(config: &CacheManagerConfig) -> Result> { - let file_statistic_cache = match &config.file_statistics_cache { - Some(fsc) if config.file_statistics_cache_limit > 0 => { - fsc.update_cache_limit(config.file_statistics_cache_limit); - Some(Arc::clone(fsc)) - } - None if config.file_statistics_cache_limit > 0 => { - let fsc: Arc = Arc::new( - DefaultFileStatisticsCache::new(config.file_statistics_cache_limit), - ); - Some(fsc) - } - _ => None, - }; - - let list_files_cache = match &config.list_files_cache { + let file_statistic_cache: Option> = + match &config.file_statistics_cache { + Some(fsc) if config.file_statistics_cache_limit > 0 => { + fsc.update_cache_limit(config.file_statistics_cache_limit); + Some(Arc::clone(fsc)) + } + None if config.file_statistics_cache_limit > 0 => Some(Arc::new( + DefaultCache::::new( + config.file_statistics_cache_limit, + ) + .with_name("DefaultFileStatisticsCache"), + )), + _ => None, + }; + + let list_files_cache: Option> = match &config.list_files_cache + { Some(lfc) if config.list_files_cache_limit > 0 => { // the cache memory limit or ttl might have changed, ensure they are updated lfc.update_cache_limit(config.list_files_cache_limit); @@ -380,13 +333,13 @@ impl CacheManager { } Some(Arc::clone(lfc)) } - None if config.list_files_cache_limit > 0 => { - let lfc: Arc = Arc::new(DefaultListFilesCache::new( + None if config.list_files_cache_limit > 0 => Some(Arc::new( + DefaultCache::::with_ttl( config.list_files_cache_limit, config.list_files_cache_ttl, - )); - Some(lfc) - } + ) + .with_name("DefaultListFilesCache"), + )), _ => None, }; @@ -394,9 +347,7 @@ impl CacheManager { .file_metadata_cache .as_ref() .map(Arc::clone) - .unwrap_or_else(|| { - Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit)) - }); + .unwrap_or_else(|| Arc::new(DefaultCache::new(config.metadata_cache_limit))); // the cache memory limit might have changed, ensure the limit is updated file_metadata_cache.update_cache_limit(config.metadata_cache_limit); @@ -409,7 +360,7 @@ impl CacheManager { } /// Get the file statistics cache. - pub fn get_file_statistic_cache(&self) -> Option> { + pub fn get_file_statistic_cache(&self) -> Option> { self.file_statistic_cache.clone() } @@ -421,7 +372,7 @@ impl CacheManager { } /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path. - pub fn get_list_files_cache(&self) -> Option> { + pub fn get_list_files_cache(&self) -> Option> { self.list_files_cache.clone() } @@ -438,7 +389,7 @@ impl CacheManager { } /// Get the file embedded metadata cache. - pub fn get_file_metadata_cache(&self) -> Arc { + pub fn get_file_metadata_cache(&self) -> Arc { Arc::clone(&self.file_metadata_cache) } @@ -448,14 +399,12 @@ impl CacheManager { } } -pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M - #[derive(Clone)] pub struct CacheManagerConfig { /// Enable caching of file statistics when listing files. /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session. /// Default is enabled. Currently only Parquet files are supported. - pub file_statistics_cache: Option>, + pub file_statistics_cache: Option>, /// Limit of the file statistics cache, in bytes. Default: 20MiB. pub file_statistics_cache_limit: usize, /// Enable caching of file metadata when listing files. @@ -465,7 +414,7 @@ pub struct CacheManagerConfig { /// Note that if this option is enabled, DataFusion will not see any updates to the underlying /// storage for at least `list_files_cache_ttl` duration. /// Default is enabled. - pub list_files_cache: Option>, + pub list_files_cache: Option>, /// Limit of the `list_files_cache`, in bytes. Default: 1MiB. pub list_files_cache_limit: usize, /// The duration the list files cache will consider an entry valid after insertion. Note that @@ -474,8 +423,8 @@ pub struct CacheManagerConfig { pub list_files_cache_ttl: Option, /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a /// data file (e.g., Parquet footer and page metadata). - /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. - pub file_metadata_cache: Option>, + /// If not provided, the [`CacheManager`] will create it. + pub file_metadata_cache: Option>, /// Limit of the file-embedded metadata cache, in bytes. pub metadata_cache_limit: usize, } @@ -498,7 +447,7 @@ impl CacheManagerConfig { /// Set the cache for file statistics. pub fn with_file_statistics_cache( mut self, - cache: Option>, + cache: Option>, ) -> Self { self.file_statistics_cache = cache; self @@ -513,10 +462,7 @@ impl CacheManagerConfig { /// Set the cache for listing files. /// /// Default is `None` (disabled). - pub fn with_list_files_cache( - mut self, - cache: Option>, - ) -> Self { + pub fn with_list_files_cache(mut self, cache: Option>) -> Self { self.list_files_cache = cache; self } @@ -538,11 +484,9 @@ impl CacheManagerConfig { } /// Sets the cache for file-embedded metadata. - /// - /// Default is a [`DefaultFilesMetadataCache`]. pub fn with_file_metadata_cache( mut self, - cache: Option>, + cache: Option>, ) -> Self { self.file_metadata_cache = cache; self @@ -565,8 +509,7 @@ mod tests { #[test] fn test_ttl_preserved_when_not_set_in_config() { // Create a cache with TTL = 1 second - let list_file_cache = - DefaultListFilesCache::new(1024, Some(Duration::from_secs(1))); + let list_file_cache = DefaultCache::with_ttl(1024, Some(Duration::from_secs(1))); // Verify the cache has TTL set initially assert_eq!( @@ -602,8 +545,7 @@ mod tests { #[test] fn test_ttl_overridden_when_set_in_config() { // Create a cache with TTL = 1 second - let list_file_cache = - DefaultListFilesCache::new(1024, Some(Duration::from_secs(1))); + let list_file_cache = DefaultCache::with_ttl(1024, Some(Duration::from_secs(1))); // Put cache in config WITH a different TTL set let config = CacheManagerConfig::default() diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs new file mode 100644 index 0000000000000..4574c38717b10 --- /dev/null +++ b/datafusion/execution/src/cache/default_cache.rs @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use datafusion_common::TableReference; +use datafusion_common::instant::Instant; +use datafusion_common::{HashMap, Result}; + +use crate::cache::lru_queue::LruQueue; +use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, Key, Value}; + +/// Source of the current time used by a [`DefaultCache`] when applying TTLs. +pub trait TimeProvider: Send + Sync { + /// Return the current instant. + fn now(&self) -> Instant; +} + +/// [`TimeProvider`] backed by [`Instant::now`]. +/// +/// This is the default time source used by [`DefaultCache`] +#[derive(Debug, Default)] +pub struct SystemTimeProvider; + +impl TimeProvider for SystemTimeProvider { + fn now(&self) -> Instant { + Instant::now() + } +} + +#[derive(Clone)] +struct ValueEntry { + value: V, + expires: Option, +} + +struct DefaultCacheState { + lru_queue: LruQueue>, + hits: HashMap, + memory_limit: usize, + memory_used: usize, + ttl: Option, +} + +impl DefaultCacheState { + fn new(memory_limit: usize, ttl: Option) -> Self { + Self { + lru_queue: LruQueue::new(), + hits: HashMap::new(), + memory_limit, + memory_used: 0, + ttl, + } + } + + fn get(&mut self, key: &K, now: Instant) -> Option { + let entry = self.lru_queue.get(key)?; + if let Some(exp) = entry.expires + && now > exp + { + self.remove(key); + return None; + } + let value = entry.value.clone(); + *self.hits.entry(key.clone()).or_insert(0) += 1; + Some(value) + } + + fn contains_key(&mut self, key: &K, now: Instant) -> bool { + let Some(entry) = self.lru_queue.peek(key) else { + return false; + }; + match entry.expires { + Some(exp) if now > exp => { + self.remove(key); + false + } + _ => true, + } + } + + fn put(&mut self, key: &K, value: V, now: Instant) -> Option { + let value_size = value.size(); + + if value_size == 0 { + return None; + } + + let key_size = key.size(); + let total_size = key_size + value_size; + + if total_size > self.memory_limit { + // Remove potential stale entry + let result = self.remove(key); + if let Some(stale_entry) = &result { + self.memory_used -= key_size; + self.memory_used -= stale_entry.size(); + } + return result; + } + + let expires = self.ttl.map(|ttl| now + ttl); + let entry = ValueEntry { value, expires }; + + self.memory_used += total_size; + self.hits.insert(key.clone(), 0); + let old = self.lru_queue.put(key.clone(), entry); + if let Some(old_entry) = &old { + self.memory_used -= key_size; + self.memory_used -= old_entry.value.size(); + } + + self.evict_entries(); + + old.map(|v| v.value) + } + + fn remove(&mut self, key: &K) -> Option { + let entry = self.lru_queue.remove(key)?; + self.memory_used -= key.size(); + self.memory_used -= entry.value.size(); + self.hits.remove(key); + Some(entry.value) + } + + fn evict_entries(&mut self) { + while self.memory_used > self.memory_limit { + let Some((evicted_key, evicted)) = self.lru_queue.pop() else { + // cache is empty while memory_used > memory_limit, cannot happen + log::error!( + "DefaultCache memory accounting bug: memory_used={} but cache is empty", + self.memory_used + ); + debug_assert!(false, "memory_used > limit with empty cache"); + self.memory_used = 0; + return; + }; + self.memory_used -= evicted_key.size(); + self.memory_used -= evicted.value.size(); + self.hits.remove(&evicted_key); + } + } + + fn clear(&mut self) { + self.lru_queue.clear(); + self.hits.clear(); + self.memory_used = 0; + } +} + +/// In-memory [`Cache`] with an LRU eviction policy, byte-based memory limit, +/// and optional per-entry TTL. +/// +/// Entries are evicted in least-recently-used order whenever an insert would +/// push `memory_used` above `memory_limit`. Inserts whose own size exceeds the +/// limit are rejected (and any prior entry under the same key is removed). +/// When a TTL is configured, the expiration is stamped onto each entry at +/// insertion time and checked lazily on access. +pub struct DefaultCache { + state: Mutex>, + time_provider: Arc, + name: String, +} + +impl DefaultCache { + /// Create a cache with the given memory budget in bytes and no TTL. + pub fn new(memory_limit: usize) -> Self { + Self::with_ttl(memory_limit, None) + } + + /// Create a cache with the given memory budget in bytes and an optional + /// TTL applied to every newly inserted entry. + pub fn with_ttl(memory_limit: usize, ttl: Option) -> Self { + Self { + state: Mutex::new(DefaultCacheState::new(memory_limit, ttl)), + time_provider: Arc::new(SystemTimeProvider), + name: "DefaultCache".to_string(), + } + } + + /// Override the cache name reported by [`CacheAccessor::name`]. + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + /// Override the time source used to stamp and check TTLs. + pub fn with_time_provider(mut self, provider: Arc) -> Self { + self.time_provider = provider; + self + } + + /// Number of bytes currently accounted for by live entries. + pub fn memory_used(&self) -> usize { + self.state.lock().unwrap().memory_used + } +} + +impl CacheAccessor for DefaultCache { + fn get(&self, key: &K) -> Option { + let now = self.time_provider.now(); + let mut state = self.state.lock().unwrap(); + state.get(key, now) + } + + fn put(&self, key: &K, value: V) -> Option { + let now = self.time_provider.now(); + let mut state = self.state.lock().unwrap(); + state.put(key, value, now) + } + + fn remove(&self, k: &K) -> Option { + let mut state = self.state.lock().unwrap(); + state.remove(k) + } + + fn contains_key(&self, k: &K) -> bool { + let now = self.time_provider.now(); + let mut state = self.state.lock().unwrap(); + state.contains_key(k, now) + } + + fn len(&self) -> usize { + self.state.lock().unwrap().lru_queue.len() + } + + fn clear(&self) { + let mut state = self.state.lock().unwrap(); + state.clear(); + } + + fn name(&self) -> String { + self.name.clone() + } +} + +impl Cache for DefaultCache { + fn cache_limit(&self) -> usize { + self.state.lock().unwrap().memory_limit + } + + fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } + + fn cache_ttl(&self) -> Option { + self.state.lock().unwrap().ttl + } + + fn update_cache_ttl(&self, ttl: Option) { + let mut state = self.state.lock().unwrap(); + state.ttl = ttl; + } + + fn drop_table_entries(&self, table_ref: &Option) -> Result<()> { + let mut state = self.state.lock().unwrap(); + let to_remove: Vec = state + .lru_queue + .list_entries() + .keys() + .flat_map(|k| { + let matches = match (k.table_ref(), table_ref.as_ref()) { + (Some(a), Some(b)) => a == b, + (None, None) => true, + _ => false, + }; + matches.then(|| (*k).clone()) + }) + .collect(); + for k in &to_remove { + state.remove(k); + } + Ok(()) + } + + fn list_entries(&self) -> HashMap> { + let state = self.state.lock().unwrap(); + state + .lru_queue + .list_entries() + .into_iter() + .map(|(k, entry)| { + let hits = state.hits.get(k).copied().unwrap_or(0); + let info = CacheEntryInfo { + value: entry.value.clone(), + size_bytes: entry.value.size(), + hits, + expires: entry.expires, + }; + (k.clone(), info) + }) + .collect() + } +} diff --git a/datafusion/execution/src/cache/file_metadata_cache.rs b/datafusion/execution/src/cache/file_metadata_cache.rs index 5e899d7dd9f8b..da95a25496bc8 100644 --- a/datafusion/execution/src/cache/file_metadata_cache.rs +++ b/datafusion/execution/src/cache/file_metadata_cache.rs @@ -15,237 +15,14 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashMap, sync::Mutex}; - -use object_store::path::Path; - -use crate::cache::{ - CacheAccessor, - cache_manager::{CachedFileMetadataEntry, FileMetadataCache, FileMetadataCacheEntry}, - lru_queue::LruQueue, -}; - -/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. -struct DefaultFilesMetadataCacheState { - lru_queue: LruQueue, - memory_limit: usize, - memory_used: usize, - cache_hits: HashMap, -} - -impl DefaultFilesMetadataCacheState { - fn new(memory_limit: usize) -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit, - memory_used: 0, - cache_hits: HashMap::new(), - } - } - - /// Returns the respective entry from the cache, if it exists. - /// If the entry exists, it becomes the most recently used. - fn get(&mut self, k: &Path) -> Option { - self.lru_queue.get(k).cloned().inspect(|_| { - *self.cache_hits.entry(k.clone()).or_insert(0) += 1; - }) - } - - /// Checks if the metadata is currently cached. - /// The LRU queue is not updated. - fn contains_key(&self, k: &Path) -> bool { - self.lru_queue.peek(k).is_some() - } - - /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. - /// If the key is already in the cache, the previous metadata is returned. - /// If the size of the metadata is greater than the `memory_limit`, the value is not inserted. - fn put( - &mut self, - key: Path, - value: CachedFileMetadataEntry, - ) -> Option { - let value_size = value.file_metadata.memory_size(); - - // no point in trying to add this value to the cache if it cannot fit entirely - if value_size > self.memory_limit { - return None; - } - - self.cache_hits.insert(key.clone(), 0); - // if the key is already in the cache, the old value is removed - let old_value = self.lru_queue.put(key, value); - self.memory_used += value_size; - if let Some(ref old_entry) = old_value { - self.memory_used -= old_entry.file_metadata.memory_size(); - } - - self.evict_entries(); - - old_value - } - - /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. - fn evict_entries(&mut self) { - while self.memory_used > self.memory_limit { - if let Some(removed) = self.lru_queue.pop() { - self.memory_used -= removed.1.file_metadata.memory_size(); - } else { - // cache is empty while memory_used > memory_limit, cannot happen - debug_assert!( - false, - "cache is empty while memory_used > memory_limit, cannot happen" - ); - return; - } - } - } - - /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &Path) -> Option { - if let Some(old_entry) = self.lru_queue.remove(k) { - self.memory_used -= old_entry.file_metadata.memory_size(); - self.cache_hits.remove(k); - Some(old_entry) - } else { - None - } - } - - /// Returns the number of entries currently cached. - fn len(&self) -> usize { - self.lru_queue.len() - } - - /// Removes all entries from the cache. - fn clear(&mut self) { - self.lru_queue.clear(); - self.memory_used = 0; - self.cache_hits.clear(); - } -} - -/// Default implementation of [`FileMetadataCache`] -/// -/// Collected file embedded metadata cache. -/// -/// The metadata for each file is validated by comparing the cached [`ObjectMeta`] -/// (size and last_modified) against the current file state using `cached.is_valid_for(¤t_meta)`. -/// -/// # Internal details -/// -/// The `memory_limit` controls the maximum size of the cache, which uses a -/// Least Recently Used eviction algorithm. When adding a new entry, if the total -/// size of the cached entries exceeds `memory_limit`, the least recently used entries -/// are evicted until the total size is lower than `memory_limit`. -/// -/// [`ObjectMeta`]: object_store::ObjectMeta -pub struct DefaultFilesMetadataCache { - // the state is wrapped in a Mutex to ensure the operations are atomic - state: Mutex, -} - -impl DefaultFilesMetadataCache { - /// Create a new instance of [`DefaultFilesMetadataCache`]. - /// - /// # Arguments - /// `memory_limit`: the maximum size of the cache, in bytes - // - pub fn new(memory_limit: usize) -> Self { - Self { - state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)), - } - } - - /// Returns the size of the cached memory, in bytes. - pub fn memory_used(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_used - } -} - -impl CacheAccessor for DefaultFilesMetadataCache { - fn get(&self, key: &Path) -> Option { - let mut state = self.state.lock().unwrap(); - state.get(key) - } - - fn put( - &self, - key: &Path, - value: CachedFileMetadataEntry, - ) -> Option { - let mut state = self.state.lock().unwrap(); - state.put(key.clone(), value) - } - - fn remove(&self, k: &Path) -> Option { - let mut state = self.state.lock().unwrap(); - state.remove(k) - } - - fn contains_key(&self, k: &Path) -> bool { - let state = self.state.lock().unwrap(); - state.contains_key(k) - } - - fn len(&self) -> usize { - let state = self.state.lock().unwrap(); - state.len() - } - - fn clear(&self) { - let mut state = self.state.lock().unwrap(); - state.clear(); - } - - fn name(&self) -> String { - "DefaultFilesMetadataCache".to_string() - } -} - -impl FileMetadataCache for DefaultFilesMetadataCache { - fn cache_limit(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_limit - } - - fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - fn list_entries(&self) -> HashMap { - let state = self.state.lock().unwrap(); - let mut entries = HashMap::::new(); - - for (path, entry) in state.lru_queue.list_entries() { - entries.insert( - path.clone(), - FileMetadataCacheEntry { - object_meta: entry.meta.clone(), - size_bytes: entry.file_metadata.memory_size(), - hits: *state.cache_hits.get(path).expect("entry must exist"), - extra: entry.file_metadata.extra_info(), - }, - ); - } - - entries - } -} - #[cfg(test)] mod tests { - use std::collections::HashMap; use std::sync::Arc; - use crate::cache::CacheAccessor; - use crate::cache::cache_manager::{ - CachedFileMetadataEntry, FileMetadata, FileMetadataCache, FileMetadataCacheEntry, - }; - use crate::cache::file_metadata_cache::DefaultFilesMetadataCache; + use crate::cache::cache_manager::{CachedFileMetadataEntry, FileMetadata}; + use crate::cache::default_cache::DefaultCache; + use crate::cache::{Cache, CacheAccessor, CacheEntryInfo}; + use datafusion_common::HashMap; use object_store::ObjectMeta; use object_store::path::Path; @@ -267,6 +44,12 @@ mod tests { } } + impl PartialEq for CachedFileMetadataEntry { + fn eq(&self, other: &Self) -> bool { + self.meta == other.meta + } + } + fn create_test_object_meta(path: &str, size: usize) -> ObjectMeta { ObjectMeta { location: Path::from(path), @@ -289,7 +72,7 @@ mod tests { metadata: "retrieved_metadata".to_owned(), }); - let cache = DefaultFilesMetadataCache::new(1024 * 1024); + let cache = DefaultCache::new(1024 * 1024); // Cache miss assert!(cache.get(&object_meta.location).is_none()); @@ -354,19 +137,20 @@ mod tests { e_tag: None, version: None, }; - let metadata: Arc = Arc::new(TestFileMetadata { - metadata: "a".repeat(size), - }); + let metadata = "a".repeat(size); + let metadata: Arc = Arc::new(TestFileMetadata { metadata }); (object_meta, metadata) } #[test] fn test_default_file_metadata_cache_with_limit() { - let cache = DefaultFilesMetadataCache::new(1000); - let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100); - let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500); - let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); + // Create a cache with 1000 bytes capacity + 4 keys each key 2 bytes + let cache = DefaultCache::new(1000 + 4 * 2); + + let (object_meta1, metadata1) = generate_test_metadata_with_size("01", 100); + let (object_meta2, metadata2) = generate_test_metadata_with_size("02", 500); + let (object_meta3, metadata3) = generate_test_metadata_with_size("03", 300); cache.put( &object_meta1.location, @@ -383,67 +167,67 @@ mod tests { // all entries will fit assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 900); + assert_eq!(cache.memory_used(), 906); assert!(cache.contains_key(&object_meta1.location)); assert!(cache.contains_key(&object_meta2.location)); assert!(cache.contains_key(&object_meta3.location)); // add a new entry which will remove the least recently used ("1") - let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200); + let (object_meta4, metadata4) = generate_test_metadata_with_size("04", 200); cache.put( &object_meta4.location, CachedFileMetadataEntry::new(object_meta4.clone(), metadata4), ); assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.memory_used(), 1006); assert!(!cache.contains_key(&object_meta1.location)); assert!(cache.contains_key(&object_meta4.location)); // get entry "2", which will move it to the top of the queue, and add a new one which will // remove the new least recently used ("3") let _ = cache.get(&object_meta2.location); - let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100); + let (object_meta5, metadata5) = generate_test_metadata_with_size("05", 100); cache.put( &object_meta5.location, CachedFileMetadataEntry::new(object_meta5.clone(), metadata5), ); assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 800); + assert_eq!(cache.memory_used(), 806); assert!(!cache.contains_key(&object_meta3.location)); assert!(cache.contains_key(&object_meta5.location)); // new entry which will not be able to fit in the 1000 bytes allocated - let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200); + let (object_meta6, metadata6) = generate_test_metadata_with_size("06", 1200); cache.put( &object_meta6.location, CachedFileMetadataEntry::new(object_meta6.clone(), metadata6), ); assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 800); + assert_eq!(cache.memory_used(), 806); assert!(!cache.contains_key(&object_meta6.location)); // new entry which is able to fit without removing any entry - let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200); + let (object_meta7, metadata7) = generate_test_metadata_with_size("07", 200); cache.put( &object_meta7.location, CachedFileMetadataEntry::new(object_meta7.clone(), metadata7), ); assert_eq!(cache.len(), 4); - assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.memory_used(), 1008); assert!(cache.contains_key(&object_meta7.location)); // new entry which will remove all other entries - let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999); + let (object_meta8, metadata8) = generate_test_metadata_with_size("08", 999); cache.put( &object_meta8.location, CachedFileMetadataEntry::new(object_meta8.clone(), metadata8), ); assert_eq!(cache.len(), 1); - assert_eq!(cache.memory_used(), 999); + assert_eq!(cache.memory_used(), 1001); assert!(cache.contains_key(&object_meta8.location)); // when updating an entry, the previous ones are not unnecessarily removed - let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300); + let (object_meta9, metadata9) = generate_test_metadata_with_size("09", 300); let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200); let (object_meta11_v1, metadata11_v1) = generate_test_metadata_with_size("11", 400); @@ -459,7 +243,7 @@ mod tests { &object_meta11_v1.location, CachedFileMetadataEntry::new(object_meta11_v1.clone(), metadata11_v1), ); - assert_eq!(cache.memory_used(), 900); + assert_eq!(cache.memory_used(), 906); assert_eq!(cache.len(), 3); let (object_meta11_v2, metadata11_v2) = generate_test_metadata_with_size("11", 500); @@ -467,20 +251,20 @@ mod tests { &object_meta11_v2.location, CachedFileMetadataEntry::new(object_meta11_v2.clone(), metadata11_v2), ); - assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.memory_used(), 1006); assert_eq!(cache.len(), 3); assert!(cache.contains_key(&object_meta9.location)); assert!(cache.contains_key(&object_meta10.location)); assert!(cache.contains_key(&object_meta11_v2.location)); - // when updating an entry that now exceeds the limit, the LRU ("9") needs to be removed + // when updating an entry that now exceeds the limit, the LRU ("09") needs to be removed let (object_meta11_v3, metadata11_v3) = - generate_test_metadata_with_size("11", 501); + generate_test_metadata_with_size("11", 510); cache.put( &object_meta11_v3.location, CachedFileMetadataEntry::new(object_meta11_v3.clone(), metadata11_v3), ); - assert_eq!(cache.memory_used(), 701); + assert_eq!(cache.memory_used(), 714); assert_eq!(cache.len(), 2); assert!(cache.contains_key(&object_meta10.location)); assert!(cache.contains_key(&object_meta11_v3.location)); @@ -488,7 +272,7 @@ mod tests { // manually removing an entry that is not the LRU cache.remove(&object_meta11_v3.location); assert_eq!(cache.len(), 1); - assert_eq!(cache.memory_used(), 200); + assert_eq!(cache.memory_used(), 202); assert!(cache.contains_key(&object_meta10.location)); assert!(!cache.contains_key(&object_meta11_v3.location)); @@ -514,10 +298,10 @@ mod tests { CachedFileMetadataEntry::new(object_meta14.clone(), metadata14), ); assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.memory_used(), 1006); cache.update_cache_limit(600); assert_eq!(cache.len(), 1); - assert_eq!(cache.memory_used(), 500); + assert_eq!(cache.memory_used(), 502); assert!(!cache.contains_key(&object_meta12.location)); assert!(!cache.contains_key(&object_meta13.location)); assert!(cache.contains_key(&object_meta14.location)); @@ -525,61 +309,53 @@ mod tests { #[test] fn test_default_file_metadata_cache_entries_info() { - let cache = DefaultFilesMetadataCache::new(1000); + // Create a cache with 1000 bytes + 4 bytes for 4 keys each key 1 byte + let cache = DefaultCache::new(1000 + 4); + let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100); let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 200); let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); // initial entries, all will have hits = 0 - cache.put( - &object_meta1.location, - CachedFileMetadataEntry::new(object_meta1.clone(), metadata1), - ); - cache.put( - &object_meta2.location, - CachedFileMetadataEntry::new(object_meta2.clone(), metadata2), - ); - cache.put( - &object_meta3.location, - CachedFileMetadataEntry::new(object_meta3.clone(), metadata3), - ); + let entry_1 = CachedFileMetadataEntry::new(object_meta1.clone(), metadata1); + let entry_2 = CachedFileMetadataEntry::new(object_meta2.clone(), metadata2); + let entry_3 = CachedFileMetadataEntry::new(object_meta3.clone(), metadata3); + + // Build a cache which fits exactly these 3 entries + + cache.put(&object_meta1.location, entry_1.clone()); + cache.put(&object_meta2.location, entry_2.clone()); + cache.put(&object_meta3.location, entry_3.clone()); + let entries = cache.list_entries(); + assert_eq!( - cache.list_entries(), + entries, HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 100, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("2"), - FileMetadataCacheEntry { - object_meta: object_meta2.clone(), + CacheEntryInfo { + value: entry_2.clone(), size_bytes: 200, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) @@ -592,38 +368,29 @@ mod tests { HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 100, hits: 1, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("2"), - FileMetadataCacheEntry { - object_meta: object_meta2.clone(), + CacheEntryInfo { + value: entry_2.clone(), size_bytes: 200, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) @@ -631,47 +398,36 @@ mod tests { // new entry, will evict "2" let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 600); - cache.put( - &object_meta4.location, - CachedFileMetadataEntry::new(object_meta4.clone(), metadata4), - ); + let entry_4 = CachedFileMetadataEntry::new(object_meta4.clone(), metadata4); + cache.put(&object_meta4.location, entry_4.clone()); assert_eq!( cache.list_entries(), HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 100, hits: 1, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("4"), - FileMetadataCacheEntry { - object_meta: object_meta4.clone(), + CacheEntryInfo { + value: entry_4.clone(), size_bytes: 600, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) @@ -679,47 +435,37 @@ mod tests { // replace entry "1" let (object_meta1_new, metadata1_new) = generate_test_metadata_with_size("1", 50); - cache.put( - &object_meta1_new.location, - CachedFileMetadataEntry::new(object_meta1_new.clone(), metadata1_new), - ); + let entry_1 = + CachedFileMetadataEntry::new(object_meta1_new.clone(), metadata1_new); + cache.put(&object_meta1_new.location, entry_1.clone()); assert_eq!( cache.list_entries(), HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1_new.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 50, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("4"), - FileMetadataCacheEntry { - object_meta: object_meta4.clone(), + CacheEntryInfo { + value: entry_4.clone(), size_bytes: 600, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) @@ -732,26 +478,20 @@ mod tests { HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1_new.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 50, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) diff --git a/datafusion/execution/src/cache/file_statistics_cache.rs b/datafusion/execution/src/cache/file_statistics_cache.rs index 12f0bb1b8af88..48b21a6440b55 100644 --- a/datafusion/execution/src/cache/file_statistics_cache.rs +++ b/datafusion/execution/src/cache/file_statistics_cache.rs @@ -15,267 +15,20 @@ // specific language governing permissions and limitations // under the License. -use crate::cache::cache_manager::{ - CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, -}; -use crate::cache::{CacheAccessor, TableScopedPath}; -use std::collections::HashMap; -use std::sync::Mutex; - -pub use crate::cache::DefaultFilesMetadataCache; -use crate::cache::lru_queue::LruQueue; -use datafusion_common::TableReference; -use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; - -/// Default implementation of [`FileStatisticsCache`] -/// -/// Stores cached file metadata (statistics and orderings) for files. -/// -/// The typical usage pattern is: -/// 1. Call `get(path)` to check for cached value -/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` -/// 3. If invalid or missing, compute new value and call `put(path, new_value)` -/// -/// # Internal details -/// -/// The `memory_limit` controls the maximum size of the cache, which uses a -/// Least Recently Used eviction algorithm. When adding a new entry, if the total -/// size of the cached entries exceeds `memory_limit`, the least recently used entries -/// are evicted until the total size is lower than `memory_limit`. -/// -/// -/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache -#[derive(Default)] -pub struct DefaultFileStatisticsCache { - state: Mutex, -} - -impl DefaultFileStatisticsCache { - pub fn new(memory_limit: usize) -> Self { - Self { - state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), - } - } - - /// Returns the size of the cached memory, in bytes. - pub fn memory_used(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_used - } -} - -struct DefaultFileStatisticsCacheState { - lru_queue: LruQueue, - memory_limit: usize, - memory_used: usize, -} - -pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MiB - -impl Default for DefaultFileStatisticsCacheState { - fn default() -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, - memory_used: 0, - } - } -} - -impl DefaultFileStatisticsCacheState { - fn new(memory_limit: usize) -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit, - memory_used: 0, - } - } - fn get(&mut self, key: &TableScopedPath) -> Option { - self.lru_queue.get(key).cloned() - } - - fn put( - &mut self, - key: &TableScopedPath, - value: CachedFileMetadata, - ) -> Option { - let mut ctx = DFHeapSizeCtx::default(); - let key_size = key.heap_size(&mut ctx); - let entry_size = value.heap_size(&mut ctx); - - if entry_size + key_size > self.memory_limit { - // Remove potential stale entry - return self.remove(key); - } - - self.memory_used += entry_size; - self.memory_used += key_size; - - let old_value = self.lru_queue.put(key.clone(), value); - if let Some(old_entry) = &old_value { - let mut ctx = DFHeapSizeCtx::default(); - self.memory_used -= old_entry.heap_size(&mut ctx); - self.memory_used -= key_size; - } - - self.evict_entries(); - - old_value - } - - fn remove(&mut self, k: &TableScopedPath) -> Option { - if let Some(old_entry) = self.lru_queue.remove(k) { - let mut ctx = DFHeapSizeCtx::default(); - self.memory_used -= k.heap_size(&mut ctx); - self.memory_used -= old_entry.heap_size(&mut ctx); - Some(old_entry) - } else { - None - } - } - - fn contains_key(&self, k: &TableScopedPath) -> bool { - self.lru_queue.contains_key(k) - } - - fn len(&self) -> usize { - self.lru_queue.len() - } - - fn clear(&mut self) { - self.lru_queue.clear(); - self.memory_used = 0; - } - - fn evict_entries(&mut self) { - while self.memory_used > self.memory_limit { - if let Some(removed) = self.lru_queue.pop() { - let mut ctx = DFHeapSizeCtx::default(); - self.memory_used -= removed.0.heap_size(&mut ctx); - self.memory_used -= removed.1.heap_size(&mut ctx); - } else { - // cache is empty while memory_used > memory_limit, cannot happen - log::error!( - "File statistics cache memory accounting bug: memory_used={} but cache is empty. \ - Please report this to the Apache DataFusion developers.", - self.memory_used - ); - debug_assert!( - false, - "memory_used={} but cache is empty", - self.memory_used - ); - self.memory_used = 0; - return; - } - } - } -} -impl CacheAccessor for DefaultFileStatisticsCache { - fn get(&self, key: &TableScopedPath) -> Option { - let mut state = self.state.lock().unwrap(); - state.get(key) - } - - fn put( - &self, - key: &TableScopedPath, - value: CachedFileMetadata, - ) -> Option { - let mut state = self.state.lock().unwrap(); - state.put(key, value) - } - - fn remove(&self, key: &TableScopedPath) -> Option { - let mut state = self.state.lock().unwrap(); - state.remove(key) - } - - fn contains_key(&self, k: &TableScopedPath) -> bool { - let state = self.state.lock().unwrap(); - state.contains_key(k) - } - - fn len(&self) -> usize { - let state = self.state.lock().unwrap(); - state.len() - } - - fn clear(&self) { - let mut state = self.state.lock().unwrap(); - state.clear(); - } - - fn name(&self) -> String { - "DefaultFileStatisticsCache".to_string() - } -} - -impl FileStatisticsCache for DefaultFileStatisticsCache { - fn cache_limit(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_limit - } - - fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - fn list_entries(&self) -> HashMap { - let mut entries = HashMap::::new(); - let mut ctx = DFHeapSizeCtx::default(); - for entry in self.state.lock().unwrap().lru_queue.list_entries() { - let path = entry.0.clone(); - let cached = entry.1; - entries.insert( - path, - FileStatisticsCacheEntry { - object_meta: cached.meta.clone(), - num_rows: cached.statistics.num_rows, - num_columns: cached.statistics.column_statistics.len(), - table_size_bytes: cached.statistics.total_byte_size, - statistics_size_bytes: cached.statistics.heap_size(&mut ctx), - has_ordering: cached.ordering.is_some(), - }, - ); - } - - entries - } - - fn drop_table_entries( - &self, - table_ref: &Option, - ) -> datafusion_common::Result<()> { - let mut state = self.state.lock().unwrap(); - let mut table_paths = vec![]; - for (path, _) in state.lru_queue.list_entries() { - if path.table == *table_ref { - table_paths.push(path.clone()); - } - } - for path in table_paths { - state.remove(&path); - } - Ok(()) - } -} - #[cfg(test)] mod tests { - use super::*; use crate::cache::cache_manager::{ - CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, + CachedFileMetadata, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, TableScopedPath, }; + use crate::cache::default_cache::DefaultCache; + use crate::cache::{Cache, CacheAccessor, CacheEntryInfo}; use arrow::array::{Int32Array, ListArray, RecordBatch}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; - use datafusion_common::heap_size::DFHeapSizeCtx; + use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::stats::Precision; - use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + use datafusion_common::{ColumnStatistics, HashMap, ScalarValue, Statistics}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; @@ -298,7 +51,7 @@ mod tests { #[test] fn test_statistics_cache() { let meta = create_test_meta("test", 1024); - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let schema = Schema::new(vec![Field::new( "test_column", @@ -358,7 +111,7 @@ mod tests { }; let entry = entries.get(&path_3).unwrap(); - assert_eq!(entry.object_meta.size, 2048); // Should be updated value + assert_eq!(entry.value.meta.size, 2048); // Should be updated value } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -414,7 +167,7 @@ mod tests { #[test] fn test_ordering_cache() { let meta = create_test_meta("test.parquet", 100); - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); @@ -448,12 +201,12 @@ mod tests { // Verify list_entries shows has_ordering = true let entries = cache.list_entries(); assert_eq!(entries.len(), 1); - assert!(entries.get(&path).unwrap().has_ordering); + assert!(entries.get(&path).unwrap().value.ordering.is_some()); } #[test] fn test_cache_invalidation_on_file_modification() { - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let path = TableScopedPath { path: Path::from("test.parquet"), table: None, @@ -492,7 +245,7 @@ mod tests { #[test] fn test_ordering_cache_invalidation_on_file_modification() { - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let path = TableScopedPath { path: Path::from("test.parquet"), table: None, @@ -557,12 +310,12 @@ mod tests { #[test] fn test_list_entries() { - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); let meta1 = create_test_meta("test1.parquet", 100); - let cached_value = CachedFileMetadata::new( + let cached_value_1 = CachedFileMetadata::new( meta1.clone(), Arc::new(Statistics::new_unknown(&schema)), None, @@ -573,9 +326,9 @@ mod tests { table: None, }; - cache.put(&path_1, cached_value); + cache.put(&path_1, cached_value_1.clone()); let meta2 = create_test_meta("test2.parquet", 200); - let cached_value = CachedFileMetadata::new( + let cached_value_2 = CachedFileMetadata::new( meta2.clone(), Arc::new(Statistics::new_unknown(&schema)), Some(ordering()), @@ -586,7 +339,7 @@ mod tests { table: None, }; - cache.put(&path_2, cached_value); + cache.put(&path_2, cached_value_2.clone()); let entries = cache.list_entries(); assert_eq!( @@ -594,24 +347,20 @@ mod tests { HashMap::from([ ( path_1, - FileStatisticsCacheEntry { - object_meta: meta1, - num_rows: Precision::Absent, - num_columns: 1, - table_size_bytes: Precision::Absent, - statistics_size_bytes: 360, - has_ordering: false, + CacheEntryInfo { + value: cached_value_1, + hits: 0, + size_bytes: 373, + expires: None, } ), ( path_2, - FileStatisticsCacheEntry { - object_meta: meta2, - num_rows: Precision::Absent, - num_columns: 1, - table_size_bytes: Precision::Absent, - statistics_size_bytes: 360, - has_ordering: true, + CacheEntryInfo { + value: cached_value_2, + hits: 0, + size_bytes: 373, + expires: None, } ), ]) @@ -632,7 +381,7 @@ mod tests { + value_2.heap_size(&mut ctx); // create a cache with a limit which fits exactly 2 entries - let cache = DefaultFileStatisticsCache::new(limit_for_2_entries); + let cache = DefaultCache::new(limit_for_2_entries); let path_1 = TableScopedPath { path: meta_1.location.clone(), table: None, @@ -699,7 +448,7 @@ mod tests { let limit_less_than_the_entry = value.heap_size(&mut ctx) - 1; // create a cache with a size less than the entry - let cache = DefaultFileStatisticsCache::new(limit_less_than_the_entry); + let cache = DefaultCache::new(limit_less_than_the_entry); let path_1 = TableScopedPath { path: meta.location.clone(), diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index a3cdf7c5e9110..c82b80b973ec9 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -15,393 +15,22 @@ // specific language governing permissions and limitations // under the License. -use crate::cache::{ - CacheAccessor, - cache_manager::{CachedFileList, ListFilesCache}, - lru_queue::LruQueue, -}; - -use std::fmt::{Debug, Display, Formatter}; -use std::mem::size_of; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - time::Duration, -}; - -use datafusion_common::TableReference; -use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; -use datafusion_common::instant::Instant; -use object_store::{ObjectMeta, path::Path}; - -pub trait TimeProvider: Send + Sync + 'static { - fn now(&self) -> Instant; -} - -#[derive(Debug, Default)] -pub struct SystemTimeProvider; - -impl TimeProvider for SystemTimeProvider { - fn now(&self) -> Instant { - Instant::now() - } -} - -/// Default implementation of [`ListFilesCache`] -/// -/// Caches file metadata for file listing operations. -/// -/// # Internal details -/// -/// The `memory_limit` parameter controls the maximum size of the cache, which uses a Least -/// Recently Used eviction algorithm. When adding a new entry, if the total number of entries in -/// the cache exceeds `memory_limit`, the least recently used entries are evicted until the total -/// size is lower than the `memory_limit`. -/// -/// # Cache API -/// -/// Uses `get` and `put` methods for cache operations. TTL validation is handled internally - -/// expired entries return `None` from `get`. -pub struct DefaultListFilesCache { - state: Mutex, - time_provider: Arc, -} - -impl Default for DefaultListFilesCache { - fn default() -> Self { - Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None) - } -} - -impl DefaultListFilesCache { - /// Creates a new instance of [`DefaultListFilesCache`]. - /// - /// # Arguments - /// * `memory_limit` - The maximum size of the cache, in bytes. - /// * `ttl` - The TTL (time-to-live) of entries in the cache. - pub fn new(memory_limit: usize, ttl: Option) -> Self { - Self { - state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)), - time_provider: Arc::new(SystemTimeProvider), - } - } - - #[cfg(test)] - pub(crate) fn with_time_provider(mut self, provider: Arc) -> Self { - self.time_provider = provider; - self - } -} - -#[derive(Clone, PartialEq, Debug)] -pub struct ListFilesEntry { - pub metas: CachedFileList, - pub size_bytes: usize, - pub expires: Option, -} - -impl ListFilesEntry { - fn try_new( - cached_file_list: CachedFileList, - ttl: Option, - now: Instant, - ) -> Option { - let size_bytes = (cached_file_list.files.capacity() * size_of::()) - + cached_file_list - .files - .iter() - .map(meta_heap_bytes) - .reduce(|acc, b| acc + b)?; - - Some(Self { - metas: cached_file_list, - size_bytes, - expires: ttl.map(|t| now + t), - }) - } -} - -/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap. -fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { - let mut size = object_meta.location.as_ref().len(); - - if let Some(e) = &object_meta.e_tag { - size += e.len(); - } - if let Some(v) = &object_meta.version { - size += v.len(); - } - - size -} - -/// The default memory limit for the [`DefaultListFilesCache`] -pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB - -/// The default cache TTL for the [`DefaultListFilesCache`] -pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite - -/// Key for [`DefaultListFilesCache`] -/// -/// Each entry is scoped to its use within a specific table so that the cache -/// can differentiate between identical paths in different tables, and -/// table-level cache invalidation. -#[derive(PartialEq, Eq, Hash, Clone, Debug)] -pub struct TableScopedPath { - pub table: Option, - pub path: Path, -} - -/// Handles the inner state of the [`DefaultListFilesCache`] struct. -pub struct DefaultListFilesCacheState { - lru_queue: LruQueue, - memory_limit: usize, - memory_used: usize, - ttl: Option, -} - -impl Default for DefaultListFilesCacheState { - fn default() -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, - memory_used: 0, - ttl: DEFAULT_LIST_FILES_CACHE_TTL, - } - } -} - -impl DFHeapSize for TableScopedPath { - fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { - self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) - } -} - -impl Display for TableScopedPath { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if let Some(table) = &self.table { - write!(f, "{}, {}", self.path, table) - } else { - write!(f, "{}", self.path) - } - } -} - -impl DefaultListFilesCacheState { - fn new(memory_limit: usize, ttl: Option) -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit, - memory_used: 0, - ttl, - } - } - - /// Gets an entry from the cache, checking for expiration. - /// - /// Returns the cached file list if it exists and hasn't expired. - /// If the entry has expired, it is removed from the cache. - fn get(&mut self, key: &TableScopedPath, now: Instant) -> Option { - let entry = self.lru_queue.get(key)?; - - // Check expiration - if let Some(exp) = entry.expires - && now > exp - { - self.remove(key); - return None; - } - - Some(entry.metas.clone()) - } - - /// Checks if the respective entry is currently cached. - /// - /// If the entry has expired by `now` it is removed from the cache. - /// - /// The LRU queue is not updated. - fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool { - let Some(entry) = self.lru_queue.peek(k) else { - return false; - }; - - match entry.expires { - Some(exp) if now > exp => { - self.remove(k); - false - } - _ => true, - } - } - - /// Adds a new key-value pair to cache expiring at `now` + the TTL. - /// - /// This means that LRU entries might be evicted if required. - /// If the key is already in the cache, the previous entry is returned. - /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. - fn put( - &mut self, - key: &TableScopedPath, - value: CachedFileList, - now: Instant, - ) -> Option { - let entry = ListFilesEntry::try_new(value, self.ttl, now)?; - let entry_size = entry.size_bytes; - - // no point in trying to add this value to the cache if it cannot fit entirely - if entry_size > self.memory_limit { - return None; - } - - // if the key is already in the cache, the old value is removed - let old_value = self.lru_queue.put(key.clone(), entry); - self.memory_used += entry_size; - - if let Some(entry) = &old_value { - self.memory_used -= entry.size_bytes; - } - - self.evict_entries(); - - old_value.map(|v| v.metas) - } - - /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. - fn evict_entries(&mut self) { - while self.memory_used > self.memory_limit { - if let Some(removed) = self.lru_queue.pop() { - self.memory_used -= removed.1.size_bytes; - } else { - // cache is empty while memory_used > memory_limit, cannot happen - debug_assert!( - false, - "cache is empty while memory_used > memory_limit, cannot happen" - ); - return; - } - } - } - - /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &TableScopedPath) -> Option { - if let Some(entry) = self.lru_queue.remove(k) { - self.memory_used -= entry.size_bytes; - Some(entry.metas) - } else { - None - } - } - - /// Returns the number of entries currently cached. - fn len(&self) -> usize { - self.lru_queue.len() - } - - /// Removes all entries from the cache. - fn clear(&mut self) { - self.lru_queue.clear(); - self.memory_used = 0; - } -} - -impl CacheAccessor for DefaultListFilesCache { - fn get(&self, key: &TableScopedPath) -> Option { - let mut state = self.state.lock().unwrap(); - let now = self.time_provider.now(); - state.get(key, now) - } - - fn put( - &self, - key: &TableScopedPath, - value: CachedFileList, - ) -> Option { - let mut state = self.state.lock().unwrap(); - let now = self.time_provider.now(); - state.put(key, value, now) - } - - fn remove(&self, k: &TableScopedPath) -> Option { - let mut state = self.state.lock().unwrap(); - state.remove(k) - } - - fn contains_key(&self, k: &TableScopedPath) -> bool { - let mut state = self.state.lock().unwrap(); - let now = self.time_provider.now(); - state.contains_key(k, now) - } - - fn len(&self) -> usize { - let state = self.state.lock().unwrap(); - state.len() - } - - fn clear(&self) { - let mut state = self.state.lock().unwrap(); - state.clear(); - } - - fn name(&self) -> String { - String::from("DefaultListFilesCache") - } -} - -impl ListFilesCache for DefaultListFilesCache { - fn cache_limit(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_limit - } - - fn cache_ttl(&self) -> Option { - let state = self.state.lock().unwrap(); - state.ttl - } - - fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - fn update_cache_ttl(&self, ttl: Option) { - let mut state = self.state.lock().unwrap(); - state.ttl = ttl; - state.evict_entries(); - } - - fn list_entries(&self) -> HashMap { - let state = self.state.lock().unwrap(); - let mut entries = HashMap::::new(); - for (path, entry) in state.lru_queue.list_entries() { - entries.insert(path.clone(), entry.clone()); - } - entries - } - - fn drop_table_entries( - &self, - table_ref: &Option, - ) -> datafusion_common::Result<()> { - let mut state = self.state.lock().unwrap(); - let mut table_paths = vec![]; - for (path, _) in state.lru_queue.list_entries() { - if path.table == *table_ref { - table_paths.push(path.clone()); - } - } - for path in table_paths { - state.remove(&path); - } - Ok(()) - } -} - #[cfg(test)] mod tests { - use super::*; + use crate::cache::cache_manager::{ + CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, TableScopedPath, + meta_heap_bytes, + }; + use crate::cache::default_cache::{DefaultCache, TimeProvider}; + use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, Key, Value}; use chrono::DateTime; + use datafusion_common::HashMap; + use datafusion_common::TableReference; + use datafusion_common::instant::Instant; + use object_store::{ObjectMeta, path::Path}; + use std::sync::{Arc, Mutex}; use std::thread; + use std::time::Duration; struct MockTimeProvider { base: Instant, @@ -448,26 +77,27 @@ mod tests { } } - /// Helper function to create a CachedFileList with at least meta_size bytes + /// Helper function to create a TableScopedPath and a CachedFileList with at least meta_size bytes fn create_test_list_files_entry( path: &str, count: usize, meta_size: usize, - ) -> (Path, CachedFileList, usize) { + table: Option, + ) -> (TableScopedPath, CachedFileList) { + let key = TableScopedPath { + table, + path: Path::from(path), + }; let metas: Vec = (0..count) .map(|i| create_test_object_meta(&format!("file{i}"), meta_size)) .collect(); - - // Calculate actual size using the same logic as ListFilesEntry::try_new - let size = (metas.capacity() * size_of::()) - + metas.iter().map(meta_heap_bytes).sum::(); - - (Path::from(path), CachedFileList::new(metas), size) + let value = CachedFileList::new(metas); + (key, value) } #[test] fn test_basic_operations() { - let cache = DefaultListFilesCache::default(); + let cache = DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT); let table_ref = Some(TableReference::from("table")); let path = Path::from("test_path"); let key = TableScopedPath { @@ -499,16 +129,9 @@ mod tests { assert_eq!(cache.len(), 0); // Put multiple entries - let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref, - path: path2, - }; + let (key1, value1) = + create_test_list_files_entry("path1", 2, 50, table_ref.clone()); + let (key2, value2) = create_test_list_files_entry("path2", 3, 50, table_ref); cache.put(&key1, value1.clone()); cache.put(&key2, value2.clone()); assert_eq!(cache.len(), 2); @@ -519,17 +142,19 @@ mod tests { HashMap::from([ ( key1.clone(), - ListFilesEntry { - metas: value1, - size_bytes: size1, + CacheEntryInfo { + value: value1.clone(), + size_bytes: value1.size(), + hits: 0, expires: None, } ), ( key2.clone(), - ListFilesEntry { - metas: value2, - size_bytes: size2, + CacheEntryInfo { + value: value2.clone(), + size_bytes: value2.size(), + hits: 0, expires: None, } ) @@ -545,26 +170,18 @@ mod tests { #[test] fn test_lru_eviction_basic() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); - // Set cache limit to exactly fit all three entries - let cache = DefaultListFilesCache::new(size * 3, None); + let entry_size = key1.size() + value1.size(); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref.clone(), - path: path3, - }; + // Set cache limit to exactly fit all 3 entries + let cache = DefaultCache::new(entry_size * 3); // All three entries should fit cache.put(&key1, value1); @@ -576,11 +193,7 @@ mod tests { assert!(cache.contains_key(&key3)); // Adding a new entry should evict path1 (LRU) - let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - let key4 = TableScopedPath { - table: table_ref, - path: path4, - }; + let (key4, value4) = create_test_list_files_entry("path4", 1, 100, table_ref); cache.put(&key4, value4); assert_eq!(cache.len(), 3); @@ -592,26 +205,16 @@ mod tests { #[test] fn test_lru_ordering_after_access() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); // Set cache limit to fit exactly three entries - let cache = DefaultListFilesCache::new(size * 3, None); - - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref.clone(), - path: path3, - }; + let cache = DefaultCache::new((key1.size() + value1.size()) * 3); cache.put(&key1, value1); cache.put(&key2, value2); @@ -623,11 +226,7 @@ mod tests { let _ = cache.get(&key1); // Adding a new entry should evict path2 (the LRU) - let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - let key4 = TableScopedPath { - table: table_ref, - path: path4, - }; + let (key4, value4) = create_test_list_files_entry("path4", 1, 100, table_ref); cache.put(&key4, value4); assert_eq!(cache.len(), 3); @@ -639,32 +238,23 @@ mod tests { #[test] fn test_reject_too_large() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); // Set cache limit to fit both entries - let cache = DefaultListFilesCache::new(size * 2, None); + let cache = DefaultCache::new((key1.size() + value1.size()) * 2); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; cache.put(&key1, value1); cache.put(&key2, value2); assert_eq!(cache.len(), 2); // Try to add an entry that's too large to fit in the cache // The entry is not stored (too large) - let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000); - let key_large = TableScopedPath { - table: table_ref, - path: path_large, - }; + let (key_large, value_large) = + create_test_list_files_entry("large", 1, 1000, table_ref); cache.put(&key_large, value_large); // Large entry should not be added @@ -676,37 +266,27 @@ mod tests { #[test] fn test_multiple_evictions() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); + + let entry_size = key1.size() + value1.size(); // Set cache limit for exactly 3 entries - let cache = DefaultListFilesCache::new(size * 3, None); + let cache = DefaultCache::new(entry_size * 3); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref.clone(), - path: path3, - }; cache.put(&key1, value1); cache.put(&key2, value2); cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Add a large entry that requires evicting 2 entries - let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200); - let key_large = TableScopedPath { - table: table_ref, - path: path_large, - }; + let (key_large, value_large) = + create_test_list_files_entry("large", 1, 200, table_ref); cache.put(&key_large, value_large); // path1 and path2 should be evicted (both LRU), path3 and path_large remain @@ -719,25 +299,17 @@ mod tests { #[test] fn test_cache_limit_resize() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3) = create_test_list_files_entry("path3", 1, 100, table_ref); - let cache = DefaultListFilesCache::new(size * 3, None); + let entry_size = key1.size() + value1.size(); + + let cache = DefaultCache::new(entry_size * 3); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref, - path: path3, - }; // Add three entries cache.put(&key1, value1); cache.put(&key2, value2); @@ -745,7 +317,7 @@ mod tests { assert_eq!(cache.len(), 3); // Resize cache to only fit one entry - cache.update_cache_limit(size); + cache.update_cache_limit(entry_size); // Should keep only the most recent entry (path3, the MRU) assert_eq!(cache.len(), 1); @@ -757,25 +329,18 @@ mod tests { #[test] fn test_entry_update_with_size_change() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3_v1) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); - let cache = DefaultListFilesCache::new(size * 3, None); + let entry_size = key1.size() + value1.size(); + + let cache = DefaultCache::new(entry_size * 3); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref, - path: path3, - }; // Add three entries cache.put(&key1, value1); cache.put(&key2, value2.clone()); @@ -783,7 +348,8 @@ mod tests { assert_eq!(cache.len(), 3); // Update path3 with same size - should not cause eviction - let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100); + let (_, value3_v2) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); cache.put(&key3, value3_v2); assert_eq!(cache.len(), 3); @@ -792,7 +358,7 @@ mod tests { assert!(cache.contains_key(&key3)); // Update path3 with larger size that requires evicting path1 (LRU) - let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200); + let (_, value3_v3) = create_test_list_files_entry("path3", 1, 200, table_ref); cache.put(&key3, value3_v3.clone()); assert_eq!(cache.len(), 2); @@ -806,17 +372,19 @@ mod tests { HashMap::from([ ( key2, - ListFilesEntry { - metas: value2, - size_bytes: size2, + CacheEntryInfo { + value: value2.clone(), + size_bytes: value2.size(), + hits: 0, expires: None, } ), ( key3, - ListFilesEntry { - metas: value3_v3, - size_bytes: size3_v3, + CacheEntryInfo { + value: value3_v3.clone(), + size_bytes: value3_v3.size(), + hits: 0, expires: None, } ) @@ -829,21 +397,13 @@ mod tests { let ttl = Duration::from_millis(100); let mock_time = Arc::new(MockTimeProvider::new()); - let cache = DefaultListFilesCache::new(10000, Some(ttl)) + let cache = DefaultCache::with_ttl(10000, Some(ttl)) .with_time_provider(Arc::clone(&mock_time) as Arc); - let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref, - path: path2, - }; + let (key1, value1) = + create_test_list_files_entry("path1", 2, 50, table_ref.clone()); + let (key2, value2) = create_test_list_files_entry("path2", 2, 50, table_ref); cache.put(&key1, value1.clone()); cache.put(&key2, value2.clone()); @@ -856,17 +416,19 @@ mod tests { HashMap::from([ ( key1.clone(), - ListFilesEntry { - metas: value1, - size_bytes: size1, + CacheEntryInfo { + value: value1.clone(), + size_bytes: value1.size(), + hits: 1, expires: mock_time.now().checked_add(ttl), } ), ( key2.clone(), - ListFilesEntry { - metas: value2, - size_bytes: size2, + CacheEntryInfo { + value: value2.clone(), + size_bytes: value2.size(), + hits: 1, expires: mock_time.now().checked_add(ttl), } ) @@ -887,26 +449,16 @@ mod tests { let ttl = Duration::from_millis(200); let mock_time = Arc::new(MockTimeProvider::new()); - let cache = DefaultListFilesCache::new(1000, Some(ttl)) + let cache = DefaultCache::with_ttl(1100, Some(ttl)) .with_time_provider(Arc::clone(&mock_time) as Arc); - let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref, - path: path3, - }; + let (key1, value1) = + create_test_list_files_entry("path1", 1, 400, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 400, table_ref.clone()); + + let (key3, value3) = create_test_list_files_entry("path3", 1, 400, table_ref); cache.put(&key1, value1); mock_time.inc(Duration::from_millis(50)); cache.put(&key2, value2); @@ -927,14 +479,10 @@ mod tests { #[test] fn test_ttl_expiration_in_get() { let ttl = Duration::from_millis(100); - let cache = DefaultListFilesCache::new(10000, Some(ttl)); + let cache = DefaultCache::with_ttl(1000, Some(ttl)); - let (path, value, _) = create_test_list_files_entry("path", 2, 50); let table_ref = Some(TableReference::from("table")); - let key = TableScopedPath { - table: table_ref, - path, - }; + let (key, value) = create_test_list_files_entry("path", 2, 50, table_ref); // Cache the entry cache.put(&key, value.clone()); @@ -995,81 +543,45 @@ mod tests { assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); // location (4) + e_tag (3) + version (3) } - #[test] - fn test_entry_creation() { - // Test with empty vector - let empty_list = CachedFileList::new(vec![]); - let now = Instant::now(); - let entry = ListFilesEntry::try_new(empty_list, None, now); - assert!(entry.is_none()); - - // Validate entry size - let metas: Vec = (0..5) - .map(|i| create_test_object_meta(&format!("file{i}"), 30)) - .collect(); - let cached_list = CachedFileList::new(metas); - let entry = ListFilesEntry::try_new(cached_list, None, now).unwrap(); - assert_eq!(entry.metas.files.len(), 5); - // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes - let expected_size = (entry.metas.files.capacity() * size_of::()) - + (entry.metas.files.len() * 30); - assert_eq!(entry.size_bytes, expected_size); - - // Test with TTL - let meta = create_test_object_meta("file", 50); - let ttl = Duration::from_secs(10); - let cached_list = CachedFileList::new(vec![meta]); - let entry = ListFilesEntry::try_new(cached_list, Some(ttl), now).unwrap(); - assert!(entry.expires.unwrap() > now); - } - #[test] fn test_memory_tracking() { - let cache = DefaultListFilesCache::new(1000, None); + let cache = DefaultCache::new(1000); // Verify cache starts with 0 memory used { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, 0); + assert_eq!(cache.memory_used(), 0); } // Add entry and verify memory tracking - let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100); let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - cache.put(&key1, value1); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + cache.put(&key1, value1.clone()); + let entry_size_1 = key1.size() + value1.size(); { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, size1); + assert_eq!(cache.memory_used(), entry_size_1); } // Add another entry - let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200); - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - cache.put(&key2, value2); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 200, table_ref.clone()); + cache.put(&key2, value2.clone()); + let entry_size_2 = key2.size() + value2.size(); + { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, size1 + size2); + assert_eq!(cache.memory_used(), entry_size_1 + entry_size_2); } // Remove first entry and verify memory decreases cache.remove(&key1); { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, size2); + assert_eq!(cache.memory_used(), entry_size_2); } // Clear and verify memory is 0 cache.clear(); { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, 0); + assert_eq!(cache.memory_used(), 0); } } @@ -1090,7 +602,7 @@ mod tests { #[test] fn test_prefix_filtering() { - let cache = DefaultListFilesCache::new(100000, None); + let cache = DefaultCache::new(100000); // Create files for a partitioned table let table_base = Path::from("my_table"); @@ -1138,7 +650,7 @@ mod tests { #[test] fn test_prefix_no_matching_files() { - let cache = DefaultListFilesCache::new(100000, None); + let cache = DefaultCache::new(100000); let table_base = Path::from("my_table"); let files = vec![ @@ -1162,7 +674,7 @@ mod tests { #[test] fn test_nested_partitions() { - let cache = DefaultListFilesCache::new(100000, None); + let cache = DefaultCache::new(100000); let table_base = Path::from("events"); let files = vec![ @@ -1201,27 +713,16 @@ mod tests { #[test] fn test_drop_table_entries() { - let cache = DefaultListFilesCache::default(); - - let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let cache = DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT); let table_ref1 = Some(TableReference::from("table1")); - let key1 = TableScopedPath { - table: table_ref1.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref1.clone(), - path: path2, - }; - let table_ref2 = Some(TableReference::from("table2")); - let key3 = TableScopedPath { - table: table_ref2.clone(), - path: path3, - }; + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref1.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref1.clone()); + let (key3, value3) = + create_test_list_files_entry("path3", 1, 100, table_ref2.clone()); cache.put(&key1, value1); cache.put(&key2, value2); diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 76bd660e6c7d5..25d04434cf3b6 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -19,13 +19,17 @@ pub mod cache_manager; pub mod file_statistics_cache; pub mod lru_queue; +pub mod default_cache; mod file_metadata_cache; mod list_files_cache; -pub use file_metadata_cache::DefaultFilesMetadataCache; -pub use list_files_cache::DefaultListFilesCache; -pub use list_files_cache::ListFilesEntry; -pub use list_files_cache::TableScopedPath; +use crate::cache::cache_manager::TableScopedPath; +use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; +use datafusion_common::instant::Instant; +use datafusion_common::{HashMap, TableReference}; +use std::fmt::Debug; +use std::hash::Hash; +use std::time::Duration; /// Base trait for cache implementations with common operations. /// @@ -78,3 +82,83 @@ pub trait CacheAccessor: Send + Sync { /// Return the cache name. fn name(&self) -> String; } + +/// A managed cache with capacity, expiration, and introspection policies. +/// +/// Keys must implement [`Key`] and values must implement [`Value`] so +/// the implementation can account for heap usage when enforcing the cache limit. +/// +pub trait Cache: CacheAccessor { + /// Current memory budget, in bytes. + fn cache_limit(&self) -> usize; + + /// Change the memory budget in bytes. + fn update_cache_limit(&self, limit: usize); + + /// Time-to-live applied to newly inserted entries, or `None` if entries + /// never expire on their own. + fn cache_ttl(&self) -> Option; + + /// Change the TTL applied to subsequent inserts. + fn update_cache_ttl(&self, _ttl: Option); + + /// Invalidate every entry associated with `table_ref`. + fn drop_table_entries( + &self, + _table_ref: &Option, + ) -> datafusion_common::Result<()>; + + /// Snapshot of all current entries with per-entry metadata (size, hits, + /// expiration) for diagnostics and observability. + fn list_entries(&self) -> HashMap>; +} + +/// Key type for entries stored in a [`Cache`]. +pub trait Key: Clone + Eq + Hash + Send + Sync + Debug { + /// Size of the key in bytes, used for cache memory accounting. + fn size(&self) -> usize; + + /// Table this key is associated with, or `None` if the key is not + /// table-scoped. + fn table_ref(&self) -> Option<&TableReference>; +} + +/// Value type for entries stored in a [`Cache`]. +pub trait Value: Clone + Send + Sync { + /// Size of the value in bytes used for cache memory accounting. + fn size(&self) -> usize; +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CacheEntryInfo { + pub value: V, + pub size_bytes: usize, + pub hits: usize, + pub expires: Option, +} + +impl Debug for dyn Cache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Cache name: {} with length: {}", self.name(), self.len()) + } +} + +impl Key for object_store::path::Path { + fn size(&self) -> usize { + self.as_ref().heap_size(&mut DFHeapSizeCtx::default()) + } + + fn table_ref(&self) -> Option<&TableReference> { + None + } +} + +impl Key for TableScopedPath { + fn size(&self) -> usize { + DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) + } + + fn table_ref(&self) -> Option<&TableReference> { + self.table.as_ref() + } +} From d903b2ee8b247b84b36e85cc6fa8ab32374a65d0 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 09:48:06 +0200 Subject: [PATCH 02/13] Rename Key, Value to CacheKey, CacheValue --- datafusion/execution/src/cache/cache_manager.rs | 8 ++++---- datafusion/execution/src/cache/default_cache.rs | 16 ++++++++-------- .../execution/src/cache/list_files_cache.rs | 2 +- datafusion/execution/src/cache/mod.rs | 16 +++++++--------- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index ddd9c57e22f5d..ccb3d42c721fa 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,7 +16,7 @@ // under the License. use crate::cache::default_cache::DefaultCache; -use crate::cache::{Cache, Value}; +use crate::cache::{Cache, CacheValue}; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::stats::Precision; use datafusion_common::{HashMap, TableReference}; @@ -106,7 +106,7 @@ impl CachedFileMetadata { } } -impl Value for CachedFileMetadata { +impl CacheValue for CachedFileMetadata { fn size(&self) -> usize { DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) } @@ -195,7 +195,7 @@ impl CachedFileList { } } -impl Value for CachedFileList { +impl CacheValue for CachedFileList { fn size(&self) -> usize { self.files.capacity() * size_of::() + self @@ -247,7 +247,7 @@ pub struct CachedFileMetadataEntry { pub file_metadata: Arc, } -impl Value for CachedFileMetadataEntry { +impl CacheValue for CachedFileMetadataEntry { fn size(&self) -> usize { self.file_metadata.memory_size() } diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index 4574c38717b10..dd5c55587d07e 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -23,7 +23,7 @@ use datafusion_common::instant::Instant; use datafusion_common::{HashMap, Result}; use crate::cache::lru_queue::LruQueue; -use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, Key, Value}; +use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, CacheKey, CacheValue}; /// Source of the current time used by a [`DefaultCache`] when applying TTLs. pub trait TimeProvider: Send + Sync { @@ -44,12 +44,12 @@ impl TimeProvider for SystemTimeProvider { } #[derive(Clone)] -struct ValueEntry { +struct ValueEntry { value: V, expires: Option, } -struct DefaultCacheState { +struct DefaultCacheState { lru_queue: LruQueue>, hits: HashMap, memory_limit: usize, @@ -57,7 +57,7 @@ struct DefaultCacheState { ttl: Option, } -impl DefaultCacheState { +impl DefaultCacheState { fn new(memory_limit: usize, ttl: Option) -> Self { Self { lru_queue: LruQueue::new(), @@ -171,13 +171,13 @@ impl DefaultCacheState { /// limit are rejected (and any prior entry under the same key is removed). /// When a TTL is configured, the expiration is stamped onto each entry at /// insertion time and checked lazily on access. -pub struct DefaultCache { +pub struct DefaultCache { state: Mutex>, time_provider: Arc, name: String, } -impl DefaultCache { +impl DefaultCache { /// Create a cache with the given memory budget in bytes and no TTL. pub fn new(memory_limit: usize) -> Self { Self::with_ttl(memory_limit, None) @@ -211,7 +211,7 @@ impl DefaultCache { } } -impl CacheAccessor for DefaultCache { +impl CacheAccessor for DefaultCache { fn get(&self, key: &K) -> Option { let now = self.time_provider.now(); let mut state = self.state.lock().unwrap(); @@ -249,7 +249,7 @@ impl CacheAccessor for DefaultCache { } } -impl Cache for DefaultCache { +impl Cache for DefaultCache { fn cache_limit(&self) -> usize { self.state.lock().unwrap().memory_limit } diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index c82b80b973ec9..8f77968fa0713 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -22,7 +22,7 @@ mod tests { meta_heap_bytes, }; use crate::cache::default_cache::{DefaultCache, TimeProvider}; - use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, Key, Value}; + use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, CacheKey, CacheValue}; use chrono::DateTime; use datafusion_common::HashMap; use datafusion_common::TableReference; diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 25d04434cf3b6..8fff88445d1ee 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -85,10 +85,8 @@ pub trait CacheAccessor: Send + Sync { /// A managed cache with capacity, expiration, and introspection policies. /// -/// Keys must implement [`Key`] and values must implement [`Value`] so -/// the implementation can account for heap usage when enforcing the cache limit. -/// -pub trait Cache: CacheAccessor { + +pub trait Cache: CacheAccessor { /// Current memory budget, in bytes. fn cache_limit(&self) -> usize; @@ -114,7 +112,7 @@ pub trait Cache: CacheAccessor { } /// Key type for entries stored in a [`Cache`]. -pub trait Key: Clone + Eq + Hash + Send + Sync + Debug { +pub trait CacheKey: Clone + Eq + Hash + Send + Sync + Debug { /// Size of the key in bytes, used for cache memory accounting. fn size(&self) -> usize; @@ -124,7 +122,7 @@ pub trait Key: Clone + Eq + Hash + Send + Sync + Debug { } /// Value type for entries stored in a [`Cache`]. -pub trait Value: Clone + Send + Sync { +pub trait CacheValue: Clone + Send + Sync { /// Size of the value in bytes used for cache memory accounting. fn size(&self) -> usize; } @@ -137,13 +135,13 @@ pub struct CacheEntryInfo { pub expires: Option, } -impl Debug for dyn Cache { +impl Debug for dyn Cache { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Cache name: {} with length: {}", self.name(), self.len()) } } -impl Key for object_store::path::Path { +impl CacheKey for object_store::path::Path { fn size(&self) -> usize { self.as_ref().heap_size(&mut DFHeapSizeCtx::default()) } @@ -153,7 +151,7 @@ impl Key for object_store::path::Path { } } -impl Key for TableScopedPath { +impl CacheKey for TableScopedPath { fn size(&self) -> usize { DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) } From 6d27917ec1b1a3fce4002bb85a6559fd8d11c781 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 10:44:52 +0200 Subject: [PATCH 03/13] Merge CacheAccessor trait into Cache trait --- datafusion/execution/src/cache/default_cache.rs | 8 +++----- datafusion/execution/src/cache/file_metadata_cache.rs | 2 +- datafusion/execution/src/cache/file_statistics_cache.rs | 2 +- datafusion/execution/src/cache/list_files_cache.rs | 2 +- datafusion/execution/src/cache/mod.rs | 9 ++------- 5 files changed, 8 insertions(+), 15 deletions(-) diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index dd5c55587d07e..8ffcfcd281266 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -23,7 +23,7 @@ use datafusion_common::instant::Instant; use datafusion_common::{HashMap, Result}; use crate::cache::lru_queue::LruQueue; -use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, CacheKey, CacheValue}; +use crate::cache::{Cache, CacheEntryInfo, CacheKey, CacheValue}; /// Source of the current time used by a [`DefaultCache`] when applying TTLs. pub trait TimeProvider: Send + Sync { @@ -211,7 +211,8 @@ impl DefaultCache { } } -impl CacheAccessor for DefaultCache { +impl Cache for DefaultCache { + fn get(&self, key: &K) -> Option { let now = self.time_provider.now(); let mut state = self.state.lock().unwrap(); @@ -247,9 +248,6 @@ impl CacheAccessor for DefaultCache { fn name(&self) -> String { self.name.clone() } -} - -impl Cache for DefaultCache { fn cache_limit(&self) -> usize { self.state.lock().unwrap().memory_limit } diff --git a/datafusion/execution/src/cache/file_metadata_cache.rs b/datafusion/execution/src/cache/file_metadata_cache.rs index da95a25496bc8..e5c1e01e48baf 100644 --- a/datafusion/execution/src/cache/file_metadata_cache.rs +++ b/datafusion/execution/src/cache/file_metadata_cache.rs @@ -21,7 +21,7 @@ mod tests { use crate::cache::cache_manager::{CachedFileMetadataEntry, FileMetadata}; use crate::cache::default_cache::DefaultCache; - use crate::cache::{Cache, CacheAccessor, CacheEntryInfo}; + use crate::cache::{Cache, CacheEntryInfo}; use datafusion_common::HashMap; use object_store::ObjectMeta; use object_store::path::Path; diff --git a/datafusion/execution/src/cache/file_statistics_cache.rs b/datafusion/execution/src/cache/file_statistics_cache.rs index 48b21a6440b55..daa338461e90c 100644 --- a/datafusion/execution/src/cache/file_statistics_cache.rs +++ b/datafusion/execution/src/cache/file_statistics_cache.rs @@ -21,7 +21,7 @@ mod tests { CachedFileMetadata, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, TableScopedPath, }; use crate::cache::default_cache::DefaultCache; - use crate::cache::{Cache, CacheAccessor, CacheEntryInfo}; + use crate::cache::{Cache, CacheEntryInfo}; use arrow::array::{Int32Array, ListArray, RecordBatch}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 8f77968fa0713..31055f812ff60 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -22,7 +22,7 @@ mod tests { meta_heap_bytes, }; use crate::cache::default_cache::{DefaultCache, TimeProvider}; - use crate::cache::{Cache, CacheAccessor, CacheEntryInfo, CacheKey, CacheValue}; + use crate::cache::{Cache, CacheEntryInfo, CacheKey, CacheValue}; use chrono::DateTime; use datafusion_common::HashMap; use datafusion_common::TableReference; diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 8fff88445d1ee..90eda01257fd9 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -50,7 +50,7 @@ use std::time::Duration; /// 1. Call `get(key)` to check for cached value /// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` /// 3. If invalid or missing, compute new value and call `put(key, new_value)` -pub trait CacheAccessor: Send + Sync { +pub trait Cache: Send + Sync { /// Get a cached entry if it exists. /// /// Returns the cached value without any validation. The caller should @@ -81,12 +81,7 @@ pub trait CacheAccessor: Send + Sync { /// Return the cache name. fn name(&self) -> String; -} - -/// A managed cache with capacity, expiration, and introspection policies. -/// -pub trait Cache: CacheAccessor { /// Current memory budget, in bytes. fn cache_limit(&self) -> usize; @@ -103,7 +98,7 @@ pub trait Cache: CacheAccessor { /// Invalidate every entry associated with `table_ref`. fn drop_table_entries( &self, - _table_ref: &Option, + table_ref: &Option, ) -> datafusion_common::Result<()>; /// Snapshot of all current entries with per-entry metadata (size, hits, From 20273d3ab9add5677b14566710701c24488b3468 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 10:46:51 +0200 Subject: [PATCH 04/13] Remove FileStatisticsCacheEntry --- datafusion/execution/src/cache/cache_manager.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index ccb3d42c721fa..980df2de138dc 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -136,23 +136,6 @@ impl DFHeapSize for CachedFileMetadata { } } -/// Represents information about a cached statistics entry. -/// This is used to expose the statistics cache contents to outside modules. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct FileStatisticsCacheEntry { - pub object_meta: ObjectMeta, - /// Number of table rows. - pub num_rows: Precision, - /// Number of table columns. - pub num_columns: usize, - /// Total table size, in bytes. - pub table_size_bytes: Precision, - /// Size of the statistics entry, in bytes. - pub statistics_size_bytes: usize, - /// Whether ordering information is cached for this file. - pub has_ordering: bool, -} - /// Cached file listing. /// /// TTL expiration is handled internally by the cache implementation. From 410bc2ce088345d86f73152271aa6cadb70de84b Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 11:10:07 +0200 Subject: [PATCH 05/13] Refactor cache_manager --- .../execution/src/cache/cache_manager.rs | 56 +++++-------------- .../src/cache/file_statistics_cache.rs | 4 +- .../execution/src/cache/list_files_cache.rs | 4 +- datafusion/execution/src/cache/mod.rs | 33 +++++++++-- 4 files changed, 48 insertions(+), 49 deletions(-) diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 980df2de138dc..a478e73aeeaf8 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,20 +16,31 @@ // under the License. use crate::cache::default_cache::DefaultCache; -use crate::cache::{Cache, CacheValue}; +pub use crate::cache::{Cache, CacheValue, TableScopedPath}; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; -use datafusion_common::stats::Precision; -use datafusion_common::{HashMap, TableReference}; +use datafusion_common::HashMap; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use object_store::ObjectMeta; use object_store::path::Path; use std::any::Any; -use std::fmt::{Debug, Display, Formatter}; +use std::fmt::{Debug, Formatter}; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; +pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MiB + +pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M + +pub type FileStatisticsCache = dyn Cache; +pub type ListFilesCache = dyn Cache; +pub type FileMetadataCache = dyn Cache; + /// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap. pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { let mut size = object_meta.location.as_ref().len(); @@ -44,31 +55,6 @@ pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { size } -/// Each entry is scoped to its use within a specific table so that the cache -/// can differentiate between identical paths in different tables, and -/// table-level cache invalidation. -#[derive(PartialEq, Eq, Hash, Clone, Debug)] -pub struct TableScopedPath { - pub table: Option, - pub path: Path, -} - -impl DFHeapSize for TableScopedPath { - fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { - self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) - } -} - -impl Display for TableScopedPath { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if let Some(table) = &self.table { - write!(f, "{}, {}", self.path, table) - } else { - write!(f, "{}", self.path) - } - } -} - /// Cached metadata for a file, including statistics and ordering. /// /// This struct embeds the [`ObjectMeta`] used for cache validation, @@ -112,18 +98,6 @@ impl CacheValue for CachedFileMetadata { } } -pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB - -pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite - -pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MiB - -pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M - -pub type FileStatisticsCache = dyn Cache; -pub type ListFilesCache = dyn Cache; -pub type FileMetadataCache = dyn Cache; - impl DFHeapSize for CachedFileMetadata { fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { self.meta.size.heap_size(ctx) diff --git a/datafusion/execution/src/cache/file_statistics_cache.rs b/datafusion/execution/src/cache/file_statistics_cache.rs index daa338461e90c..58705413acc37 100644 --- a/datafusion/execution/src/cache/file_statistics_cache.rs +++ b/datafusion/execution/src/cache/file_statistics_cache.rs @@ -18,10 +18,10 @@ #[cfg(test)] mod tests { use crate::cache::cache_manager::{ - CachedFileMetadata, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, TableScopedPath, + CachedFileMetadata, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, }; use crate::cache::default_cache::DefaultCache; - use crate::cache::{Cache, CacheEntryInfo}; + use crate::cache::{Cache, CacheEntryInfo, TableScopedPath}; use arrow::array::{Int32Array, ListArray, RecordBatch}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 31055f812ff60..4f66e2dfbc5a1 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -18,11 +18,11 @@ #[cfg(test)] mod tests { use crate::cache::cache_manager::{ - CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, TableScopedPath, + CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, meta_heap_bytes, }; use crate::cache::default_cache::{DefaultCache, TimeProvider}; - use crate::cache::{Cache, CacheEntryInfo, CacheKey, CacheValue}; + use crate::cache::{Cache, CacheEntryInfo, CacheKey, CacheValue, TableScopedPath}; use chrono::DateTime; use datafusion_common::HashMap; use datafusion_common::TableReference; diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 90eda01257fd9..30e09369f53f8 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -23,13 +23,13 @@ pub mod default_cache; mod file_metadata_cache; mod list_files_cache; -use crate::cache::cache_manager::TableScopedPath; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::instant::Instant; use datafusion_common::{HashMap, TableReference}; -use std::fmt::Debug; +use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; use std::time::Duration; +use object_store::path::Path; /// Base trait for cache implementations with common operations. /// @@ -131,12 +131,12 @@ pub struct CacheEntryInfo { } impl Debug for dyn Cache { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Cache name: {} with length: {}", self.name(), self.len()) } } -impl CacheKey for object_store::path::Path { +impl CacheKey for Path { fn size(&self) -> usize { self.as_ref().heap_size(&mut DFHeapSizeCtx::default()) } @@ -155,3 +155,28 @@ impl CacheKey for TableScopedPath { self.table.as_ref() } } + +/// Each entry is scoped to its use within a specific table so that the cache +/// can differentiate between identical paths in different tables, and +/// table-level cache invalidation. +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct TableScopedPath { + pub table: Option, + pub path: Path, +} + +impl DFHeapSize for TableScopedPath { + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) + } +} + +impl Display for TableScopedPath { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(table) = &self.table { + write!(f, "{}, {}", self.path, table) + } else { + write!(f, "{}", self.path) + } + } +} From 0448804a7e582a03080d70e89bf109774458d707 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 11:11:08 +0200 Subject: [PATCH 06/13] fmt --- datafusion/execution/src/cache/cache_manager.rs | 2 +- datafusion/execution/src/cache/default_cache.rs | 1 - datafusion/execution/src/cache/list_files_cache.rs | 3 +-- datafusion/execution/src/cache/mod.rs | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index a478e73aeeaf8..29bb5073d5657 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -17,8 +17,8 @@ use crate::cache::default_cache::DefaultCache; pub use crate::cache::{Cache, CacheValue, TableScopedPath}; -use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::HashMap; +use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use object_store::ObjectMeta; diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index 8ffcfcd281266..4b410213d4011 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -212,7 +212,6 @@ impl DefaultCache { } impl Cache for DefaultCache { - fn get(&self, key: &K) -> Option { let now = self.time_provider.now(); let mut state = self.state.lock().unwrap(); diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 4f66e2dfbc5a1..a3340adc72162 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -18,8 +18,7 @@ #[cfg(test)] mod tests { use crate::cache::cache_manager::{ - CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, - meta_heap_bytes, + CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, meta_heap_bytes, }; use crate::cache::default_cache::{DefaultCache, TimeProvider}; use crate::cache::{Cache, CacheEntryInfo, CacheKey, CacheValue, TableScopedPath}; diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 30e09369f53f8..4bb57cc8a2afa 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -26,10 +26,10 @@ mod list_files_cache; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::instant::Instant; use datafusion_common::{HashMap, TableReference}; +use object_store::path::Path; use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; use std::time::Duration; -use object_store::path::Path; /// Base trait for cache implementations with common operations. /// From 3d08699b2935136d9f95ced6aa5aea055c8f3ac5 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 17:11:19 +0200 Subject: [PATCH 07/13] Move meta_heap_bytes to CachedFileList --- .../execution/src/cache/cache_manager.rs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 29bb5073d5657..bd65998e3fad6 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -41,20 +41,6 @@ pub type FileStatisticsCache = dyn Cache; pub type ListFilesCache = dyn Cache; pub type FileMetadataCache = dyn Cache; -/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap. -pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { - let mut size = object_meta.location.as_ref().len(); - - if let Some(e) = &object_meta.e_tag { - size += e.len(); - } - if let Some(v) = &object_meta.version { - size += v.len(); - } - - size -} - /// Cached metadata for a file, including statistics and ordering. /// /// This struct embeds the [`ObjectMeta`] used for cache validation, @@ -164,6 +150,20 @@ impl CacheValue for CachedFileList { } } +/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap. +pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { + let mut size = object_meta.location.as_ref().len(); + + if let Some(e) = &object_meta.e_tag { + size += e.len(); + } + if let Some(v) = &object_meta.version { + size += v.len(); + } + + size +} + impl Deref for CachedFileList { type Target = Arc>; fn deref(&self) -> &Self::Target { From 27111ebcd518e1bd0410937d3e5148bdda608296 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 17:15:45 +0200 Subject: [PATCH 08/13] Rename with_ttl to new_with_ttl --- datafusion-cli/src/main.rs | 2 +- datafusion/execution/src/cache/cache_manager.rs | 6 +++--- datafusion/execution/src/cache/default_cache.rs | 4 ++-- datafusion/execution/src/cache/list_files_cache.rs | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index e51d09c4b646d..c83491895c39c 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -702,7 +702,7 @@ mod tests { #[tokio::test] async fn test_list_files_cache() -> Result<(), DataFusionError> { let list_files_cache = - Arc::new(DefaultCache::with_ttl(1024, Some(Duration::from_secs(1)))); + Arc::new(DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1)))); let rt = RuntimeEnvBuilder::new() .with_cache_manager( diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index bd65998e3fad6..969f20e11f365 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -291,7 +291,7 @@ impl CacheManager { Some(Arc::clone(lfc)) } None if config.list_files_cache_limit > 0 => Some(Arc::new( - DefaultCache::::with_ttl( + DefaultCache::::new_with_ttl( config.list_files_cache_limit, config.list_files_cache_ttl, ) @@ -466,7 +466,7 @@ mod tests { #[test] fn test_ttl_preserved_when_not_set_in_config() { // Create a cache with TTL = 1 second - let list_file_cache = DefaultCache::with_ttl(1024, Some(Duration::from_secs(1))); + let list_file_cache = DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1))); // Verify the cache has TTL set initially assert_eq!( @@ -502,7 +502,7 @@ mod tests { #[test] fn test_ttl_overridden_when_set_in_config() { // Create a cache with TTL = 1 second - let list_file_cache = DefaultCache::with_ttl(1024, Some(Duration::from_secs(1))); + let list_file_cache = DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1))); // Put cache in config WITH a different TTL set let config = CacheManagerConfig::default() diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index 4b410213d4011..d2e1c47f7c901 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -180,12 +180,12 @@ pub struct DefaultCache { impl DefaultCache { /// Create a cache with the given memory budget in bytes and no TTL. pub fn new(memory_limit: usize) -> Self { - Self::with_ttl(memory_limit, None) + Self::new_with_ttl(memory_limit, None) } /// Create a cache with the given memory budget in bytes and an optional /// TTL applied to every newly inserted entry. - pub fn with_ttl(memory_limit: usize, ttl: Option) -> Self { + pub fn new_with_ttl(memory_limit: usize, ttl: Option) -> Self { Self { state: Mutex::new(DefaultCacheState::new(memory_limit, ttl)), time_provider: Arc::new(SystemTimeProvider), diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index a3340adc72162..ae604f8e50b54 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -396,7 +396,7 @@ mod tests { let ttl = Duration::from_millis(100); let mock_time = Arc::new(MockTimeProvider::new()); - let cache = DefaultCache::with_ttl(10000, Some(ttl)) + let cache = DefaultCache::new_with_ttl(10000, Some(ttl)) .with_time_provider(Arc::clone(&mock_time) as Arc); let table_ref = Some(TableReference::from("table")); @@ -448,7 +448,7 @@ mod tests { let ttl = Duration::from_millis(200); let mock_time = Arc::new(MockTimeProvider::new()); - let cache = DefaultCache::with_ttl(1100, Some(ttl)) + let cache = DefaultCache::new_with_ttl(1100, Some(ttl)) .with_time_provider(Arc::clone(&mock_time) as Arc); let table_ref = Some(TableReference::from("table")); @@ -478,7 +478,7 @@ mod tests { #[test] fn test_ttl_expiration_in_get() { let ttl = Duration::from_millis(100); - let cache = DefaultCache::with_ttl(1000, Some(ttl)); + let cache = DefaultCache::new_with_ttl(1000, Some(ttl)); let table_ref = Some(TableReference::from("table")); let (key, value) = create_test_list_files_entry("path", 2, 50, table_ref); From 01d6fd83d9ade7de45d03efb7eb97d056738b2d7 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 17:16:08 +0200 Subject: [PATCH 09/13] Set memory limit to original value in test --- datafusion/execution/src/cache/list_files_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index ae604f8e50b54..5676ab9c33573 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -478,7 +478,7 @@ mod tests { #[test] fn test_ttl_expiration_in_get() { let ttl = Duration::from_millis(100); - let cache = DefaultCache::new_with_ttl(1000, Some(ttl)); + let cache = DefaultCache::new_with_ttl(10000, Some(ttl)); let table_ref = Some(TableReference::from("table")); let (key, value) = create_test_list_files_entry("path", 2, 50, table_ref); From 8b1e10d0ed4c4be5f5052f90f7b01637ec992ac1 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 17:17:35 +0200 Subject: [PATCH 10/13] fmt --- datafusion-cli/src/main.rs | 6 ++++-- datafusion/execution/src/cache/cache_manager.rs | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index c83491895c39c..5a65cd3407485 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -701,8 +701,10 @@ mod tests { #[tokio::test] async fn test_list_files_cache() -> Result<(), DataFusionError> { - let list_files_cache = - Arc::new(DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1)))); + let list_files_cache = Arc::new(DefaultCache::new_with_ttl( + 1024, + Some(Duration::from_secs(1)), + )); let rt = RuntimeEnvBuilder::new() .with_cache_manager( diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 969f20e11f365..4183d966cfaa5 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -466,7 +466,8 @@ mod tests { #[test] fn test_ttl_preserved_when_not_set_in_config() { // Create a cache with TTL = 1 second - let list_file_cache = DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1))); + let list_file_cache = + DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1))); // Verify the cache has TTL set initially assert_eq!( @@ -502,7 +503,8 @@ mod tests { #[test] fn test_ttl_overridden_when_set_in_config() { // Create a cache with TTL = 1 second - let list_file_cache = DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1))); + let list_file_cache = + DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1))); // Put cache in config WITH a different TTL set let config = CacheManagerConfig::default() From bba67199dc7109e88554d02465fe654f2ba855d4 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 17:31:15 +0200 Subject: [PATCH 11/13] fix --- datafusion/core/src/datasource/listing_table_factory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 3c0bd3f630240..9ad8dc4f092d0 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -229,7 +229,6 @@ mod tests { datasource::file_format::csv::CsvFormat, execution::context::SessionContext, test_util::parquet_test_data, }; - use datafusion_execution::cache::CacheAccessor; use datafusion_execution::cache::cache_manager::{ CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, }; @@ -243,6 +242,7 @@ mod tests { use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{DFSchema, TableReference}; + use datafusion_execution::cache::Cache; use datafusion_execution::cache::default_cache::DefaultCache; use datafusion_expr::registry::ExtensionTypeRegistryRef; From b0f25c1f50952cfa83f8f1ac4534ef89b93e9769 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 17:55:12 +0200 Subject: [PATCH 12/13] fixup! fix --- datafusion/execution/src/cache/default_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index d2e1c47f7c901..3b05a99927c97 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -193,7 +193,7 @@ impl DefaultCache { } } - /// Override the cache name reported by [`CacheAccessor::name`]. + /// Override the cache name. pub fn with_name(mut self, name: impl Into) -> Self { self.name = name.into(); self From f1fa1cdc961c08b49383d3143c0e43ab6da3d546 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Sun, 31 May 2026 18:35:26 +0200 Subject: [PATCH 13/13] Add comment to default cache that empty entries are rejected --- datafusion/execution/src/cache/default_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index 3b05a99927c97..4a1c5d6ef8cc5 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -170,7 +170,7 @@ impl DefaultCacheState { /// push `memory_used` above `memory_limit`. Inserts whose own size exceeds the /// limit are rejected (and any prior entry under the same key is removed). /// When a TTL is configured, the expiration is stamped onto each entry at -/// insertion time and checked lazily on access. +/// insertion time and checked lazily on access. Entries with size 0 are rejected. pub struct DefaultCache { state: Mutex>, time_provider: Arc,