diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index df066992fb979..cb2372958e735 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -42,6 +42,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::scalar::ScalarValue; use async_trait::async_trait; +use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use parquet::basic::ConvertedType; use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::reader::FileReader; @@ -546,15 +547,17 @@ impl TableFunctionImpl for MetadataCacheFunc { for (path, entry) in cached_entries { path_arr.push(path.to_string()); file_modified_arr - .push(Some(entry.object_meta.last_modified.timestamp_millis())); - file_size_bytes_arr.push(entry.object_meta.size); - e_tag_arr.push(entry.object_meta.e_tag); - version_arr.push(entry.object_meta.version); + .push(Some(entry.value.meta.last_modified.timestamp_millis())); + file_size_bytes_arr.push(entry.value.meta.size); + e_tag_arr.push(entry.value.meta.e_tag); + version_arr.push(entry.value.meta.version); metadata_size_bytes.push(entry.size_bytes as u64); hits_arr.push(entry.hits as u64); let mut extra = entry - .extra + .value + .file_metadata + .extra_info() .iter() .map(|(k, v)| format!("{k}={v}")) .collect::>(); @@ -667,14 +670,22 @@ impl TableFunctionImpl for StatisticsCacheFunc { table_arr .push(path.table.map_or_else(|| "".to_string(), |t| t.to_string())); file_modified_arr - .push(Some(entry.object_meta.last_modified.timestamp_millis())); - file_size_bytes_arr.push(entry.object_meta.size); - e_tag_arr.push(entry.object_meta.e_tag); - version_arr.push(entry.object_meta.version); - num_rows_arr.push(entry.num_rows.to_string()); - num_columns_arr.push(entry.num_columns as u64); - table_size_bytes_arr.push(entry.table_size_bytes.to_string()); - statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64); + .push(Some(entry.value.meta.last_modified.timestamp_millis())); + file_size_bytes_arr.push(entry.value.meta.size); + e_tag_arr.push(entry.value.meta.e_tag); + version_arr.push(entry.value.meta.version); + num_rows_arr.push(entry.value.statistics.num_rows.to_string()); + num_columns_arr + .push(entry.value.statistics.column_statistics.len() as u64); + table_size_bytes_arr + .push(entry.value.statistics.total_byte_size.to_string()); + statistics_size_bytes_arr.push( + entry + .value + .statistics + .heap_size(&mut DFHeapSizeCtx::default()) + as u64, + ); } } @@ -827,14 +838,14 @@ impl TableFunctionImpl for ListFilesCacheFunc { .map(|t| t.duration_since(now).as_millis() as i64), ); - for meta in entry.metas.files.iter() { + for meta in entry.value.files.iter() { file_path_arr.push(meta.location.to_string()); file_modified_arr.push(meta.last_modified.timestamp_millis()); file_size_bytes_arr.push(meta.size); etag_arr.push(meta.e_tag.clone()); version_arr.push(meta.version.clone()); } - current_offset += entry.metas.files.len() as i32; + current_offset += entry.value.files.len() as i32; offsets.push(current_offset); } } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 935bf0a9744dd..5a65cd3407485 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -441,9 +441,10 @@ mod tests { use std::time::Duration; use super::*; + use datafusion::execution::cache::default_cache::DefaultCache; use datafusion::{ common::test_util::batches_to_string, - execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig}, + execution::cache::cache_manager::CacheManagerConfig, prelude::{ParquetReadOptions, col, lit, split_part}, }; use insta::assert_snapshot; @@ -700,7 +701,7 @@ mod tests { #[tokio::test] async fn test_list_files_cache() -> Result<(), DataFusionError> { - let list_files_cache = Arc::new(DefaultListFilesCache::new( + let list_files_cache = Arc::new(DefaultCache::new_with_ttl( 1024, Some(Duration::from_secs(1)), )); diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index dd3675bd2b39d..5e2507a79984c 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -34,8 +34,8 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics, }; -use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; +use datafusion_execution::cache::cache_manager::TableScopedPath; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; @@ -186,7 +186,7 @@ pub struct ListingTable { /// The SQL definition for this table, if any definition: Option, /// Cache for collected file statistics - collected_statistics: Option>, + collected_statistics: Option>, /// Constraints applied to this table constraints: Constraints, /// Column default expressions for columns that are not physically present in the data files @@ -259,7 +259,7 @@ impl ListingTable { /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics /// multiple times in the same session. /// - pub fn with_cache(mut self, cache: Option>) -> Self { + pub fn with_cache(mut self, cache: Option>) -> Self { self.collected_statistics = cache; self } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 349d941cc2bda..9ad8dc4f092d0 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -229,9 +229,9 @@ mod tests { datasource::file_format::csv::CsvFormat, execution::context::SessionContext, test_util::parquet_test_data, }; - use datafusion_execution::cache::CacheAccessor; - use datafusion_execution::cache::cache_manager::CacheManagerConfig; - use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; + use datafusion_execution::cache::cache_manager::{ + CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + }; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use glob::Pattern; @@ -242,6 +242,8 @@ mod tests { use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{DFSchema, TableReference}; + use datafusion_execution::cache::Cache; + use datafusion_execution::cache::default_cache::DefaultCache; use datafusion_expr::registry::ExtensionTypeRegistryRef; #[tokio::test] @@ -484,7 +486,8 @@ mod tests { .to_string(); // Test with collect_statistics enabled - let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); + let file_statistics_cache = + Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT)); let cache_config = CacheManagerConfig::default() .with_file_statistics_cache(Some(file_statistics_cache.clone())); let runtime = RuntimeEnvBuilder::new() @@ -514,7 +517,8 @@ mod tests { ); // Test with collect_statistics disabled - let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); + let file_statistics_cache = + Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT)); let cache_config = CacheManagerConfig::default() .with_file_statistics_cache(Some(file_statistics_cache.clone())); let runtime = RuntimeEnvBuilder::new() diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b2ad5c7d7ada0..9176f0defc68d 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -76,8 +76,8 @@ use datafusion_common::{ }; pub use datafusion_execution::TaskContext; use datafusion_execution::cache::cache_manager::{ - DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, - DEFAULT_METADATA_CACHE_LIMIT, + DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, + DEFAULT_LIST_FILES_CACHE_TTL, DEFAULT_METADATA_CACHE_LIMIT, }; pub use datafusion_execution::config::SessionConfig; use datafusion_execution::disk_manager::{ @@ -103,7 +103,6 @@ use datafusion_session::SessionStore; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion_execution::cache::file_statistics_cache::DEFAULT_FILE_STATISTICS_MEMORY_LIMIT; use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 3e3b90a348b04..7c8e1d2ce0cc0 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -29,11 +29,11 @@ use datafusion::execution::session_state::SessionStateBuilder; use datafusion::prelude::SessionContext; use datafusion_common::DFSchema; use datafusion_common::stats::Precision; -use datafusion_execution::cache::DefaultListFilesCache; use datafusion_execution::cache::cache_manager::{ - CacheManagerConfig, FileStatisticsCache, + CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, FileStatisticsCache, ListFilesCache, }; -use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; +use datafusion_execution::cache::default_cache::DefaultCache; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::{Expr, col, lit}; @@ -238,7 +238,7 @@ async fn list_files_with_session_level_cache() { async fn get_listing_table( table_path: &ListingTableUrl, - static_cache: Option>, + static_cache: Option>, opt: &ListingOptions, ) -> ListingTable { let schema = opt @@ -256,14 +256,13 @@ async fn get_listing_table( .with_cache(static_cache) } -fn get_cache_runtime_state() -> ( - Arc, - Arc, - SessionState, -) { +fn get_cache_runtime_state() +-> (Arc, Arc, SessionState) { let cache_config = CacheManagerConfig::default(); - let file_static_cache = Arc::new(DefaultFileStatisticsCache::default()); - let list_file_cache = Arc::new(DefaultListFilesCache::default()); + let file_static_cache = + Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT)); + let list_file_cache = + Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT)); let cache_config = cache_config .with_file_statistics_cache(Some(file_static_cache.clone())) diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 2db1e1ce12f72..a9f57a0793463 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -23,9 +23,11 @@ use std::time::Duration; use datafusion::execution::context::SessionContext; use datafusion::execution::context::TaskContext; use datafusion::prelude::SessionConfig; -use datafusion_execution::cache::DefaultListFilesCache; -use datafusion_execution::cache::cache_manager::CacheManagerConfig; -use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; +use datafusion_execution::cache::cache_manager::{ + CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, +}; +use datafusion_execution::cache::default_cache::DefaultCache; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_plan::common::collect; @@ -260,7 +262,8 @@ async fn test_test_metadata_cache_limit() { #[tokio::test] async fn test_list_files_cache_limit() { - let list_files_cache = Arc::new(DefaultListFilesCache::default()); + let list_files_cache = + Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT)); let rt = RuntimeEnvBuilder::new() .with_cache_manager( @@ -303,7 +306,8 @@ async fn test_list_files_cache_limit() { #[tokio::test] async fn test_list_files_cache_ttl() { - let list_files_cache = Arc::new(DefaultListFilesCache::default()); + let list_files_cache = + Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT)); let rt = RuntimeEnvBuilder::new() .with_cache_manager( @@ -347,7 +351,8 @@ async fn test_list_files_cache_ttl() { #[tokio::test] async fn test_file_statistics_cache_limit() { - let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); + let file_statistics_cache = + Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT)); let rt = RuntimeEnvBuilder::new() .with_cache_manager( diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index fe81504e320d7..734ec6b536f69 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -626,7 +626,7 @@ pub async fn fetch_parquet_metadata( object_meta: &ObjectMeta, size_hint: Option, decryption_properties: Option<&FileDecryptionProperties>, - file_metadata_cache: Option>, + file_metadata_cache: Option>, ) -> Result> { let decryption_properties = decryption_properties.cloned().map(Arc::new); DFParquetMetadata::new(store, object_meta) @@ -650,7 +650,7 @@ pub async fn fetch_statistics( file: &ObjectMeta, metadata_size_hint: Option, decryption_properties: Option<&FileDecryptionProperties>, - file_metadata_cache: Option>, + file_metadata_cache: Option>, ) -> Result { let decryption_properties = decryption_properties.cloned().map(Arc::new); DFParquetMetadata::new(store, file) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index d3831766a42ab..1618050a8daae 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ - ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, + ColumnStatistics, DataFusionError, HashMap, Result, ScalarValue, Statistics, }; use datafusion_execution::cache::cache_manager::{ CachedFileMetadataEntry, FileMetadata, FileMetadataCache, @@ -48,7 +48,6 @@ use parquet::file::metadata::{ use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; /// Minimum fraction of row groups that must report NDV statistics for the @@ -69,7 +68,7 @@ pub struct DFParquetMetadata<'a> { object_meta: &'a ObjectMeta, metadata_size_hint: Option, decryption_properties: Option>, - file_metadata_cache: Option>, + file_metadata_cache: Option>, /// timeunit to coerce INT96 timestamps to pub coerce_int96: Option, /// Optional timezone applied to INT96-coerced timestamps. @@ -107,7 +106,7 @@ impl<'a> DFParquetMetadata<'a> { /// set file metadata cache pub fn with_file_metadata_cache( mut self, - file_metadata_cache: Option>, + file_metadata_cache: Option>, ) -> Self { self.file_metadata_cache = file_metadata_cache; self diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 482bf8dced4f8..f1d7c82b26d13 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -21,6 +21,7 @@ use crate::ParquetFileMetrics; use crate::metadata::DFParquetMetadata; use bytes::Bytes; +use datafusion_common::HashMap; use datafusion_datasource::PartitionedFile; use datafusion_execution::cache::cache_manager::FileMetadata; use datafusion_execution::cache::cache_manager::FileMetadataCache; @@ -32,7 +33,6 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::file::metadata::ParquetMetaData; use std::any::Any; -use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -182,13 +182,13 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { #[derive(Debug)] pub struct CachedParquetFileReaderFactory { store: Arc, - metadata_cache: Arc, + metadata_cache: Arc, } impl CachedParquetFileReaderFactory { pub fn new( store: Arc, - metadata_cache: Arc, + metadata_cache: Arc, ) -> Self { Self { store, @@ -241,7 +241,7 @@ pub struct CachedParquetFileReader { store: Arc, pub inner: ParquetObjectReader, partitioned_file: PartitionedFile, - metadata_cache: Arc, + metadata_cache: Arc, metadata_size_hint: Option, } @@ -251,7 +251,7 @@ impl CachedParquetFileReader { store: Arc, inner: ParquetObjectReader, partitioned_file: PartitionedFile, - metadata_cache: Arc, + metadata_cache: Arc, metadata_size_hint: Option, ) -> Self { Self { diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 4bf99fc325e2c..7985a29e4fd94 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use datafusion_common::{DataFusionError, Result, TableReference}; -use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::CachedFileList; +use datafusion_execution::cache::cache_manager::TableScopedPath; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 08a8dc9fd9cda..4183d966cfaa5 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -15,31 +15,31 @@ // specific language governing permissions and limitations // under the License. -use crate::cache::CacheAccessor; -use crate::cache::DefaultListFilesCache; -use crate::cache::file_statistics_cache::{ - DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DefaultFileStatisticsCache, - DefaultFilesMetadataCache, -}; -use crate::cache::list_files_cache::ListFilesEntry; -use crate::cache::list_files_cache::TableScopedPath; -use datafusion_common::TableReference; +use crate::cache::default_cache::DefaultCache; +pub use crate::cache::{Cache, CacheValue, TableScopedPath}; +use datafusion_common::HashMap; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; -use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use object_store::ObjectMeta; use object_store::path::Path; use std::any::Any; -use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; -pub use super::list_files_cache::{ - DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL, -}; +pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MiB + +pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M + +pub type FileStatisticsCache = dyn Cache; +pub type ListFilesCache = dyn Cache; +pub type FileMetadataCache = dyn Cache; /// Cached metadata for a file, including statistics and ordering. /// @@ -78,36 +78,10 @@ impl CachedFileMetadata { } } -/// A cache for file statistics and orderings. -/// -/// This cache stores [`CachedFileMetadata`] which includes: -/// - File metadata for validation (size, last_modified) -/// - Statistics for the file -/// - Ordering information for the file -/// -/// If enabled via [`CacheManagerConfig::with_file_statistics_cache`] this -/// cache avoids inferring the same file statistics repeatedly during the -/// session lifetime. -/// -/// The typical usage pattern is: -/// 1. Call `get(path)` to check for cached value -/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` -/// 3. If invalid or missing, compute new value and call `put(path, new_value)` -/// -/// See [`crate::runtime_env::RuntimeEnv`] for more details -pub trait FileStatisticsCache: - CacheAccessor -{ - /// Cache memory limit in bytes. - fn cache_limit(&self) -> usize; - - /// Updates the cache with a new memory limit in bytes. - fn update_cache_limit(&self, limit: usize); - - /// Retrieves the information about the entries currently cached. - fn list_entries(&self) -> HashMap; - - fn drop_table_entries(&self, table_ref: &Option) -> Result<()>; +impl CacheValue for CachedFileMetadata { + fn size(&self) -> usize { + DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) + } } impl DFHeapSize for CachedFileMetadata { @@ -122,23 +96,6 @@ impl DFHeapSize for CachedFileMetadata { } } -/// Represents information about a cached statistics entry. -/// This is used to expose the statistics cache contents to outside modules. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct FileStatisticsCacheEntry { - pub object_meta: ObjectMeta, - /// Number of table rows. - pub num_rows: Precision, - /// Number of table columns. - pub num_columns: usize, - /// Total table size, in bytes. - pub table_size_bytes: Precision, - /// Size of the statistics entry, in bytes. - pub statistics_size_bytes: usize, - /// Whether ordering information is cached for this file. - pub has_ordering: bool, -} - /// Cached file listing. /// /// TTL expiration is handled internally by the cache implementation. @@ -181,6 +138,32 @@ impl CachedFileList { } } +impl CacheValue for CachedFileList { + fn size(&self) -> usize { + self.files.capacity() * size_of::() + + self + .files + .iter() + .map(meta_heap_bytes) + .reduce(|acc, b| acc + b) + .unwrap_or(0) + } +} + +/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap. +pub fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { + let mut size = object_meta.location.as_ref().len(); + + if let Some(e) = &object_meta.e_tag { + size += e.len(); + } + if let Some(v) = &object_meta.version { + size += v.len(); + } + + size +} + impl Deref for CachedFileList { type Target = Arc>; fn deref(&self) -> &Self::Target { @@ -194,38 +177,6 @@ impl From> for CachedFileList { } } -/// Cache for storing the [`ObjectMeta`]s that result from listing a path -/// -/// Listing a path means doing an object store "list" operation or `ls` -/// command on the local filesystem. This operation can be expensive, -/// especially when done over remote object stores. -/// -/// The cache key is always the table's base path, ensuring a stable cache key. -/// The cached value is a [`CachedFileList`] containing the files and a timestamp. -/// -/// Partition filtering is done after retrieval using [`CachedFileList::files_matching_prefix`]. -/// -/// See [`crate::runtime_env::RuntimeEnv`] for more details. -pub trait ListFilesCache: CacheAccessor { - /// Returns the cache's memory limit in bytes. - fn cache_limit(&self) -> usize; - - /// Returns the TTL (time-to-live) for cache entries, if configured. - fn cache_ttl(&self) -> Option; - - /// Updates the cache with a new memory limit in bytes. - fn update_cache_limit(&self, limit: usize); - - /// Updates the cache with a new TTL (time-to-live). - fn update_cache_ttl(&self, ttl: Option); - - /// Retrieves the information about the entries currently cached. - fn list_entries(&self) -> HashMap; - - /// Drop all entries for the given table reference. - fn drop_table_entries(&self, table_ref: &Option) -> Result<()>; -} - /// Generic file-embedded metadata used with [`FileMetadataCache`]. /// /// For example, Parquet footers and page metadata can be represented @@ -240,7 +191,7 @@ pub trait FileMetadata: Any + Send + Sync { /// Returns the size of the metadata in bytes. fn memory_size(&self) -> usize; - /// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]). + /// Returns extra information about this entry fn extra_info(&self) -> HashMap; } @@ -253,6 +204,12 @@ pub struct CachedFileMetadataEntry { pub file_metadata: Arc, } +impl CacheValue for CachedFileMetadataEntry { + fn size(&self) -> usize { + self.file_metadata.memory_size() + } +} + impl CachedFileMetadataEntry { /// Create a new cached file metadata entry. pub fn new(meta: ObjectMeta, file_metadata: Arc) -> Self { @@ -278,37 +235,6 @@ impl Debug for CachedFileMetadataEntry { } } -/// Cache for file-embedded metadata. -/// -/// This cache stores per-file metadata in the form of [`CachedFileMetadataEntry`], -/// which includes the [`ObjectMeta`] for validation. -/// -/// For example, the built in [`ListingTable`] uses this cache to avoid parsing -/// Parquet footers multiple times for the same file. -/// -/// DataFusion provides a default implementation, [`DefaultFilesMetadataCache`], -/// and users can also provide their own implementations to implement custom -/// caching strategies. -/// -/// The typical usage pattern is: -/// 1. Call `get(path)` to check for cached value -/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` -/// 3. If invalid or missing, compute new value and call `put(path, new_value)` -/// -/// See [`crate::runtime_env::RuntimeEnv`] for more details. -/// -/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html -pub trait FileMetadataCache: CacheAccessor { - /// Returns the cache's memory limit in bytes. - fn cache_limit(&self) -> usize; - - /// Updates the cache with a new memory limit in bytes. - fn update_cache_limit(&self, limit: usize); - - /// Retrieves the information about the entries currently cached. - fn list_entries(&self) -> HashMap; -} - #[derive(Debug, Clone, PartialEq, Eq)] /// Represents information about a cached metadata entry. /// This is used to expose the metadata cache contents to outside modules. @@ -322,24 +248,6 @@ pub struct FileMetadataCacheEntry { pub extra: HashMap, } -impl Debug for dyn FileStatisticsCache { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Cache name: {} with length: {}", self.name(), self.len()) - } -} - -impl Debug for dyn ListFilesCache { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Cache name: {} with length: {}", self.name(), self.len()) - } -} - -impl Debug for dyn FileMetadataCache { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Cache name: {} with length: {}", self.name(), self.len()) - } -} - /// Manages various caches used in DataFusion. /// /// Following DataFusion design principles, DataFusion provides default cache @@ -349,28 +257,30 @@ impl Debug for dyn FileMetadataCache { /// See [`CacheManagerConfig`] for configuration options. #[derive(Debug)] pub struct CacheManager { - file_statistic_cache: Option>, - list_files_cache: Option>, - file_metadata_cache: Arc, + file_statistic_cache: Option>, + list_files_cache: Option>, + file_metadata_cache: Arc, } impl CacheManager { pub fn try_new(config: &CacheManagerConfig) -> Result> { - let file_statistic_cache = match &config.file_statistics_cache { - Some(fsc) if config.file_statistics_cache_limit > 0 => { - fsc.update_cache_limit(config.file_statistics_cache_limit); - Some(Arc::clone(fsc)) - } - None if config.file_statistics_cache_limit > 0 => { - let fsc: Arc = Arc::new( - DefaultFileStatisticsCache::new(config.file_statistics_cache_limit), - ); - Some(fsc) - } - _ => None, - }; - - let list_files_cache = match &config.list_files_cache { + let file_statistic_cache: Option> = + match &config.file_statistics_cache { + Some(fsc) if config.file_statistics_cache_limit > 0 => { + fsc.update_cache_limit(config.file_statistics_cache_limit); + Some(Arc::clone(fsc)) + } + None if config.file_statistics_cache_limit > 0 => Some(Arc::new( + DefaultCache::::new( + config.file_statistics_cache_limit, + ) + .with_name("DefaultFileStatisticsCache"), + )), + _ => None, + }; + + let list_files_cache: Option> = match &config.list_files_cache + { Some(lfc) if config.list_files_cache_limit > 0 => { // the cache memory limit or ttl might have changed, ensure they are updated lfc.update_cache_limit(config.list_files_cache_limit); @@ -380,13 +290,13 @@ impl CacheManager { } Some(Arc::clone(lfc)) } - None if config.list_files_cache_limit > 0 => { - let lfc: Arc = Arc::new(DefaultListFilesCache::new( + None if config.list_files_cache_limit > 0 => Some(Arc::new( + DefaultCache::::new_with_ttl( config.list_files_cache_limit, config.list_files_cache_ttl, - )); - Some(lfc) - } + ) + .with_name("DefaultListFilesCache"), + )), _ => None, }; @@ -394,9 +304,7 @@ impl CacheManager { .file_metadata_cache .as_ref() .map(Arc::clone) - .unwrap_or_else(|| { - Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit)) - }); + .unwrap_or_else(|| Arc::new(DefaultCache::new(config.metadata_cache_limit))); // the cache memory limit might have changed, ensure the limit is updated file_metadata_cache.update_cache_limit(config.metadata_cache_limit); @@ -409,7 +317,7 @@ impl CacheManager { } /// Get the file statistics cache. - pub fn get_file_statistic_cache(&self) -> Option> { + pub fn get_file_statistic_cache(&self) -> Option> { self.file_statistic_cache.clone() } @@ -421,7 +329,7 @@ impl CacheManager { } /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path. - pub fn get_list_files_cache(&self) -> Option> { + pub fn get_list_files_cache(&self) -> Option> { self.list_files_cache.clone() } @@ -438,7 +346,7 @@ impl CacheManager { } /// Get the file embedded metadata cache. - pub fn get_file_metadata_cache(&self) -> Arc { + pub fn get_file_metadata_cache(&self) -> Arc { Arc::clone(&self.file_metadata_cache) } @@ -448,14 +356,12 @@ impl CacheManager { } } -pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M - #[derive(Clone)] pub struct CacheManagerConfig { /// Enable caching of file statistics when listing files. /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session. /// Default is enabled. Currently only Parquet files are supported. - pub file_statistics_cache: Option>, + pub file_statistics_cache: Option>, /// Limit of the file statistics cache, in bytes. Default: 20MiB. pub file_statistics_cache_limit: usize, /// Enable caching of file metadata when listing files. @@ -465,7 +371,7 @@ pub struct CacheManagerConfig { /// Note that if this option is enabled, DataFusion will not see any updates to the underlying /// storage for at least `list_files_cache_ttl` duration. /// Default is enabled. - pub list_files_cache: Option>, + pub list_files_cache: Option>, /// Limit of the `list_files_cache`, in bytes. Default: 1MiB. pub list_files_cache_limit: usize, /// The duration the list files cache will consider an entry valid after insertion. Note that @@ -474,8 +380,8 @@ pub struct CacheManagerConfig { pub list_files_cache_ttl: Option, /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a /// data file (e.g., Parquet footer and page metadata). - /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. - pub file_metadata_cache: Option>, + /// If not provided, the [`CacheManager`] will create it. + pub file_metadata_cache: Option>, /// Limit of the file-embedded metadata cache, in bytes. pub metadata_cache_limit: usize, } @@ -498,7 +404,7 @@ impl CacheManagerConfig { /// Set the cache for file statistics. pub fn with_file_statistics_cache( mut self, - cache: Option>, + cache: Option>, ) -> Self { self.file_statistics_cache = cache; self @@ -513,10 +419,7 @@ impl CacheManagerConfig { /// Set the cache for listing files. /// /// Default is `None` (disabled). - pub fn with_list_files_cache( - mut self, - cache: Option>, - ) -> Self { + pub fn with_list_files_cache(mut self, cache: Option>) -> Self { self.list_files_cache = cache; self } @@ -538,11 +441,9 @@ impl CacheManagerConfig { } /// Sets the cache for file-embedded metadata. - /// - /// Default is a [`DefaultFilesMetadataCache`]. pub fn with_file_metadata_cache( mut self, - cache: Option>, + cache: Option>, ) -> Self { self.file_metadata_cache = cache; self @@ -566,7 +467,7 @@ mod tests { fn test_ttl_preserved_when_not_set_in_config() { // Create a cache with TTL = 1 second let list_file_cache = - DefaultListFilesCache::new(1024, Some(Duration::from_secs(1))); + DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1))); // Verify the cache has TTL set initially assert_eq!( @@ -603,7 +504,7 @@ mod tests { fn test_ttl_overridden_when_set_in_config() { // Create a cache with TTL = 1 second let list_file_cache = - DefaultListFilesCache::new(1024, Some(Duration::from_secs(1))); + DefaultCache::new_with_ttl(1024, Some(Duration::from_secs(1))); // Put cache in config WITH a different TTL set let config = CacheManagerConfig::default() diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs new file mode 100644 index 0000000000000..4a1c5d6ef8cc5 --- /dev/null +++ b/datafusion/execution/src/cache/default_cache.rs @@ -0,0 +1,308 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use datafusion_common::TableReference; +use datafusion_common::instant::Instant; +use datafusion_common::{HashMap, Result}; + +use crate::cache::lru_queue::LruQueue; +use crate::cache::{Cache, CacheEntryInfo, CacheKey, CacheValue}; + +/// Source of the current time used by a [`DefaultCache`] when applying TTLs. +pub trait TimeProvider: Send + Sync { + /// Return the current instant. + fn now(&self) -> Instant; +} + +/// [`TimeProvider`] backed by [`Instant::now`]. +/// +/// This is the default time source used by [`DefaultCache`] +#[derive(Debug, Default)] +pub struct SystemTimeProvider; + +impl TimeProvider for SystemTimeProvider { + fn now(&self) -> Instant { + Instant::now() + } +} + +#[derive(Clone)] +struct ValueEntry { + value: V, + expires: Option, +} + +struct DefaultCacheState { + lru_queue: LruQueue>, + hits: HashMap, + memory_limit: usize, + memory_used: usize, + ttl: Option, +} + +impl DefaultCacheState { + fn new(memory_limit: usize, ttl: Option) -> Self { + Self { + lru_queue: LruQueue::new(), + hits: HashMap::new(), + memory_limit, + memory_used: 0, + ttl, + } + } + + fn get(&mut self, key: &K, now: Instant) -> Option { + let entry = self.lru_queue.get(key)?; + if let Some(exp) = entry.expires + && now > exp + { + self.remove(key); + return None; + } + let value = entry.value.clone(); + *self.hits.entry(key.clone()).or_insert(0) += 1; + Some(value) + } + + fn contains_key(&mut self, key: &K, now: Instant) -> bool { + let Some(entry) = self.lru_queue.peek(key) else { + return false; + }; + match entry.expires { + Some(exp) if now > exp => { + self.remove(key); + false + } + _ => true, + } + } + + fn put(&mut self, key: &K, value: V, now: Instant) -> Option { + let value_size = value.size(); + + if value_size == 0 { + return None; + } + + let key_size = key.size(); + let total_size = key_size + value_size; + + if total_size > self.memory_limit { + // Remove potential stale entry + let result = self.remove(key); + if let Some(stale_entry) = &result { + self.memory_used -= key_size; + self.memory_used -= stale_entry.size(); + } + return result; + } + + let expires = self.ttl.map(|ttl| now + ttl); + let entry = ValueEntry { value, expires }; + + self.memory_used += total_size; + self.hits.insert(key.clone(), 0); + let old = self.lru_queue.put(key.clone(), entry); + if let Some(old_entry) = &old { + self.memory_used -= key_size; + self.memory_used -= old_entry.value.size(); + } + + self.evict_entries(); + + old.map(|v| v.value) + } + + fn remove(&mut self, key: &K) -> Option { + let entry = self.lru_queue.remove(key)?; + self.memory_used -= key.size(); + self.memory_used -= entry.value.size(); + self.hits.remove(key); + Some(entry.value) + } + + fn evict_entries(&mut self) { + while self.memory_used > self.memory_limit { + let Some((evicted_key, evicted)) = self.lru_queue.pop() else { + // cache is empty while memory_used > memory_limit, cannot happen + log::error!( + "DefaultCache memory accounting bug: memory_used={} but cache is empty", + self.memory_used + ); + debug_assert!(false, "memory_used > limit with empty cache"); + self.memory_used = 0; + return; + }; + self.memory_used -= evicted_key.size(); + self.memory_used -= evicted.value.size(); + self.hits.remove(&evicted_key); + } + } + + fn clear(&mut self) { + self.lru_queue.clear(); + self.hits.clear(); + self.memory_used = 0; + } +} + +/// In-memory [`Cache`] with an LRU eviction policy, byte-based memory limit, +/// and optional per-entry TTL. +/// +/// Entries are evicted in least-recently-used order whenever an insert would +/// push `memory_used` above `memory_limit`. Inserts whose own size exceeds the +/// limit are rejected (and any prior entry under the same key is removed). +/// When a TTL is configured, the expiration is stamped onto each entry at +/// insertion time and checked lazily on access. Entries with size 0 are rejected. +pub struct DefaultCache { + state: Mutex>, + time_provider: Arc, + name: String, +} + +impl DefaultCache { + /// Create a cache with the given memory budget in bytes and no TTL. + pub fn new(memory_limit: usize) -> Self { + Self::new_with_ttl(memory_limit, None) + } + + /// Create a cache with the given memory budget in bytes and an optional + /// TTL applied to every newly inserted entry. + pub fn new_with_ttl(memory_limit: usize, ttl: Option) -> Self { + Self { + state: Mutex::new(DefaultCacheState::new(memory_limit, ttl)), + time_provider: Arc::new(SystemTimeProvider), + name: "DefaultCache".to_string(), + } + } + + /// Override the cache name. + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + /// Override the time source used to stamp and check TTLs. + pub fn with_time_provider(mut self, provider: Arc) -> Self { + self.time_provider = provider; + self + } + + /// Number of bytes currently accounted for by live entries. + pub fn memory_used(&self) -> usize { + self.state.lock().unwrap().memory_used + } +} + +impl Cache for DefaultCache { + fn get(&self, key: &K) -> Option { + let now = self.time_provider.now(); + let mut state = self.state.lock().unwrap(); + state.get(key, now) + } + + fn put(&self, key: &K, value: V) -> Option { + let now = self.time_provider.now(); + let mut state = self.state.lock().unwrap(); + state.put(key, value, now) + } + + fn remove(&self, k: &K) -> Option { + let mut state = self.state.lock().unwrap(); + state.remove(k) + } + + fn contains_key(&self, k: &K) -> bool { + let now = self.time_provider.now(); + let mut state = self.state.lock().unwrap(); + state.contains_key(k, now) + } + + fn len(&self) -> usize { + self.state.lock().unwrap().lru_queue.len() + } + + fn clear(&self) { + let mut state = self.state.lock().unwrap(); + state.clear(); + } + + fn name(&self) -> String { + self.name.clone() + } + fn cache_limit(&self) -> usize { + self.state.lock().unwrap().memory_limit + } + + fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } + + fn cache_ttl(&self) -> Option { + self.state.lock().unwrap().ttl + } + + fn update_cache_ttl(&self, ttl: Option) { + let mut state = self.state.lock().unwrap(); + state.ttl = ttl; + } + + fn drop_table_entries(&self, table_ref: &Option) -> Result<()> { + let mut state = self.state.lock().unwrap(); + let to_remove: Vec = state + .lru_queue + .list_entries() + .keys() + .flat_map(|k| { + let matches = match (k.table_ref(), table_ref.as_ref()) { + (Some(a), Some(b)) => a == b, + (None, None) => true, + _ => false, + }; + matches.then(|| (*k).clone()) + }) + .collect(); + for k in &to_remove { + state.remove(k); + } + Ok(()) + } + + fn list_entries(&self) -> HashMap> { + let state = self.state.lock().unwrap(); + state + .lru_queue + .list_entries() + .into_iter() + .map(|(k, entry)| { + let hits = state.hits.get(k).copied().unwrap_or(0); + let info = CacheEntryInfo { + value: entry.value.clone(), + size_bytes: entry.value.size(), + hits, + expires: entry.expires, + }; + (k.clone(), info) + }) + .collect() + } +} diff --git a/datafusion/execution/src/cache/file_metadata_cache.rs b/datafusion/execution/src/cache/file_metadata_cache.rs index 5e899d7dd9f8b..e5c1e01e48baf 100644 --- a/datafusion/execution/src/cache/file_metadata_cache.rs +++ b/datafusion/execution/src/cache/file_metadata_cache.rs @@ -15,237 +15,14 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashMap, sync::Mutex}; - -use object_store::path::Path; - -use crate::cache::{ - CacheAccessor, - cache_manager::{CachedFileMetadataEntry, FileMetadataCache, FileMetadataCacheEntry}, - lru_queue::LruQueue, -}; - -/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. -struct DefaultFilesMetadataCacheState { - lru_queue: LruQueue, - memory_limit: usize, - memory_used: usize, - cache_hits: HashMap, -} - -impl DefaultFilesMetadataCacheState { - fn new(memory_limit: usize) -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit, - memory_used: 0, - cache_hits: HashMap::new(), - } - } - - /// Returns the respective entry from the cache, if it exists. - /// If the entry exists, it becomes the most recently used. - fn get(&mut self, k: &Path) -> Option { - self.lru_queue.get(k).cloned().inspect(|_| { - *self.cache_hits.entry(k.clone()).or_insert(0) += 1; - }) - } - - /// Checks if the metadata is currently cached. - /// The LRU queue is not updated. - fn contains_key(&self, k: &Path) -> bool { - self.lru_queue.peek(k).is_some() - } - - /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. - /// If the key is already in the cache, the previous metadata is returned. - /// If the size of the metadata is greater than the `memory_limit`, the value is not inserted. - fn put( - &mut self, - key: Path, - value: CachedFileMetadataEntry, - ) -> Option { - let value_size = value.file_metadata.memory_size(); - - // no point in trying to add this value to the cache if it cannot fit entirely - if value_size > self.memory_limit { - return None; - } - - self.cache_hits.insert(key.clone(), 0); - // if the key is already in the cache, the old value is removed - let old_value = self.lru_queue.put(key, value); - self.memory_used += value_size; - if let Some(ref old_entry) = old_value { - self.memory_used -= old_entry.file_metadata.memory_size(); - } - - self.evict_entries(); - - old_value - } - - /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. - fn evict_entries(&mut self) { - while self.memory_used > self.memory_limit { - if let Some(removed) = self.lru_queue.pop() { - self.memory_used -= removed.1.file_metadata.memory_size(); - } else { - // cache is empty while memory_used > memory_limit, cannot happen - debug_assert!( - false, - "cache is empty while memory_used > memory_limit, cannot happen" - ); - return; - } - } - } - - /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &Path) -> Option { - if let Some(old_entry) = self.lru_queue.remove(k) { - self.memory_used -= old_entry.file_metadata.memory_size(); - self.cache_hits.remove(k); - Some(old_entry) - } else { - None - } - } - - /// Returns the number of entries currently cached. - fn len(&self) -> usize { - self.lru_queue.len() - } - - /// Removes all entries from the cache. - fn clear(&mut self) { - self.lru_queue.clear(); - self.memory_used = 0; - self.cache_hits.clear(); - } -} - -/// Default implementation of [`FileMetadataCache`] -/// -/// Collected file embedded metadata cache. -/// -/// The metadata for each file is validated by comparing the cached [`ObjectMeta`] -/// (size and last_modified) against the current file state using `cached.is_valid_for(¤t_meta)`. -/// -/// # Internal details -/// -/// The `memory_limit` controls the maximum size of the cache, which uses a -/// Least Recently Used eviction algorithm. When adding a new entry, if the total -/// size of the cached entries exceeds `memory_limit`, the least recently used entries -/// are evicted until the total size is lower than `memory_limit`. -/// -/// [`ObjectMeta`]: object_store::ObjectMeta -pub struct DefaultFilesMetadataCache { - // the state is wrapped in a Mutex to ensure the operations are atomic - state: Mutex, -} - -impl DefaultFilesMetadataCache { - /// Create a new instance of [`DefaultFilesMetadataCache`]. - /// - /// # Arguments - /// `memory_limit`: the maximum size of the cache, in bytes - // - pub fn new(memory_limit: usize) -> Self { - Self { - state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)), - } - } - - /// Returns the size of the cached memory, in bytes. - pub fn memory_used(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_used - } -} - -impl CacheAccessor for DefaultFilesMetadataCache { - fn get(&self, key: &Path) -> Option { - let mut state = self.state.lock().unwrap(); - state.get(key) - } - - fn put( - &self, - key: &Path, - value: CachedFileMetadataEntry, - ) -> Option { - let mut state = self.state.lock().unwrap(); - state.put(key.clone(), value) - } - - fn remove(&self, k: &Path) -> Option { - let mut state = self.state.lock().unwrap(); - state.remove(k) - } - - fn contains_key(&self, k: &Path) -> bool { - let state = self.state.lock().unwrap(); - state.contains_key(k) - } - - fn len(&self) -> usize { - let state = self.state.lock().unwrap(); - state.len() - } - - fn clear(&self) { - let mut state = self.state.lock().unwrap(); - state.clear(); - } - - fn name(&self) -> String { - "DefaultFilesMetadataCache".to_string() - } -} - -impl FileMetadataCache for DefaultFilesMetadataCache { - fn cache_limit(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_limit - } - - fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - fn list_entries(&self) -> HashMap { - let state = self.state.lock().unwrap(); - let mut entries = HashMap::::new(); - - for (path, entry) in state.lru_queue.list_entries() { - entries.insert( - path.clone(), - FileMetadataCacheEntry { - object_meta: entry.meta.clone(), - size_bytes: entry.file_metadata.memory_size(), - hits: *state.cache_hits.get(path).expect("entry must exist"), - extra: entry.file_metadata.extra_info(), - }, - ); - } - - entries - } -} - #[cfg(test)] mod tests { - use std::collections::HashMap; use std::sync::Arc; - use crate::cache::CacheAccessor; - use crate::cache::cache_manager::{ - CachedFileMetadataEntry, FileMetadata, FileMetadataCache, FileMetadataCacheEntry, - }; - use crate::cache::file_metadata_cache::DefaultFilesMetadataCache; + use crate::cache::cache_manager::{CachedFileMetadataEntry, FileMetadata}; + use crate::cache::default_cache::DefaultCache; + use crate::cache::{Cache, CacheEntryInfo}; + use datafusion_common::HashMap; use object_store::ObjectMeta; use object_store::path::Path; @@ -267,6 +44,12 @@ mod tests { } } + impl PartialEq for CachedFileMetadataEntry { + fn eq(&self, other: &Self) -> bool { + self.meta == other.meta + } + } + fn create_test_object_meta(path: &str, size: usize) -> ObjectMeta { ObjectMeta { location: Path::from(path), @@ -289,7 +72,7 @@ mod tests { metadata: "retrieved_metadata".to_owned(), }); - let cache = DefaultFilesMetadataCache::new(1024 * 1024); + let cache = DefaultCache::new(1024 * 1024); // Cache miss assert!(cache.get(&object_meta.location).is_none()); @@ -354,19 +137,20 @@ mod tests { e_tag: None, version: None, }; - let metadata: Arc = Arc::new(TestFileMetadata { - metadata: "a".repeat(size), - }); + let metadata = "a".repeat(size); + let metadata: Arc = Arc::new(TestFileMetadata { metadata }); (object_meta, metadata) } #[test] fn test_default_file_metadata_cache_with_limit() { - let cache = DefaultFilesMetadataCache::new(1000); - let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100); - let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500); - let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); + // Create a cache with 1000 bytes capacity + 4 keys each key 2 bytes + let cache = DefaultCache::new(1000 + 4 * 2); + + let (object_meta1, metadata1) = generate_test_metadata_with_size("01", 100); + let (object_meta2, metadata2) = generate_test_metadata_with_size("02", 500); + let (object_meta3, metadata3) = generate_test_metadata_with_size("03", 300); cache.put( &object_meta1.location, @@ -383,67 +167,67 @@ mod tests { // all entries will fit assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 900); + assert_eq!(cache.memory_used(), 906); assert!(cache.contains_key(&object_meta1.location)); assert!(cache.contains_key(&object_meta2.location)); assert!(cache.contains_key(&object_meta3.location)); // add a new entry which will remove the least recently used ("1") - let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200); + let (object_meta4, metadata4) = generate_test_metadata_with_size("04", 200); cache.put( &object_meta4.location, CachedFileMetadataEntry::new(object_meta4.clone(), metadata4), ); assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.memory_used(), 1006); assert!(!cache.contains_key(&object_meta1.location)); assert!(cache.contains_key(&object_meta4.location)); // get entry "2", which will move it to the top of the queue, and add a new one which will // remove the new least recently used ("3") let _ = cache.get(&object_meta2.location); - let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100); + let (object_meta5, metadata5) = generate_test_metadata_with_size("05", 100); cache.put( &object_meta5.location, CachedFileMetadataEntry::new(object_meta5.clone(), metadata5), ); assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 800); + assert_eq!(cache.memory_used(), 806); assert!(!cache.contains_key(&object_meta3.location)); assert!(cache.contains_key(&object_meta5.location)); // new entry which will not be able to fit in the 1000 bytes allocated - let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200); + let (object_meta6, metadata6) = generate_test_metadata_with_size("06", 1200); cache.put( &object_meta6.location, CachedFileMetadataEntry::new(object_meta6.clone(), metadata6), ); assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 800); + assert_eq!(cache.memory_used(), 806); assert!(!cache.contains_key(&object_meta6.location)); // new entry which is able to fit without removing any entry - let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200); + let (object_meta7, metadata7) = generate_test_metadata_with_size("07", 200); cache.put( &object_meta7.location, CachedFileMetadataEntry::new(object_meta7.clone(), metadata7), ); assert_eq!(cache.len(), 4); - assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.memory_used(), 1008); assert!(cache.contains_key(&object_meta7.location)); // new entry which will remove all other entries - let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999); + let (object_meta8, metadata8) = generate_test_metadata_with_size("08", 999); cache.put( &object_meta8.location, CachedFileMetadataEntry::new(object_meta8.clone(), metadata8), ); assert_eq!(cache.len(), 1); - assert_eq!(cache.memory_used(), 999); + assert_eq!(cache.memory_used(), 1001); assert!(cache.contains_key(&object_meta8.location)); // when updating an entry, the previous ones are not unnecessarily removed - let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300); + let (object_meta9, metadata9) = generate_test_metadata_with_size("09", 300); let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200); let (object_meta11_v1, metadata11_v1) = generate_test_metadata_with_size("11", 400); @@ -459,7 +243,7 @@ mod tests { &object_meta11_v1.location, CachedFileMetadataEntry::new(object_meta11_v1.clone(), metadata11_v1), ); - assert_eq!(cache.memory_used(), 900); + assert_eq!(cache.memory_used(), 906); assert_eq!(cache.len(), 3); let (object_meta11_v2, metadata11_v2) = generate_test_metadata_with_size("11", 500); @@ -467,20 +251,20 @@ mod tests { &object_meta11_v2.location, CachedFileMetadataEntry::new(object_meta11_v2.clone(), metadata11_v2), ); - assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.memory_used(), 1006); assert_eq!(cache.len(), 3); assert!(cache.contains_key(&object_meta9.location)); assert!(cache.contains_key(&object_meta10.location)); assert!(cache.contains_key(&object_meta11_v2.location)); - // when updating an entry that now exceeds the limit, the LRU ("9") needs to be removed + // when updating an entry that now exceeds the limit, the LRU ("09") needs to be removed let (object_meta11_v3, metadata11_v3) = - generate_test_metadata_with_size("11", 501); + generate_test_metadata_with_size("11", 510); cache.put( &object_meta11_v3.location, CachedFileMetadataEntry::new(object_meta11_v3.clone(), metadata11_v3), ); - assert_eq!(cache.memory_used(), 701); + assert_eq!(cache.memory_used(), 714); assert_eq!(cache.len(), 2); assert!(cache.contains_key(&object_meta10.location)); assert!(cache.contains_key(&object_meta11_v3.location)); @@ -488,7 +272,7 @@ mod tests { // manually removing an entry that is not the LRU cache.remove(&object_meta11_v3.location); assert_eq!(cache.len(), 1); - assert_eq!(cache.memory_used(), 200); + assert_eq!(cache.memory_used(), 202); assert!(cache.contains_key(&object_meta10.location)); assert!(!cache.contains_key(&object_meta11_v3.location)); @@ -514,10 +298,10 @@ mod tests { CachedFileMetadataEntry::new(object_meta14.clone(), metadata14), ); assert_eq!(cache.len(), 3); - assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.memory_used(), 1006); cache.update_cache_limit(600); assert_eq!(cache.len(), 1); - assert_eq!(cache.memory_used(), 500); + assert_eq!(cache.memory_used(), 502); assert!(!cache.contains_key(&object_meta12.location)); assert!(!cache.contains_key(&object_meta13.location)); assert!(cache.contains_key(&object_meta14.location)); @@ -525,61 +309,53 @@ mod tests { #[test] fn test_default_file_metadata_cache_entries_info() { - let cache = DefaultFilesMetadataCache::new(1000); + // Create a cache with 1000 bytes + 4 bytes for 4 keys each key 1 byte + let cache = DefaultCache::new(1000 + 4); + let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100); let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 200); let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); // initial entries, all will have hits = 0 - cache.put( - &object_meta1.location, - CachedFileMetadataEntry::new(object_meta1.clone(), metadata1), - ); - cache.put( - &object_meta2.location, - CachedFileMetadataEntry::new(object_meta2.clone(), metadata2), - ); - cache.put( - &object_meta3.location, - CachedFileMetadataEntry::new(object_meta3.clone(), metadata3), - ); + let entry_1 = CachedFileMetadataEntry::new(object_meta1.clone(), metadata1); + let entry_2 = CachedFileMetadataEntry::new(object_meta2.clone(), metadata2); + let entry_3 = CachedFileMetadataEntry::new(object_meta3.clone(), metadata3); + + // Build a cache which fits exactly these 3 entries + + cache.put(&object_meta1.location, entry_1.clone()); + cache.put(&object_meta2.location, entry_2.clone()); + cache.put(&object_meta3.location, entry_3.clone()); + let entries = cache.list_entries(); + assert_eq!( - cache.list_entries(), + entries, HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 100, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("2"), - FileMetadataCacheEntry { - object_meta: object_meta2.clone(), + CacheEntryInfo { + value: entry_2.clone(), size_bytes: 200, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) @@ -592,38 +368,29 @@ mod tests { HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 100, hits: 1, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("2"), - FileMetadataCacheEntry { - object_meta: object_meta2.clone(), + CacheEntryInfo { + value: entry_2.clone(), size_bytes: 200, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) @@ -631,47 +398,36 @@ mod tests { // new entry, will evict "2" let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 600); - cache.put( - &object_meta4.location, - CachedFileMetadataEntry::new(object_meta4.clone(), metadata4), - ); + let entry_4 = CachedFileMetadataEntry::new(object_meta4.clone(), metadata4); + cache.put(&object_meta4.location, entry_4.clone()); assert_eq!( cache.list_entries(), HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 100, hits: 1, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("4"), - FileMetadataCacheEntry { - object_meta: object_meta4.clone(), + CacheEntryInfo { + value: entry_4.clone(), size_bytes: 600, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) @@ -679,47 +435,37 @@ mod tests { // replace entry "1" let (object_meta1_new, metadata1_new) = generate_test_metadata_with_size("1", 50); - cache.put( - &object_meta1_new.location, - CachedFileMetadataEntry::new(object_meta1_new.clone(), metadata1_new), - ); + let entry_1 = + CachedFileMetadataEntry::new(object_meta1_new.clone(), metadata1_new); + cache.put(&object_meta1_new.location, entry_1.clone()); assert_eq!( cache.list_entries(), HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1_new.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 50, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("4"), - FileMetadataCacheEntry { - object_meta: object_meta4.clone(), + CacheEntryInfo { + value: entry_4.clone(), size_bytes: 600, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) @@ -732,26 +478,20 @@ mod tests { HashMap::from([ ( Path::from("1"), - FileMetadataCacheEntry { - object_meta: object_meta1_new.clone(), + CacheEntryInfo { + value: entry_1.clone(), size_bytes: 50, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ), ( Path::from("3"), - FileMetadataCacheEntry { - object_meta: object_meta3.clone(), + CacheEntryInfo { + value: entry_3.clone(), size_bytes: 300, hits: 0, - extra: HashMap::from([( - "extra_info".to_owned(), - "abc".to_owned() - )]), + expires: None, } ) ]) diff --git a/datafusion/execution/src/cache/file_statistics_cache.rs b/datafusion/execution/src/cache/file_statistics_cache.rs index 12f0bb1b8af88..58705413acc37 100644 --- a/datafusion/execution/src/cache/file_statistics_cache.rs +++ b/datafusion/execution/src/cache/file_statistics_cache.rs @@ -15,267 +15,20 @@ // specific language governing permissions and limitations // under the License. -use crate::cache::cache_manager::{ - CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, -}; -use crate::cache::{CacheAccessor, TableScopedPath}; -use std::collections::HashMap; -use std::sync::Mutex; - -pub use crate::cache::DefaultFilesMetadataCache; -use crate::cache::lru_queue::LruQueue; -use datafusion_common::TableReference; -use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; - -/// Default implementation of [`FileStatisticsCache`] -/// -/// Stores cached file metadata (statistics and orderings) for files. -/// -/// The typical usage pattern is: -/// 1. Call `get(path)` to check for cached value -/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` -/// 3. If invalid or missing, compute new value and call `put(path, new_value)` -/// -/// # Internal details -/// -/// The `memory_limit` controls the maximum size of the cache, which uses a -/// Least Recently Used eviction algorithm. When adding a new entry, if the total -/// size of the cached entries exceeds `memory_limit`, the least recently used entries -/// are evicted until the total size is lower than `memory_limit`. -/// -/// -/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache -#[derive(Default)] -pub struct DefaultFileStatisticsCache { - state: Mutex, -} - -impl DefaultFileStatisticsCache { - pub fn new(memory_limit: usize) -> Self { - Self { - state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), - } - } - - /// Returns the size of the cached memory, in bytes. - pub fn memory_used(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_used - } -} - -struct DefaultFileStatisticsCacheState { - lru_queue: LruQueue, - memory_limit: usize, - memory_used: usize, -} - -pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MiB - -impl Default for DefaultFileStatisticsCacheState { - fn default() -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, - memory_used: 0, - } - } -} - -impl DefaultFileStatisticsCacheState { - fn new(memory_limit: usize) -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit, - memory_used: 0, - } - } - fn get(&mut self, key: &TableScopedPath) -> Option { - self.lru_queue.get(key).cloned() - } - - fn put( - &mut self, - key: &TableScopedPath, - value: CachedFileMetadata, - ) -> Option { - let mut ctx = DFHeapSizeCtx::default(); - let key_size = key.heap_size(&mut ctx); - let entry_size = value.heap_size(&mut ctx); - - if entry_size + key_size > self.memory_limit { - // Remove potential stale entry - return self.remove(key); - } - - self.memory_used += entry_size; - self.memory_used += key_size; - - let old_value = self.lru_queue.put(key.clone(), value); - if let Some(old_entry) = &old_value { - let mut ctx = DFHeapSizeCtx::default(); - self.memory_used -= old_entry.heap_size(&mut ctx); - self.memory_used -= key_size; - } - - self.evict_entries(); - - old_value - } - - fn remove(&mut self, k: &TableScopedPath) -> Option { - if let Some(old_entry) = self.lru_queue.remove(k) { - let mut ctx = DFHeapSizeCtx::default(); - self.memory_used -= k.heap_size(&mut ctx); - self.memory_used -= old_entry.heap_size(&mut ctx); - Some(old_entry) - } else { - None - } - } - - fn contains_key(&self, k: &TableScopedPath) -> bool { - self.lru_queue.contains_key(k) - } - - fn len(&self) -> usize { - self.lru_queue.len() - } - - fn clear(&mut self) { - self.lru_queue.clear(); - self.memory_used = 0; - } - - fn evict_entries(&mut self) { - while self.memory_used > self.memory_limit { - if let Some(removed) = self.lru_queue.pop() { - let mut ctx = DFHeapSizeCtx::default(); - self.memory_used -= removed.0.heap_size(&mut ctx); - self.memory_used -= removed.1.heap_size(&mut ctx); - } else { - // cache is empty while memory_used > memory_limit, cannot happen - log::error!( - "File statistics cache memory accounting bug: memory_used={} but cache is empty. \ - Please report this to the Apache DataFusion developers.", - self.memory_used - ); - debug_assert!( - false, - "memory_used={} but cache is empty", - self.memory_used - ); - self.memory_used = 0; - return; - } - } - } -} -impl CacheAccessor for DefaultFileStatisticsCache { - fn get(&self, key: &TableScopedPath) -> Option { - let mut state = self.state.lock().unwrap(); - state.get(key) - } - - fn put( - &self, - key: &TableScopedPath, - value: CachedFileMetadata, - ) -> Option { - let mut state = self.state.lock().unwrap(); - state.put(key, value) - } - - fn remove(&self, key: &TableScopedPath) -> Option { - let mut state = self.state.lock().unwrap(); - state.remove(key) - } - - fn contains_key(&self, k: &TableScopedPath) -> bool { - let state = self.state.lock().unwrap(); - state.contains_key(k) - } - - fn len(&self) -> usize { - let state = self.state.lock().unwrap(); - state.len() - } - - fn clear(&self) { - let mut state = self.state.lock().unwrap(); - state.clear(); - } - - fn name(&self) -> String { - "DefaultFileStatisticsCache".to_string() - } -} - -impl FileStatisticsCache for DefaultFileStatisticsCache { - fn cache_limit(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_limit - } - - fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - fn list_entries(&self) -> HashMap { - let mut entries = HashMap::::new(); - let mut ctx = DFHeapSizeCtx::default(); - for entry in self.state.lock().unwrap().lru_queue.list_entries() { - let path = entry.0.clone(); - let cached = entry.1; - entries.insert( - path, - FileStatisticsCacheEntry { - object_meta: cached.meta.clone(), - num_rows: cached.statistics.num_rows, - num_columns: cached.statistics.column_statistics.len(), - table_size_bytes: cached.statistics.total_byte_size, - statistics_size_bytes: cached.statistics.heap_size(&mut ctx), - has_ordering: cached.ordering.is_some(), - }, - ); - } - - entries - } - - fn drop_table_entries( - &self, - table_ref: &Option, - ) -> datafusion_common::Result<()> { - let mut state = self.state.lock().unwrap(); - let mut table_paths = vec![]; - for (path, _) in state.lru_queue.list_entries() { - if path.table == *table_ref { - table_paths.push(path.clone()); - } - } - for path in table_paths { - state.remove(&path); - } - Ok(()) - } -} - #[cfg(test)] mod tests { - use super::*; use crate::cache::cache_manager::{ - CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, + CachedFileMetadata, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, }; + use crate::cache::default_cache::DefaultCache; + use crate::cache::{Cache, CacheEntryInfo, TableScopedPath}; use arrow::array::{Int32Array, ListArray, RecordBatch}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; - use datafusion_common::heap_size::DFHeapSizeCtx; + use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::stats::Precision; - use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + use datafusion_common::{ColumnStatistics, HashMap, ScalarValue, Statistics}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; @@ -298,7 +51,7 @@ mod tests { #[test] fn test_statistics_cache() { let meta = create_test_meta("test", 1024); - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let schema = Schema::new(vec![Field::new( "test_column", @@ -358,7 +111,7 @@ mod tests { }; let entry = entries.get(&path_3).unwrap(); - assert_eq!(entry.object_meta.size, 2048); // Should be updated value + assert_eq!(entry.value.meta.size, 2048); // Should be updated value } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -414,7 +167,7 @@ mod tests { #[test] fn test_ordering_cache() { let meta = create_test_meta("test.parquet", 100); - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); @@ -448,12 +201,12 @@ mod tests { // Verify list_entries shows has_ordering = true let entries = cache.list_entries(); assert_eq!(entries.len(), 1); - assert!(entries.get(&path).unwrap().has_ordering); + assert!(entries.get(&path).unwrap().value.ordering.is_some()); } #[test] fn test_cache_invalidation_on_file_modification() { - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let path = TableScopedPath { path: Path::from("test.parquet"), table: None, @@ -492,7 +245,7 @@ mod tests { #[test] fn test_ordering_cache_invalidation_on_file_modification() { - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let path = TableScopedPath { path: Path::from("test.parquet"), table: None, @@ -557,12 +310,12 @@ mod tests { #[test] fn test_list_entries() { - let cache = DefaultFileStatisticsCache::default(); + let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); let meta1 = create_test_meta("test1.parquet", 100); - let cached_value = CachedFileMetadata::new( + let cached_value_1 = CachedFileMetadata::new( meta1.clone(), Arc::new(Statistics::new_unknown(&schema)), None, @@ -573,9 +326,9 @@ mod tests { table: None, }; - cache.put(&path_1, cached_value); + cache.put(&path_1, cached_value_1.clone()); let meta2 = create_test_meta("test2.parquet", 200); - let cached_value = CachedFileMetadata::new( + let cached_value_2 = CachedFileMetadata::new( meta2.clone(), Arc::new(Statistics::new_unknown(&schema)), Some(ordering()), @@ -586,7 +339,7 @@ mod tests { table: None, }; - cache.put(&path_2, cached_value); + cache.put(&path_2, cached_value_2.clone()); let entries = cache.list_entries(); assert_eq!( @@ -594,24 +347,20 @@ mod tests { HashMap::from([ ( path_1, - FileStatisticsCacheEntry { - object_meta: meta1, - num_rows: Precision::Absent, - num_columns: 1, - table_size_bytes: Precision::Absent, - statistics_size_bytes: 360, - has_ordering: false, + CacheEntryInfo { + value: cached_value_1, + hits: 0, + size_bytes: 373, + expires: None, } ), ( path_2, - FileStatisticsCacheEntry { - object_meta: meta2, - num_rows: Precision::Absent, - num_columns: 1, - table_size_bytes: Precision::Absent, - statistics_size_bytes: 360, - has_ordering: true, + CacheEntryInfo { + value: cached_value_2, + hits: 0, + size_bytes: 373, + expires: None, } ), ]) @@ -632,7 +381,7 @@ mod tests { + value_2.heap_size(&mut ctx); // create a cache with a limit which fits exactly 2 entries - let cache = DefaultFileStatisticsCache::new(limit_for_2_entries); + let cache = DefaultCache::new(limit_for_2_entries); let path_1 = TableScopedPath { path: meta_1.location.clone(), table: None, @@ -699,7 +448,7 @@ mod tests { let limit_less_than_the_entry = value.heap_size(&mut ctx) - 1; // create a cache with a size less than the entry - let cache = DefaultFileStatisticsCache::new(limit_less_than_the_entry); + let cache = DefaultCache::new(limit_less_than_the_entry); let path_1 = TableScopedPath { path: meta.location.clone(), diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index a3cdf7c5e9110..5676ab9c33573 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -15,393 +15,21 @@ // specific language governing permissions and limitations // under the License. -use crate::cache::{ - CacheAccessor, - cache_manager::{CachedFileList, ListFilesCache}, - lru_queue::LruQueue, -}; - -use std::fmt::{Debug, Display, Formatter}; -use std::mem::size_of; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - time::Duration, -}; - -use datafusion_common::TableReference; -use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; -use datafusion_common::instant::Instant; -use object_store::{ObjectMeta, path::Path}; - -pub trait TimeProvider: Send + Sync + 'static { - fn now(&self) -> Instant; -} - -#[derive(Debug, Default)] -pub struct SystemTimeProvider; - -impl TimeProvider for SystemTimeProvider { - fn now(&self) -> Instant { - Instant::now() - } -} - -/// Default implementation of [`ListFilesCache`] -/// -/// Caches file metadata for file listing operations. -/// -/// # Internal details -/// -/// The `memory_limit` parameter controls the maximum size of the cache, which uses a Least -/// Recently Used eviction algorithm. When adding a new entry, if the total number of entries in -/// the cache exceeds `memory_limit`, the least recently used entries are evicted until the total -/// size is lower than the `memory_limit`. -/// -/// # Cache API -/// -/// Uses `get` and `put` methods for cache operations. TTL validation is handled internally - -/// expired entries return `None` from `get`. -pub struct DefaultListFilesCache { - state: Mutex, - time_provider: Arc, -} - -impl Default for DefaultListFilesCache { - fn default() -> Self { - Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None) - } -} - -impl DefaultListFilesCache { - /// Creates a new instance of [`DefaultListFilesCache`]. - /// - /// # Arguments - /// * `memory_limit` - The maximum size of the cache, in bytes. - /// * `ttl` - The TTL (time-to-live) of entries in the cache. - pub fn new(memory_limit: usize, ttl: Option) -> Self { - Self { - state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)), - time_provider: Arc::new(SystemTimeProvider), - } - } - - #[cfg(test)] - pub(crate) fn with_time_provider(mut self, provider: Arc) -> Self { - self.time_provider = provider; - self - } -} - -#[derive(Clone, PartialEq, Debug)] -pub struct ListFilesEntry { - pub metas: CachedFileList, - pub size_bytes: usize, - pub expires: Option, -} - -impl ListFilesEntry { - fn try_new( - cached_file_list: CachedFileList, - ttl: Option, - now: Instant, - ) -> Option { - let size_bytes = (cached_file_list.files.capacity() * size_of::()) - + cached_file_list - .files - .iter() - .map(meta_heap_bytes) - .reduce(|acc, b| acc + b)?; - - Some(Self { - metas: cached_file_list, - size_bytes, - expires: ttl.map(|t| now + t), - }) - } -} - -/// Calculates the number of bytes an [`ObjectMeta`] occupies in the heap. -fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize { - let mut size = object_meta.location.as_ref().len(); - - if let Some(e) = &object_meta.e_tag { - size += e.len(); - } - if let Some(v) = &object_meta.version { - size += v.len(); - } - - size -} - -/// The default memory limit for the [`DefaultListFilesCache`] -pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB - -/// The default cache TTL for the [`DefaultListFilesCache`] -pub const DEFAULT_LIST_FILES_CACHE_TTL: Option = None; // Infinite - -/// Key for [`DefaultListFilesCache`] -/// -/// Each entry is scoped to its use within a specific table so that the cache -/// can differentiate between identical paths in different tables, and -/// table-level cache invalidation. -#[derive(PartialEq, Eq, Hash, Clone, Debug)] -pub struct TableScopedPath { - pub table: Option, - pub path: Path, -} - -/// Handles the inner state of the [`DefaultListFilesCache`] struct. -pub struct DefaultListFilesCacheState { - lru_queue: LruQueue, - memory_limit: usize, - memory_used: usize, - ttl: Option, -} - -impl Default for DefaultListFilesCacheState { - fn default() -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, - memory_used: 0, - ttl: DEFAULT_LIST_FILES_CACHE_TTL, - } - } -} - -impl DFHeapSize for TableScopedPath { - fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { - self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) - } -} - -impl Display for TableScopedPath { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if let Some(table) = &self.table { - write!(f, "{}, {}", self.path, table) - } else { - write!(f, "{}", self.path) - } - } -} - -impl DefaultListFilesCacheState { - fn new(memory_limit: usize, ttl: Option) -> Self { - Self { - lru_queue: LruQueue::new(), - memory_limit, - memory_used: 0, - ttl, - } - } - - /// Gets an entry from the cache, checking for expiration. - /// - /// Returns the cached file list if it exists and hasn't expired. - /// If the entry has expired, it is removed from the cache. - fn get(&mut self, key: &TableScopedPath, now: Instant) -> Option { - let entry = self.lru_queue.get(key)?; - - // Check expiration - if let Some(exp) = entry.expires - && now > exp - { - self.remove(key); - return None; - } - - Some(entry.metas.clone()) - } - - /// Checks if the respective entry is currently cached. - /// - /// If the entry has expired by `now` it is removed from the cache. - /// - /// The LRU queue is not updated. - fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool { - let Some(entry) = self.lru_queue.peek(k) else { - return false; - }; - - match entry.expires { - Some(exp) if now > exp => { - self.remove(k); - false - } - _ => true, - } - } - - /// Adds a new key-value pair to cache expiring at `now` + the TTL. - /// - /// This means that LRU entries might be evicted if required. - /// If the key is already in the cache, the previous entry is returned. - /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. - fn put( - &mut self, - key: &TableScopedPath, - value: CachedFileList, - now: Instant, - ) -> Option { - let entry = ListFilesEntry::try_new(value, self.ttl, now)?; - let entry_size = entry.size_bytes; - - // no point in trying to add this value to the cache if it cannot fit entirely - if entry_size > self.memory_limit { - return None; - } - - // if the key is already in the cache, the old value is removed - let old_value = self.lru_queue.put(key.clone(), entry); - self.memory_used += entry_size; - - if let Some(entry) = &old_value { - self.memory_used -= entry.size_bytes; - } - - self.evict_entries(); - - old_value.map(|v| v.metas) - } - - /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. - fn evict_entries(&mut self) { - while self.memory_used > self.memory_limit { - if let Some(removed) = self.lru_queue.pop() { - self.memory_used -= removed.1.size_bytes; - } else { - // cache is empty while memory_used > memory_limit, cannot happen - debug_assert!( - false, - "cache is empty while memory_used > memory_limit, cannot happen" - ); - return; - } - } - } - - /// Removes an entry from the cache and returns it, if it exists. - fn remove(&mut self, k: &TableScopedPath) -> Option { - if let Some(entry) = self.lru_queue.remove(k) { - self.memory_used -= entry.size_bytes; - Some(entry.metas) - } else { - None - } - } - - /// Returns the number of entries currently cached. - fn len(&self) -> usize { - self.lru_queue.len() - } - - /// Removes all entries from the cache. - fn clear(&mut self) { - self.lru_queue.clear(); - self.memory_used = 0; - } -} - -impl CacheAccessor for DefaultListFilesCache { - fn get(&self, key: &TableScopedPath) -> Option { - let mut state = self.state.lock().unwrap(); - let now = self.time_provider.now(); - state.get(key, now) - } - - fn put( - &self, - key: &TableScopedPath, - value: CachedFileList, - ) -> Option { - let mut state = self.state.lock().unwrap(); - let now = self.time_provider.now(); - state.put(key, value, now) - } - - fn remove(&self, k: &TableScopedPath) -> Option { - let mut state = self.state.lock().unwrap(); - state.remove(k) - } - - fn contains_key(&self, k: &TableScopedPath) -> bool { - let mut state = self.state.lock().unwrap(); - let now = self.time_provider.now(); - state.contains_key(k, now) - } - - fn len(&self) -> usize { - let state = self.state.lock().unwrap(); - state.len() - } - - fn clear(&self) { - let mut state = self.state.lock().unwrap(); - state.clear(); - } - - fn name(&self) -> String { - String::from("DefaultListFilesCache") - } -} - -impl ListFilesCache for DefaultListFilesCache { - fn cache_limit(&self) -> usize { - let state = self.state.lock().unwrap(); - state.memory_limit - } - - fn cache_ttl(&self) -> Option { - let state = self.state.lock().unwrap(); - state.ttl - } - - fn update_cache_limit(&self, limit: usize) { - let mut state = self.state.lock().unwrap(); - state.memory_limit = limit; - state.evict_entries(); - } - - fn update_cache_ttl(&self, ttl: Option) { - let mut state = self.state.lock().unwrap(); - state.ttl = ttl; - state.evict_entries(); - } - - fn list_entries(&self) -> HashMap { - let state = self.state.lock().unwrap(); - let mut entries = HashMap::::new(); - for (path, entry) in state.lru_queue.list_entries() { - entries.insert(path.clone(), entry.clone()); - } - entries - } - - fn drop_table_entries( - &self, - table_ref: &Option, - ) -> datafusion_common::Result<()> { - let mut state = self.state.lock().unwrap(); - let mut table_paths = vec![]; - for (path, _) in state.lru_queue.list_entries() { - if path.table == *table_ref { - table_paths.push(path.clone()); - } - } - for path in table_paths { - state.remove(&path); - } - Ok(()) - } -} - #[cfg(test)] mod tests { - use super::*; + use crate::cache::cache_manager::{ + CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, meta_heap_bytes, + }; + use crate::cache::default_cache::{DefaultCache, TimeProvider}; + use crate::cache::{Cache, CacheEntryInfo, CacheKey, CacheValue, TableScopedPath}; use chrono::DateTime; + use datafusion_common::HashMap; + use datafusion_common::TableReference; + use datafusion_common::instant::Instant; + use object_store::{ObjectMeta, path::Path}; + use std::sync::{Arc, Mutex}; use std::thread; + use std::time::Duration; struct MockTimeProvider { base: Instant, @@ -448,26 +76,27 @@ mod tests { } } - /// Helper function to create a CachedFileList with at least meta_size bytes + /// Helper function to create a TableScopedPath and a CachedFileList with at least meta_size bytes fn create_test_list_files_entry( path: &str, count: usize, meta_size: usize, - ) -> (Path, CachedFileList, usize) { + table: Option, + ) -> (TableScopedPath, CachedFileList) { + let key = TableScopedPath { + table, + path: Path::from(path), + }; let metas: Vec = (0..count) .map(|i| create_test_object_meta(&format!("file{i}"), meta_size)) .collect(); - - // Calculate actual size using the same logic as ListFilesEntry::try_new - let size = (metas.capacity() * size_of::()) - + metas.iter().map(meta_heap_bytes).sum::(); - - (Path::from(path), CachedFileList::new(metas), size) + let value = CachedFileList::new(metas); + (key, value) } #[test] fn test_basic_operations() { - let cache = DefaultListFilesCache::default(); + let cache = DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT); let table_ref = Some(TableReference::from("table")); let path = Path::from("test_path"); let key = TableScopedPath { @@ -499,16 +128,9 @@ mod tests { assert_eq!(cache.len(), 0); // Put multiple entries - let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 50); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref, - path: path2, - }; + let (key1, value1) = + create_test_list_files_entry("path1", 2, 50, table_ref.clone()); + let (key2, value2) = create_test_list_files_entry("path2", 3, 50, table_ref); cache.put(&key1, value1.clone()); cache.put(&key2, value2.clone()); assert_eq!(cache.len(), 2); @@ -519,17 +141,19 @@ mod tests { HashMap::from([ ( key1.clone(), - ListFilesEntry { - metas: value1, - size_bytes: size1, + CacheEntryInfo { + value: value1.clone(), + size_bytes: value1.size(), + hits: 0, expires: None, } ), ( key2.clone(), - ListFilesEntry { - metas: value2, - size_bytes: size2, + CacheEntryInfo { + value: value2.clone(), + size_bytes: value2.size(), + hits: 0, expires: None, } ) @@ -545,26 +169,18 @@ mod tests { #[test] fn test_lru_eviction_basic() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); - // Set cache limit to exactly fit all three entries - let cache = DefaultListFilesCache::new(size * 3, None); + let entry_size = key1.size() + value1.size(); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref.clone(), - path: path3, - }; + // Set cache limit to exactly fit all 3 entries + let cache = DefaultCache::new(entry_size * 3); // All three entries should fit cache.put(&key1, value1); @@ -576,11 +192,7 @@ mod tests { assert!(cache.contains_key(&key3)); // Adding a new entry should evict path1 (LRU) - let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - let key4 = TableScopedPath { - table: table_ref, - path: path4, - }; + let (key4, value4) = create_test_list_files_entry("path4", 1, 100, table_ref); cache.put(&key4, value4); assert_eq!(cache.len(), 3); @@ -592,26 +204,16 @@ mod tests { #[test] fn test_lru_ordering_after_access() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); // Set cache limit to fit exactly three entries - let cache = DefaultListFilesCache::new(size * 3, None); - - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref.clone(), - path: path3, - }; + let cache = DefaultCache::new((key1.size() + value1.size()) * 3); cache.put(&key1, value1); cache.put(&key2, value2); @@ -623,11 +225,7 @@ mod tests { let _ = cache.get(&key1); // Adding a new entry should evict path2 (the LRU) - let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100); - let key4 = TableScopedPath { - table: table_ref, - path: path4, - }; + let (key4, value4) = create_test_list_files_entry("path4", 1, 100, table_ref); cache.put(&key4, value4); assert_eq!(cache.len(), 3); @@ -639,32 +237,23 @@ mod tests { #[test] fn test_reject_too_large() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); // Set cache limit to fit both entries - let cache = DefaultListFilesCache::new(size * 2, None); + let cache = DefaultCache::new((key1.size() + value1.size()) * 2); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; cache.put(&key1, value1); cache.put(&key2, value2); assert_eq!(cache.len(), 2); // Try to add an entry that's too large to fit in the cache // The entry is not stored (too large) - let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 1000); - let key_large = TableScopedPath { - table: table_ref, - path: path_large, - }; + let (key_large, value_large) = + create_test_list_files_entry("large", 1, 1000, table_ref); cache.put(&key_large, value_large); // Large entry should not be added @@ -676,37 +265,27 @@ mod tests { #[test] fn test_multiple_evictions() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); + + let entry_size = key1.size() + value1.size(); // Set cache limit for exactly 3 entries - let cache = DefaultListFilesCache::new(size * 3, None); + let cache = DefaultCache::new(entry_size * 3); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref.clone(), - path: path3, - }; cache.put(&key1, value1); cache.put(&key2, value2); cache.put(&key3, value3); assert_eq!(cache.len(), 3); // Add a large entry that requires evicting 2 entries - let (path_large, value_large, _) = create_test_list_files_entry("large", 1, 200); - let key_large = TableScopedPath { - table: table_ref, - path: path_large, - }; + let (key_large, value_large) = + create_test_list_files_entry("large", 1, 200, table_ref); cache.put(&key_large, value_large); // path1 and path2 should be evicted (both LRU), path3 and path_large remain @@ -719,25 +298,17 @@ mod tests { #[test] fn test_cache_limit_resize() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3) = create_test_list_files_entry("path3", 1, 100, table_ref); - let cache = DefaultListFilesCache::new(size * 3, None); + let entry_size = key1.size() + value1.size(); + + let cache = DefaultCache::new(entry_size * 3); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref, - path: path3, - }; // Add three entries cache.put(&key1, value1); cache.put(&key2, value2); @@ -745,7 +316,7 @@ mod tests { assert_eq!(cache.len(), 3); // Resize cache to only fit one entry - cache.update_cache_limit(size); + cache.update_cache_limit(entry_size); // Should keep only the most recent entry (path3, the MRU) assert_eq!(cache.len(), 1); @@ -757,25 +328,18 @@ mod tests { #[test] fn test_entry_update_with_size_change() { - let (path1, value1, size) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 100); + let table_ref = Some(TableReference::from("table")); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref.clone()); + let (key3, value3_v1) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); - let cache = DefaultListFilesCache::new(size * 3, None); + let entry_size = key1.size() + value1.size(); + + let cache = DefaultCache::new(entry_size * 3); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref, - path: path3, - }; // Add three entries cache.put(&key1, value1); cache.put(&key2, value2.clone()); @@ -783,7 +347,8 @@ mod tests { assert_eq!(cache.len(), 3); // Update path3 with same size - should not cause eviction - let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100); + let (_, value3_v2) = + create_test_list_files_entry("path3", 1, 100, table_ref.clone()); cache.put(&key3, value3_v2); assert_eq!(cache.len(), 3); @@ -792,7 +357,7 @@ mod tests { assert!(cache.contains_key(&key3)); // Update path3 with larger size that requires evicting path1 (LRU) - let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 1, 200); + let (_, value3_v3) = create_test_list_files_entry("path3", 1, 200, table_ref); cache.put(&key3, value3_v3.clone()); assert_eq!(cache.len(), 2); @@ -806,17 +371,19 @@ mod tests { HashMap::from([ ( key2, - ListFilesEntry { - metas: value2, - size_bytes: size2, + CacheEntryInfo { + value: value2.clone(), + size_bytes: value2.size(), + hits: 0, expires: None, } ), ( key3, - ListFilesEntry { - metas: value3_v3, - size_bytes: size3_v3, + CacheEntryInfo { + value: value3_v3.clone(), + size_bytes: value3_v3.size(), + hits: 0, expires: None, } ) @@ -829,21 +396,13 @@ mod tests { let ttl = Duration::from_millis(100); let mock_time = Arc::new(MockTimeProvider::new()); - let cache = DefaultListFilesCache::new(10000, Some(ttl)) + let cache = DefaultCache::new_with_ttl(10000, Some(ttl)) .with_time_provider(Arc::clone(&mock_time) as Arc); - let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 50); - let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 50); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref, - path: path2, - }; + let (key1, value1) = + create_test_list_files_entry("path1", 2, 50, table_ref.clone()); + let (key2, value2) = create_test_list_files_entry("path2", 2, 50, table_ref); cache.put(&key1, value1.clone()); cache.put(&key2, value2.clone()); @@ -856,17 +415,19 @@ mod tests { HashMap::from([ ( key1.clone(), - ListFilesEntry { - metas: value1, - size_bytes: size1, + CacheEntryInfo { + value: value1.clone(), + size_bytes: value1.size(), + hits: 1, expires: mock_time.now().checked_add(ttl), } ), ( key2.clone(), - ListFilesEntry { - metas: value2, - size_bytes: size2, + CacheEntryInfo { + value: value2.clone(), + size_bytes: value2.size(), + hits: 1, expires: mock_time.now().checked_add(ttl), } ) @@ -887,26 +448,16 @@ mod tests { let ttl = Duration::from_millis(200); let mock_time = Arc::new(MockTimeProvider::new()); - let cache = DefaultListFilesCache::new(1000, Some(ttl)) + let cache = DefaultCache::new_with_ttl(1100, Some(ttl)) .with_time_provider(Arc::clone(&mock_time) as Arc); - let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400); - let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - let key3 = TableScopedPath { - table: table_ref, - path: path3, - }; + let (key1, value1) = + create_test_list_files_entry("path1", 1, 400, table_ref.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 400, table_ref.clone()); + + let (key3, value3) = create_test_list_files_entry("path3", 1, 400, table_ref); cache.put(&key1, value1); mock_time.inc(Duration::from_millis(50)); cache.put(&key2, value2); @@ -927,14 +478,10 @@ mod tests { #[test] fn test_ttl_expiration_in_get() { let ttl = Duration::from_millis(100); - let cache = DefaultListFilesCache::new(10000, Some(ttl)); + let cache = DefaultCache::new_with_ttl(10000, Some(ttl)); - let (path, value, _) = create_test_list_files_entry("path", 2, 50); let table_ref = Some(TableReference::from("table")); - let key = TableScopedPath { - table: table_ref, - path, - }; + let (key, value) = create_test_list_files_entry("path", 2, 50, table_ref); // Cache the entry cache.put(&key, value.clone()); @@ -995,81 +542,45 @@ mod tests { assert_eq!(meta_heap_bytes(&meta4), 4 + 3 + 3); // location (4) + e_tag (3) + version (3) } - #[test] - fn test_entry_creation() { - // Test with empty vector - let empty_list = CachedFileList::new(vec![]); - let now = Instant::now(); - let entry = ListFilesEntry::try_new(empty_list, None, now); - assert!(entry.is_none()); - - // Validate entry size - let metas: Vec = (0..5) - .map(|i| create_test_object_meta(&format!("file{i}"), 30)) - .collect(); - let cached_list = CachedFileList::new(metas); - let entry = ListFilesEntry::try_new(cached_list, None, now).unwrap(); - assert_eq!(entry.metas.files.len(), 5); - // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes - let expected_size = (entry.metas.files.capacity() * size_of::()) - + (entry.metas.files.len() * 30); - assert_eq!(entry.size_bytes, expected_size); - - // Test with TTL - let meta = create_test_object_meta("file", 50); - let ttl = Duration::from_secs(10); - let cached_list = CachedFileList::new(vec![meta]); - let entry = ListFilesEntry::try_new(cached_list, Some(ttl), now).unwrap(); - assert!(entry.expires.unwrap() > now); - } - #[test] fn test_memory_tracking() { - let cache = DefaultListFilesCache::new(1000, None); + let cache = DefaultCache::new(1000); // Verify cache starts with 0 memory used { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, 0); + assert_eq!(cache.memory_used(), 0); } // Add entry and verify memory tracking - let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 100); let table_ref = Some(TableReference::from("table")); - let key1 = TableScopedPath { - table: table_ref.clone(), - path: path1, - }; - cache.put(&key1, value1); + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref.clone()); + cache.put(&key1, value1.clone()); + let entry_size_1 = key1.size() + value1.size(); { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, size1); + assert_eq!(cache.memory_used(), entry_size_1); } // Add another entry - let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 200); - let key2 = TableScopedPath { - table: table_ref.clone(), - path: path2, - }; - cache.put(&key2, value2); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 200, table_ref.clone()); + cache.put(&key2, value2.clone()); + let entry_size_2 = key2.size() + value2.size(); + { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, size1 + size2); + assert_eq!(cache.memory_used(), entry_size_1 + entry_size_2); } // Remove first entry and verify memory decreases cache.remove(&key1); { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, size2); + assert_eq!(cache.memory_used(), entry_size_2); } // Clear and verify memory is 0 cache.clear(); { - let state = cache.state.lock().unwrap(); - assert_eq!(state.memory_used, 0); + assert_eq!(cache.memory_used(), 0); } } @@ -1090,7 +601,7 @@ mod tests { #[test] fn test_prefix_filtering() { - let cache = DefaultListFilesCache::new(100000, None); + let cache = DefaultCache::new(100000); // Create files for a partitioned table let table_base = Path::from("my_table"); @@ -1138,7 +649,7 @@ mod tests { #[test] fn test_prefix_no_matching_files() { - let cache = DefaultListFilesCache::new(100000, None); + let cache = DefaultCache::new(100000); let table_base = Path::from("my_table"); let files = vec![ @@ -1162,7 +673,7 @@ mod tests { #[test] fn test_nested_partitions() { - let cache = DefaultListFilesCache::new(100000, None); + let cache = DefaultCache::new(100000); let table_base = Path::from("events"); let files = vec![ @@ -1201,27 +712,16 @@ mod tests { #[test] fn test_drop_table_entries() { - let cache = DefaultListFilesCache::default(); - - let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100); - let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100); - let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100); + let cache = DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT); let table_ref1 = Some(TableReference::from("table1")); - let key1 = TableScopedPath { - table: table_ref1.clone(), - path: path1, - }; - let key2 = TableScopedPath { - table: table_ref1.clone(), - path: path2, - }; - let table_ref2 = Some(TableReference::from("table2")); - let key3 = TableScopedPath { - table: table_ref2.clone(), - path: path3, - }; + let (key1, value1) = + create_test_list_files_entry("path1", 1, 100, table_ref1.clone()); + let (key2, value2) = + create_test_list_files_entry("path2", 1, 100, table_ref1.clone()); + let (key3, value3) = + create_test_list_files_entry("path3", 1, 100, table_ref2.clone()); cache.put(&key1, value1); cache.put(&key2, value2); diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 76bd660e6c7d5..4bb57cc8a2afa 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -19,13 +19,17 @@ pub mod cache_manager; pub mod file_statistics_cache; pub mod lru_queue; +pub mod default_cache; mod file_metadata_cache; mod list_files_cache; -pub use file_metadata_cache::DefaultFilesMetadataCache; -pub use list_files_cache::DefaultListFilesCache; -pub use list_files_cache::ListFilesEntry; -pub use list_files_cache::TableScopedPath; +use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; +use datafusion_common::instant::Instant; +use datafusion_common::{HashMap, TableReference}; +use object_store::path::Path; +use std::fmt::{Debug, Display, Formatter}; +use std::hash::Hash; +use std::time::Duration; /// Base trait for cache implementations with common operations. /// @@ -46,7 +50,7 @@ pub use list_files_cache::TableScopedPath; /// 1. Call `get(key)` to check for cached value /// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` /// 3. If invalid or missing, compute new value and call `put(key, new_value)` -pub trait CacheAccessor: Send + Sync { +pub trait Cache: Send + Sync { /// Get a cached entry if it exists. /// /// Returns the cached value without any validation. The caller should @@ -77,4 +81,102 @@ pub trait CacheAccessor: Send + Sync { /// Return the cache name. fn name(&self) -> String; + + /// Current memory budget, in bytes. + fn cache_limit(&self) -> usize; + + /// Change the memory budget in bytes. + fn update_cache_limit(&self, limit: usize); + + /// Time-to-live applied to newly inserted entries, or `None` if entries + /// never expire on their own. + fn cache_ttl(&self) -> Option; + + /// Change the TTL applied to subsequent inserts. + fn update_cache_ttl(&self, _ttl: Option); + + /// Invalidate every entry associated with `table_ref`. + fn drop_table_entries( + &self, + table_ref: &Option, + ) -> datafusion_common::Result<()>; + + /// Snapshot of all current entries with per-entry metadata (size, hits, + /// expiration) for diagnostics and observability. + fn list_entries(&self) -> HashMap>; +} + +/// Key type for entries stored in a [`Cache`]. +pub trait CacheKey: Clone + Eq + Hash + Send + Sync + Debug { + /// Size of the key in bytes, used for cache memory accounting. + fn size(&self) -> usize; + + /// Table this key is associated with, or `None` if the key is not + /// table-scoped. + fn table_ref(&self) -> Option<&TableReference>; +} + +/// Value type for entries stored in a [`Cache`]. +pub trait CacheValue: Clone + Send + Sync { + /// Size of the value in bytes used for cache memory accounting. + fn size(&self) -> usize; +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CacheEntryInfo { + pub value: V, + pub size_bytes: usize, + pub hits: usize, + pub expires: Option, +} + +impl Debug for dyn Cache { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Cache name: {} with length: {}", self.name(), self.len()) + } +} + +impl CacheKey for Path { + fn size(&self) -> usize { + self.as_ref().heap_size(&mut DFHeapSizeCtx::default()) + } + + fn table_ref(&self) -> Option<&TableReference> { + None + } +} + +impl CacheKey for TableScopedPath { + fn size(&self) -> usize { + DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) + } + + fn table_ref(&self) -> Option<&TableReference> { + self.table.as_ref() + } +} + +/// Each entry is scoped to its use within a specific table so that the cache +/// can differentiate between identical paths in different tables, and +/// table-level cache invalidation. +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct TableScopedPath { + pub table: Option, + pub path: Path, +} + +impl DFHeapSize for TableScopedPath { + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) + } +} + +impl Display for TableScopedPath { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(table) = &self.table { + write!(f, "{}, {}", self.path, table) + } else { + write!(f, "{}", self.path) + } + } }