From 89cd3d2ac6546f6e5d74348bd51055794c398023 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 19 May 2026 02:28:24 +0800 Subject: [PATCH 1/6] [fix](filecache) Exclude warmup reads from hit ratio metrics ### What problem does this PR solve? Issue Number: N/A Related PR: N/A Problem Summary: File cache hit ratio metrics are derived from global file cache read bytes, but warmup reads from manual warmup, periodic warmup, event-driven warmup, and rebalance-triggered warmup used to update the same counters as query reads. This polluted the query hit ratio. Mixed hit/miss reads could also be attributed to one source for the whole request. This change skips warmup updates to global file cache read metrics while preserving per-IOContext profile stats, records local/remote/peer bytes by actual returned bytes, and avoids updating metrics for failed reads. It also fixes direct-read partial continuation and no-warmup miss-only hit ratio refresh. After rebase, the warmup metrics UT exposed a separate ASAN issue because the test snapshot helper triggered all metric hooks, including a stale StorageEngine hook that captured a destroyed engine. The test now snapshots FileCacheMetrics directly, and StorageEngine deregisters its hook on destruction. ### Release note File cache hit ratio metrics now exclude warmup reads. ### Check List (For Author) - Test: Regression test / Unit Test - Unit Test: DORIS_TOOLCHAIN=clang DISABLE_BE_JAVA_EXTENSIONS=ON ENABLE_INJECTION_POINT=ON ENABLE_CACHE_LOCK_DEBUG=0 ENABLE_PCH=0 sh run-be-ut.sh --run --filter=BlockFileCacheTest.cached_remote_file_reader_warmup_does_not_update_global_metrics - Unit Test: DORIS_TOOLCHAIN=clang DISABLE_BE_JAVA_EXTENSIONS=ON ENABLE_INJECTION_POINT=ON ENABLE_CACHE_LOCK_DEBUG=0 ENABLE_PCH=0 sh run-be-ut.sh --run --filter='BlockFileCacheTest.cached_remote_file_reader*' - Behavior changed: Yes. Warmup reads no longer contribute to global file cache read metrics used for query hit ratio; per-IOContext profile stats are preserved. - Does this need documentation: No --- be/src/io/cache/block_file_cache.cpp | 6 +- be/src/io/cache/block_file_cache_profile.cpp | 37 ++ be/src/io/cache/block_file_cache_profile.h | 7 +- be/src/io/cache/cached_remote_file_reader.cpp | 88 ++-- be/src/io/cache/file_cache_common.h | 3 + be/src/storage/storage_engine.cpp | 1 + ...block_file_cache_profile_reporter_test.cpp | 139 ++++++ be/test/io/cache/block_file_cache_test.cpp | 365 +++++++++++++- ...le_cache_warmup_read_metrics_docker.groovy | 457 ++++++++++++++++++ 9 files changed, 1042 insertions(+), 61 deletions(-) create mode 100644 be/test/io/cache/block_file_cache_profile_reporter_test.cpp create mode 100644 regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 3ff1526c32fc5b..78cf530ec2d982 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -2000,16 +2000,16 @@ void BlockFileCache::run_background_monitor() { (double)_num_read_blocks_1h->get_value()); } - if (_no_warmup_num_hit_blocks->get_value() > 0) { + if (_no_warmup_num_read_blocks->get_value() > 0) { _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() / (double)_no_warmup_num_read_blocks->get_value()); } - if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) { + if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) { _no_warmup_hit_ratio_5m->set_value( (double)_no_warmup_num_hit_blocks_5m->get_value() / (double)_no_warmup_num_read_blocks_5m->get_value()); } - if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) { + if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) { _no_warmup_hit_ratio_1h->set_value( (double)_no_warmup_num_hit_blocks_1h->get_value() / (double)_no_warmup_num_read_blocks_1h->get_value()); diff --git a/be/src/io/cache/block_file_cache_profile.cpp b/be/src/io/cache/block_file_cache_profile.cpp index 692174dbbcb71a..8f9c167c9989e6 100644 --- a/be/src/io/cache/block_file_cache_profile.cpp +++ b/be/src/io/cache/block_file_cache_profile.cpp @@ -65,6 +65,43 @@ void FileCacheMetrics::update_metrics_callback() { stats->num_io_bytes_read_from_peer); } +FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& current, + const FileCacheStatistics& previous) { + FileCacheStatistics diff; +#define SUBTRACT_FIELD(field) diff.field = current.field - previous.field + SUBTRACT_FIELD(num_local_io_total); + SUBTRACT_FIELD(num_remote_io_total); + SUBTRACT_FIELD(num_peer_io_total); + SUBTRACT_FIELD(local_io_timer); + SUBTRACT_FIELD(bytes_read_from_local); + SUBTRACT_FIELD(bytes_read_from_remote); + SUBTRACT_FIELD(bytes_read_from_peer); + SUBTRACT_FIELD(remote_io_timer); + SUBTRACT_FIELD(peer_io_timer); + SUBTRACT_FIELD(remote_wait_timer); + SUBTRACT_FIELD(write_cache_io_timer); + SUBTRACT_FIELD(bytes_write_into_cache); + SUBTRACT_FIELD(num_skip_cache_io_total); + SUBTRACT_FIELD(read_cache_file_directly_timer); + SUBTRACT_FIELD(cache_get_or_set_timer); + SUBTRACT_FIELD(lock_wait_timer); + SUBTRACT_FIELD(get_timer); + SUBTRACT_FIELD(set_timer); + + SUBTRACT_FIELD(inverted_index_num_local_io_total); + SUBTRACT_FIELD(inverted_index_num_remote_io_total); + SUBTRACT_FIELD(inverted_index_num_peer_io_total); + SUBTRACT_FIELD(inverted_index_bytes_read_from_local); + SUBTRACT_FIELD(inverted_index_bytes_read_from_remote); + SUBTRACT_FIELD(inverted_index_bytes_read_from_peer); + SUBTRACT_FIELD(inverted_index_local_io_timer); + SUBTRACT_FIELD(inverted_index_remote_io_timer); + SUBTRACT_FIELD(inverted_index_peer_io_timer); + SUBTRACT_FIELD(inverted_index_io_timer); +#undef SUBTRACT_FIELD + return diff; +} + FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) { static const char* cache_profile = "FileCache"; ADD_TIMER_WITH_LEVEL(profile, cache_profile, 2); diff --git a/be/src/io/cache/block_file_cache_profile.h b/be/src/io/cache/block_file_cache_profile.h index 5fdb82fbd61a4d..6c95e49791c054 100644 --- a/be/src/io/cache/block_file_cache_profile.h +++ b/be/src/io/cache/block_file_cache_profile.h @@ -52,9 +52,9 @@ class FileCacheMetrics { } void update(FileCacheStatistics* stats); + std::shared_ptr report(); private: - std::shared_ptr report(); void register_entity(); void update_metrics_callback(); @@ -64,6 +64,9 @@ class FileCacheMetrics { std::shared_ptr _statistics; }; +FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& current, + const FileCacheStatistics& previous); + struct FileCacheProfileReporter { RuntimeProfile::Counter* num_local_io_total = nullptr; RuntimeProfile::Counter* num_remote_io_total = nullptr; @@ -100,4 +103,4 @@ struct FileCacheProfileReporter { }; } // namespace io -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index 47b534d2dc9e42..7c297737d60a44 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -297,6 +297,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* ReadStatistics stats; stats.bytes_read += bytes_req; + bool read_success = false; MonotonicStopWatch read_at_sw; read_at_sw.start(); auto defer_func = [&](int*) { @@ -315,10 +316,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* if (is_dryrun) { return; } - // update stats increment in this reading procedure for file cache metrics - FileCacheStatistics fcache_stats_increment; - _update_stats(stats, &fcache_stats_increment, io_ctx->is_inverted_index); - io::FileCacheMetrics::instance().update(&fcache_stats_increment); + if (!read_success) { + return; + } + if (!io_ctx->is_warmup) { + // update stats increment in this reading procedure for file cache metrics + FileCacheStatistics fcache_stats_increment; + _update_stats(stats, &fcache_stats_increment, io_ctx->is_inverted_index); + io::FileCacheMetrics::instance().update(&fcache_stats_increment); + } if (io_ctx->file_cache_stats) { // update stats in io_ctx, for query profile _update_stats(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index); @@ -355,6 +361,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* .ok()) { //TODO: maybe read failed because block evict, should handle error break; } + stats.bytes_read_from_local += reserve_bytes; } _cache->add_need_update_lru_block(iter->second); need_read_size -= reserve_bytes; @@ -367,6 +374,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* stats.hit_cache = true; g_read_cache_direct_whole_num << 1; g_read_cache_direct_whole_bytes << bytes_req; + read_success = true; return Status::OK(); } else { g_read_cache_direct_partial_num << 1; @@ -442,6 +450,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* // Determine read type and execute remote read RETURN_IF_ERROR( _execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx)); + bool empty_blocks_from_peer_cache = stats.from_peer_cache; { SCOPED_CONCURRENCY_COUNT( @@ -476,20 +485,25 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* size_t copy_size = copy_right_offset - copy_left_offset + 1; memcpy(dst, src, copy_size); indirect_read_bytes += copy_size; + if (empty_blocks_from_peer_cache) { + stats.bytes_read_from_peer += copy_size; + } else { + stats.bytes_read_from_remote += copy_size; + } } } - size_t current_offset = offset; + size_t current_offset = offset + already_read; size_t end_offset = offset + bytes_req - 1; bool need_self_heal = false; - *bytes_read = 0; + *bytes_read = already_read; for (auto& block : holder.file_blocks) { if (current_offset > end_offset) { break; } size_t left = block->range().left; size_t right = block->range().right; - if (right < offset) { + if (right < current_offset) { continue; } size_t read_size = @@ -536,7 +550,10 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* ConcurrencyStatsManager::instance().cached_remote_reader_local_read); st = block->read(Slice(result.data + (current_offset - offset), read_size), file_offset); - indirect_read_bytes += read_size; + if (st.ok()) { + indirect_read_bytes += read_size; + stats.bytes_read_from_local += read_size; + } } } if (!st || block_state != FileBlock::State::DOWNLOADED) { @@ -567,6 +584,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* &nest_bytes_read)); indirect_read_bytes += read_size; DCHECK(nest_bytes_read == read_size); + stats.bytes_read_from_remote += nest_bytes_read; } } } @@ -580,6 +598,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* g_read_cache_indirect_total_bytes << *bytes_read; DCHECK(*bytes_read == bytes_req); + if (!is_dryrun) { + DCHECK_EQ(stats.bytes_read_from_local + stats.bytes_read_from_remote + + stats.bytes_read_from_peer, + bytes_req); + } + read_success = true; return Status::OK(); } @@ -589,19 +613,19 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, if (statis == nullptr) { return; } - if (read_stats.hit_cache) { + if (read_stats.bytes_read_from_local > 0) { statis->num_local_io_total++; - statis->bytes_read_from_local += read_stats.bytes_read; - } else { - if (read_stats.from_peer_cache) { - statis->num_peer_io_total++; - statis->bytes_read_from_peer += read_stats.bytes_read; - statis->peer_io_timer += read_stats.peer_read_timer; - } else { - statis->num_remote_io_total++; - statis->bytes_read_from_remote += read_stats.bytes_read; - statis->remote_io_timer += read_stats.remote_read_timer; - } + statis->bytes_read_from_local += read_stats.bytes_read_from_local; + } + if (read_stats.bytes_read_from_remote > 0) { + statis->num_remote_io_total++; + statis->bytes_read_from_remote += read_stats.bytes_read_from_remote; + statis->remote_io_timer += read_stats.remote_read_timer; + } + if (read_stats.bytes_read_from_peer > 0) { + statis->num_peer_io_total++; + statis->bytes_read_from_peer += read_stats.bytes_read_from_peer; + statis->peer_io_timer += read_stats.peer_read_timer; } statis->remote_wait_timer += read_stats.remote_wait_timer; statis->local_io_timer += read_stats.local_read_timer; @@ -616,19 +640,19 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, statis->set_timer += read_stats.set_timer; if (is_inverted_index) { - if (read_stats.hit_cache) { + if (read_stats.bytes_read_from_local > 0) { statis->inverted_index_num_local_io_total++; - statis->inverted_index_bytes_read_from_local += read_stats.bytes_read; - } else { - if (read_stats.from_peer_cache) { - statis->inverted_index_num_peer_io_total++; - statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read; - statis->inverted_index_peer_io_timer += read_stats.peer_read_timer; - } else { - statis->inverted_index_num_remote_io_total++; - statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read; - statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; - } + statis->inverted_index_bytes_read_from_local += read_stats.bytes_read_from_local; + } + if (read_stats.bytes_read_from_remote > 0) { + statis->inverted_index_num_remote_io_total++; + statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read_from_remote; + statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; + } + if (read_stats.bytes_read_from_peer > 0) { + statis->inverted_index_num_peer_io_total++; + statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read_from_peer; + statis->inverted_index_peer_io_timer += read_stats.peer_read_timer; } statis->inverted_index_local_io_timer += read_stats.local_read_timer; } diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 43510293af8cfd..8fde90abc0a7b2 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -71,6 +71,9 @@ struct ReadStatistics { bool from_peer_cache = false; bool skip_cache = false; int64_t bytes_read = 0; + int64_t bytes_read_from_local = 0; + int64_t bytes_read_from_remote = 0; + int64_t bytes_read_from_peer = 0; int64_t bytes_write_into_file_cache = 0; int64_t remote_read_timer = 0; int64_t peer_read_timer = 0; diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index b7b16a6ba1d3b9..e2b25bd7f31b2e 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -257,6 +257,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) } StorageEngine::~StorageEngine() { + DEREGISTER_HOOK_METRIC(unused_rowsets_count); stop(); } diff --git a/be/test/io/cache/block_file_cache_profile_reporter_test.cpp b/be/test/io/cache/block_file_cache_profile_reporter_test.cpp new file mode 100644 index 00000000000000..e74ad758ac1db3 --- /dev/null +++ b/be/test/io/cache/block_file_cache_profile_reporter_test.cpp @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "io/cache/block_file_cache_profile.h" + +namespace doris { +namespace { + +io::FileCacheStatistics make_file_cache_stats(int64_t multiplier) { + io::FileCacheStatistics stats; + stats.num_local_io_total = multiplier; + stats.num_remote_io_total = multiplier * 2; + stats.num_peer_io_total = multiplier * 3; + stats.local_io_timer = multiplier * 4; + stats.bytes_read_from_local = multiplier * 5; + stats.bytes_read_from_remote = multiplier * 6; + stats.bytes_read_from_peer = multiplier * 7; + stats.remote_io_timer = multiplier * 8; + stats.peer_io_timer = multiplier * 9; + stats.remote_wait_timer = multiplier * 10; + stats.write_cache_io_timer = multiplier * 11; + stats.bytes_write_into_cache = multiplier * 12; + stats.num_skip_cache_io_total = multiplier * 13; + stats.read_cache_file_directly_timer = multiplier * 14; + stats.cache_get_or_set_timer = multiplier * 15; + stats.lock_wait_timer = multiplier * 16; + stats.get_timer = multiplier * 17; + stats.set_timer = multiplier * 18; + stats.inverted_index_num_local_io_total = multiplier * 19; + stats.inverted_index_num_remote_io_total = multiplier * 20; + stats.inverted_index_num_peer_io_total = multiplier * 21; + stats.inverted_index_bytes_read_from_local = multiplier * 22; + stats.inverted_index_bytes_read_from_remote = multiplier * 23; + stats.inverted_index_bytes_read_from_peer = multiplier * 24; + stats.inverted_index_local_io_timer = multiplier * 25; + stats.inverted_index_remote_io_timer = multiplier * 26; + stats.inverted_index_peer_io_timer = multiplier * 27; + stats.inverted_index_io_timer = multiplier * 28; + return stats; +} + +void expect_file_cache_stats_eq(const io::FileCacheStatistics& actual, + const io::FileCacheStatistics& expected) { + EXPECT_EQ(actual.num_local_io_total, expected.num_local_io_total); + EXPECT_EQ(actual.num_remote_io_total, expected.num_remote_io_total); + EXPECT_EQ(actual.num_peer_io_total, expected.num_peer_io_total); + EXPECT_EQ(actual.local_io_timer, expected.local_io_timer); + EXPECT_EQ(actual.bytes_read_from_local, expected.bytes_read_from_local); + EXPECT_EQ(actual.bytes_read_from_remote, expected.bytes_read_from_remote); + EXPECT_EQ(actual.bytes_read_from_peer, expected.bytes_read_from_peer); + EXPECT_EQ(actual.remote_io_timer, expected.remote_io_timer); + EXPECT_EQ(actual.peer_io_timer, expected.peer_io_timer); + EXPECT_EQ(actual.remote_wait_timer, expected.remote_wait_timer); + EXPECT_EQ(actual.write_cache_io_timer, expected.write_cache_io_timer); + EXPECT_EQ(actual.bytes_write_into_cache, expected.bytes_write_into_cache); + EXPECT_EQ(actual.num_skip_cache_io_total, expected.num_skip_cache_io_total); + EXPECT_EQ(actual.read_cache_file_directly_timer, expected.read_cache_file_directly_timer); + EXPECT_EQ(actual.cache_get_or_set_timer, expected.cache_get_or_set_timer); + EXPECT_EQ(actual.lock_wait_timer, expected.lock_wait_timer); + EXPECT_EQ(actual.get_timer, expected.get_timer); + EXPECT_EQ(actual.set_timer, expected.set_timer); + EXPECT_EQ(actual.inverted_index_num_local_io_total, expected.inverted_index_num_local_io_total); + EXPECT_EQ(actual.inverted_index_num_remote_io_total, + expected.inverted_index_num_remote_io_total); + EXPECT_EQ(actual.inverted_index_num_peer_io_total, expected.inverted_index_num_peer_io_total); + EXPECT_EQ(actual.inverted_index_bytes_read_from_local, + expected.inverted_index_bytes_read_from_local); + EXPECT_EQ(actual.inverted_index_bytes_read_from_remote, + expected.inverted_index_bytes_read_from_remote); + EXPECT_EQ(actual.inverted_index_bytes_read_from_peer, + expected.inverted_index_bytes_read_from_peer); + EXPECT_EQ(actual.inverted_index_local_io_timer, expected.inverted_index_local_io_timer); + EXPECT_EQ(actual.inverted_index_remote_io_timer, expected.inverted_index_remote_io_timer); + EXPECT_EQ(actual.inverted_index_peer_io_timer, expected.inverted_index_peer_io_timer); + EXPECT_EQ(actual.inverted_index_io_timer, expected.inverted_index_io_timer); +} + +} // namespace + +TEST(FileCacheProfileReporterTest, DiffReturnsFullStatsWhenPreviousIsZero) { + const auto current = make_file_cache_stats(3); + + expect_file_cache_stats_eq(io::diff_file_cache_statistics(current, {}), current); +} + +TEST(FileCacheProfileReporterTest, DiffReturnsOnlyIncrementalDelta) { + expect_file_cache_stats_eq( + io::diff_file_cache_statistics(make_file_cache_stats(5), make_file_cache_stats(3)), + make_file_cache_stats(2)); +} + +TEST(FileCacheProfileReporterTest, DiffReturnsZeroWithoutNewData) { + const auto current = make_file_cache_stats(4); + + expect_file_cache_stats_eq(io::diff_file_cache_statistics(current, current), + make_file_cache_stats(0)); +} + +TEST(FileCacheProfileReporterTest, ReporterAggregatesDeltaReportsToExactFinalTotals) { + auto profile = std::make_unique("test_profile"); + io::FileCacheProfileReporter reporter(profile.get()); + + const auto after_first_report = make_file_cache_stats(4); + const auto after_second_report = make_file_cache_stats(7); + const auto first_delta = io::diff_file_cache_statistics(after_first_report, {}); + reporter.update(&first_delta); + + const auto second_delta = + io::diff_file_cache_statistics(after_second_report, after_first_report); + reporter.update(&second_delta); + + EXPECT_EQ(profile->get_counter("BytesScannedFromCache")->value(), + after_second_report.bytes_read_from_local); + EXPECT_EQ(profile->get_counter("BytesScannedFromRemote")->value(), + after_second_report.bytes_read_from_remote); + EXPECT_EQ(profile->get_counter("BytesWriteIntoCache")->value(), + after_second_report.bytes_write_into_cache); + EXPECT_EQ(profile->get_counter("CacheGetOrSetTimer")->value(), + after_second_report.cache_get_or_set_timer); + EXPECT_EQ(profile->get_counter("LockWaitTimer")->value(), after_second_report.lock_wait_timer); +} + +} // namespace doris diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 9f5563efb3d1f5..111e5824c6f985 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -19,6 +19,7 @@ // and modified by Doris #include "io/cache/block_file_cache_test_common.h" +#include "io/fs/buffered_reader.h" #include "storage/olap_define.h" namespace doris::io { @@ -43,6 +44,114 @@ fs::path caches_dir = fs::current_path() / "lru_cache_test"; std::string cache_base_path = caches_dir / "cache1" / ""; std::string tmp_file = caches_dir / "tmp_file"; +io::FileCacheSettings cached_remote_reader_cache_settings() { + io::FileCacheSettings settings; + settings.query_queue_size = 6291456; + settings.query_queue_elements = 6; + settings.index_queue_size = 1048576; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1048576; + settings.disposable_queue_elements = 1; + settings.capacity = 8388608; + settings.max_file_block_size = 1048576; + settings.max_query_cache_size = 0; + return settings; +} + +void reset_file_cache_factory_for_test() { + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; +} + +Status create_cached_remote_reader_cache(const std::string& cache_path, BlockFileCache** cache) { + reset_file_cache_factory_for_test(); + if (fs::exists(cache_path)) { + fs::remove_all(cache_path); + } + fs::create_directories(cache_path); + + RETURN_IF_ERROR(FileCacheFactory::instance()->create_file_cache( + cache_path, cached_remote_reader_cache_settings())); + *cache = FileCacheFactory::instance()->_path_to_cache[cache_path]; + DORIS_CHECK(*cache != nullptr); + for (int i = 0; i < 100; i++) { + if ((*cache)->get_async_open_success()) { + return Status::OK(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return Status::TimedOut("file cache async open timeout for path {}", cache_path); +} + +void cleanup_cached_remote_reader_cache(const std::string& cache_path) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + if (fs::exists(cache_path)) { + fs::remove_all(cache_path); + } + reset_file_cache_factory_for_test(); +} + +struct FileCacheMetricSnapshot { + int64_t total = 0; + int64_t local = 0; + int64_t remote = 0; + int64_t peer = 0; +}; + +FileCacheMetricSnapshot get_file_cache_metric_snapshot() { + FileCacheStatistics empty_stats; + FileCacheMetrics::instance().update(&empty_stats); + std::shared_ptr stats = FileCacheMetrics::instance().report(); + auto local = stats->num_io_bytes_read_from_cache.load(); + auto remote = stats->num_io_bytes_read_from_remote.load(); + auto peer = stats->num_io_bytes_read_from_peer.load(); + return {.total = local + remote + peer, .local = local, .remote = remote, .peer = peer}; +} + +void expect_file_cache_metric_delta(const FileCacheMetricSnapshot& before, + const FileCacheMetricSnapshot& after, int64_t local, + int64_t remote, int64_t peer) { + EXPECT_EQ(after.local - before.local, local); + EXPECT_EQ(after.remote - before.remote, remote); + EXPECT_EQ(after.peer - before.peer, peer); + EXPECT_EQ(after.total - before.total, local + remote + peer); +} + +class FailAfterOffsetFileReader : public FileReader { +public: + FailAfterOffsetFileReader(FileReaderSPtr reader, size_t fail_offset) + : _reader(std::move(reader)), _fail_offset(fail_offset) {} + ~FailAfterOffsetFileReader() override = default; + + void set_fail(bool fail) { _fail = fail; } + + Status close() override { return _reader->close(); } + + const Path& path() const override { return _reader->path(); } + + size_t size() const override { return _reader->size(); } + + bool closed() const override { return _reader->closed(); } + + int64_t mtime() const override { return _reader->mtime(); } + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) override { + if (_fail && offset >= _fail_offset) { + *bytes_read = 0; + return Status::IOError("inject remote read failure at offset {}", offset); + } + return _reader->read_at(offset, result, bytes_read, io_ctx); + } + +private: + FileReaderSPtr _reader; + size_t _fail_offset; + bool _fail = false; +}; + void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr file_block, const io::FileBlock::Range& expected_range, io::FileBlock::State expected_state) { auto range = file_block->range(); @@ -3625,6 +3734,136 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) { extern bvar::Adder g_read_cache_self_heal_on_not_found; +TEST_F(BlockFileCacheTest, cached_remote_file_reader_warmup_does_not_update_global_metrics) { + bool origin_enable_direct_read = config::enable_read_cache_file_directly; + config::enable_read_cache_file_directly = false; + Defer reset_direct_read { + [&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }}; + + std::string cache_base_path = caches_dir / "cached_remote_reader_warmup_metrics" / ""; + BlockFileCache* cache = nullptr; + ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, &cache).ok()); + Defer cleanup_cache {[&] { cleanup_cached_remote_reader_cache(cache_base_path); }}; + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + opts.tablet_id = 10086; + CachedRemoteFileReader reader(local_reader, opts); + + std::string buffer(64_kb, '\0'); + size_t bytes_read = 0; + + auto before_normal_read = get_file_cache_metric_snapshot(); + FileCacheStatistics normal_stats; + IOContext normal_ctx; + normal_ctx.file_cache_stats = &normal_stats; + ASSERT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &normal_ctx).ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(std::string(64_kb, '0'), buffer); + EXPECT_EQ(normal_stats.bytes_read_from_local, 0); + EXPECT_EQ(normal_stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(normal_stats.bytes_read_from_peer, 0); + EXPECT_EQ(normal_stats.num_local_io_total, 0); + EXPECT_EQ(normal_stats.num_remote_io_total, 1); + auto after_normal_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_normal_read, after_normal_read, 0, 64_kb, 0); + + auto before_warmup_miss = get_file_cache_metric_snapshot(); + FileCacheStatistics warmup_miss_stats; + IOContext warmup_miss_ctx; + warmup_miss_ctx.file_cache_stats = &warmup_miss_stats; + warmup_miss_ctx.is_warmup = true; + buffer.assign(64_kb, '\0'); + ASSERT_TRUE( + reader.read_at(2_mb, Slice(buffer.data(), buffer.size()), &bytes_read, &warmup_miss_ctx) + .ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(std::string(64_kb, '2'), buffer); + EXPECT_EQ(warmup_miss_stats.bytes_read_from_local, 0); + EXPECT_EQ(warmup_miss_stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(warmup_miss_stats.bytes_read_from_peer, 0); + EXPECT_EQ(warmup_miss_stats.num_remote_io_total, 1); + auto after_warmup_miss = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_warmup_miss, after_warmup_miss, 0, 0, 0); + + auto before_warmup_hit = get_file_cache_metric_snapshot(); + FileCacheStatistics warmup_hit_stats; + IOContext warmup_hit_ctx; + warmup_hit_ctx.file_cache_stats = &warmup_hit_stats; + warmup_hit_ctx.is_warmup = true; + buffer.assign(64_kb, '\0'); + ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &warmup_hit_ctx) + .ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(std::string(64_kb, '0'), buffer); + EXPECT_EQ(warmup_hit_stats.bytes_read_from_local, 64_kb); + EXPECT_EQ(warmup_hit_stats.bytes_read_from_remote, 0); + EXPECT_EQ(warmup_hit_stats.bytes_read_from_peer, 0); + EXPECT_EQ(warmup_hit_stats.num_local_io_total, 1); + auto after_warmup_hit = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_warmup_hit, after_warmup_hit, 0, 0, 0); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); +} + +TEST_F(BlockFileCacheTest, cached_remote_file_reader_mixed_hit_miss_stats_are_split_by_bytes) { + bool origin_enable_direct_read = config::enable_read_cache_file_directly; + config::enable_read_cache_file_directly = false; + Defer reset_direct_read { + [&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }}; + + std::string cache_base_path = caches_dir / "cached_remote_reader_mixed_hit_miss_stats" / ""; + BlockFileCache* cache = nullptr; + ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, &cache).ok()); + Defer cleanup_cache {[&] { cleanup_cached_remote_reader_cache(cache_base_path); }}; + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + opts.tablet_id = 10086; + CachedRemoteFileReader reader(local_reader, opts); + + std::string buffer(64_kb, '\0'); + size_t bytes_read = 0; + FileCacheStatistics prime_stats; + IOContext prime_ctx; + prime_ctx.file_cache_stats = &prime_stats; + ASSERT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &prime_ctx).ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(prime_stats.bytes_read_from_remote, 64_kb); + + auto before_mixed_read = get_file_cache_metric_snapshot(); + FileCacheStatistics mixed_stats; + IOContext mixed_ctx; + mixed_ctx.file_cache_stats = &mixed_stats; + buffer.assign(64_kb, '\0'); + ASSERT_TRUE(reader.read_at(1_mb - 32_kb, Slice(buffer.data(), buffer.size()), &bytes_read, + &mixed_ctx) + .ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(buffer.substr(0, 32_kb), std::string(32_kb, '0')); + EXPECT_EQ(buffer.substr(32_kb), std::string(32_kb, '1')); + EXPECT_EQ(mixed_stats.bytes_read_from_local, 32_kb); + EXPECT_EQ(mixed_stats.bytes_read_from_remote, 32_kb); + EXPECT_EQ(mixed_stats.bytes_read_from_peer, 0); + EXPECT_EQ(mixed_stats.num_local_io_total, 1); + EXPECT_EQ(mixed_stats.num_remote_io_total, 1); + EXPECT_EQ(mixed_stats.num_peer_io_total, 0); + auto after_mixed_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_mixed_read, after_mixed_read, 32_kb, 32_kb, 0); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); +} + TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not_found) { bool origin_enable_direct_read = config::enable_read_cache_file_directly; config::enable_read_cache_file_directly = false; @@ -3650,6 +3889,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not settings.max_query_cache_size = 0; ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok()); auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path]; + Defer cleanup_cache {[&] { cleanup_cached_remote_reader_cache(cache_base_path); }}; for (int i = 0; i < 100; i++) { if (cache->get_async_open_success()) { break; @@ -3659,11 +3899,12 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not FileReaderSPtr local_reader; ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + auto memory_reader = std::make_shared(local_reader); io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; opts.tablet_id = 10086; - CachedRemoteFileReader reader(local_reader, opts); + CachedRemoteFileReader reader(memory_reader, opts); uint64_t before_self_heal = g_read_cache_self_heal_on_not_found.get_value(); @@ -3674,25 +3915,39 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not size_t bytes_read = 0; ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); EXPECT_EQ(std::string(64_kb, '0'), buffer); + EXPECT_EQ(stats.bytes_read_from_local, 0); + EXPECT_EQ(stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(stats.bytes_read_from_peer, 0); auto key = io::BlockFileCache::hash("tmp_file"); - { - io::CacheContext inspect_ctx; - ReadStatistics inspect_stats; - inspect_ctx.stats = &inspect_stats; - inspect_ctx.cache_type = io::FileCacheType::NORMAL; - auto inspect_holder = cache->get_or_set(key, 0, 64_kb, inspect_ctx); - auto inspect_blocks = fromHolder(inspect_holder); - ASSERT_EQ(inspect_blocks.size(), 1); - ASSERT_EQ(inspect_blocks[0]->state(), io::FileBlock::State::DOWNLOADED); - std::string cache_file = inspect_blocks[0]->get_cache_file(); - ASSERT_TRUE(fs::exists(cache_file)); - ASSERT_TRUE(global_local_filesystem()->delete_file(cache_file).ok()); - ASSERT_FALSE(fs::exists(cache_file)); - } - ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); + auto* sp = SyncPoint::get_instance(); + sp->enable_processing(); + Defer defer {[&] { + sp->clear_call_back("LocalFileReader::read_at_impl"); + sp->disable_processing(); + }}; + sp->set_call_back("LocalFileReader::read_at_impl", [&](auto&& values) { + std::pair* pair = try_any_cast*>(values.back()); + pair->first = Status::NotFound("inject cache file not found"); + pair->second = true; + }); + + auto before_fallback_read = get_file_cache_metric_snapshot(); + FileCacheStatistics fallback_stats; + IOContext fallback_ctx; + fallback_ctx.file_cache_stats = &fallback_stats; + ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &fallback_ctx) + .ok()); EXPECT_EQ(std::string(64_kb, '0'), buffer); + EXPECT_EQ(fallback_stats.bytes_read_from_local, 0); + EXPECT_EQ(fallback_stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(fallback_stats.bytes_read_from_peer, 0); + EXPECT_EQ(fallback_stats.num_local_io_total, 0); + EXPECT_EQ(fallback_stats.num_remote_io_total, 1); + EXPECT_EQ(fallback_stats.num_peer_io_total, 0); + auto after_fallback_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_fallback_read, after_fallback_read, 0, 64_kb, 0); bool self_healed = false; for (int i = 0; i < 100; ++i) { @@ -3713,13 +3968,6 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not EXPECT_TRUE(reader.close().ok()); EXPECT_TRUE(reader.closed()); - std::this_thread::sleep_for(std::chrono::seconds(1)); - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } - FileCacheFactory::instance()->_caches.clear(); - FileCacheFactory::instance()->_path_to_cache.clear(); - FileCacheFactory::instance()->_capacity = 0; } TEST_F(BlockFileCacheTest, cached_remote_file_reader_no_self_heal_on_non_not_found_error) { @@ -3810,6 +4058,60 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_no_self_heal_on_non_not_fou FileCacheFactory::instance()->_capacity = 0; } +TEST_F(BlockFileCacheTest, cached_remote_file_reader_failed_read_does_not_update_metrics) { + bool origin_enable_direct_read = config::enable_read_cache_file_directly; + config::enable_read_cache_file_directly = true; + Defer reset_direct_read { + [&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }}; + + std::string cache_base_path = caches_dir / "cached_remote_reader_failed_read_metrics" / ""; + BlockFileCache* cache = nullptr; + ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, &cache).ok()); + Defer cleanup_cache {[&] { cleanup_cached_remote_reader_cache(cache_base_path); }}; + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + auto remote_reader = std::make_shared(local_reader); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + opts.tablet_id = 10086; + auto failing_remote_reader = + std::make_shared(remote_reader, static_cast(1_mb)); + CachedRemoteFileReader reader(failing_remote_reader, opts); + + std::string buffer(64_kb, '\0'); + size_t bytes_read = 0; + FileCacheStatistics prime_stats; + IOContext prime_ctx; + prime_ctx.file_cache_stats = &prime_stats; + ASSERT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &prime_ctx).ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(prime_stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(cache->_cur_cache_size, 1_mb); + + failing_remote_reader->set_fail(true); + + auto before_failed_read = get_file_cache_metric_snapshot(); + FileCacheStatistics failed_stats; + IOContext failed_ctx; + failed_ctx.file_cache_stats = &failed_stats; + buffer.assign(64_kb, '\0'); + auto st = reader.read_at(1_mb - 100, Slice(buffer.data(), buffer.size()), &bytes_read, + &failed_ctx); + ASSERT_FALSE(st.ok()); + auto after_failed_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_failed_read, after_failed_read, 0, 0, 0); + EXPECT_EQ(buffer.substr(0, 100), std::string(100, '0')); + EXPECT_EQ(failed_stats.bytes_read_from_local, 0); + EXPECT_EQ(failed_stats.bytes_read_from_remote, 0); + EXPECT_EQ(failed_stats.bytes_read_from_peer, 0); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); +} + TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) { std::string cache_base_path = caches_dir / "cached_remote_file_reader_init" / ""; if (fs::exists(cache_base_path)) { @@ -7908,7 +8210,10 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) { config::enable_evict_file_cache_in_advance = false; config::file_cache_enter_disk_resource_limit_mode_percent = 99; + bool origin_enable_direct_read = config::enable_read_cache_file_directly; config::enable_read_cache_file_directly = true; + Defer reset_direct_read { + [&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }}; if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -7975,10 +8280,22 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) { 64_kb); // try to read first two blocks + FileCacheStatistics partial_stats; + IOContext partial_io_ctx; + partial_io_ctx.file_cache_stats = &partial_stats; + auto before_partial_read = get_file_cache_metric_snapshot(); ASSERT_TRUE(reader->read_at(1048576 - 100, Slice(buffer.data(), buffer.size()), &bytes_read, - &io_ctx) + &partial_io_ctx) .ok()); std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_EQ(partial_stats.bytes_read_from_local, 100); + EXPECT_EQ(partial_stats.bytes_read_from_remote, 64_kb - 100); + EXPECT_EQ(partial_stats.bytes_read_from_peer, 0); + EXPECT_EQ(partial_stats.num_local_io_total, 1); + EXPECT_EQ(partial_stats.num_remote_io_total, 1); + EXPECT_EQ(partial_stats.num_peer_io_total, 0); + auto after_partial_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_partial_read, after_partial_read, 100, 64_kb - 100, 0); EXPECT_EQ(cache->_cur_cache_size, 2097152); EXPECT_EQ(g_read_cache_direct_partial_num.get_value() - org_g_read_cache_direct_partial_num, 1); EXPECT_EQ(g_read_cache_direct_partial_bytes.get_value() - org_g_read_cache_direct_partial_bytes, diff --git a/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy b/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy new file mode 100644 index 00000000000000..e2fc6d6fe65205 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy @@ -0,0 +1,457 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper +import java.net.ServerSocket + +suite('test_file_cache_warmup_read_metrics_docker', 'docker') { + if (!isCloudMode()) { + return + } + + def allocatePort = { + def socket = new ServerSocket(0) + def port = socket.localPort + socket.close() + return port + } + def minioPort = allocatePort() + def minioBucket = "test-bucket" + def minioContainer = "doris-file-cache-warmup-metrics-minio-${System.currentTimeMillis()}" + + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'fetch_cluster_cache_hotspot_interval_ms=1000', + 'heartbeat_interval_second=1', + 'cloud_warm_up_for_rebalance_type=async_warmup', + 'cloud_pre_heating_time_limit_sec=180', + 'auto_check_statistics_in_minutes=60', + ] + options.beConfigs += [ + 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'enable_evict_file_cache_in_advance=false', + 'file_cache_background_monitor_interval_ms=1000', + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'disable_segment_cache=true', + 'disable_storage_page_cache=true', + 'enable_cache_read_from_peer=false', + ] + options.cloudStoreConfigs += [ + 'DORIS_CLOUD_USER=minio', + 'DORIS_CLOUD_AK=minioadmin', + 'DORIS_CLOUD_SK=minioadmin', + "DORIS_CLOUD_BUCKET=${minioBucket}", + "DORIS_CLOUD_ENDPOINT=host.docker.internal:${minioPort}", + "DORIS_CLOUD_EXTERNAL_ENDPOINT=host.docker.internal:${minioPort}", + 'DORIS_CLOUD_REGION=us-east-1', + 'DORIS_CLOUD_PROVIDER=S3', + ] + options.extraHosts += [ + 'host.docker.internal:host-gateway', + "${minioBucket}.host.docker.internal:host-gateway", + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + + def computeCluster = "compute_cluster" + def sourceCluster = "metrics_warmup_source" + def targetCluster = "metrics_warmup_target" + def payload = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + + def waitForCondition = { condition, timeoutMs, message -> + long start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < timeoutMs) { + if (condition()) { + return + } + sleep(1000) + } + if (!condition()) { + throw new RuntimeException(message) + } + } + + def startMinio = { + cmd """docker rm -f ${minioContainer} >/dev/null 2>&1 || true""" + cmd """ + docker run -d --name ${minioContainer} \ + -p ${minioPort}:9000 \ + -e MINIO_ROOT_USER=minioadmin \ + -e MINIO_ROOT_PASSWORD=minioadmin \ + -e MINIO_DOMAIN=host.docker.internal \ + minio/minio:RELEASE.2024-11-07T00-52-20Z \ + server /data --console-address ':9001' + """ + waitForCondition({ + try { + new URL("http://127.0.0.1:${minioPort}/minio/health/ready").text + return true + } catch (Throwable ignored) { + return false + } + }, 60000, "MinIO did not become ready") + cmd """ + docker exec ${minioContainer} sh -c \ + 'mc alias set local http://localhost:9000 minioadmin minioadmin && mc mb -p local/${minioBucket}' + """ + } + + def stopMinio = { + cmd """docker rm -f ${minioContainer} >/dev/null 2>&1 || true""" + } + + def getClusterBackends = { clusterName -> + def backends = sql """SHOW BACKENDS""" + def clusterBackends = backends.findAll { + it[19].contains("""\"compute_group_name\" : \"${clusterName}\"""") + } + assertTrue(clusterBackends.size() > 0, "No backend found for cluster ${clusterName}") + return clusterBackends + } + + def getPromMetric = { ip, httpPort, name -> + def metrics = new URL("http://${ip}:${httpPort}/metrics").text + def matcher = metrics =~ ~"(?m)^${name}\\s+([0-9]+(?:\\.[0-9]+)?)" + if (matcher.find()) { + return new BigDecimal(matcher[0][1]).longValue() + } + logger.info("${name} not found in /metrics for ${ip}:${httpPort}, treat it as 0") + return 0L + } + + def getBrpcMetric = { ip, brpcPort, name -> + def metrics = new URL("http://${ip}:${brpcPort}/brpc_metrics").text + def matcher = metrics =~ ~"(?m)^.*${name}\\s+([0-9]+)" + if (matcher.find()) { + return matcher[0][1] as long + } + logger.info("${name} not found in /brpc_metrics for ${ip}:${brpcPort}, treat it as 0") + return 0L + } + + def getBackendReadMetrics = { be -> + def ip = be[1] + def httpPort = be[4] + return [ + total : getPromMetric(ip, httpPort, "doris_be_num_io_bytes_read_total"), + local : getPromMetric(ip, httpPort, "doris_be_num_io_bytes_read_from_cache"), + remote: getPromMetric(ip, httpPort, "doris_be_num_io_bytes_read_from_remote"), + peer : getPromMetric(ip, httpPort, "doris_be_num_io_bytes_read_from_peer"), + ] + } + + def getClusterReadMetrics = { clusterName -> + def result = [total: 0L, local: 0L, remote: 0L, peer: 0L] + getClusterBackends(clusterName).each { be -> + def metrics = getBackendReadMetrics(be) + result.total += metrics.total + result.local += metrics.local + result.remote += metrics.remote + result.peer += metrics.peer + } + return result + } + + def getMetricDelta = { before, after -> + return [ + total : after.total - before.total, + local : after.local - before.local, + remote: after.remote - before.remote, + peer : after.peer - before.peer, + ] + } + + def assertMetricInvariant = { metrics -> + assertEquals(metrics.total, metrics.local + metrics.remote + metrics.peer) + } + + def assertNoReadMetricDelta = { before, after, label -> + def delta = getMetricDelta(before, after) + logger.info("${label} read metric delta: ${delta}") + assertEquals(0L, delta.total) + assertEquals(0L, delta.local) + assertEquals(0L, delta.remote) + assertEquals(0L, delta.peer) + } + + def clearFileCache = { ip, httpPort -> + def response = new URL("http://${ip}:${httpPort}/api/file_cache?op=clear&sync=true").text + def json = new JsonSlurper().parseText(response) + if (json.status != "OK") { + throw new RuntimeException("Clear cache on ${ip}:${httpPort} failed: ${json.status}") + } + } + + def clearFileCacheOnCluster = { clusterName -> + getClusterBackends(clusterName).each { be -> + clearFileCache(be[1], be[4]) + } + sleep(5000) + } + + def getBackendCacheSize = { be -> + return getBrpcMetric(be[1], be[5], "cache_cache_size") + } + + def getClusterCacheSizeSum = { clusterName -> + long sum = 0 + getClusterBackends(clusterName).each { be -> + sum += getBackendCacheSize(be) + } + return sum + } + + def getClusterBrpcMetricSum = { clusterName, metricName -> + long sum = 0 + getClusterBackends(clusterName).each { be -> + sum += getBrpcMetric(be[1], be[5], metricName) + } + return sum + } + + def createTable = { table, buckets -> + sql """DROP TABLE IF EXISTS ${table}""" + sql """ + CREATE TABLE ${table} ( + k1 INT, + v1 VARCHAR(4096) + ) + DUPLICATE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS ${buckets} + PROPERTIES ( + "replication_num" = "1", + "file_cache_ttl_seconds" = "3600" + ) + """ + } + + def loadRows = { table, rowStart, rowCount -> + int batchSize = 100 + for (int begin = 0; begin < rowCount; begin += batchSize) { + int end = Math.min(rowCount, begin + batchSize) + def values = (begin.. + int key = rowStart + idx + return "(${key}, '${payload}_${key}_${payload}')" + }.join(",") + sql """INSERT INTO ${table} VALUES ${values}""" + } + sql """sync""" + } + + def queryTable = { table -> + def result = sql """SELECT SUM(LENGTH(v1)) FROM ${table} WHERE k1 >= 0""" + assertTrue((result[0][0] as String).toLong() > 0) + } + + def waitWarmupFinished = { jobId -> + waitForCondition({ + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + if (jobInfo.size() == 0) { + return false + } + def state = jobInfo[0][3].toString() + if (state == "CANCELLED") { + throw new RuntimeException("Warm up job ${jobId} was cancelled") + } + return state == "FINISHED" + }, 120000, "Warm up job ${jobId} did not finish") + } + + def cancelWarmupJob = { jobId -> + sql """CANCEL WARM UP JOB WHERE ID = ${jobId}""" + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals("CANCELLED", jobInfo[0][3]) + } + + try { + startMinio() + + docker(options) { + cluster.addBackend(1, sourceCluster) + cluster.addBackend(1, targetCluster) + + sql """SET enable_file_cache = true""" + sql """SET enable_sql_cache = false""" + sql """SET enable_query_cache = false""" + + sql """use @${computeCluster}""" + def manualTable = "test_file_cache_metrics_manual" + createTable(manualTable, 1) + loadRows(manualTable, 1, 300) + clearFileCacheOnCluster(computeCluster) + + def beforeQueryMiss = getClusterReadMetrics(computeCluster) + queryTable(manualTable) + waitForCondition({ + def delta = getMetricDelta(beforeQueryMiss, getClusterReadMetrics(computeCluster)) + return delta.total > 0 && delta.remote > 0 + }, 30000, "Normal query miss did not increase global remote read metrics") + def queryMissDelta = getMetricDelta(beforeQueryMiss, getClusterReadMetrics(computeCluster)) + logger.info("query miss read metric delta: ${queryMissDelta}") + assertMetricInvariant(queryMissDelta) + assertTrue(queryMissDelta.remote > 0) + + clearFileCacheOnCluster(computeCluster) + def beforeManualWarmup = getClusterReadMetrics(computeCluster) + def manualJob = sql """WARM UP CLUSTER ${computeCluster} WITH TABLE ${manualTable}""" + waitWarmupFinished(manualJob[0][0]) + waitForCondition({ + return getClusterCacheSizeSum(computeCluster) > 0 + }, 30000, "Manual table warm up did not populate file cache") + def afterManualWarmup = getClusterReadMetrics(computeCluster) + assertNoReadMetricDelta(beforeManualWarmup, afterManualWarmup, "manual table warm up") + + def beforeQueryHit = getClusterReadMetrics(computeCluster) + queryTable(manualTable) + waitForCondition({ + def delta = getMetricDelta(beforeQueryHit, getClusterReadMetrics(computeCluster)) + return delta.total > 0 && delta.local > 0 + }, 30000, "Query after warm up did not increase global local read metrics") + def queryHitDelta = getMetricDelta(beforeQueryHit, getClusterReadMetrics(computeCluster)) + logger.info("query hit read metric delta: ${queryHitDelta}") + assertMetricInvariant(queryHitDelta) + assertTrue(queryHitDelta.local > 0) + + sql """use @${sourceCluster}""" + sql """TRUNCATE TABLE __internal_schema.cloud_cache_hotspot""" + def periodicTable = "test_file_cache_metrics_periodic" + createTable(periodicTable, 1) + loadRows(periodicTable, 1000, 300) + clearFileCacheOnCluster(sourceCluster) + clearFileCacheOnCluster(targetCluster) + + def beforePeriodicWarmup = getClusterReadMetrics(targetCluster) + def periodicJob = sql """ + WARM UP CLUSTER ${targetCluster} WITH CLUSTER ${sourceCluster} + PROPERTIES ( + "sync_mode" = "periodic", + "sync_interval_sec" = "1" + ) + """ + for (int i = 0; i < 120; i++) { + queryTable(periodicTable) + } + waitForCondition({ + return getClusterCacheSizeSum(sourceCluster) > 0 && + getClusterCacheSizeSum(targetCluster) > 0 + }, 90000, "Periodic warm up did not populate target file cache") + def afterPeriodicWarmup = getClusterReadMetrics(targetCluster) + assertNoReadMetricDelta(beforePeriodicWarmup, afterPeriodicWarmup, + "periodic cluster warm up") + cancelWarmupJob(periodicJob[0][0]) + + def eventTable = "test_file_cache_metrics_event" + createTable(eventTable, 1) + clearFileCacheOnCluster(sourceCluster) + clearFileCacheOnCluster(targetCluster) + def beforeEventWarmup = getClusterReadMetrics(targetCluster) + def eventJob = sql """ + WARM UP CLUSTER ${targetCluster} WITH CLUSTER ${sourceCluster} + PROPERTIES ( + "sync_mode" = "event_driven", + "sync_event" = "load" + ) + """ + waitForCondition({ + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${eventJob[0][0]}""" + return jobInfo.size() > 0 && jobInfo[0][3].toString() in ["RUNNING", "PENDING"] + }, 30000, "Event-driven warm up job did not enter running state") + sleep(15000) + def beforeEventSubmitted = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_submitted_segment_num") + def beforeEventFinished = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_finished_segment_num") + def beforeEventFailed = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_failed_segment_num") + loadRows(eventTable, 2000, 300) + waitForCondition({ + def submitted = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_submitted_segment_num") + def finished = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_finished_segment_num") + return submitted > beforeEventSubmitted && finished >= submitted + }, 90000, "Event-driven warm up did not finish target segment downloads") + def afterEventSubmitted = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_submitted_segment_num") + def afterEventFinished = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_finished_segment_num") + def afterEventFailed = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_failed_segment_num") + logger.info("event-driven warm up segment metrics: submitted ${beforeEventSubmitted}" + + " -> ${afterEventSubmitted}, finished ${beforeEventFinished}" + + " -> ${afterEventFinished}, failed ${beforeEventFailed}" + + " -> ${afterEventFailed}") + assertEquals(beforeEventFailed, afterEventFailed) + def afterEventWarmup = getClusterReadMetrics(targetCluster) + assertNoReadMetricDelta(beforeEventWarmup, afterEventWarmup, + "event-driven cluster warm up") + cancelWarmupJob(eventJob[0][0]) + + sql """use @${computeCluster}""" + def rebalanceTable = "test_file_cache_metrics_rebalance" + createTable(rebalanceTable, 8) + loadRows(rebalanceTable, 3000, 500) + clearFileCacheOnCluster(computeCluster) + def beforeRebalanceSourceQuery = getClusterReadMetrics(computeCluster) + queryTable(rebalanceTable) + waitForCondition({ + def delta = getMetricDelta(beforeRebalanceSourceQuery, + getClusterReadMetrics(computeCluster)) + return delta.total > 0 && delta.remote > 0 + }, 30000, "Rebalance source query did not populate source file cache") + + def beforeBackendIds = getClusterBackends(computeCluster).collect { it[0].toString() } as Set + def beforeRebalanceWarmup = getClusterReadMetrics(computeCluster) + cluster.addBackend(1, computeCluster) + def newBackendHolder = [:] + waitForCondition({ + def added = getClusterBackends(computeCluster).find { + !beforeBackendIds.contains(it[0].toString()) + } + if (added != null) { + newBackendHolder.be = added + return true + } + return false + }, 30000, "New backend was not added to ${computeCluster}") + + waitForCondition({ + def distribution = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM ${rebalanceTable}""" + return distribution.any { + it.BackendId.toString() == newBackendHolder.be[0].toString() && + Integer.valueOf(it.ReplicaNum.toString()) > 0 + } + }, 180000, "Rebalance did not move any replica to the new backend") + + waitForCondition({ + return getBackendCacheSize(newBackendHolder.be) > 0 + }, 180000, "Rebalance warm up did not populate file cache on the new backend") + + def afterRebalanceWarmup = getClusterReadMetrics(computeCluster) + assertNoReadMetricDelta(beforeRebalanceWarmup, afterRebalanceWarmup, + "rebalance warm up") + } + } finally { + stopMinio() + } +} From 4a53caa7ad9cea04cc37d10cc46e6b2edd540e1e Mon Sep 17 00:00:00 2001 From: zhengyu Date: Thu, 21 May 2026 23:13:22 +0800 Subject: [PATCH 2/6] [improvement](filecache) Adapt file cache queue consumption Problem Summary: File cache background consumers used fixed intervals and batch sizes for LRU recorder log replay and _need_update_lru_blocks updates. When producers outpaced those consumers, backlog growth was hard to observe and could increase memory pressure. This change adds queue length metrics for LRU recorder log queues, exposes queue-size accessors, supports bounded LRU log replay, and makes both background consumers adapt their interval and batch size according to backlog watermarks. It also slices block LRU update work into smaller lock-hold batches and skips LRU log recording when tail-record retention is disabled. None - Test: Unit Test - `CCACHE_DISABLE=1 DORIS_TOOLCHAIN=clang DISABLE_BE_JAVA_EXTENSIONS=ON ENABLE_INJECTION_POINT=ON ENABLE_CACHE_LOCK_DEBUG=0 ENABLE_PCH=0 EXTRA_CXX_FLAGS='-Wno-error=deprecated-literal-operator' sh run-be-ut.sh --run --filter=BlockFileCacheTest.test_lru_log_replay_bound_and_disable_record` - `build-support/check-format.sh` - `git diff --check` - Tried `build-support/run-clang-tidy.sh --base origin/master --build-dir be/ut_build_ASAN`; it was blocked by pre-existing/file-level diagnostics and system header lookup errors before producing a clean result. - Behavior changed: Yes. File cache background queue consumers can increase consume frequency and batch size when backlog crosses configured watermarks. - Does this need documentation: No --- be/src/common/config.cpp | 10 ++ be/src/common/config.h | 10 ++ be/src/io/cache/block_file_cache.cpp | 168 ++++++++++++++++-- be/src/io/cache/block_file_cache.h | 8 + be/src/io/cache/lru_queue_recorder.cpp | 22 ++- be/src/io/cache/lru_queue_recorder.h | 5 +- .../cache/block_file_cache_test_lru_dump.cpp | 70 +++++++- 7 files changed, 264 insertions(+), 29 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7b6db1636234aa..1e41d5b5a4384e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1246,6 +1246,16 @@ DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000"); DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000"); DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000"); DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000"); +DEFINE_mBool(enable_file_cache_adaptive_queue_consume, "true"); +DEFINE_mInt64(file_cache_lru_log_replay_adaptive_low_watermark, "100000"); +DEFINE_mInt64(file_cache_lru_log_replay_adaptive_high_watermark, "20000000"); +DEFINE_mInt64(file_cache_lru_log_replay_adaptive_min_interval_ms, "50"); +DEFINE_mInt64(file_cache_lru_log_replay_adaptive_max_batch_per_type, "100000"); +DEFINE_mInt64(file_cache_block_lru_update_adaptive_low_watermark, "10000"); +DEFINE_mInt64(file_cache_block_lru_update_adaptive_high_watermark, "100000"); +DEFINE_mInt64(file_cache_block_lru_update_adaptive_min_interval_ms, "200"); +DEFINE_mInt64(file_cache_block_lru_update_adaptive_max_batch, "10000"); +DEFINE_mInt64(file_cache_block_lru_update_lock_slice_batch, "1000"); DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false"); DEFINE_mBool(file_cache_enable_only_warm_up_idx, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 427282a4452bc4..a7adfcd7babba2 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1290,6 +1290,16 @@ DECLARE_mInt64(file_cache_background_lru_dump_interval_ms); DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold); DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num); DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms); +DECLARE_mBool(enable_file_cache_adaptive_queue_consume); +DECLARE_mInt64(file_cache_lru_log_replay_adaptive_low_watermark); +DECLARE_mInt64(file_cache_lru_log_replay_adaptive_high_watermark); +DECLARE_mInt64(file_cache_lru_log_replay_adaptive_min_interval_ms); +DECLARE_mInt64(file_cache_lru_log_replay_adaptive_max_batch_per_type); +DECLARE_mInt64(file_cache_block_lru_update_adaptive_low_watermark); +DECLARE_mInt64(file_cache_block_lru_update_adaptive_high_watermark); +DECLARE_mInt64(file_cache_block_lru_update_adaptive_min_interval_ms); +DECLARE_mInt64(file_cache_block_lru_update_adaptive_max_batch); +DECLARE_mInt64(file_cache_block_lru_update_lock_slice_batch); DECLARE_mBool(enable_evaluate_shadow_queue_diff); DECLARE_mBool(file_cache_enable_only_warm_up_idx); diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 78cf530ec2d982..c216638ccdc390 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -138,6 +138,92 @@ size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const { return std::hash {}(ptr)&kShardMask; } +namespace { + +struct QueueConsumePlan { + int64_t interval_ms = 0; + size_t batch_limit = 0; +}; + +int64_t positive_or_default(int64_t value, int64_t default_value) { + return value > 0 ? value : default_value; +} + +size_t positive_size_or_default(int64_t value, size_t default_value) { + return value > 0 ? static_cast(value) : default_value; +} + +QueueConsumePlan build_queue_consume_plan(size_t backlog, int64_t base_interval_ms, + size_t base_batch, size_t low_watermark, + size_t high_watermark, int64_t min_interval_ms, + size_t max_batch) { + QueueConsumePlan plan; + plan.interval_ms = positive_or_default(base_interval_ms, 1); + plan.batch_limit = base_batch; + + if (backlog < low_watermark) { + return plan; + } + + const int64_t min_interval = positive_or_default(min_interval_ms, 1); + const size_t capped_max_batch = std::max(max_batch, 1); + if (backlog < high_watermark) { + plan.interval_ms = std::max(min_interval, plan.interval_ms / 2); + plan.batch_limit = std::min(capped_max_batch, std::max(base_batch * 2, 1)); + return plan; + } + + plan.interval_ms = min_interval; + plan.batch_limit = capped_max_batch; + return plan; +} + +QueueConsumePlan build_lru_log_replay_plan(size_t backlog) { + if (!config::enable_file_cache_adaptive_queue_consume) { + return {positive_or_default(config::file_cache_background_lru_log_replay_interval_ms, 1), + 0}; + } + const auto base_interval = + positive_or_default(config::file_cache_background_lru_log_replay_interval_ms, 1); + const auto max_batch = positive_size_or_default( + config::file_cache_lru_log_replay_adaptive_max_batch_per_type, 1); + if (backlog < static_cast(std::max( + config::file_cache_lru_log_replay_adaptive_low_watermark, 0))) { + return {base_interval, 0}; + } + return build_queue_consume_plan( + backlog, base_interval, std::max(max_batch / 4, 1), + static_cast( + std::max(config::file_cache_lru_log_replay_adaptive_low_watermark, 0)), + static_cast(std::max( + config::file_cache_lru_log_replay_adaptive_high_watermark, 0)), + config::file_cache_lru_log_replay_adaptive_min_interval_ms, max_batch); +} + +QueueConsumePlan build_block_lru_update_plan(size_t backlog) { + const auto base_interval = + positive_or_default(config::file_cache_background_block_lru_update_interval_ms, 1); + const size_t base_batch = + std::max(config::file_cache_background_block_lru_update_qps_limit, 0) * + static_cast(base_interval) / 1000; + if (base_batch == 0) { + return {base_interval, 0}; + } + if (!config::enable_file_cache_adaptive_queue_consume) { + return {base_interval, base_batch}; + } + return build_queue_consume_plan( + backlog, base_interval, base_batch, + static_cast(std::max( + config::file_cache_block_lru_update_adaptive_low_watermark, 0)), + static_cast(std::max( + config::file_cache_block_lru_update_adaptive_high_watermark, 0)), + config::file_cache_block_lru_update_adaptive_min_interval_ms, + positive_size_or_default(config::file_cache_block_lru_update_adaptive_max_batch, 1)); +} + +} // namespace + BlockFileCache::BlockFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings) : _cache_base_path(cache_base_path), @@ -348,6 +434,16 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _cache_base_path.c_str(), "file_cache_recycle_keys_length"); _need_update_lru_blocks_length_recorder = std::make_shared( _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length"); + _lru_recorder_ttl_log_queue_length_recorder = std::make_shared( + _cache_base_path.c_str(), "file_cache_lru_recorder_ttl_log_queue_length"); + _lru_recorder_index_log_queue_length_recorder = std::make_shared( + _cache_base_path.c_str(), "file_cache_lru_recorder_index_log_queue_length"); + _lru_recorder_normal_log_queue_length_recorder = std::make_shared( + _cache_base_path.c_str(), "file_cache_lru_recorder_normal_log_queue_length"); + _lru_recorder_disposable_log_queue_length_recorder = std::make_shared( + _cache_base_path.c_str(), "file_cache_lru_recorder_disposable_log_queue_length"); + _lru_recorder_total_log_queue_length_recorder = std::make_shared( + _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_length"); _update_lru_blocks_latency_us = std::make_shared( _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us"); _ttl_gc_latency_us = std::make_shared(_cache_base_path.c_str(), @@ -382,6 +478,11 @@ UInt128Wrapper BlockFileCache::hash(const std::string& path) { return UInt128Wrapper(value); } +bool BlockFileCache::is_memory_storage() const { + DCHECK(_storage != nullptr); + return _storage->get_type() == FileCacheStorageType::MEMORY; +} + BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder( const TUniqueId& query_id, int file_cache_query_limit_percent) { SCOPED_CACHE_LOCK(_mutex, this); @@ -2090,35 +2191,46 @@ void BlockFileCache::run_background_block_lru_update() { Thread::set_self_name("run_background_block_lru_update"); std::vector batch; while (!_close) { - int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms; - size_t batch_limit = - config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000; + size_t backlog = _need_update_lru_blocks.size(); + QueueConsumePlan plan = build_block_lru_update_plan(backlog); { std::unique_lock close_lock(_close_mtx); - _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); + _close_cv.wait_for(close_lock, std::chrono::milliseconds(plan.interval_ms)); if (_close) { break; } } batch.clear(); - batch.reserve(batch_limit); - size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch); + batch.reserve(plan.batch_limit); + size_t drained = _need_update_lru_blocks.drain(plan.batch_limit, &batch); if (drained == 0) { *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); continue; } int64_t duration_ns = 0; - { - SCOPED_CACHE_LOCK(_mutex, this); - SCOPED_RAW_TIMER(&duration_ns); - for (auto& block : batch) { - update_block_lru(block, cache_lock); + const size_t slice_batch = positive_size_or_default( + config::file_cache_block_lru_update_lock_slice_batch, drained); + for (size_t begin = 0; begin < batch.size(); begin += slice_batch) { + const size_t end = std::min(begin + slice_batch, batch.size()); + { + SCOPED_CACHE_LOCK(_mutex, this); + SCOPED_RAW_TIMER(&duration_ns); + for (size_t i = begin; i < end; ++i) { + update_block_lru(batch[i], cache_lock); + } } } *_update_lru_blocks_latency_us << (duration_ns / 1000); *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + if (backlog >= static_cast(std::max( + config::file_cache_block_lru_update_adaptive_high_watermark, 0))) { + LOG_EVERY_N(WARNING, 60) + << "need_update_lru_blocks backlog is high, backlog=" << backlog + << " drained=" << drained << " interval_ms=" << plan.interval_ms + << " batch_limit=" << plan.batch_limit; + } } } @@ -2314,19 +2426,29 @@ void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) { void BlockFileCache::run_background_lru_log_replay() { Thread::set_self_name("run_background_lru_log_replay"); while (!_close) { - int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms; + size_t backlog = _lru_recorder->get_total_lru_log_queue_size(); + QueueConsumePlan plan = build_lru_log_replay_plan(backlog); { std::unique_lock close_lock(_close_mtx); - _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); + _close_cv.wait_for(close_lock, std::chrono::milliseconds(plan.interval_ms)); if (_close) { break; } } - _lru_recorder->replay_queue_event(FileCacheType::TTL); - _lru_recorder->replay_queue_event(FileCacheType::INDEX); - _lru_recorder->replay_queue_event(FileCacheType::NORMAL); - _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE); + record_lru_recorder_log_queue_length(); + size_t drained = 0; + drained += _lru_recorder->replay_queue_event(FileCacheType::TTL, plan.batch_limit); + drained += _lru_recorder->replay_queue_event(FileCacheType::INDEX, plan.batch_limit); + drained += _lru_recorder->replay_queue_event(FileCacheType::NORMAL, plan.batch_limit); + drained += _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE, plan.batch_limit); + record_lru_recorder_log_queue_length(); + if (backlog >= static_cast(std::max( + config::file_cache_lru_log_replay_adaptive_high_watermark, 0))) { + LOG_EVERY_N(WARNING, 60) + << "lru recorder backlog is high, backlog=" << backlog << " drained=" << drained + << " interval_ms=" << plan.interval_ms << " batch_limit=" << plan.batch_limit; + } if (config::enable_evaluate_shadow_queue_diff) { SCOPED_CACHE_LOCK(_mutex, this); @@ -2338,6 +2460,18 @@ void BlockFileCache::run_background_lru_log_replay() { } } +void BlockFileCache::record_lru_recorder_log_queue_length() { + const auto ttl = _lru_recorder->get_lru_log_queue_size(FileCacheType::TTL); + const auto index = _lru_recorder->get_lru_log_queue_size(FileCacheType::INDEX); + const auto normal = _lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL); + const auto disposable = _lru_recorder->get_lru_log_queue_size(FileCacheType::DISPOSABLE); + *_lru_recorder_ttl_log_queue_length_recorder << ttl; + *_lru_recorder_index_log_queue_length_recorder << index; + *_lru_recorder_normal_log_queue_length_recorder << normal; + *_lru_recorder_disposable_log_queue_length_recorder << disposable; + *_lru_recorder_total_log_queue_length_recorder << (ttl + index + normal + disposable); +} + void BlockFileCache::dump_lru_queues(bool force) { std::unique_lock dump_lock(_dump_lru_queues_mtx); if (config::file_cache_background_lru_dump_tail_record_num > 0 && diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index bc7ec7ce4c27a1..9d6f99809326cb 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -286,6 +286,8 @@ class BlockFileCache { [[nodiscard]] bool get_async_open_success() const { return _async_open_done; } + [[nodiscard]] bool is_memory_storage() const; + BlockFileCache& operator=(const BlockFileCache&) = delete; BlockFileCache(const BlockFileCache&) = delete; @@ -470,6 +472,7 @@ class BlockFileCache { void restore_lru_queues_from_disk(std::lock_guard& cache_lock); void run_background_evict_in_advance(); void run_background_block_lru_update(); + void record_lru_recorder_log_queue_length(); bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type, std::vector other_cache_types, @@ -613,6 +616,11 @@ class BlockFileCache { std::shared_ptr _recycle_keys_length_recorder; std::shared_ptr _update_lru_blocks_latency_us; std::shared_ptr _need_update_lru_blocks_length_recorder; + std::shared_ptr _lru_recorder_ttl_log_queue_length_recorder; + std::shared_ptr _lru_recorder_index_log_queue_length_recorder; + std::shared_ptr _lru_recorder_normal_log_queue_length_recorder; + std::shared_ptr _lru_recorder_disposable_log_queue_length_recorder; + std::shared_ptr _lru_recorder_total_log_queue_length_recorder; std::shared_ptr _ttl_gc_latency_us; std::shared_ptr _shadow_queue_levenshtein_distance; diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp index 9907e58cb2a607..20a363f346b044 100644 --- a/be/src/io/cache/lru_queue_recorder.cpp +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -17,6 +17,7 @@ #include "io/cache/lru_queue_recorder.h" +#include "common/config.h" #include "io/cache/block_file_cache.h" #include "io/cache/file_cache_common.h" @@ -25,19 +26,24 @@ namespace doris::io { void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType log_type, const UInt128Wrapper hash, const size_t offset, const size_t size) { + if (_mgr->is_memory_storage() || config::file_cache_background_lru_dump_tail_record_num <= 0) { + return; + } CacheLRULogQueue& log_queue = get_lru_log_queue(type); log_queue.enqueue(std::make_unique(log_type, hash, offset, size)); ++(_lru_queue_update_cnt_from_last_dump[type]); } -void LRUQueueRecorder::replay_queue_event(FileCacheType type) { +size_t LRUQueueRecorder::replay_queue_event(FileCacheType type, size_t max_events) { // we don't need the real cache lock for the shadow queue, but we do need a lock to prevent read/write contension CacheLRULogQueue& log_queue = get_lru_log_queue(type); LRUQueue& shadow_queue = get_shadow_queue(type); std::lock_guard lru_log_lock(_mutex_lru_log); std::unique_ptr log; - while (log_queue.try_dequeue(log)) { + size_t replayed = 0; + while ((max_events == 0 || replayed < max_events) && log_queue.try_dequeue(log)) { + ++replayed; try { switch (log->type) { case CacheLRULogType::ADD: { @@ -79,6 +85,7 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) { LOG(WARNING) << "Failed to replay queue event: " << e.what(); } } + return replayed; } // we evaluate the diff between two queue by calculate how many operation is @@ -137,4 +144,15 @@ void LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump(FileCacheType t _lru_queue_update_cnt_from_last_dump[type] = 0; } +size_t LRUQueueRecorder::get_lru_log_queue_size(FileCacheType type) { + return get_lru_log_queue(type).size_approx(); +} + +size_t LRUQueueRecorder::get_total_lru_log_queue_size() { + return get_lru_log_queue_size(FileCacheType::TTL) + + get_lru_log_queue_size(FileCacheType::INDEX) + + get_lru_log_queue_size(FileCacheType::NORMAL) + + get_lru_log_queue_size(FileCacheType::DISPOSABLE); +} + } // end of namespace doris::io diff --git a/be/src/io/cache/lru_queue_recorder.h b/be/src/io/cache/lru_queue_recorder.h index 5bd68b70d555f9..99fb40ea3d7523 100644 --- a/be/src/io/cache/lru_queue_recorder.h +++ b/be/src/io/cache/lru_queue_recorder.h @@ -57,16 +57,17 @@ class LRUQueueRecorder { } void record_queue_event(FileCacheType type, CacheLRULogType log_type, const UInt128Wrapper hash, const size_t offset, const size_t size); - void replay_queue_event(FileCacheType type); + size_t replay_queue_event(FileCacheType type, size_t max_events = 0); void evaluate_queue_diff(LRUQueue& base, std::string name, std::lock_guard& cache_lock); size_t get_lru_queue_update_cnt_from_last_dump(FileCacheType type); void reset_lru_queue_update_cnt_from_last_dump(FileCacheType type); + size_t get_lru_log_queue_size(FileCacheType type); + size_t get_total_lru_log_queue_size(); CacheLRULogQueue& get_lru_log_queue(FileCacheType type); LRUQueue& get_shadow_queue(FileCacheType type); -public: std::mutex _mutex_lru_log; private: diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp b/be/test/io/cache/block_file_cache_test_lru_dump.cpp index a5d06b5abbc3a2..99253f50d34ed5 100644 --- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp +++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp @@ -19,14 +19,20 @@ // and modified by Doris #include "io/cache/block_file_cache_test_common.h" +#include "util/defer_op.h" namespace doris::io { TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_tail_record_num {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; config::enable_evict_file_cache_in_advance = false; config::file_cache_enter_disk_resource_limit_mode_percent = 99; config::file_cache_background_lru_dump_interval_ms = 3000; config::file_cache_background_lru_dump_update_cnt_threshold = 0; + config::file_cache_background_lru_dump_tail_record_num = 5000000; if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -156,10 +162,11 @@ TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 500000); // all queue are filled, let's check the lru log records - ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5); - ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 5); - ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 5); - ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::TTL), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::INDEX), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::DISPOSABLE), 5); + ASSERT_EQ(cache._lru_recorder->get_total_lru_log_queue_size(), 20); // then check the log replay std::this_thread::sleep_for(std::chrono::milliseconds( @@ -175,10 +182,10 @@ TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { context2); // move index queue 3rd element to the end cache.remove_if_cached(key3); // remove all element from ttl queue } - ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5); - ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 1); - ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 0); - ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 0); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::TTL), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::INDEX), 1); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 0); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::DISPOSABLE), 0); std::this_thread::sleep_for(std::chrono::milliseconds( 2 * config::file_cache_background_lru_log_replay_interval_ms)); @@ -401,11 +408,58 @@ TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { } } +TEST_F(BlockFileCacheTest, test_lru_log_replay_bound_and_disable_record) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_tail_record_num {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; + + io::FileCacheSettings settings; + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 5000000; + settings.query_queue_elements = 50000; + settings.index_queue_size = 5000000; + settings.index_queue_elements = 50000; + settings.disposable_queue_size = 5000000; + settings.disposable_queue_elements = 50000; + settings.capacity = 20000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + config::file_cache_background_lru_dump_tail_record_num = 100; + io::BlockFileCache cache(cache_base_path, settings); + auto hash = io::BlockFileCache::hash("bounded-replay-key"); + + for (size_t i = 0; i < 5; ++i) { + cache._lru_recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, + i * 100, 100); + } + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 5); + ASSERT_EQ(cache._lru_recorder->replay_queue_event(FileCacheType::NORMAL, 2), 2); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 3); + ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 2); + + ASSERT_EQ(cache._lru_recorder->replay_queue_event(FileCacheType::NORMAL), 3); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 0); + ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 5); + + config::file_cache_background_lru_dump_tail_record_num = 0; + cache._lru_recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 600, + 100); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 0); +} + TEST_F(BlockFileCacheTest, test_lru_duplicate_queue_entry_restore) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_tail_record_num {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; config::enable_evict_file_cache_in_advance = false; config::file_cache_enter_disk_resource_limit_mode_percent = 99; config::file_cache_background_lru_dump_interval_ms = 3000; config::file_cache_background_lru_dump_update_cnt_threshold = 0; + config::file_cache_background_lru_dump_tail_record_num = 5000000; if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } From 97a715b6d34747dea06b7c43a4c66fb192458afa Mon Sep 17 00:00:00 2001 From: zhengyu Date: Mon, 1 Jun 2026 22:36:11 +0800 Subject: [PATCH 3/6] simplify metrics and configs Signed-off-by: zhengyu --- be/src/common/config.cpp | 10 --- be/src/common/config.h | 10 --- be/src/io/cache/block_file_cache.cpp | 98 ++++++++++------------------ be/src/io/cache/block_file_cache.h | 10 +-- 4 files changed, 38 insertions(+), 90 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1e41d5b5a4384e..7b6db1636234aa 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1246,16 +1246,6 @@ DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000"); DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000"); DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000"); DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000"); -DEFINE_mBool(enable_file_cache_adaptive_queue_consume, "true"); -DEFINE_mInt64(file_cache_lru_log_replay_adaptive_low_watermark, "100000"); -DEFINE_mInt64(file_cache_lru_log_replay_adaptive_high_watermark, "20000000"); -DEFINE_mInt64(file_cache_lru_log_replay_adaptive_min_interval_ms, "50"); -DEFINE_mInt64(file_cache_lru_log_replay_adaptive_max_batch_per_type, "100000"); -DEFINE_mInt64(file_cache_block_lru_update_adaptive_low_watermark, "10000"); -DEFINE_mInt64(file_cache_block_lru_update_adaptive_high_watermark, "100000"); -DEFINE_mInt64(file_cache_block_lru_update_adaptive_min_interval_ms, "200"); -DEFINE_mInt64(file_cache_block_lru_update_adaptive_max_batch, "10000"); -DEFINE_mInt64(file_cache_block_lru_update_lock_slice_batch, "1000"); DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false"); DEFINE_mBool(file_cache_enable_only_warm_up_idx, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a7adfcd7babba2..427282a4452bc4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1290,16 +1290,6 @@ DECLARE_mInt64(file_cache_background_lru_dump_interval_ms); DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold); DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num); DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms); -DECLARE_mBool(enable_file_cache_adaptive_queue_consume); -DECLARE_mInt64(file_cache_lru_log_replay_adaptive_low_watermark); -DECLARE_mInt64(file_cache_lru_log_replay_adaptive_high_watermark); -DECLARE_mInt64(file_cache_lru_log_replay_adaptive_min_interval_ms); -DECLARE_mInt64(file_cache_lru_log_replay_adaptive_max_batch_per_type); -DECLARE_mInt64(file_cache_block_lru_update_adaptive_low_watermark); -DECLARE_mInt64(file_cache_block_lru_update_adaptive_high_watermark); -DECLARE_mInt64(file_cache_block_lru_update_adaptive_min_interval_ms); -DECLARE_mInt64(file_cache_block_lru_update_adaptive_max_batch); -DECLARE_mInt64(file_cache_block_lru_update_lock_slice_batch); DECLARE_mBool(enable_evaluate_shadow_queue_diff); DECLARE_mBool(file_cache_enable_only_warm_up_idx); diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index c216638ccdc390..9ad0e1424933a1 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -145,14 +145,21 @@ struct QueueConsumePlan { size_t batch_limit = 0; }; +constexpr size_t kLruLogReplayAdaptiveLowWatermark = 50'000; +constexpr size_t kLruLogReplayAdaptiveHighWatermark = 300'000; +constexpr int64_t kLruLogReplayAdaptiveMinIntervalMs = 100; +constexpr size_t kLruLogReplayAdaptiveMaxBatchPerType = 25'000; + +constexpr size_t kBlockLruUpdateAdaptiveLowWatermark = 10'000; +constexpr size_t kBlockLruUpdateAdaptiveHighWatermark = 50'000; +constexpr int64_t kBlockLruUpdateAdaptiveMinIntervalMs = 500; +constexpr size_t kBlockLruUpdateAdaptiveMaxBatch = 10'000; +constexpr size_t kBlockLruUpdateLockSliceBatch = 500; + int64_t positive_or_default(int64_t value, int64_t default_value) { return value > 0 ? value : default_value; } -size_t positive_size_or_default(int64_t value, size_t default_value) { - return value > 0 ? static_cast(value) : default_value; -} - QueueConsumePlan build_queue_consume_plan(size_t backlog, int64_t base_interval_ms, size_t base_batch, size_t low_watermark, size_t high_watermark, int64_t min_interval_ms, @@ -179,25 +186,15 @@ QueueConsumePlan build_queue_consume_plan(size_t backlog, int64_t base_interval_ } QueueConsumePlan build_lru_log_replay_plan(size_t backlog) { - if (!config::enable_file_cache_adaptive_queue_consume) { - return {positive_or_default(config::file_cache_background_lru_log_replay_interval_ms, 1), - 0}; - } const auto base_interval = positive_or_default(config::file_cache_background_lru_log_replay_interval_ms, 1); - const auto max_batch = positive_size_or_default( - config::file_cache_lru_log_replay_adaptive_max_batch_per_type, 1); - if (backlog < static_cast(std::max( - config::file_cache_lru_log_replay_adaptive_low_watermark, 0))) { + if (backlog < kLruLogReplayAdaptiveLowWatermark) { return {base_interval, 0}; } return build_queue_consume_plan( - backlog, base_interval, std::max(max_batch / 4, 1), - static_cast( - std::max(config::file_cache_lru_log_replay_adaptive_low_watermark, 0)), - static_cast(std::max( - config::file_cache_lru_log_replay_adaptive_high_watermark, 0)), - config::file_cache_lru_log_replay_adaptive_min_interval_ms, max_batch); + backlog, base_interval, kLruLogReplayAdaptiveMaxBatchPerType / 4, + kLruLogReplayAdaptiveLowWatermark, kLruLogReplayAdaptiveHighWatermark, + kLruLogReplayAdaptiveMinIntervalMs, kLruLogReplayAdaptiveMaxBatchPerType); } QueueConsumePlan build_block_lru_update_plan(size_t backlog) { @@ -209,17 +206,10 @@ QueueConsumePlan build_block_lru_update_plan(size_t backlog) { if (base_batch == 0) { return {base_interval, 0}; } - if (!config::enable_file_cache_adaptive_queue_consume) { - return {base_interval, base_batch}; - } return build_queue_consume_plan( - backlog, base_interval, base_batch, - static_cast(std::max( - config::file_cache_block_lru_update_adaptive_low_watermark, 0)), - static_cast(std::max( - config::file_cache_block_lru_update_adaptive_high_watermark, 0)), - config::file_cache_block_lru_update_adaptive_min_interval_ms, - positive_size_or_default(config::file_cache_block_lru_update_adaptive_max_batch, 1)); + backlog, base_interval, base_batch, kBlockLruUpdateAdaptiveLowWatermark, + kBlockLruUpdateAdaptiveHighWatermark, kBlockLruUpdateAdaptiveMinIntervalMs, + kBlockLruUpdateAdaptiveMaxBatch); } } // namespace @@ -430,20 +420,12 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us"); _lru_dump_latency_us = std::make_shared( _cache_base_path.c_str(), "file_cache_lru_dump_latency_us"); - _recycle_keys_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_recycle_keys_length"); - _need_update_lru_blocks_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length"); - _lru_recorder_ttl_log_queue_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_lru_recorder_ttl_log_queue_length"); - _lru_recorder_index_log_queue_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_lru_recorder_index_log_queue_length"); - _lru_recorder_normal_log_queue_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_lru_recorder_normal_log_queue_length"); - _lru_recorder_disposable_log_queue_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_lru_recorder_disposable_log_queue_length"); - _lru_recorder_total_log_queue_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_length"); + _recycle_keys_length_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_recycle_keys_length", 0); + _need_update_lru_blocks_length_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length", 0); + _lru_recorder_log_queue_length_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_length", 0); _update_lru_blocks_latency_us = std::make_shared( _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us"); _ttl_gc_latency_us = std::make_shared(_cache_base_path.c_str(), @@ -749,7 +731,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) { if (_need_update_lru_blocks.insert(std::move(block))) { - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); } } @@ -1554,7 +1536,7 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo // but it's ok, because the rowset is stale already bool ret = _recycle_keys.enqueue(key); if (ret) [[likely]] { - *_recycle_keys_length_recorder << _recycle_keys.size_approx(); + _recycle_keys_length_metrics->set_value(_recycle_keys.size_approx()); } else { LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); int64_t duration_ns = 0; @@ -2148,7 +2130,7 @@ void BlockFileCache::run_background_gc() { } batch_count++; } - *_recycle_keys_length_recorder << _recycle_keys.size_approx(); + _recycle_keys_length_metrics->set_value(_recycle_keys.size_approx()); batch_count = 0; } } @@ -2205,13 +2187,12 @@ void BlockFileCache::run_background_block_lru_update() { batch.reserve(plan.batch_limit); size_t drained = _need_update_lru_blocks.drain(plan.batch_limit, &batch); if (drained == 0) { - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); continue; } int64_t duration_ns = 0; - const size_t slice_batch = positive_size_or_default( - config::file_cache_block_lru_update_lock_slice_batch, drained); + const size_t slice_batch = std::min(kBlockLruUpdateLockSliceBatch, drained); for (size_t begin = 0; begin < batch.size(); begin += slice_batch) { const size_t end = std::min(begin + slice_batch, batch.size()); { @@ -2223,9 +2204,8 @@ void BlockFileCache::run_background_block_lru_update() { } } *_update_lru_blocks_latency_us << (duration_ns / 1000); - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); - if (backlog >= static_cast(std::max( - config::file_cache_block_lru_update_adaptive_high_watermark, 0))) { + _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); + if (backlog >= kBlockLruUpdateAdaptiveHighWatermark) { LOG_EVERY_N(WARNING, 60) << "need_update_lru_blocks backlog is high, backlog=" << backlog << " drained=" << drained << " interval_ms=" << plan.interval_ms @@ -2307,7 +2287,7 @@ bool BlockFileCache::try_reserve_during_async_load(size_t size, void BlockFileCache::clear_need_update_lru_blocks() { _need_update_lru_blocks.clear(); - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); } void BlockFileCache::pause_ttl_manager() { @@ -2443,8 +2423,7 @@ void BlockFileCache::run_background_lru_log_replay() { drained += _lru_recorder->replay_queue_event(FileCacheType::NORMAL, plan.batch_limit); drained += _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE, plan.batch_limit); record_lru_recorder_log_queue_length(); - if (backlog >= static_cast(std::max( - config::file_cache_lru_log_replay_adaptive_high_watermark, 0))) { + if (backlog >= kLruLogReplayAdaptiveHighWatermark) { LOG_EVERY_N(WARNING, 60) << "lru recorder backlog is high, backlog=" << backlog << " drained=" << drained << " interval_ms=" << plan.interval_ms << " batch_limit=" << plan.batch_limit; @@ -2461,15 +2440,8 @@ void BlockFileCache::run_background_lru_log_replay() { } void BlockFileCache::record_lru_recorder_log_queue_length() { - const auto ttl = _lru_recorder->get_lru_log_queue_size(FileCacheType::TTL); - const auto index = _lru_recorder->get_lru_log_queue_size(FileCacheType::INDEX); - const auto normal = _lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL); - const auto disposable = _lru_recorder->get_lru_log_queue_size(FileCacheType::DISPOSABLE); - *_lru_recorder_ttl_log_queue_length_recorder << ttl; - *_lru_recorder_index_log_queue_length_recorder << index; - *_lru_recorder_normal_log_queue_length_recorder << normal; - *_lru_recorder_disposable_log_queue_length_recorder << disposable; - *_lru_recorder_total_log_queue_length_recorder << (ttl + index + normal + disposable); + _lru_recorder_log_queue_length_metrics->set_value( + _lru_recorder->get_total_lru_log_queue_size()); } void BlockFileCache::dump_lru_queues(bool force) { diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index 9d6f99809326cb..7b50ab88eef823 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -613,14 +613,10 @@ class BlockFileCache { std::shared_ptr _storage_retry_sync_remove_latency_us; std::shared_ptr _storage_async_remove_latency_us; std::shared_ptr _evict_in_advance_latency_us; - std::shared_ptr _recycle_keys_length_recorder; + std::shared_ptr> _recycle_keys_length_metrics; std::shared_ptr _update_lru_blocks_latency_us; - std::shared_ptr _need_update_lru_blocks_length_recorder; - std::shared_ptr _lru_recorder_ttl_log_queue_length_recorder; - std::shared_ptr _lru_recorder_index_log_queue_length_recorder; - std::shared_ptr _lru_recorder_normal_log_queue_length_recorder; - std::shared_ptr _lru_recorder_disposable_log_queue_length_recorder; - std::shared_ptr _lru_recorder_total_log_queue_length_recorder; + std::shared_ptr> _need_update_lru_blocks_length_metrics; + std::shared_ptr> _lru_recorder_log_queue_length_metrics; std::shared_ptr _ttl_gc_latency_us; std::shared_ptr _shadow_queue_levenshtein_distance; From 49f486626a8462bcea371b08f652910716e3aeed Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 2 Jun 2026 16:33:47 +0800 Subject: [PATCH 4/6] fix file cache lru dump pending replay --- be/src/io/cache/cache_lru_dumper.cpp | 65 ++++++++++++++----- be/src/io/cache/cache_lru_dumper.h | 9 ++- be/src/io/cache/lru_queue_recorder.cpp | 27 +++++++- be/src/io/cache/lru_queue_recorder.h | 8 +++ be/test/io/cache/cache_lru_dumper_test.cpp | 74 +++++++++++++++++++++- 5 files changed, 161 insertions(+), 22 deletions(-) diff --git a/be/src/io/cache/cache_lru_dumper.cpp b/be/src/io/cache/cache_lru_dumper.cpp index 43275f5069e614..ea3474388d753f 100644 --- a/be/src/io/cache/cache_lru_dumper.cpp +++ b/be/src/io/cache/cache_lru_dumper.cpp @@ -298,25 +298,53 @@ void CacheLRUDumper::dump_queue(const std::string& queue_name, bool force) { if (force || _recorder->get_lru_queue_update_cnt_from_last_dump(type) > config::file_cache_background_lru_dump_update_cnt_threshold) { LRUQueue& queue = _recorder->get_shadow_queue(type); - do_dump_queue(queue, queue_name); - _recorder->reset_lru_queue_update_cnt_from_last_dump(type); + std::vector elements; + size_t dump_update_cnt = 0; + { + std::lock_guard lru_log_lock(_recorder->_mutex_lru_log); + // Drain logs counted for this dump before updating the counter, otherwise restore may + // see a stale LRU tail after a crash. + _recorder->replay_queue_event_locked(type, 0, lru_log_lock); + elements = collect_lru_queue_entries_locked(queue, lru_log_lock); + dump_update_cnt = + _recorder->get_lru_queue_update_cnt_from_last_dump_locked(type, lru_log_lock); + } + Status st = do_dump_queue(elements, queue_name); + if (st.ok()) { + std::lock_guard lru_log_lock(_recorder->_mutex_lru_log); + _recorder->subtract_lru_queue_update_cnt_from_last_dump_locked(type, dump_update_cnt, + lru_log_lock); + } else { + LOG(WARNING) << "failed to dump lru queue " << queue_name << ": " << st; + } } } -void CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_name) { - Status st; - std::vector> elements; - elements.reserve(config::file_cache_background_lru_dump_tail_record_num); - +Status CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_name) { + std::vector elements; { std::lock_guard lru_log_lock(_recorder->_mutex_lru_log); - size_t count = 0; - for (const auto& [hash, offset, size] : queue) { - if (count++ >= config::file_cache_background_lru_dump_tail_record_num) break; - elements.emplace_back(hash, offset, size); + elements = collect_lru_queue_entries_locked(queue, lru_log_lock); + } + return do_dump_queue(elements, queue_name); +}; + +std::vector CacheLRUDumper::collect_lru_queue_entries_locked( + LRUQueue& queue, std::lock_guard& /* lru_log_lock */) { + std::vector elements; + elements.reserve(config::file_cache_background_lru_dump_tail_record_num); + size_t count = 0; + for (const auto& [hash, offset, size] : queue) { + if (count++ >= config::file_cache_background_lru_dump_tail_record_num) { + break; } + elements.emplace_back(hash, offset, size); } + return elements; +} +Status CacheLRUDumper::do_dump_queue(const std::vector& elements, + const std::string& queue_name) { // Write to disk int64_t duration_ns = 0; std::uintmax_t file_size = 0; @@ -330,19 +358,22 @@ void CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_nam if (out) { LOG(INFO) << "begin dump " << queue_name << " with " << elements.size() << " elements"; for (const auto& [hash, offset, size] : elements) { - RETURN_IF_STATUS_ERROR(st, - dump_one_lru_entry(out, tmp_filename, hash, offset, size)); + RETURN_IF_ERROR(dump_one_lru_entry(out, tmp_filename, hash, offset, size)); } - RETURN_IF_STATUS_ERROR(st, finalize_dump(out, elements.size(), tmp_filename, - final_filename, file_size)); + RETURN_IF_ERROR( + finalize_dump(out, elements.size(), tmp_filename, final_filename, file_size)); } else { - LOG(WARNING) << "open lru dump file failed, reason: " << tmp_filename - << " failed to create"; + std::string warn_msg = + fmt::format("open lru dump file failed, file={} failed to create", + tmp_filename); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); } } *(_mgr->_lru_dump_latency_us) << (duration_ns / 1000); LOG(INFO) << fmt::format("lru dump for {} size={} element={} time={}us", queue_name, file_size, elements.size(), duration_ns / 1000); + return Status::OK(); }; Status CacheLRUDumper::parse_dump_footer(std::ifstream& in, std::string& filename, diff --git a/be/src/io/cache/cache_lru_dumper.h b/be/src/io/cache/cache_lru_dumper.h index 8074ee334a1934..02418fbda2d557 100644 --- a/be/src/io/cache/cache_lru_dumper.h +++ b/be/src/io/cache/cache_lru_dumper.h @@ -40,6 +40,8 @@ class LRUQueueRecorder; class CacheLRUDumper { public: + using LruDumpEntry = std::tuple; + CacheLRUDumper(BlockFileCache* mgr, LRUQueueRecorder* recorder) : _mgr(mgr), _recorder(recorder) { auto now = std::chrono::system_clock::now(); @@ -56,7 +58,10 @@ class CacheLRUDumper { void set_first_dump_done() { _is_first_dump = false; } private: - void do_dump_queue(LRUQueue& queue, const std::string& queue_name); + Status do_dump_queue(LRUQueue& queue, const std::string& queue_name); + Status do_dump_queue(const std::vector& elements, const std::string& queue_name); + std::vector collect_lru_queue_entries_locked( + LRUQueue& queue, std::lock_guard& lru_log_lock); Status check_ofstream_status(std::ofstream& out, std::string& filename); Status check_ifstream_status(std::ifstream& in, std::string& filename); Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const UInt128Wrapper& hash, @@ -94,4 +99,4 @@ class CacheLRUDumper { std::string _start_time; bool _is_first_dump = true; }; -} // namespace doris::io \ No newline at end of file +} // namespace doris::io diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp index 20a363f346b044..9c9a969a019d54 100644 --- a/be/src/io/cache/lru_queue_recorder.cpp +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -29,6 +29,7 @@ void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType lo if (_mgr->is_memory_storage() || config::file_cache_background_lru_dump_tail_record_num <= 0) { return; } + std::lock_guard lru_log_lock(_mutex_lru_log); CacheLRULogQueue& log_queue = get_lru_log_queue(type); log_queue.enqueue(std::make_unique(log_type, hash, offset, size)); ++(_lru_queue_update_cnt_from_last_dump[type]); @@ -36,10 +37,16 @@ void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType lo size_t LRUQueueRecorder::replay_queue_event(FileCacheType type, size_t max_events) { // we don't need the real cache lock for the shadow queue, but we do need a lock to prevent read/write contension + std::lock_guard lru_log_lock(_mutex_lru_log); + return replay_queue_event_locked(type, max_events, lru_log_lock); +} + +size_t LRUQueueRecorder::replay_queue_event_locked( + FileCacheType type, size_t max_events, std::lock_guard& lru_log_lock) { + // we don't need the real cache lock for the shadow queue, but we do need a lock to prevent read/write contension CacheLRULogQueue& log_queue = get_lru_log_queue(type); LRUQueue& shadow_queue = get_shadow_queue(type); - std::lock_guard lru_log_lock(_mutex_lru_log); std::unique_ptr log; size_t replayed = 0; while ((max_events == 0 || replayed < max_events) && log_queue.try_dequeue(log)) { @@ -137,13 +144,31 @@ CacheLRULogQueue& LRUQueueRecorder::get_lru_log_queue(FileCacheType type) { } size_t LRUQueueRecorder::get_lru_queue_update_cnt_from_last_dump(FileCacheType type) { + std::lock_guard lru_log_lock(_mutex_lru_log); + return get_lru_queue_update_cnt_from_last_dump_locked(type, lru_log_lock); +} + +size_t LRUQueueRecorder::get_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, std::lock_guard& /* lru_log_lock */) { return _lru_queue_update_cnt_from_last_dump[type]; } void LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump(FileCacheType type) { + std::lock_guard lru_log_lock(_mutex_lru_log); + reset_lru_queue_update_cnt_from_last_dump_locked(type, lru_log_lock); +} + +void LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, std::lock_guard& /* lru_log_lock */) { _lru_queue_update_cnt_from_last_dump[type] = 0; } +void LRUQueueRecorder::subtract_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, size_t count, std::lock_guard& /* lru_log_lock */) { + auto& update_cnt = _lru_queue_update_cnt_from_last_dump[type]; + update_cnt = count >= update_cnt ? 0 : update_cnt - count; +} + size_t LRUQueueRecorder::get_lru_log_queue_size(FileCacheType type) { return get_lru_log_queue(type).size_approx(); } diff --git a/be/src/io/cache/lru_queue_recorder.h b/be/src/io/cache/lru_queue_recorder.h index 99fb40ea3d7523..d54d444e5b6a25 100644 --- a/be/src/io/cache/lru_queue_recorder.h +++ b/be/src/io/cache/lru_queue_recorder.h @@ -58,10 +58,18 @@ class LRUQueueRecorder { void record_queue_event(FileCacheType type, CacheLRULogType log_type, const UInt128Wrapper hash, const size_t offset, const size_t size); size_t replay_queue_event(FileCacheType type, size_t max_events = 0); + size_t replay_queue_event_locked(FileCacheType type, size_t max_events, + std::lock_guard& lru_log_lock); void evaluate_queue_diff(LRUQueue& base, std::string name, std::lock_guard& cache_lock); size_t get_lru_queue_update_cnt_from_last_dump(FileCacheType type); + size_t get_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, std::lock_guard& lru_log_lock); void reset_lru_queue_update_cnt_from_last_dump(FileCacheType type); + void reset_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, std::lock_guard& lru_log_lock); + void subtract_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, size_t count, std::lock_guard& lru_log_lock); size_t get_lru_log_queue_size(FileCacheType type); size_t get_total_lru_log_queue_size(); diff --git a/be/test/io/cache/cache_lru_dumper_test.cpp b/be/test/io/cache/cache_lru_dumper_test.cpp index 76647ba544fa05..8ae0efcb299f7b 100644 --- a/be/test/io/cache/cache_lru_dumper_test.cpp +++ b/be/test/io/cache/cache_lru_dumper_test.cpp @@ -21,9 +21,11 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "common/config.h" #include "io/cache/block_file_cache.h" #include "io/cache/file_block.h" #include "io/cache/file_cache_common.h" +#include "util/defer_op.h" using ::testing::_; using ::testing::Return; @@ -140,7 +142,7 @@ TEST_F(CacheLRUDumperTest, test_dump_and_restore_queue) { src_queue.add(hash, offset, size, lock); // Test dump - dumper->do_dump_queue(src_queue, queue_name); + ASSERT_TRUE(dumper->do_dump_queue(src_queue, queue_name).ok()); // Test restore std::lock_guard cache_lock(mock_cache->mutex()); @@ -158,4 +160,72 @@ TEST_F(CacheLRUDumperTest, test_dump_and_restore_queue) { } } -} // namespace doris::io \ No newline at end of file +TEST_F(CacheLRUDumperTest, test_dump_queue_drains_pending_logs_before_reset_counter) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + auto origin_update_cnt_threshold = config::file_cache_background_lru_dump_update_cnt_threshold; + Defer restore_config {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + config::file_cache_background_lru_dump_update_cnt_threshold = origin_update_cnt_threshold; + }}; + config::file_cache_background_lru_dump_tail_record_num = 100; + config::file_cache_background_lru_dump_update_cnt_threshold = 0; + + UInt128Wrapper hash(987654321ULL); + recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 2048, 8192); + ASSERT_EQ(recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 1); + ASSERT_EQ(recorder->get_shadow_queue(FileCacheType::NORMAL).get_elements_num_unsafe(), 0); + ASSERT_EQ(recorder->get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 1); + + dumper->dump_queue("normal", false); + + EXPECT_EQ(recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 0); + EXPECT_EQ(recorder->get_shadow_queue(FileCacheType::NORMAL).get_elements_num_unsafe(), 1); + EXPECT_EQ(recorder->get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 0); + + std::lock_guard cache_lock(mock_cache->mutex()); + dumper->restore_queue(dst_queue, "normal", cache_lock); + ASSERT_EQ(dst_queue.get_elements_num_unsafe(), 1); + auto it = dst_queue.begin(); + EXPECT_EQ(it->hash, hash); + EXPECT_EQ(it->offset, 2048); + EXPECT_EQ(it->size, 8192); +} + +TEST_F(CacheLRUDumperTest, test_dump_counter_subtract_keeps_new_updates) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_config {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; + config::file_cache_background_lru_dump_tail_record_num = 100; + + UInt128Wrapper hash(123123123ULL); + recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 4096, 16384); + + size_t dump_update_cnt = 0; + std::vector elements; + { + std::lock_guard lru_log_lock(recorder->_mutex_lru_log); + recorder->replay_queue_event_locked(FileCacheType::NORMAL, 0, lru_log_lock); + elements = dumper->collect_lru_queue_entries_locked( + recorder->get_shadow_queue(FileCacheType::NORMAL), lru_log_lock); + dump_update_cnt = recorder->get_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType::NORMAL, lru_log_lock); + } + + recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::MOVETOBACK, hash, 4096, + 16384); + ASSERT_EQ(recorder->get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), + dump_update_cnt + 1); + + ASSERT_TRUE(dumper->do_dump_queue(elements, "normal").ok()); + { + std::lock_guard lru_log_lock(recorder->_mutex_lru_log); + recorder->subtract_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType::NORMAL, dump_update_cnt, lru_log_lock); + } + + EXPECT_EQ(recorder->get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 1); + EXPECT_EQ(recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 1); +} + +} // namespace doris::io From 12268c64e90d12389c7a69e3ca30dd2b2fcbcdcd Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 2 Jun 2026 16:39:06 +0800 Subject: [PATCH 5/6] fix format Signed-off-by: zhengyu --- be/src/io/cache/cache_lru_dumper.cpp | 5 ++--- be/src/io/cache/lru_queue_recorder.cpp | 4 ++-- be/test/io/cache/cache_lru_dumper_test.cpp | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/be/src/io/cache/cache_lru_dumper.cpp b/be/src/io/cache/cache_lru_dumper.cpp index ea3474388d753f..1b58efdcd2e793 100644 --- a/be/src/io/cache/cache_lru_dumper.cpp +++ b/be/src/io/cache/cache_lru_dumper.cpp @@ -363,9 +363,8 @@ Status CacheLRUDumper::do_dump_queue(const std::vector& elements, RETURN_IF_ERROR( finalize_dump(out, elements.size(), tmp_filename, final_filename, file_size)); } else { - std::string warn_msg = - fmt::format("open lru dump file failed, file={} failed to create", - tmp_filename); + std::string warn_msg = fmt::format( + "open lru dump file failed, file={} failed to create", tmp_filename); LOG(WARNING) << warn_msg; return Status::InternalError(warn_msg); } diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp index 9c9a969a019d54..d6398ba0a2de90 100644 --- a/be/src/io/cache/lru_queue_recorder.cpp +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -41,8 +41,8 @@ size_t LRUQueueRecorder::replay_queue_event(FileCacheType type, size_t max_event return replay_queue_event_locked(type, max_events, lru_log_lock); } -size_t LRUQueueRecorder::replay_queue_event_locked( - FileCacheType type, size_t max_events, std::lock_guard& lru_log_lock) { +size_t LRUQueueRecorder::replay_queue_event_locked(FileCacheType type, size_t max_events, + std::lock_guard& lru_log_lock) { // we don't need the real cache lock for the shadow queue, but we do need a lock to prevent read/write contension CacheLRULogQueue& log_queue = get_lru_log_queue(type); LRUQueue& shadow_queue = get_shadow_queue(type); diff --git a/be/test/io/cache/cache_lru_dumper_test.cpp b/be/test/io/cache/cache_lru_dumper_test.cpp index 8ae0efcb299f7b..0853bac30490bc 100644 --- a/be/test/io/cache/cache_lru_dumper_test.cpp +++ b/be/test/io/cache/cache_lru_dumper_test.cpp @@ -19,9 +19,9 @@ #include +#include "common/config.h" #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "common/config.h" #include "io/cache/block_file_cache.h" #include "io/cache/file_block.h" #include "io/cache/file_cache_common.h" From 37bc18e46f1321ba8c261c7698fd238d0b1aa77c Mon Sep 17 00:00:00 2001 From: zhengyu Date: Thu, 4 Jun 2026 00:18:57 +0800 Subject: [PATCH 6/6] bound file cache update queues --- be/src/io/cache/block_file_cache.cpp | 62 ++++++++++++++++--- be/src/io/cache/block_file_cache.h | 19 +++++- be/src/io/cache/lru_queue_recorder.cpp | 41 ++++++++++-- be/src/io/cache/lru_queue_recorder.h | 17 ++++- be/test/io/cache/cache_lru_dumper_test.cpp | 28 +++++++++ .../io/cache/need_update_lru_blocks_test.cpp | 25 ++++++++ 6 files changed, 176 insertions(+), 16 deletions(-) diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 9ad0e1424933a1..7a2306500ad06a 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -61,25 +61,47 @@ namespace doris::io { // Insert a block pointer into one shard while swallowing allocation failures. bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) { + return insert_with_result(std::move(block)) == InsertResult::INSERTED; +} + +NeedUpdateLRUBlocks::InsertResult NeedUpdateLRUBlocks::insert_with_result(FileBlockSPtr block) { if (!block) { - return false; + return InsertResult::IGNORED; } + bool reserved = false; try { auto* raw_ptr = block.get(); auto idx = shard_index(raw_ptr); auto& shard = _shards[idx]; std::lock_guard lock(shard.mutex); + if (shard.entries.contains(raw_ptr)) { + return InsertResult::DUPLICATED; + } + if (!try_reserve_slot()) { + _dropped.fetch_add(1, std::memory_order_relaxed); + return InsertResult::DROPPED; + } + reserved = true; auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block)); - if (inserted) { - _size.fetch_add(1, std::memory_order_relaxed); + if (!inserted) { + _size.fetch_sub(1, std::memory_order_relaxed); + reserved = false; + return InsertResult::DUPLICATED; } - return inserted; + reserved = false; + return InsertResult::INSERTED; } catch (const std::exception& e) { + if (reserved) { + _size.fetch_sub(1, std::memory_order_relaxed); + } LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what(); } catch (...) { + if (reserved) { + _size.fetch_sub(1, std::memory_order_relaxed); + } LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error"; } - return false; + return InsertResult::IGNORED; } // Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures. @@ -138,6 +160,17 @@ size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const { return std::hash {}(ptr)&kShardMask; } +bool NeedUpdateLRUBlocks::try_reserve_slot() { + size_t cur_size = _size.load(std::memory_order_relaxed); + while (cur_size < _hard_cap) { + if (_size.compare_exchange_weak(cur_size, cur_size + 1, std::memory_order_relaxed, + std::memory_order_relaxed)) { + return true; + } + } + return false; +} + namespace { struct QueueConsumePlan { @@ -155,6 +188,8 @@ constexpr size_t kBlockLruUpdateAdaptiveHighWatermark = 50'000; constexpr int64_t kBlockLruUpdateAdaptiveMinIntervalMs = 500; constexpr size_t kBlockLruUpdateAdaptiveMaxBatch = 10'000; constexpr size_t kBlockLruUpdateLockSliceBatch = 500; +constexpr size_t kNeedUpdateLruBlocksHardCap = 100'000; +constexpr size_t kLruRecorderLogQueueHardCap = 500'000; int64_t positive_or_default(int64_t value, int64_t default_value) { return value > 0 ? value : default_value; @@ -218,7 +253,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings) : _cache_base_path(cache_base_path), _capacity(cache_settings.capacity), - _max_file_block_size(cache_settings.max_file_block_size) { + _max_file_block_size(cache_settings.max_file_block_size), + _need_update_lru_blocks(kNeedUpdateLruBlocksHardCap) { _cur_cache_size_metrics = std::make_shared>(_cache_base_path.c_str(), "file_cache_cache_size", 0); _cache_capacity_metrics = std::make_shared>( @@ -424,8 +460,12 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _cache_base_path.c_str(), "file_cache_recycle_keys_length", 0); _need_update_lru_blocks_length_metrics = std::make_shared>( _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length", 0); + _need_update_lru_blocks_dropped_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_dropped"); _lru_recorder_log_queue_length_metrics = std::make_shared>( _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_length", 0); + _lru_recorder_log_queue_dropped_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_dropped"); _update_lru_blocks_latency_us = std::make_shared( _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us"); _ttl_gc_latency_us = std::make_shared(_cache_base_path.c_str(), @@ -442,7 +482,7 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, std::numeric_limits::max()); - _lru_recorder = std::make_unique(this); + _lru_recorder = std::make_unique(this, kLruRecorderLogQueueHardCap); _lru_dumper = std::make_unique(this, _lru_recorder.get()); if (cache_settings.storage == "memory") { _storage = std::make_unique(); @@ -730,8 +770,14 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte } void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) { - if (_need_update_lru_blocks.insert(std::move(block))) { + auto result = _need_update_lru_blocks.insert_with_result(std::move(block)); + if (result == NeedUpdateLRUBlocks::InsertResult::INSERTED) { _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); + } else if (result == NeedUpdateLRUBlocks::InsertResult::DROPPED) { + *(_need_update_lru_blocks_dropped_metrics) << 1; + LOG_EVERY_N(WARNING, 60) << "Drop block LRU update because hard cap is reached, hard_cap=" + << _need_update_lru_blocks.hard_cap() + << " queue_size=" << _need_update_lru_blocks.size(); } } diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index 7b50ab88eef823..8c87bfae2e8ebe 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -87,11 +88,20 @@ class FSFileCacheStorage; // Note that Blocks are updated in batch, internal order is not important. class NeedUpdateLRUBlocks { public: - NeedUpdateLRUBlocks() = default; + enum class InsertResult { + INSERTED, + DUPLICATED, + DROPPED, + IGNORED, + }; + + explicit NeedUpdateLRUBlocks(size_t hard_cap = std::numeric_limits::max()) + : _hard_cap(hard_cap) {} // Insert a block into the pending set. Returns true only when the block // was not already queued. Null inputs are ignored. bool insert(FileBlockSPtr block); + InsertResult insert_with_result(FileBlockSPtr block); // Drain up to `limit` unique blocks into `output`. The method returns how // many blocks were actually drained and shrinks the internal size @@ -103,6 +113,8 @@ class NeedUpdateLRUBlocks { // Thread-safe approximate size of queued unique blocks. size_t size() const { return _size.load(std::memory_order_relaxed); } + size_t dropped() const { return _dropped.load(std::memory_order_relaxed); } + size_t hard_cap() const { return _hard_cap; } private: static constexpr size_t kShardCount = 64; @@ -114,9 +126,12 @@ class NeedUpdateLRUBlocks { }; size_t shard_index(FileBlock* ptr) const; + bool try_reserve_slot(); std::array _shards; std::atomic _size {0}; + std::atomic _dropped {0}; + size_t _hard_cap; }; // The BlockFileCache is responsible for the management of the blocks @@ -616,7 +631,9 @@ class BlockFileCache { std::shared_ptr> _recycle_keys_length_metrics; std::shared_ptr _update_lru_blocks_latency_us; std::shared_ptr> _need_update_lru_blocks_length_metrics; + std::shared_ptr> _need_update_lru_blocks_dropped_metrics; std::shared_ptr> _lru_recorder_log_queue_length_metrics; + std::shared_ptr> _lru_recorder_log_queue_dropped_metrics; std::shared_ptr _ttl_gc_latency_us; std::shared_ptr _shadow_queue_levenshtein_distance; diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp index d6398ba0a2de90..8c74b1b66bafce 100644 --- a/be/src/io/cache/lru_queue_recorder.cpp +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -29,9 +29,37 @@ void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType lo if (_mgr->is_memory_storage() || config::file_cache_background_lru_dump_tail_record_num <= 0) { return; } + auto record_drop = [&]() { + _dropped_lru_log_count.fetch_add(1, std::memory_order_relaxed); + if (_mgr->_lru_recorder_log_queue_dropped_metrics) { + *(_mgr->_lru_recorder_log_queue_dropped_metrics) << 1; + } + }; std::lock_guard lru_log_lock(_mutex_lru_log); + if (_total_lru_log_queue_size >= _hard_cap) { + record_drop(); + LOG_EVERY_N(WARNING, 60) << "Drop lru recorder log because hard cap is reached, hard_cap=" + << _hard_cap << " total_queue_size=" << _total_lru_log_queue_size; + return; + } CacheLRULogQueue& log_queue = get_lru_log_queue(type); - log_queue.enqueue(std::make_unique(log_type, hash, offset, size)); + try { + if (!log_queue.enqueue(std::make_unique(log_type, hash, offset, size))) { + record_drop(); + LOG(WARNING) << "Failed to enqueue lru recorder log"; + return; + } + } catch (const std::exception& e) { + record_drop(); + LOG(WARNING) << "Failed to enqueue lru recorder log: " << e.what(); + return; + } catch (...) { + record_drop(); + LOG(WARNING) << "Failed to enqueue lru recorder log: unknown error"; + return; + } + ++_lru_log_queue_size_by_type[static_cast(type)]; + ++_total_lru_log_queue_size; ++(_lru_queue_update_cnt_from_last_dump[type]); } @@ -51,6 +79,8 @@ size_t LRUQueueRecorder::replay_queue_event_locked(FileCacheType type, size_t ma size_t replayed = 0; while ((max_events == 0 || replayed < max_events) && log_queue.try_dequeue(log)) { ++replayed; + --_lru_log_queue_size_by_type[static_cast(type)]; + --_total_lru_log_queue_size; try { switch (log->type) { case CacheLRULogType::ADD: { @@ -170,14 +200,13 @@ void LRUQueueRecorder::subtract_lru_queue_update_cnt_from_last_dump_locked( } size_t LRUQueueRecorder::get_lru_log_queue_size(FileCacheType type) { - return get_lru_log_queue(type).size_approx(); + std::lock_guard lru_log_lock(_mutex_lru_log); + return _lru_log_queue_size_by_type[static_cast(type)]; } size_t LRUQueueRecorder::get_total_lru_log_queue_size() { - return get_lru_log_queue_size(FileCacheType::TTL) + - get_lru_log_queue_size(FileCacheType::INDEX) + - get_lru_log_queue_size(FileCacheType::NORMAL) + - get_lru_log_queue_size(FileCacheType::DISPOSABLE); + std::lock_guard lru_log_lock(_mutex_lru_log); + return _total_lru_log_queue_size; } } // end of namespace doris::io diff --git a/be/src/io/cache/lru_queue_recorder.h b/be/src/io/cache/lru_queue_recorder.h index d54d444e5b6a25..17474fa6b0d7f5 100644 --- a/be/src/io/cache/lru_queue_recorder.h +++ b/be/src/io/cache/lru_queue_recorder.h @@ -19,7 +19,13 @@ #include +#include +#include #include +#include +#include +#include +#include #include "io/cache/file_cache_common.h" @@ -49,7 +55,8 @@ using CacheLRULogQueue = moodycamel::ConcurrentQueue::max()) + : _hard_cap(hard_cap), _mgr(mgr) { _lru_queue_update_cnt_from_last_dump[FileCacheType::DISPOSABLE] = 0; _lru_queue_update_cnt_from_last_dump[FileCacheType::NORMAL] = 0; _lru_queue_update_cnt_from_last_dump[FileCacheType::INDEX] = 0; @@ -72,6 +79,10 @@ class LRUQueueRecorder { FileCacheType type, size_t count, std::lock_guard& lru_log_lock); size_t get_lru_log_queue_size(FileCacheType type); size_t get_total_lru_log_queue_size(); + size_t get_dropped_lru_log_count() const { + return _dropped_lru_log_count.load(std::memory_order_relaxed); + } + size_t hard_cap() const { return _hard_cap; } CacheLRULogQueue& get_lru_log_queue(FileCacheType type); LRUQueue& get_shadow_queue(FileCacheType type); @@ -89,6 +100,10 @@ class LRUQueueRecorder { CacheLRULogQueue _normal_lru_log_queue; CacheLRULogQueue _disposable_lru_log_queue; + std::array _lru_log_queue_size_by_type {}; + std::atomic _dropped_lru_log_count {0}; + size_t _total_lru_log_queue_size = 0; + size_t _hard_cap; std::unordered_map _lru_queue_update_cnt_from_last_dump; BlockFileCache* _mgr; diff --git a/be/test/io/cache/cache_lru_dumper_test.cpp b/be/test/io/cache/cache_lru_dumper_test.cpp index 0853bac30490bc..3825cfd4bb3d3e 100644 --- a/be/test/io/cache/cache_lru_dumper_test.cpp +++ b/be/test/io/cache/cache_lru_dumper_test.cpp @@ -228,4 +228,32 @@ TEST_F(CacheLRUDumperTest, test_dump_counter_subtract_keeps_new_updates) { EXPECT_EQ(recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 1); } +TEST_F(CacheLRUDumperTest, test_lru_recorder_drops_logs_after_hard_cap) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_config {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; + config::file_cache_background_lru_dump_tail_record_num = 100; + + LRUQueueRecorder capped_recorder(mock_cache.get(), 2); + UInt128Wrapper hash(456456456ULL); + + capped_recorder.record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 0, 100); + capped_recorder.record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 100, 100); + capped_recorder.record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 200, 100); + + EXPECT_EQ(capped_recorder.get_total_lru_log_queue_size(), 2); + EXPECT_EQ(capped_recorder.get_lru_log_queue_size(FileCacheType::NORMAL), 2); + EXPECT_EQ(capped_recorder.get_dropped_lru_log_count(), 1); + EXPECT_EQ(capped_recorder.get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 2); + + EXPECT_EQ(capped_recorder.replay_queue_event(FileCacheType::NORMAL, 1), 1); + EXPECT_EQ(capped_recorder.get_total_lru_log_queue_size(), 1); + + capped_recorder.record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 300, 100); + EXPECT_EQ(capped_recorder.get_total_lru_log_queue_size(), 2); + EXPECT_EQ(capped_recorder.get_dropped_lru_log_count(), 1); + EXPECT_EQ(capped_recorder.get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 3); +} + } // namespace doris::io diff --git a/be/test/io/cache/need_update_lru_blocks_test.cpp b/be/test/io/cache/need_update_lru_blocks_test.cpp index ca9419a3351e38..3cbdb9aedb8663 100644 --- a/be/test/io/cache/need_update_lru_blocks_test.cpp +++ b/be/test/io/cache/need_update_lru_blocks_test.cpp @@ -59,6 +59,31 @@ TEST(NeedUpdateLRUBlocksTest, InsertRejectsNullAndDeduplicates) { EXPECT_EQ(1, pending.size()); } +TEST(NeedUpdateLRUBlocksTest, InsertDropsNewBlocksAfterHardCap) { + NeedUpdateLRUBlocks pending(2); + auto block0 = create_block(0); + auto block1 = create_block(1); + auto block2 = create_block(2); + + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::INSERTED, pending.insert_with_result(block0)); + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::INSERTED, pending.insert_with_result(block1)); + EXPECT_EQ(2u, pending.size()); + + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::DUPLICATED, pending.insert_with_result(block0)); + EXPECT_EQ(2u, pending.size()); + EXPECT_EQ(0u, pending.dropped()); + + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::DROPPED, pending.insert_with_result(block2)); + EXPECT_EQ(2u, pending.size()); + EXPECT_EQ(1u, pending.dropped()); + + std::vector drained; + ASSERT_EQ(1u, pending.drain(1, &drained)); + EXPECT_EQ(1u, pending.size()); + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::INSERTED, pending.insert_with_result(block2)); + EXPECT_EQ(2u, pending.size()); +} + TEST(NeedUpdateLRUBlocksTest, DrainHandlesZeroLimitAndNullOutput) { NeedUpdateLRUBlocks pending; insert_blocks(&pending, 3);