diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 3ff1526c32fc5b..7a2306500ad06a 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -61,25 +61,47 @@ namespace doris::io { // Insert a block pointer into one shard while swallowing allocation failures. bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) { + return insert_with_result(std::move(block)) == InsertResult::INSERTED; +} + +NeedUpdateLRUBlocks::InsertResult NeedUpdateLRUBlocks::insert_with_result(FileBlockSPtr block) { if (!block) { - return false; + return InsertResult::IGNORED; } + bool reserved = false; try { auto* raw_ptr = block.get(); auto idx = shard_index(raw_ptr); auto& shard = _shards[idx]; std::lock_guard lock(shard.mutex); + if (shard.entries.contains(raw_ptr)) { + return InsertResult::DUPLICATED; + } + if (!try_reserve_slot()) { + _dropped.fetch_add(1, std::memory_order_relaxed); + return InsertResult::DROPPED; + } + reserved = true; auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block)); - if (inserted) { - _size.fetch_add(1, std::memory_order_relaxed); + if (!inserted) { + _size.fetch_sub(1, std::memory_order_relaxed); + reserved = false; + return InsertResult::DUPLICATED; } - return inserted; + reserved = false; + return InsertResult::INSERTED; } catch (const std::exception& e) { + if (reserved) { + _size.fetch_sub(1, std::memory_order_relaxed); + } LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what(); } catch (...) { + if (reserved) { + _size.fetch_sub(1, std::memory_order_relaxed); + } LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error"; } - return false; + return InsertResult::IGNORED; } // Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures. @@ -138,11 +160,101 @@ size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const { return std::hash {}(ptr)&kShardMask; } +bool NeedUpdateLRUBlocks::try_reserve_slot() { + size_t cur_size = _size.load(std::memory_order_relaxed); + while (cur_size < _hard_cap) { + if (_size.compare_exchange_weak(cur_size, cur_size + 1, std::memory_order_relaxed, + std::memory_order_relaxed)) { + return true; + } + } + return false; +} + +namespace { + +struct QueueConsumePlan { + int64_t interval_ms = 0; + size_t batch_limit = 0; +}; + +constexpr size_t kLruLogReplayAdaptiveLowWatermark = 50'000; +constexpr size_t kLruLogReplayAdaptiveHighWatermark = 300'000; +constexpr int64_t kLruLogReplayAdaptiveMinIntervalMs = 100; +constexpr size_t kLruLogReplayAdaptiveMaxBatchPerType = 25'000; + +constexpr size_t kBlockLruUpdateAdaptiveLowWatermark = 10'000; +constexpr size_t kBlockLruUpdateAdaptiveHighWatermark = 50'000; +constexpr int64_t kBlockLruUpdateAdaptiveMinIntervalMs = 500; +constexpr size_t kBlockLruUpdateAdaptiveMaxBatch = 10'000; +constexpr size_t kBlockLruUpdateLockSliceBatch = 500; +constexpr size_t kNeedUpdateLruBlocksHardCap = 100'000; +constexpr size_t kLruRecorderLogQueueHardCap = 500'000; + +int64_t positive_or_default(int64_t value, int64_t default_value) { + return value > 0 ? value : default_value; +} + +QueueConsumePlan build_queue_consume_plan(size_t backlog, int64_t base_interval_ms, + size_t base_batch, size_t low_watermark, + size_t high_watermark, int64_t min_interval_ms, + size_t max_batch) { + QueueConsumePlan plan; + plan.interval_ms = positive_or_default(base_interval_ms, 1); + plan.batch_limit = base_batch; + + if (backlog < low_watermark) { + return plan; + } + + const int64_t min_interval = positive_or_default(min_interval_ms, 1); + const size_t capped_max_batch = std::max(max_batch, 1); + if (backlog < high_watermark) { + plan.interval_ms = std::max(min_interval, plan.interval_ms / 2); + plan.batch_limit = std::min(capped_max_batch, std::max(base_batch * 2, 1)); + return plan; + } + + plan.interval_ms = min_interval; + plan.batch_limit = capped_max_batch; + return plan; +} + +QueueConsumePlan build_lru_log_replay_plan(size_t backlog) { + const auto base_interval = + positive_or_default(config::file_cache_background_lru_log_replay_interval_ms, 1); + if (backlog < kLruLogReplayAdaptiveLowWatermark) { + return {base_interval, 0}; + } + return build_queue_consume_plan( + backlog, base_interval, kLruLogReplayAdaptiveMaxBatchPerType / 4, + kLruLogReplayAdaptiveLowWatermark, kLruLogReplayAdaptiveHighWatermark, + kLruLogReplayAdaptiveMinIntervalMs, kLruLogReplayAdaptiveMaxBatchPerType); +} + +QueueConsumePlan build_block_lru_update_plan(size_t backlog) { + const auto base_interval = + positive_or_default(config::file_cache_background_block_lru_update_interval_ms, 1); + const size_t base_batch = + std::max(config::file_cache_background_block_lru_update_qps_limit, 0) * + static_cast(base_interval) / 1000; + if (base_batch == 0) { + return {base_interval, 0}; + } + return build_queue_consume_plan( + backlog, base_interval, base_batch, kBlockLruUpdateAdaptiveLowWatermark, + kBlockLruUpdateAdaptiveHighWatermark, kBlockLruUpdateAdaptiveMinIntervalMs, + kBlockLruUpdateAdaptiveMaxBatch); +} + +} // namespace + BlockFileCache::BlockFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings) : _cache_base_path(cache_base_path), _capacity(cache_settings.capacity), - _max_file_block_size(cache_settings.max_file_block_size) { + _max_file_block_size(cache_settings.max_file_block_size), + _need_update_lru_blocks(kNeedUpdateLruBlocksHardCap) { _cur_cache_size_metrics = std::make_shared>(_cache_base_path.c_str(), "file_cache_cache_size", 0); _cache_capacity_metrics = std::make_shared>( @@ -344,10 +456,16 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us"); _lru_dump_latency_us = std::make_shared( _cache_base_path.c_str(), "file_cache_lru_dump_latency_us"); - _recycle_keys_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_recycle_keys_length"); - _need_update_lru_blocks_length_recorder = std::make_shared( - _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length"); + _recycle_keys_length_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_recycle_keys_length", 0); + _need_update_lru_blocks_length_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length", 0); + _need_update_lru_blocks_dropped_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_dropped"); + _lru_recorder_log_queue_length_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_length", 0); + _lru_recorder_log_queue_dropped_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_lru_recorder_log_queue_dropped"); _update_lru_blocks_latency_us = std::make_shared( _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us"); _ttl_gc_latency_us = std::make_shared(_cache_base_path.c_str(), @@ -364,7 +482,7 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, std::numeric_limits::max()); - _lru_recorder = std::make_unique(this); + _lru_recorder = std::make_unique(this, kLruRecorderLogQueueHardCap); _lru_dumper = std::make_unique(this, _lru_recorder.get()); if (cache_settings.storage == "memory") { _storage = std::make_unique(); @@ -382,6 +500,11 @@ UInt128Wrapper BlockFileCache::hash(const std::string& path) { return UInt128Wrapper(value); } +bool BlockFileCache::is_memory_storage() const { + DCHECK(_storage != nullptr); + return _storage->get_type() == FileCacheStorageType::MEMORY; +} + BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder( const TUniqueId& query_id, int file_cache_query_limit_percent) { SCOPED_CACHE_LOCK(_mutex, this); @@ -647,8 +770,14 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte } void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) { - if (_need_update_lru_blocks.insert(std::move(block))) { - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + auto result = _need_update_lru_blocks.insert_with_result(std::move(block)); + if (result == NeedUpdateLRUBlocks::InsertResult::INSERTED) { + _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); + } else if (result == NeedUpdateLRUBlocks::InsertResult::DROPPED) { + *(_need_update_lru_blocks_dropped_metrics) << 1; + LOG_EVERY_N(WARNING, 60) << "Drop block LRU update because hard cap is reached, hard_cap=" + << _need_update_lru_blocks.hard_cap() + << " queue_size=" << _need_update_lru_blocks.size(); } } @@ -1453,7 +1582,7 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo // but it's ok, because the rowset is stale already bool ret = _recycle_keys.enqueue(key); if (ret) [[likely]] { - *_recycle_keys_length_recorder << _recycle_keys.size_approx(); + _recycle_keys_length_metrics->set_value(_recycle_keys.size_approx()); } else { LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); int64_t duration_ns = 0; @@ -2000,16 +2129,16 @@ void BlockFileCache::run_background_monitor() { (double)_num_read_blocks_1h->get_value()); } - if (_no_warmup_num_hit_blocks->get_value() > 0) { + if (_no_warmup_num_read_blocks->get_value() > 0) { _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() / (double)_no_warmup_num_read_blocks->get_value()); } - if (_no_warmup_num_hit_blocks_5m && _no_warmup_num_hit_blocks_5m->get_value() > 0) { + if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) { _no_warmup_hit_ratio_5m->set_value( (double)_no_warmup_num_hit_blocks_5m->get_value() / (double)_no_warmup_num_read_blocks_5m->get_value()); } - if (_no_warmup_num_hit_blocks_1h && _no_warmup_num_hit_blocks_1h->get_value() > 0) { + if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) { _no_warmup_hit_ratio_1h->set_value( (double)_no_warmup_num_hit_blocks_1h->get_value() / (double)_no_warmup_num_read_blocks_1h->get_value()); @@ -2047,7 +2176,7 @@ void BlockFileCache::run_background_gc() { } batch_count++; } - *_recycle_keys_length_recorder << _recycle_keys.size_approx(); + _recycle_keys_length_metrics->set_value(_recycle_keys.size_approx()); batch_count = 0; } } @@ -2090,35 +2219,44 @@ void BlockFileCache::run_background_block_lru_update() { Thread::set_self_name("run_background_block_lru_update"); std::vector batch; while (!_close) { - int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms; - size_t batch_limit = - config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000; + size_t backlog = _need_update_lru_blocks.size(); + QueueConsumePlan plan = build_block_lru_update_plan(backlog); { std::unique_lock close_lock(_close_mtx); - _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); + _close_cv.wait_for(close_lock, std::chrono::milliseconds(plan.interval_ms)); if (_close) { break; } } batch.clear(); - batch.reserve(batch_limit); - size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch); + batch.reserve(plan.batch_limit); + size_t drained = _need_update_lru_blocks.drain(plan.batch_limit, &batch); if (drained == 0) { - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); continue; } int64_t duration_ns = 0; - { - SCOPED_CACHE_LOCK(_mutex, this); - SCOPED_RAW_TIMER(&duration_ns); - for (auto& block : batch) { - update_block_lru(block, cache_lock); + const size_t slice_batch = std::min(kBlockLruUpdateLockSliceBatch, drained); + for (size_t begin = 0; begin < batch.size(); begin += slice_batch) { + const size_t end = std::min(begin + slice_batch, batch.size()); + { + SCOPED_CACHE_LOCK(_mutex, this); + SCOPED_RAW_TIMER(&duration_ns); + for (size_t i = begin; i < end; ++i) { + update_block_lru(batch[i], cache_lock); + } } } *_update_lru_blocks_latency_us << (duration_ns / 1000); - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); + if (backlog >= kBlockLruUpdateAdaptiveHighWatermark) { + LOG_EVERY_N(WARNING, 60) + << "need_update_lru_blocks backlog is high, backlog=" << backlog + << " drained=" << drained << " interval_ms=" << plan.interval_ms + << " batch_limit=" << plan.batch_limit; + } } } @@ -2195,7 +2333,7 @@ bool BlockFileCache::try_reserve_during_async_load(size_t size, void BlockFileCache::clear_need_update_lru_blocks() { _need_update_lru_blocks.clear(); - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + _need_update_lru_blocks_length_metrics->set_value(_need_update_lru_blocks.size()); } void BlockFileCache::pause_ttl_manager() { @@ -2314,19 +2452,28 @@ void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) { void BlockFileCache::run_background_lru_log_replay() { Thread::set_self_name("run_background_lru_log_replay"); while (!_close) { - int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms; + size_t backlog = _lru_recorder->get_total_lru_log_queue_size(); + QueueConsumePlan plan = build_lru_log_replay_plan(backlog); { std::unique_lock close_lock(_close_mtx); - _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); + _close_cv.wait_for(close_lock, std::chrono::milliseconds(plan.interval_ms)); if (_close) { break; } } - _lru_recorder->replay_queue_event(FileCacheType::TTL); - _lru_recorder->replay_queue_event(FileCacheType::INDEX); - _lru_recorder->replay_queue_event(FileCacheType::NORMAL); - _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE); + record_lru_recorder_log_queue_length(); + size_t drained = 0; + drained += _lru_recorder->replay_queue_event(FileCacheType::TTL, plan.batch_limit); + drained += _lru_recorder->replay_queue_event(FileCacheType::INDEX, plan.batch_limit); + drained += _lru_recorder->replay_queue_event(FileCacheType::NORMAL, plan.batch_limit); + drained += _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE, plan.batch_limit); + record_lru_recorder_log_queue_length(); + if (backlog >= kLruLogReplayAdaptiveHighWatermark) { + LOG_EVERY_N(WARNING, 60) + << "lru recorder backlog is high, backlog=" << backlog << " drained=" << drained + << " interval_ms=" << plan.interval_ms << " batch_limit=" << plan.batch_limit; + } if (config::enable_evaluate_shadow_queue_diff) { SCOPED_CACHE_LOCK(_mutex, this); @@ -2338,6 +2485,11 @@ void BlockFileCache::run_background_lru_log_replay() { } } +void BlockFileCache::record_lru_recorder_log_queue_length() { + _lru_recorder_log_queue_length_metrics->set_value( + _lru_recorder->get_total_lru_log_queue_size()); +} + void BlockFileCache::dump_lru_queues(bool force) { std::unique_lock dump_lock(_dump_lru_queues_mtx); if (config::file_cache_background_lru_dump_tail_record_num > 0 && diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index bc7ec7ce4c27a1..8c87bfae2e8ebe 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -87,11 +88,20 @@ class FSFileCacheStorage; // Note that Blocks are updated in batch, internal order is not important. class NeedUpdateLRUBlocks { public: - NeedUpdateLRUBlocks() = default; + enum class InsertResult { + INSERTED, + DUPLICATED, + DROPPED, + IGNORED, + }; + + explicit NeedUpdateLRUBlocks(size_t hard_cap = std::numeric_limits::max()) + : _hard_cap(hard_cap) {} // Insert a block into the pending set. Returns true only when the block // was not already queued. Null inputs are ignored. bool insert(FileBlockSPtr block); + InsertResult insert_with_result(FileBlockSPtr block); // Drain up to `limit` unique blocks into `output`. The method returns how // many blocks were actually drained and shrinks the internal size @@ -103,6 +113,8 @@ class NeedUpdateLRUBlocks { // Thread-safe approximate size of queued unique blocks. size_t size() const { return _size.load(std::memory_order_relaxed); } + size_t dropped() const { return _dropped.load(std::memory_order_relaxed); } + size_t hard_cap() const { return _hard_cap; } private: static constexpr size_t kShardCount = 64; @@ -114,9 +126,12 @@ class NeedUpdateLRUBlocks { }; size_t shard_index(FileBlock* ptr) const; + bool try_reserve_slot(); std::array _shards; std::atomic _size {0}; + std::atomic _dropped {0}; + size_t _hard_cap; }; // The BlockFileCache is responsible for the management of the blocks @@ -286,6 +301,8 @@ class BlockFileCache { [[nodiscard]] bool get_async_open_success() const { return _async_open_done; } + [[nodiscard]] bool is_memory_storage() const; + BlockFileCache& operator=(const BlockFileCache&) = delete; BlockFileCache(const BlockFileCache&) = delete; @@ -470,6 +487,7 @@ class BlockFileCache { void restore_lru_queues_from_disk(std::lock_guard& cache_lock); void run_background_evict_in_advance(); void run_background_block_lru_update(); + void record_lru_recorder_log_queue_length(); bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type, std::vector other_cache_types, @@ -610,9 +628,12 @@ class BlockFileCache { std::shared_ptr _storage_retry_sync_remove_latency_us; std::shared_ptr _storage_async_remove_latency_us; std::shared_ptr _evict_in_advance_latency_us; - std::shared_ptr _recycle_keys_length_recorder; + std::shared_ptr> _recycle_keys_length_metrics; std::shared_ptr _update_lru_blocks_latency_us; - std::shared_ptr _need_update_lru_blocks_length_recorder; + std::shared_ptr> _need_update_lru_blocks_length_metrics; + std::shared_ptr> _need_update_lru_blocks_dropped_metrics; + std::shared_ptr> _lru_recorder_log_queue_length_metrics; + std::shared_ptr> _lru_recorder_log_queue_dropped_metrics; std::shared_ptr _ttl_gc_latency_us; std::shared_ptr _shadow_queue_levenshtein_distance; diff --git a/be/src/io/cache/block_file_cache_profile.cpp b/be/src/io/cache/block_file_cache_profile.cpp index 692174dbbcb71a..8f9c167c9989e6 100644 --- a/be/src/io/cache/block_file_cache_profile.cpp +++ b/be/src/io/cache/block_file_cache_profile.cpp @@ -65,6 +65,43 @@ void FileCacheMetrics::update_metrics_callback() { stats->num_io_bytes_read_from_peer); } +FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& current, + const FileCacheStatistics& previous) { + FileCacheStatistics diff; +#define SUBTRACT_FIELD(field) diff.field = current.field - previous.field + SUBTRACT_FIELD(num_local_io_total); + SUBTRACT_FIELD(num_remote_io_total); + SUBTRACT_FIELD(num_peer_io_total); + SUBTRACT_FIELD(local_io_timer); + SUBTRACT_FIELD(bytes_read_from_local); + SUBTRACT_FIELD(bytes_read_from_remote); + SUBTRACT_FIELD(bytes_read_from_peer); + SUBTRACT_FIELD(remote_io_timer); + SUBTRACT_FIELD(peer_io_timer); + SUBTRACT_FIELD(remote_wait_timer); + SUBTRACT_FIELD(write_cache_io_timer); + SUBTRACT_FIELD(bytes_write_into_cache); + SUBTRACT_FIELD(num_skip_cache_io_total); + SUBTRACT_FIELD(read_cache_file_directly_timer); + SUBTRACT_FIELD(cache_get_or_set_timer); + SUBTRACT_FIELD(lock_wait_timer); + SUBTRACT_FIELD(get_timer); + SUBTRACT_FIELD(set_timer); + + SUBTRACT_FIELD(inverted_index_num_local_io_total); + SUBTRACT_FIELD(inverted_index_num_remote_io_total); + SUBTRACT_FIELD(inverted_index_num_peer_io_total); + SUBTRACT_FIELD(inverted_index_bytes_read_from_local); + SUBTRACT_FIELD(inverted_index_bytes_read_from_remote); + SUBTRACT_FIELD(inverted_index_bytes_read_from_peer); + SUBTRACT_FIELD(inverted_index_local_io_timer); + SUBTRACT_FIELD(inverted_index_remote_io_timer); + SUBTRACT_FIELD(inverted_index_peer_io_timer); + SUBTRACT_FIELD(inverted_index_io_timer); +#undef SUBTRACT_FIELD + return diff; +} + FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) { static const char* cache_profile = "FileCache"; ADD_TIMER_WITH_LEVEL(profile, cache_profile, 2); diff --git a/be/src/io/cache/block_file_cache_profile.h b/be/src/io/cache/block_file_cache_profile.h index 5fdb82fbd61a4d..6c95e49791c054 100644 --- a/be/src/io/cache/block_file_cache_profile.h +++ b/be/src/io/cache/block_file_cache_profile.h @@ -52,9 +52,9 @@ class FileCacheMetrics { } void update(FileCacheStatistics* stats); + std::shared_ptr report(); private: - std::shared_ptr report(); void register_entity(); void update_metrics_callback(); @@ -64,6 +64,9 @@ class FileCacheMetrics { std::shared_ptr _statistics; }; +FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& current, + const FileCacheStatistics& previous); + struct FileCacheProfileReporter { RuntimeProfile::Counter* num_local_io_total = nullptr; RuntimeProfile::Counter* num_remote_io_total = nullptr; @@ -100,4 +103,4 @@ struct FileCacheProfileReporter { }; } // namespace io -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/io/cache/cache_lru_dumper.cpp b/be/src/io/cache/cache_lru_dumper.cpp index 43275f5069e614..1b58efdcd2e793 100644 --- a/be/src/io/cache/cache_lru_dumper.cpp +++ b/be/src/io/cache/cache_lru_dumper.cpp @@ -298,25 +298,53 @@ void CacheLRUDumper::dump_queue(const std::string& queue_name, bool force) { if (force || _recorder->get_lru_queue_update_cnt_from_last_dump(type) > config::file_cache_background_lru_dump_update_cnt_threshold) { LRUQueue& queue = _recorder->get_shadow_queue(type); - do_dump_queue(queue, queue_name); - _recorder->reset_lru_queue_update_cnt_from_last_dump(type); + std::vector elements; + size_t dump_update_cnt = 0; + { + std::lock_guard lru_log_lock(_recorder->_mutex_lru_log); + // Drain logs counted for this dump before updating the counter, otherwise restore may + // see a stale LRU tail after a crash. + _recorder->replay_queue_event_locked(type, 0, lru_log_lock); + elements = collect_lru_queue_entries_locked(queue, lru_log_lock); + dump_update_cnt = + _recorder->get_lru_queue_update_cnt_from_last_dump_locked(type, lru_log_lock); + } + Status st = do_dump_queue(elements, queue_name); + if (st.ok()) { + std::lock_guard lru_log_lock(_recorder->_mutex_lru_log); + _recorder->subtract_lru_queue_update_cnt_from_last_dump_locked(type, dump_update_cnt, + lru_log_lock); + } else { + LOG(WARNING) << "failed to dump lru queue " << queue_name << ": " << st; + } } } -void CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_name) { - Status st; - std::vector> elements; - elements.reserve(config::file_cache_background_lru_dump_tail_record_num); - +Status CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_name) { + std::vector elements; { std::lock_guard lru_log_lock(_recorder->_mutex_lru_log); - size_t count = 0; - for (const auto& [hash, offset, size] : queue) { - if (count++ >= config::file_cache_background_lru_dump_tail_record_num) break; - elements.emplace_back(hash, offset, size); + elements = collect_lru_queue_entries_locked(queue, lru_log_lock); + } + return do_dump_queue(elements, queue_name); +}; + +std::vector CacheLRUDumper::collect_lru_queue_entries_locked( + LRUQueue& queue, std::lock_guard& /* lru_log_lock */) { + std::vector elements; + elements.reserve(config::file_cache_background_lru_dump_tail_record_num); + size_t count = 0; + for (const auto& [hash, offset, size] : queue) { + if (count++ >= config::file_cache_background_lru_dump_tail_record_num) { + break; } + elements.emplace_back(hash, offset, size); } + return elements; +} +Status CacheLRUDumper::do_dump_queue(const std::vector& elements, + const std::string& queue_name) { // Write to disk int64_t duration_ns = 0; std::uintmax_t file_size = 0; @@ -330,19 +358,21 @@ void CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_nam if (out) { LOG(INFO) << "begin dump " << queue_name << " with " << elements.size() << " elements"; for (const auto& [hash, offset, size] : elements) { - RETURN_IF_STATUS_ERROR(st, - dump_one_lru_entry(out, tmp_filename, hash, offset, size)); + RETURN_IF_ERROR(dump_one_lru_entry(out, tmp_filename, hash, offset, size)); } - RETURN_IF_STATUS_ERROR(st, finalize_dump(out, elements.size(), tmp_filename, - final_filename, file_size)); + RETURN_IF_ERROR( + finalize_dump(out, elements.size(), tmp_filename, final_filename, file_size)); } else { - LOG(WARNING) << "open lru dump file failed, reason: " << tmp_filename - << " failed to create"; + std::string warn_msg = fmt::format( + "open lru dump file failed, file={} failed to create", tmp_filename); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); } } *(_mgr->_lru_dump_latency_us) << (duration_ns / 1000); LOG(INFO) << fmt::format("lru dump for {} size={} element={} time={}us", queue_name, file_size, elements.size(), duration_ns / 1000); + return Status::OK(); }; Status CacheLRUDumper::parse_dump_footer(std::ifstream& in, std::string& filename, diff --git a/be/src/io/cache/cache_lru_dumper.h b/be/src/io/cache/cache_lru_dumper.h index 8074ee334a1934..02418fbda2d557 100644 --- a/be/src/io/cache/cache_lru_dumper.h +++ b/be/src/io/cache/cache_lru_dumper.h @@ -40,6 +40,8 @@ class LRUQueueRecorder; class CacheLRUDumper { public: + using LruDumpEntry = std::tuple; + CacheLRUDumper(BlockFileCache* mgr, LRUQueueRecorder* recorder) : _mgr(mgr), _recorder(recorder) { auto now = std::chrono::system_clock::now(); @@ -56,7 +58,10 @@ class CacheLRUDumper { void set_first_dump_done() { _is_first_dump = false; } private: - void do_dump_queue(LRUQueue& queue, const std::string& queue_name); + Status do_dump_queue(LRUQueue& queue, const std::string& queue_name); + Status do_dump_queue(const std::vector& elements, const std::string& queue_name); + std::vector collect_lru_queue_entries_locked( + LRUQueue& queue, std::lock_guard& lru_log_lock); Status check_ofstream_status(std::ofstream& out, std::string& filename); Status check_ifstream_status(std::ifstream& in, std::string& filename); Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const UInt128Wrapper& hash, @@ -94,4 +99,4 @@ class CacheLRUDumper { std::string _start_time; bool _is_first_dump = true; }; -} // namespace doris::io \ No newline at end of file +} // namespace doris::io diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index 47b534d2dc9e42..7c297737d60a44 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -297,6 +297,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* ReadStatistics stats; stats.bytes_read += bytes_req; + bool read_success = false; MonotonicStopWatch read_at_sw; read_at_sw.start(); auto defer_func = [&](int*) { @@ -315,10 +316,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* if (is_dryrun) { return; } - // update stats increment in this reading procedure for file cache metrics - FileCacheStatistics fcache_stats_increment; - _update_stats(stats, &fcache_stats_increment, io_ctx->is_inverted_index); - io::FileCacheMetrics::instance().update(&fcache_stats_increment); + if (!read_success) { + return; + } + if (!io_ctx->is_warmup) { + // update stats increment in this reading procedure for file cache metrics + FileCacheStatistics fcache_stats_increment; + _update_stats(stats, &fcache_stats_increment, io_ctx->is_inverted_index); + io::FileCacheMetrics::instance().update(&fcache_stats_increment); + } if (io_ctx->file_cache_stats) { // update stats in io_ctx, for query profile _update_stats(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index); @@ -355,6 +361,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* .ok()) { //TODO: maybe read failed because block evict, should handle error break; } + stats.bytes_read_from_local += reserve_bytes; } _cache->add_need_update_lru_block(iter->second); need_read_size -= reserve_bytes; @@ -367,6 +374,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* stats.hit_cache = true; g_read_cache_direct_whole_num << 1; g_read_cache_direct_whole_bytes << bytes_req; + read_success = true; return Status::OK(); } else { g_read_cache_direct_partial_num << 1; @@ -442,6 +450,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* // Determine read type and execute remote read RETURN_IF_ERROR( _execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx)); + bool empty_blocks_from_peer_cache = stats.from_peer_cache; { SCOPED_CONCURRENCY_COUNT( @@ -476,20 +485,25 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* size_t copy_size = copy_right_offset - copy_left_offset + 1; memcpy(dst, src, copy_size); indirect_read_bytes += copy_size; + if (empty_blocks_from_peer_cache) { + stats.bytes_read_from_peer += copy_size; + } else { + stats.bytes_read_from_remote += copy_size; + } } } - size_t current_offset = offset; + size_t current_offset = offset + already_read; size_t end_offset = offset + bytes_req - 1; bool need_self_heal = false; - *bytes_read = 0; + *bytes_read = already_read; for (auto& block : holder.file_blocks) { if (current_offset > end_offset) { break; } size_t left = block->range().left; size_t right = block->range().right; - if (right < offset) { + if (right < current_offset) { continue; } size_t read_size = @@ -536,7 +550,10 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* ConcurrencyStatsManager::instance().cached_remote_reader_local_read); st = block->read(Slice(result.data + (current_offset - offset), read_size), file_offset); - indirect_read_bytes += read_size; + if (st.ok()) { + indirect_read_bytes += read_size; + stats.bytes_read_from_local += read_size; + } } } if (!st || block_state != FileBlock::State::DOWNLOADED) { @@ -567,6 +584,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* &nest_bytes_read)); indirect_read_bytes += read_size; DCHECK(nest_bytes_read == read_size); + stats.bytes_read_from_remote += nest_bytes_read; } } } @@ -580,6 +598,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* g_read_cache_indirect_total_bytes << *bytes_read; DCHECK(*bytes_read == bytes_req); + if (!is_dryrun) { + DCHECK_EQ(stats.bytes_read_from_local + stats.bytes_read_from_remote + + stats.bytes_read_from_peer, + bytes_req); + } + read_success = true; return Status::OK(); } @@ -589,19 +613,19 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, if (statis == nullptr) { return; } - if (read_stats.hit_cache) { + if (read_stats.bytes_read_from_local > 0) { statis->num_local_io_total++; - statis->bytes_read_from_local += read_stats.bytes_read; - } else { - if (read_stats.from_peer_cache) { - statis->num_peer_io_total++; - statis->bytes_read_from_peer += read_stats.bytes_read; - statis->peer_io_timer += read_stats.peer_read_timer; - } else { - statis->num_remote_io_total++; - statis->bytes_read_from_remote += read_stats.bytes_read; - statis->remote_io_timer += read_stats.remote_read_timer; - } + statis->bytes_read_from_local += read_stats.bytes_read_from_local; + } + if (read_stats.bytes_read_from_remote > 0) { + statis->num_remote_io_total++; + statis->bytes_read_from_remote += read_stats.bytes_read_from_remote; + statis->remote_io_timer += read_stats.remote_read_timer; + } + if (read_stats.bytes_read_from_peer > 0) { + statis->num_peer_io_total++; + statis->bytes_read_from_peer += read_stats.bytes_read_from_peer; + statis->peer_io_timer += read_stats.peer_read_timer; } statis->remote_wait_timer += read_stats.remote_wait_timer; statis->local_io_timer += read_stats.local_read_timer; @@ -616,19 +640,19 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, statis->set_timer += read_stats.set_timer; if (is_inverted_index) { - if (read_stats.hit_cache) { + if (read_stats.bytes_read_from_local > 0) { statis->inverted_index_num_local_io_total++; - statis->inverted_index_bytes_read_from_local += read_stats.bytes_read; - } else { - if (read_stats.from_peer_cache) { - statis->inverted_index_num_peer_io_total++; - statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read; - statis->inverted_index_peer_io_timer += read_stats.peer_read_timer; - } else { - statis->inverted_index_num_remote_io_total++; - statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read; - statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; - } + statis->inverted_index_bytes_read_from_local += read_stats.bytes_read_from_local; + } + if (read_stats.bytes_read_from_remote > 0) { + statis->inverted_index_num_remote_io_total++; + statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read_from_remote; + statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; + } + if (read_stats.bytes_read_from_peer > 0) { + statis->inverted_index_num_peer_io_total++; + statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read_from_peer; + statis->inverted_index_peer_io_timer += read_stats.peer_read_timer; } statis->inverted_index_local_io_timer += read_stats.local_read_timer; } diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 43510293af8cfd..8fde90abc0a7b2 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -71,6 +71,9 @@ struct ReadStatistics { bool from_peer_cache = false; bool skip_cache = false; int64_t bytes_read = 0; + int64_t bytes_read_from_local = 0; + int64_t bytes_read_from_remote = 0; + int64_t bytes_read_from_peer = 0; int64_t bytes_write_into_file_cache = 0; int64_t remote_read_timer = 0; int64_t peer_read_timer = 0; diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp index 9907e58cb2a607..8c74b1b66bafce 100644 --- a/be/src/io/cache/lru_queue_recorder.cpp +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -17,6 +17,7 @@ #include "io/cache/lru_queue_recorder.h" +#include "common/config.h" #include "io/cache/block_file_cache.h" #include "io/cache/file_cache_common.h" @@ -25,19 +26,61 @@ namespace doris::io { void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType log_type, const UInt128Wrapper hash, const size_t offset, const size_t size) { + if (_mgr->is_memory_storage() || config::file_cache_background_lru_dump_tail_record_num <= 0) { + return; + } + auto record_drop = [&]() { + _dropped_lru_log_count.fetch_add(1, std::memory_order_relaxed); + if (_mgr->_lru_recorder_log_queue_dropped_metrics) { + *(_mgr->_lru_recorder_log_queue_dropped_metrics) << 1; + } + }; + std::lock_guard lru_log_lock(_mutex_lru_log); + if (_total_lru_log_queue_size >= _hard_cap) { + record_drop(); + LOG_EVERY_N(WARNING, 60) << "Drop lru recorder log because hard cap is reached, hard_cap=" + << _hard_cap << " total_queue_size=" << _total_lru_log_queue_size; + return; + } CacheLRULogQueue& log_queue = get_lru_log_queue(type); - log_queue.enqueue(std::make_unique(log_type, hash, offset, size)); + try { + if (!log_queue.enqueue(std::make_unique(log_type, hash, offset, size))) { + record_drop(); + LOG(WARNING) << "Failed to enqueue lru recorder log"; + return; + } + } catch (const std::exception& e) { + record_drop(); + LOG(WARNING) << "Failed to enqueue lru recorder log: " << e.what(); + return; + } catch (...) { + record_drop(); + LOG(WARNING) << "Failed to enqueue lru recorder log: unknown error"; + return; + } + ++_lru_log_queue_size_by_type[static_cast(type)]; + ++_total_lru_log_queue_size; ++(_lru_queue_update_cnt_from_last_dump[type]); } -void LRUQueueRecorder::replay_queue_event(FileCacheType type) { +size_t LRUQueueRecorder::replay_queue_event(FileCacheType type, size_t max_events) { + // we don't need the real cache lock for the shadow queue, but we do need a lock to prevent read/write contension + std::lock_guard lru_log_lock(_mutex_lru_log); + return replay_queue_event_locked(type, max_events, lru_log_lock); +} + +size_t LRUQueueRecorder::replay_queue_event_locked(FileCacheType type, size_t max_events, + std::lock_guard& lru_log_lock) { // we don't need the real cache lock for the shadow queue, but we do need a lock to prevent read/write contension CacheLRULogQueue& log_queue = get_lru_log_queue(type); LRUQueue& shadow_queue = get_shadow_queue(type); - std::lock_guard lru_log_lock(_mutex_lru_log); std::unique_ptr log; - while (log_queue.try_dequeue(log)) { + size_t replayed = 0; + while ((max_events == 0 || replayed < max_events) && log_queue.try_dequeue(log)) { + ++replayed; + --_lru_log_queue_size_by_type[static_cast(type)]; + --_total_lru_log_queue_size; try { switch (log->type) { case CacheLRULogType::ADD: { @@ -79,6 +122,7 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) { LOG(WARNING) << "Failed to replay queue event: " << e.what(); } } + return replayed; } // we evaluate the diff between two queue by calculate how many operation is @@ -130,11 +174,39 @@ CacheLRULogQueue& LRUQueueRecorder::get_lru_log_queue(FileCacheType type) { } size_t LRUQueueRecorder::get_lru_queue_update_cnt_from_last_dump(FileCacheType type) { + std::lock_guard lru_log_lock(_mutex_lru_log); + return get_lru_queue_update_cnt_from_last_dump_locked(type, lru_log_lock); +} + +size_t LRUQueueRecorder::get_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, std::lock_guard& /* lru_log_lock */) { return _lru_queue_update_cnt_from_last_dump[type]; } void LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump(FileCacheType type) { + std::lock_guard lru_log_lock(_mutex_lru_log); + reset_lru_queue_update_cnt_from_last_dump_locked(type, lru_log_lock); +} + +void LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, std::lock_guard& /* lru_log_lock */) { _lru_queue_update_cnt_from_last_dump[type] = 0; } +void LRUQueueRecorder::subtract_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, size_t count, std::lock_guard& /* lru_log_lock */) { + auto& update_cnt = _lru_queue_update_cnt_from_last_dump[type]; + update_cnt = count >= update_cnt ? 0 : update_cnt - count; +} + +size_t LRUQueueRecorder::get_lru_log_queue_size(FileCacheType type) { + std::lock_guard lru_log_lock(_mutex_lru_log); + return _lru_log_queue_size_by_type[static_cast(type)]; +} + +size_t LRUQueueRecorder::get_total_lru_log_queue_size() { + std::lock_guard lru_log_lock(_mutex_lru_log); + return _total_lru_log_queue_size; +} + } // end of namespace doris::io diff --git a/be/src/io/cache/lru_queue_recorder.h b/be/src/io/cache/lru_queue_recorder.h index 5bd68b70d555f9..17474fa6b0d7f5 100644 --- a/be/src/io/cache/lru_queue_recorder.h +++ b/be/src/io/cache/lru_queue_recorder.h @@ -19,7 +19,13 @@ #include +#include +#include #include +#include +#include +#include +#include #include "io/cache/file_cache_common.h" @@ -49,7 +55,8 @@ using CacheLRULogQueue = moodycamel::ConcurrentQueue::max()) + : _hard_cap(hard_cap), _mgr(mgr) { _lru_queue_update_cnt_from_last_dump[FileCacheType::DISPOSABLE] = 0; _lru_queue_update_cnt_from_last_dump[FileCacheType::NORMAL] = 0; _lru_queue_update_cnt_from_last_dump[FileCacheType::INDEX] = 0; @@ -57,16 +64,29 @@ class LRUQueueRecorder { } void record_queue_event(FileCacheType type, CacheLRULogType log_type, const UInt128Wrapper hash, const size_t offset, const size_t size); - void replay_queue_event(FileCacheType type); + size_t replay_queue_event(FileCacheType type, size_t max_events = 0); + size_t replay_queue_event_locked(FileCacheType type, size_t max_events, + std::lock_guard& lru_log_lock); void evaluate_queue_diff(LRUQueue& base, std::string name, std::lock_guard& cache_lock); size_t get_lru_queue_update_cnt_from_last_dump(FileCacheType type); + size_t get_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, std::lock_guard& lru_log_lock); void reset_lru_queue_update_cnt_from_last_dump(FileCacheType type); + void reset_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, std::lock_guard& lru_log_lock); + void subtract_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType type, size_t count, std::lock_guard& lru_log_lock); + size_t get_lru_log_queue_size(FileCacheType type); + size_t get_total_lru_log_queue_size(); + size_t get_dropped_lru_log_count() const { + return _dropped_lru_log_count.load(std::memory_order_relaxed); + } + size_t hard_cap() const { return _hard_cap; } CacheLRULogQueue& get_lru_log_queue(FileCacheType type); LRUQueue& get_shadow_queue(FileCacheType type); -public: std::mutex _mutex_lru_log; private: @@ -80,6 +100,10 @@ class LRUQueueRecorder { CacheLRULogQueue _normal_lru_log_queue; CacheLRULogQueue _disposable_lru_log_queue; + std::array _lru_log_queue_size_by_type {}; + std::atomic _dropped_lru_log_count {0}; + size_t _total_lru_log_queue_size = 0; + size_t _hard_cap; std::unordered_map _lru_queue_update_cnt_from_last_dump; BlockFileCache* _mgr; diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index b7b16a6ba1d3b9..e2b25bd7f31b2e 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -257,6 +257,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) } StorageEngine::~StorageEngine() { + DEREGISTER_HOOK_METRIC(unused_rowsets_count); stop(); } diff --git a/be/test/io/cache/block_file_cache_profile_reporter_test.cpp b/be/test/io/cache/block_file_cache_profile_reporter_test.cpp new file mode 100644 index 00000000000000..e74ad758ac1db3 --- /dev/null +++ b/be/test/io/cache/block_file_cache_profile_reporter_test.cpp @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "io/cache/block_file_cache_profile.h" + +namespace doris { +namespace { + +io::FileCacheStatistics make_file_cache_stats(int64_t multiplier) { + io::FileCacheStatistics stats; + stats.num_local_io_total = multiplier; + stats.num_remote_io_total = multiplier * 2; + stats.num_peer_io_total = multiplier * 3; + stats.local_io_timer = multiplier * 4; + stats.bytes_read_from_local = multiplier * 5; + stats.bytes_read_from_remote = multiplier * 6; + stats.bytes_read_from_peer = multiplier * 7; + stats.remote_io_timer = multiplier * 8; + stats.peer_io_timer = multiplier * 9; + stats.remote_wait_timer = multiplier * 10; + stats.write_cache_io_timer = multiplier * 11; + stats.bytes_write_into_cache = multiplier * 12; + stats.num_skip_cache_io_total = multiplier * 13; + stats.read_cache_file_directly_timer = multiplier * 14; + stats.cache_get_or_set_timer = multiplier * 15; + stats.lock_wait_timer = multiplier * 16; + stats.get_timer = multiplier * 17; + stats.set_timer = multiplier * 18; + stats.inverted_index_num_local_io_total = multiplier * 19; + stats.inverted_index_num_remote_io_total = multiplier * 20; + stats.inverted_index_num_peer_io_total = multiplier * 21; + stats.inverted_index_bytes_read_from_local = multiplier * 22; + stats.inverted_index_bytes_read_from_remote = multiplier * 23; + stats.inverted_index_bytes_read_from_peer = multiplier * 24; + stats.inverted_index_local_io_timer = multiplier * 25; + stats.inverted_index_remote_io_timer = multiplier * 26; + stats.inverted_index_peer_io_timer = multiplier * 27; + stats.inverted_index_io_timer = multiplier * 28; + return stats; +} + +void expect_file_cache_stats_eq(const io::FileCacheStatistics& actual, + const io::FileCacheStatistics& expected) { + EXPECT_EQ(actual.num_local_io_total, expected.num_local_io_total); + EXPECT_EQ(actual.num_remote_io_total, expected.num_remote_io_total); + EXPECT_EQ(actual.num_peer_io_total, expected.num_peer_io_total); + EXPECT_EQ(actual.local_io_timer, expected.local_io_timer); + EXPECT_EQ(actual.bytes_read_from_local, expected.bytes_read_from_local); + EXPECT_EQ(actual.bytes_read_from_remote, expected.bytes_read_from_remote); + EXPECT_EQ(actual.bytes_read_from_peer, expected.bytes_read_from_peer); + EXPECT_EQ(actual.remote_io_timer, expected.remote_io_timer); + EXPECT_EQ(actual.peer_io_timer, expected.peer_io_timer); + EXPECT_EQ(actual.remote_wait_timer, expected.remote_wait_timer); + EXPECT_EQ(actual.write_cache_io_timer, expected.write_cache_io_timer); + EXPECT_EQ(actual.bytes_write_into_cache, expected.bytes_write_into_cache); + EXPECT_EQ(actual.num_skip_cache_io_total, expected.num_skip_cache_io_total); + EXPECT_EQ(actual.read_cache_file_directly_timer, expected.read_cache_file_directly_timer); + EXPECT_EQ(actual.cache_get_or_set_timer, expected.cache_get_or_set_timer); + EXPECT_EQ(actual.lock_wait_timer, expected.lock_wait_timer); + EXPECT_EQ(actual.get_timer, expected.get_timer); + EXPECT_EQ(actual.set_timer, expected.set_timer); + EXPECT_EQ(actual.inverted_index_num_local_io_total, expected.inverted_index_num_local_io_total); + EXPECT_EQ(actual.inverted_index_num_remote_io_total, + expected.inverted_index_num_remote_io_total); + EXPECT_EQ(actual.inverted_index_num_peer_io_total, expected.inverted_index_num_peer_io_total); + EXPECT_EQ(actual.inverted_index_bytes_read_from_local, + expected.inverted_index_bytes_read_from_local); + EXPECT_EQ(actual.inverted_index_bytes_read_from_remote, + expected.inverted_index_bytes_read_from_remote); + EXPECT_EQ(actual.inverted_index_bytes_read_from_peer, + expected.inverted_index_bytes_read_from_peer); + EXPECT_EQ(actual.inverted_index_local_io_timer, expected.inverted_index_local_io_timer); + EXPECT_EQ(actual.inverted_index_remote_io_timer, expected.inverted_index_remote_io_timer); + EXPECT_EQ(actual.inverted_index_peer_io_timer, expected.inverted_index_peer_io_timer); + EXPECT_EQ(actual.inverted_index_io_timer, expected.inverted_index_io_timer); +} + +} // namespace + +TEST(FileCacheProfileReporterTest, DiffReturnsFullStatsWhenPreviousIsZero) { + const auto current = make_file_cache_stats(3); + + expect_file_cache_stats_eq(io::diff_file_cache_statistics(current, {}), current); +} + +TEST(FileCacheProfileReporterTest, DiffReturnsOnlyIncrementalDelta) { + expect_file_cache_stats_eq( + io::diff_file_cache_statistics(make_file_cache_stats(5), make_file_cache_stats(3)), + make_file_cache_stats(2)); +} + +TEST(FileCacheProfileReporterTest, DiffReturnsZeroWithoutNewData) { + const auto current = make_file_cache_stats(4); + + expect_file_cache_stats_eq(io::diff_file_cache_statistics(current, current), + make_file_cache_stats(0)); +} + +TEST(FileCacheProfileReporterTest, ReporterAggregatesDeltaReportsToExactFinalTotals) { + auto profile = std::make_unique("test_profile"); + io::FileCacheProfileReporter reporter(profile.get()); + + const auto after_first_report = make_file_cache_stats(4); + const auto after_second_report = make_file_cache_stats(7); + const auto first_delta = io::diff_file_cache_statistics(after_first_report, {}); + reporter.update(&first_delta); + + const auto second_delta = + io::diff_file_cache_statistics(after_second_report, after_first_report); + reporter.update(&second_delta); + + EXPECT_EQ(profile->get_counter("BytesScannedFromCache")->value(), + after_second_report.bytes_read_from_local); + EXPECT_EQ(profile->get_counter("BytesScannedFromRemote")->value(), + after_second_report.bytes_read_from_remote); + EXPECT_EQ(profile->get_counter("BytesWriteIntoCache")->value(), + after_second_report.bytes_write_into_cache); + EXPECT_EQ(profile->get_counter("CacheGetOrSetTimer")->value(), + after_second_report.cache_get_or_set_timer); + EXPECT_EQ(profile->get_counter("LockWaitTimer")->value(), after_second_report.lock_wait_timer); +} + +} // namespace doris diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 9f5563efb3d1f5..111e5824c6f985 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -19,6 +19,7 @@ // and modified by Doris #include "io/cache/block_file_cache_test_common.h" +#include "io/fs/buffered_reader.h" #include "storage/olap_define.h" namespace doris::io { @@ -43,6 +44,114 @@ fs::path caches_dir = fs::current_path() / "lru_cache_test"; std::string cache_base_path = caches_dir / "cache1" / ""; std::string tmp_file = caches_dir / "tmp_file"; +io::FileCacheSettings cached_remote_reader_cache_settings() { + io::FileCacheSettings settings; + settings.query_queue_size = 6291456; + settings.query_queue_elements = 6; + settings.index_queue_size = 1048576; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1048576; + settings.disposable_queue_elements = 1; + settings.capacity = 8388608; + settings.max_file_block_size = 1048576; + settings.max_query_cache_size = 0; + return settings; +} + +void reset_file_cache_factory_for_test() { + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; +} + +Status create_cached_remote_reader_cache(const std::string& cache_path, BlockFileCache** cache) { + reset_file_cache_factory_for_test(); + if (fs::exists(cache_path)) { + fs::remove_all(cache_path); + } + fs::create_directories(cache_path); + + RETURN_IF_ERROR(FileCacheFactory::instance()->create_file_cache( + cache_path, cached_remote_reader_cache_settings())); + *cache = FileCacheFactory::instance()->_path_to_cache[cache_path]; + DORIS_CHECK(*cache != nullptr); + for (int i = 0; i < 100; i++) { + if ((*cache)->get_async_open_success()) { + return Status::OK(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return Status::TimedOut("file cache async open timeout for path {}", cache_path); +} + +void cleanup_cached_remote_reader_cache(const std::string& cache_path) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + if (fs::exists(cache_path)) { + fs::remove_all(cache_path); + } + reset_file_cache_factory_for_test(); +} + +struct FileCacheMetricSnapshot { + int64_t total = 0; + int64_t local = 0; + int64_t remote = 0; + int64_t peer = 0; +}; + +FileCacheMetricSnapshot get_file_cache_metric_snapshot() { + FileCacheStatistics empty_stats; + FileCacheMetrics::instance().update(&empty_stats); + std::shared_ptr stats = FileCacheMetrics::instance().report(); + auto local = stats->num_io_bytes_read_from_cache.load(); + auto remote = stats->num_io_bytes_read_from_remote.load(); + auto peer = stats->num_io_bytes_read_from_peer.load(); + return {.total = local + remote + peer, .local = local, .remote = remote, .peer = peer}; +} + +void expect_file_cache_metric_delta(const FileCacheMetricSnapshot& before, + const FileCacheMetricSnapshot& after, int64_t local, + int64_t remote, int64_t peer) { + EXPECT_EQ(after.local - before.local, local); + EXPECT_EQ(after.remote - before.remote, remote); + EXPECT_EQ(after.peer - before.peer, peer); + EXPECT_EQ(after.total - before.total, local + remote + peer); +} + +class FailAfterOffsetFileReader : public FileReader { +public: + FailAfterOffsetFileReader(FileReaderSPtr reader, size_t fail_offset) + : _reader(std::move(reader)), _fail_offset(fail_offset) {} + ~FailAfterOffsetFileReader() override = default; + + void set_fail(bool fail) { _fail = fail; } + + Status close() override { return _reader->close(); } + + const Path& path() const override { return _reader->path(); } + + size_t size() const override { return _reader->size(); } + + bool closed() const override { return _reader->closed(); } + + int64_t mtime() const override { return _reader->mtime(); } + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) override { + if (_fail && offset >= _fail_offset) { + *bytes_read = 0; + return Status::IOError("inject remote read failure at offset {}", offset); + } + return _reader->read_at(offset, result, bytes_read, io_ctx); + } + +private: + FileReaderSPtr _reader; + size_t _fail_offset; + bool _fail = false; +}; + void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr file_block, const io::FileBlock::Range& expected_range, io::FileBlock::State expected_state) { auto range = file_block->range(); @@ -3625,6 +3734,136 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) { extern bvar::Adder g_read_cache_self_heal_on_not_found; +TEST_F(BlockFileCacheTest, cached_remote_file_reader_warmup_does_not_update_global_metrics) { + bool origin_enable_direct_read = config::enable_read_cache_file_directly; + config::enable_read_cache_file_directly = false; + Defer reset_direct_read { + [&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }}; + + std::string cache_base_path = caches_dir / "cached_remote_reader_warmup_metrics" / ""; + BlockFileCache* cache = nullptr; + ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, &cache).ok()); + Defer cleanup_cache {[&] { cleanup_cached_remote_reader_cache(cache_base_path); }}; + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + opts.tablet_id = 10086; + CachedRemoteFileReader reader(local_reader, opts); + + std::string buffer(64_kb, '\0'); + size_t bytes_read = 0; + + auto before_normal_read = get_file_cache_metric_snapshot(); + FileCacheStatistics normal_stats; + IOContext normal_ctx; + normal_ctx.file_cache_stats = &normal_stats; + ASSERT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &normal_ctx).ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(std::string(64_kb, '0'), buffer); + EXPECT_EQ(normal_stats.bytes_read_from_local, 0); + EXPECT_EQ(normal_stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(normal_stats.bytes_read_from_peer, 0); + EXPECT_EQ(normal_stats.num_local_io_total, 0); + EXPECT_EQ(normal_stats.num_remote_io_total, 1); + auto after_normal_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_normal_read, after_normal_read, 0, 64_kb, 0); + + auto before_warmup_miss = get_file_cache_metric_snapshot(); + FileCacheStatistics warmup_miss_stats; + IOContext warmup_miss_ctx; + warmup_miss_ctx.file_cache_stats = &warmup_miss_stats; + warmup_miss_ctx.is_warmup = true; + buffer.assign(64_kb, '\0'); + ASSERT_TRUE( + reader.read_at(2_mb, Slice(buffer.data(), buffer.size()), &bytes_read, &warmup_miss_ctx) + .ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(std::string(64_kb, '2'), buffer); + EXPECT_EQ(warmup_miss_stats.bytes_read_from_local, 0); + EXPECT_EQ(warmup_miss_stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(warmup_miss_stats.bytes_read_from_peer, 0); + EXPECT_EQ(warmup_miss_stats.num_remote_io_total, 1); + auto after_warmup_miss = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_warmup_miss, after_warmup_miss, 0, 0, 0); + + auto before_warmup_hit = get_file_cache_metric_snapshot(); + FileCacheStatistics warmup_hit_stats; + IOContext warmup_hit_ctx; + warmup_hit_ctx.file_cache_stats = &warmup_hit_stats; + warmup_hit_ctx.is_warmup = true; + buffer.assign(64_kb, '\0'); + ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &warmup_hit_ctx) + .ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(std::string(64_kb, '0'), buffer); + EXPECT_EQ(warmup_hit_stats.bytes_read_from_local, 64_kb); + EXPECT_EQ(warmup_hit_stats.bytes_read_from_remote, 0); + EXPECT_EQ(warmup_hit_stats.bytes_read_from_peer, 0); + EXPECT_EQ(warmup_hit_stats.num_local_io_total, 1); + auto after_warmup_hit = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_warmup_hit, after_warmup_hit, 0, 0, 0); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); +} + +TEST_F(BlockFileCacheTest, cached_remote_file_reader_mixed_hit_miss_stats_are_split_by_bytes) { + bool origin_enable_direct_read = config::enable_read_cache_file_directly; + config::enable_read_cache_file_directly = false; + Defer reset_direct_read { + [&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }}; + + std::string cache_base_path = caches_dir / "cached_remote_reader_mixed_hit_miss_stats" / ""; + BlockFileCache* cache = nullptr; + ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, &cache).ok()); + Defer cleanup_cache {[&] { cleanup_cached_remote_reader_cache(cache_base_path); }}; + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + opts.tablet_id = 10086; + CachedRemoteFileReader reader(local_reader, opts); + + std::string buffer(64_kb, '\0'); + size_t bytes_read = 0; + FileCacheStatistics prime_stats; + IOContext prime_ctx; + prime_ctx.file_cache_stats = &prime_stats; + ASSERT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &prime_ctx).ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(prime_stats.bytes_read_from_remote, 64_kb); + + auto before_mixed_read = get_file_cache_metric_snapshot(); + FileCacheStatistics mixed_stats; + IOContext mixed_ctx; + mixed_ctx.file_cache_stats = &mixed_stats; + buffer.assign(64_kb, '\0'); + ASSERT_TRUE(reader.read_at(1_mb - 32_kb, Slice(buffer.data(), buffer.size()), &bytes_read, + &mixed_ctx) + .ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(buffer.substr(0, 32_kb), std::string(32_kb, '0')); + EXPECT_EQ(buffer.substr(32_kb), std::string(32_kb, '1')); + EXPECT_EQ(mixed_stats.bytes_read_from_local, 32_kb); + EXPECT_EQ(mixed_stats.bytes_read_from_remote, 32_kb); + EXPECT_EQ(mixed_stats.bytes_read_from_peer, 0); + EXPECT_EQ(mixed_stats.num_local_io_total, 1); + EXPECT_EQ(mixed_stats.num_remote_io_total, 1); + EXPECT_EQ(mixed_stats.num_peer_io_total, 0); + auto after_mixed_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_mixed_read, after_mixed_read, 32_kb, 32_kb, 0); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); +} + TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not_found) { bool origin_enable_direct_read = config::enable_read_cache_file_directly; config::enable_read_cache_file_directly = false; @@ -3650,6 +3889,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not settings.max_query_cache_size = 0; ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok()); auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path]; + Defer cleanup_cache {[&] { cleanup_cached_remote_reader_cache(cache_base_path); }}; for (int i = 0; i < 100; i++) { if (cache->get_async_open_success()) { break; @@ -3659,11 +3899,12 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not FileReaderSPtr local_reader; ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + auto memory_reader = std::make_shared(local_reader); io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; opts.tablet_id = 10086; - CachedRemoteFileReader reader(local_reader, opts); + CachedRemoteFileReader reader(memory_reader, opts); uint64_t before_self_heal = g_read_cache_self_heal_on_not_found.get_value(); @@ -3674,25 +3915,39 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not size_t bytes_read = 0; ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); EXPECT_EQ(std::string(64_kb, '0'), buffer); + EXPECT_EQ(stats.bytes_read_from_local, 0); + EXPECT_EQ(stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(stats.bytes_read_from_peer, 0); auto key = io::BlockFileCache::hash("tmp_file"); - { - io::CacheContext inspect_ctx; - ReadStatistics inspect_stats; - inspect_ctx.stats = &inspect_stats; - inspect_ctx.cache_type = io::FileCacheType::NORMAL; - auto inspect_holder = cache->get_or_set(key, 0, 64_kb, inspect_ctx); - auto inspect_blocks = fromHolder(inspect_holder); - ASSERT_EQ(inspect_blocks.size(), 1); - ASSERT_EQ(inspect_blocks[0]->state(), io::FileBlock::State::DOWNLOADED); - std::string cache_file = inspect_blocks[0]->get_cache_file(); - ASSERT_TRUE(fs::exists(cache_file)); - ASSERT_TRUE(global_local_filesystem()->delete_file(cache_file).ok()); - ASSERT_FALSE(fs::exists(cache_file)); - } - ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); + auto* sp = SyncPoint::get_instance(); + sp->enable_processing(); + Defer defer {[&] { + sp->clear_call_back("LocalFileReader::read_at_impl"); + sp->disable_processing(); + }}; + sp->set_call_back("LocalFileReader::read_at_impl", [&](auto&& values) { + std::pair* pair = try_any_cast*>(values.back()); + pair->first = Status::NotFound("inject cache file not found"); + pair->second = true; + }); + + auto before_fallback_read = get_file_cache_metric_snapshot(); + FileCacheStatistics fallback_stats; + IOContext fallback_ctx; + fallback_ctx.file_cache_stats = &fallback_stats; + ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &fallback_ctx) + .ok()); EXPECT_EQ(std::string(64_kb, '0'), buffer); + EXPECT_EQ(fallback_stats.bytes_read_from_local, 0); + EXPECT_EQ(fallback_stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(fallback_stats.bytes_read_from_peer, 0); + EXPECT_EQ(fallback_stats.num_local_io_total, 0); + EXPECT_EQ(fallback_stats.num_remote_io_total, 1); + EXPECT_EQ(fallback_stats.num_peer_io_total, 0); + auto after_fallback_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_fallback_read, after_fallback_read, 0, 64_kb, 0); bool self_healed = false; for (int i = 0; i < 100; ++i) { @@ -3713,13 +3968,6 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not EXPECT_TRUE(reader.close().ok()); EXPECT_TRUE(reader.closed()); - std::this_thread::sleep_for(std::chrono::seconds(1)); - if (fs::exists(cache_base_path)) { - fs::remove_all(cache_base_path); - } - FileCacheFactory::instance()->_caches.clear(); - FileCacheFactory::instance()->_path_to_cache.clear(); - FileCacheFactory::instance()->_capacity = 0; } TEST_F(BlockFileCacheTest, cached_remote_file_reader_no_self_heal_on_non_not_found_error) { @@ -3810,6 +4058,60 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_no_self_heal_on_non_not_fou FileCacheFactory::instance()->_capacity = 0; } +TEST_F(BlockFileCacheTest, cached_remote_file_reader_failed_read_does_not_update_metrics) { + bool origin_enable_direct_read = config::enable_read_cache_file_directly; + config::enable_read_cache_file_directly = true; + Defer reset_direct_read { + [&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }}; + + std::string cache_base_path = caches_dir / "cached_remote_reader_failed_read_metrics" / ""; + BlockFileCache* cache = nullptr; + ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, &cache).ok()); + Defer cleanup_cache {[&] { cleanup_cached_remote_reader_cache(cache_base_path); }}; + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + auto remote_reader = std::make_shared(local_reader); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + opts.tablet_id = 10086; + auto failing_remote_reader = + std::make_shared(remote_reader, static_cast(1_mb)); + CachedRemoteFileReader reader(failing_remote_reader, opts); + + std::string buffer(64_kb, '\0'); + size_t bytes_read = 0; + FileCacheStatistics prime_stats; + IOContext prime_ctx; + prime_ctx.file_cache_stats = &prime_stats; + ASSERT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &prime_ctx).ok()); + EXPECT_EQ(bytes_read, 64_kb); + EXPECT_EQ(prime_stats.bytes_read_from_remote, 64_kb); + EXPECT_EQ(cache->_cur_cache_size, 1_mb); + + failing_remote_reader->set_fail(true); + + auto before_failed_read = get_file_cache_metric_snapshot(); + FileCacheStatistics failed_stats; + IOContext failed_ctx; + failed_ctx.file_cache_stats = &failed_stats; + buffer.assign(64_kb, '\0'); + auto st = reader.read_at(1_mb - 100, Slice(buffer.data(), buffer.size()), &bytes_read, + &failed_ctx); + ASSERT_FALSE(st.ok()); + auto after_failed_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_failed_read, after_failed_read, 0, 0, 0); + EXPECT_EQ(buffer.substr(0, 100), std::string(100, '0')); + EXPECT_EQ(failed_stats.bytes_read_from_local, 0); + EXPECT_EQ(failed_stats.bytes_read_from_remote, 0); + EXPECT_EQ(failed_stats.bytes_read_from_peer, 0); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); +} + TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) { std::string cache_base_path = caches_dir / "cached_remote_file_reader_init" / ""; if (fs::exists(cache_base_path)) { @@ -7908,7 +8210,10 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) { config::enable_evict_file_cache_in_advance = false; config::file_cache_enter_disk_resource_limit_mode_percent = 99; + bool origin_enable_direct_read = config::enable_read_cache_file_directly; config::enable_read_cache_file_directly = true; + Defer reset_direct_read { + [&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }}; if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -7975,10 +8280,22 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) { 64_kb); // try to read first two blocks + FileCacheStatistics partial_stats; + IOContext partial_io_ctx; + partial_io_ctx.file_cache_stats = &partial_stats; + auto before_partial_read = get_file_cache_metric_snapshot(); ASSERT_TRUE(reader->read_at(1048576 - 100, Slice(buffer.data(), buffer.size()), &bytes_read, - &io_ctx) + &partial_io_ctx) .ok()); std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_EQ(partial_stats.bytes_read_from_local, 100); + EXPECT_EQ(partial_stats.bytes_read_from_remote, 64_kb - 100); + EXPECT_EQ(partial_stats.bytes_read_from_peer, 0); + EXPECT_EQ(partial_stats.num_local_io_total, 1); + EXPECT_EQ(partial_stats.num_remote_io_total, 1); + EXPECT_EQ(partial_stats.num_peer_io_total, 0); + auto after_partial_read = get_file_cache_metric_snapshot(); + expect_file_cache_metric_delta(before_partial_read, after_partial_read, 100, 64_kb - 100, 0); EXPECT_EQ(cache->_cur_cache_size, 2097152); EXPECT_EQ(g_read_cache_direct_partial_num.get_value() - org_g_read_cache_direct_partial_num, 1); EXPECT_EQ(g_read_cache_direct_partial_bytes.get_value() - org_g_read_cache_direct_partial_bytes, diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp b/be/test/io/cache/block_file_cache_test_lru_dump.cpp index a5d06b5abbc3a2..99253f50d34ed5 100644 --- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp +++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp @@ -19,14 +19,20 @@ // and modified by Doris #include "io/cache/block_file_cache_test_common.h" +#include "util/defer_op.h" namespace doris::io { TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_tail_record_num {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; config::enable_evict_file_cache_in_advance = false; config::file_cache_enter_disk_resource_limit_mode_percent = 99; config::file_cache_background_lru_dump_interval_ms = 3000; config::file_cache_background_lru_dump_update_cnt_threshold = 0; + config::file_cache_background_lru_dump_tail_record_num = 5000000; if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -156,10 +162,11 @@ TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 500000); // all queue are filled, let's check the lru log records - ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5); - ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 5); - ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 5); - ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::TTL), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::INDEX), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::DISPOSABLE), 5); + ASSERT_EQ(cache._lru_recorder->get_total_lru_log_queue_size(), 20); // then check the log replay std::this_thread::sleep_for(std::chrono::milliseconds( @@ -175,10 +182,10 @@ TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { context2); // move index queue 3rd element to the end cache.remove_if_cached(key3); // remove all element from ttl queue } - ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5); - ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 1); - ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 0); - ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 0); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::TTL), 5); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::INDEX), 1); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 0); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::DISPOSABLE), 0); std::this_thread::sleep_for(std::chrono::milliseconds( 2 * config::file_cache_background_lru_log_replay_interval_ms)); @@ -401,11 +408,58 @@ TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { } } +TEST_F(BlockFileCacheTest, test_lru_log_replay_bound_and_disable_record) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_tail_record_num {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; + + io::FileCacheSettings settings; + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 5000000; + settings.query_queue_elements = 50000; + settings.index_queue_size = 5000000; + settings.index_queue_elements = 50000; + settings.disposable_queue_size = 5000000; + settings.disposable_queue_elements = 50000; + settings.capacity = 20000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + config::file_cache_background_lru_dump_tail_record_num = 100; + io::BlockFileCache cache(cache_base_path, settings); + auto hash = io::BlockFileCache::hash("bounded-replay-key"); + + for (size_t i = 0; i < 5; ++i) { + cache._lru_recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, + i * 100, 100); + } + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 5); + ASSERT_EQ(cache._lru_recorder->replay_queue_event(FileCacheType::NORMAL, 2), 2); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 3); + ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 2); + + ASSERT_EQ(cache._lru_recorder->replay_queue_event(FileCacheType::NORMAL), 3); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 0); + ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 5); + + config::file_cache_background_lru_dump_tail_record_num = 0; + cache._lru_recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 600, + 100); + ASSERT_EQ(cache._lru_recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 0); +} + TEST_F(BlockFileCacheTest, test_lru_duplicate_queue_entry_restore) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_tail_record_num {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; config::enable_evict_file_cache_in_advance = false; config::file_cache_enter_disk_resource_limit_mode_percent = 99; config::file_cache_background_lru_dump_interval_ms = 3000; config::file_cache_background_lru_dump_update_cnt_threshold = 0; + config::file_cache_background_lru_dump_tail_record_num = 5000000; if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } diff --git a/be/test/io/cache/cache_lru_dumper_test.cpp b/be/test/io/cache/cache_lru_dumper_test.cpp index 76647ba544fa05..3825cfd4bb3d3e 100644 --- a/be/test/io/cache/cache_lru_dumper_test.cpp +++ b/be/test/io/cache/cache_lru_dumper_test.cpp @@ -19,11 +19,13 @@ #include +#include "common/config.h" #include "gmock/gmock.h" #include "gtest/gtest.h" #include "io/cache/block_file_cache.h" #include "io/cache/file_block.h" #include "io/cache/file_cache_common.h" +#include "util/defer_op.h" using ::testing::_; using ::testing::Return; @@ -140,7 +142,7 @@ TEST_F(CacheLRUDumperTest, test_dump_and_restore_queue) { src_queue.add(hash, offset, size, lock); // Test dump - dumper->do_dump_queue(src_queue, queue_name); + ASSERT_TRUE(dumper->do_dump_queue(src_queue, queue_name).ok()); // Test restore std::lock_guard cache_lock(mock_cache->mutex()); @@ -158,4 +160,100 @@ TEST_F(CacheLRUDumperTest, test_dump_and_restore_queue) { } } -} // namespace doris::io \ No newline at end of file +TEST_F(CacheLRUDumperTest, test_dump_queue_drains_pending_logs_before_reset_counter) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + auto origin_update_cnt_threshold = config::file_cache_background_lru_dump_update_cnt_threshold; + Defer restore_config {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + config::file_cache_background_lru_dump_update_cnt_threshold = origin_update_cnt_threshold; + }}; + config::file_cache_background_lru_dump_tail_record_num = 100; + config::file_cache_background_lru_dump_update_cnt_threshold = 0; + + UInt128Wrapper hash(987654321ULL); + recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 2048, 8192); + ASSERT_EQ(recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 1); + ASSERT_EQ(recorder->get_shadow_queue(FileCacheType::NORMAL).get_elements_num_unsafe(), 0); + ASSERT_EQ(recorder->get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 1); + + dumper->dump_queue("normal", false); + + EXPECT_EQ(recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 0); + EXPECT_EQ(recorder->get_shadow_queue(FileCacheType::NORMAL).get_elements_num_unsafe(), 1); + EXPECT_EQ(recorder->get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 0); + + std::lock_guard cache_lock(mock_cache->mutex()); + dumper->restore_queue(dst_queue, "normal", cache_lock); + ASSERT_EQ(dst_queue.get_elements_num_unsafe(), 1); + auto it = dst_queue.begin(); + EXPECT_EQ(it->hash, hash); + EXPECT_EQ(it->offset, 2048); + EXPECT_EQ(it->size, 8192); +} + +TEST_F(CacheLRUDumperTest, test_dump_counter_subtract_keeps_new_updates) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_config {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; + config::file_cache_background_lru_dump_tail_record_num = 100; + + UInt128Wrapper hash(123123123ULL); + recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 4096, 16384); + + size_t dump_update_cnt = 0; + std::vector elements; + { + std::lock_guard lru_log_lock(recorder->_mutex_lru_log); + recorder->replay_queue_event_locked(FileCacheType::NORMAL, 0, lru_log_lock); + elements = dumper->collect_lru_queue_entries_locked( + recorder->get_shadow_queue(FileCacheType::NORMAL), lru_log_lock); + dump_update_cnt = recorder->get_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType::NORMAL, lru_log_lock); + } + + recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::MOVETOBACK, hash, 4096, + 16384); + ASSERT_EQ(recorder->get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), + dump_update_cnt + 1); + + ASSERT_TRUE(dumper->do_dump_queue(elements, "normal").ok()); + { + std::lock_guard lru_log_lock(recorder->_mutex_lru_log); + recorder->subtract_lru_queue_update_cnt_from_last_dump_locked( + FileCacheType::NORMAL, dump_update_cnt, lru_log_lock); + } + + EXPECT_EQ(recorder->get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 1); + EXPECT_EQ(recorder->get_lru_log_queue_size(FileCacheType::NORMAL), 1); +} + +TEST_F(CacheLRUDumperTest, test_lru_recorder_drops_logs_after_hard_cap) { + auto origin_tail_record_num = config::file_cache_background_lru_dump_tail_record_num; + Defer restore_config {[&]() { + config::file_cache_background_lru_dump_tail_record_num = origin_tail_record_num; + }}; + config::file_cache_background_lru_dump_tail_record_num = 100; + + LRUQueueRecorder capped_recorder(mock_cache.get(), 2); + UInt128Wrapper hash(456456456ULL); + + capped_recorder.record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 0, 100); + capped_recorder.record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 100, 100); + capped_recorder.record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 200, 100); + + EXPECT_EQ(capped_recorder.get_total_lru_log_queue_size(), 2); + EXPECT_EQ(capped_recorder.get_lru_log_queue_size(FileCacheType::NORMAL), 2); + EXPECT_EQ(capped_recorder.get_dropped_lru_log_count(), 1); + EXPECT_EQ(capped_recorder.get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 2); + + EXPECT_EQ(capped_recorder.replay_queue_event(FileCacheType::NORMAL, 1), 1); + EXPECT_EQ(capped_recorder.get_total_lru_log_queue_size(), 1); + + capped_recorder.record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, hash, 300, 100); + EXPECT_EQ(capped_recorder.get_total_lru_log_queue_size(), 2); + EXPECT_EQ(capped_recorder.get_dropped_lru_log_count(), 1); + EXPECT_EQ(capped_recorder.get_lru_queue_update_cnt_from_last_dump(FileCacheType::NORMAL), 3); +} + +} // namespace doris::io diff --git a/be/test/io/cache/need_update_lru_blocks_test.cpp b/be/test/io/cache/need_update_lru_blocks_test.cpp index ca9419a3351e38..3cbdb9aedb8663 100644 --- a/be/test/io/cache/need_update_lru_blocks_test.cpp +++ b/be/test/io/cache/need_update_lru_blocks_test.cpp @@ -59,6 +59,31 @@ TEST(NeedUpdateLRUBlocksTest, InsertRejectsNullAndDeduplicates) { EXPECT_EQ(1, pending.size()); } +TEST(NeedUpdateLRUBlocksTest, InsertDropsNewBlocksAfterHardCap) { + NeedUpdateLRUBlocks pending(2); + auto block0 = create_block(0); + auto block1 = create_block(1); + auto block2 = create_block(2); + + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::INSERTED, pending.insert_with_result(block0)); + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::INSERTED, pending.insert_with_result(block1)); + EXPECT_EQ(2u, pending.size()); + + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::DUPLICATED, pending.insert_with_result(block0)); + EXPECT_EQ(2u, pending.size()); + EXPECT_EQ(0u, pending.dropped()); + + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::DROPPED, pending.insert_with_result(block2)); + EXPECT_EQ(2u, pending.size()); + EXPECT_EQ(1u, pending.dropped()); + + std::vector drained; + ASSERT_EQ(1u, pending.drain(1, &drained)); + EXPECT_EQ(1u, pending.size()); + EXPECT_EQ(NeedUpdateLRUBlocks::InsertResult::INSERTED, pending.insert_with_result(block2)); + EXPECT_EQ(2u, pending.size()); +} + TEST(NeedUpdateLRUBlocksTest, DrainHandlesZeroLimitAndNullOutput) { NeedUpdateLRUBlocks pending; insert_blocks(&pending, 3); diff --git a/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy b/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy new file mode 100644 index 00000000000000..e2fc6d6fe65205 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy @@ -0,0 +1,457 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper +import java.net.ServerSocket + +suite('test_file_cache_warmup_read_metrics_docker', 'docker') { + if (!isCloudMode()) { + return + } + + def allocatePort = { + def socket = new ServerSocket(0) + def port = socket.localPort + socket.close() + return port + } + def minioPort = allocatePort() + def minioBucket = "test-bucket" + def minioContainer = "doris-file-cache-warmup-metrics-minio-${System.currentTimeMillis()}" + + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'fetch_cluster_cache_hotspot_interval_ms=1000', + 'heartbeat_interval_second=1', + 'cloud_warm_up_for_rebalance_type=async_warmup', + 'cloud_pre_heating_time_limit_sec=180', + 'auto_check_statistics_in_minutes=60', + ] + options.beConfigs += [ + 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'enable_evict_file_cache_in_advance=false', + 'file_cache_background_monitor_interval_ms=1000', + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'disable_segment_cache=true', + 'disable_storage_page_cache=true', + 'enable_cache_read_from_peer=false', + ] + options.cloudStoreConfigs += [ + 'DORIS_CLOUD_USER=minio', + 'DORIS_CLOUD_AK=minioadmin', + 'DORIS_CLOUD_SK=minioadmin', + "DORIS_CLOUD_BUCKET=${minioBucket}", + "DORIS_CLOUD_ENDPOINT=host.docker.internal:${minioPort}", + "DORIS_CLOUD_EXTERNAL_ENDPOINT=host.docker.internal:${minioPort}", + 'DORIS_CLOUD_REGION=us-east-1', + 'DORIS_CLOUD_PROVIDER=S3', + ] + options.extraHosts += [ + 'host.docker.internal:host-gateway', + "${minioBucket}.host.docker.internal:host-gateway", + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + + def computeCluster = "compute_cluster" + def sourceCluster = "metrics_warmup_source" + def targetCluster = "metrics_warmup_target" + def payload = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + + def waitForCondition = { condition, timeoutMs, message -> + long start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < timeoutMs) { + if (condition()) { + return + } + sleep(1000) + } + if (!condition()) { + throw new RuntimeException(message) + } + } + + def startMinio = { + cmd """docker rm -f ${minioContainer} >/dev/null 2>&1 || true""" + cmd """ + docker run -d --name ${minioContainer} \ + -p ${minioPort}:9000 \ + -e MINIO_ROOT_USER=minioadmin \ + -e MINIO_ROOT_PASSWORD=minioadmin \ + -e MINIO_DOMAIN=host.docker.internal \ + minio/minio:RELEASE.2024-11-07T00-52-20Z \ + server /data --console-address ':9001' + """ + waitForCondition({ + try { + new URL("http://127.0.0.1:${minioPort}/minio/health/ready").text + return true + } catch (Throwable ignored) { + return false + } + }, 60000, "MinIO did not become ready") + cmd """ + docker exec ${minioContainer} sh -c \ + 'mc alias set local http://localhost:9000 minioadmin minioadmin && mc mb -p local/${minioBucket}' + """ + } + + def stopMinio = { + cmd """docker rm -f ${minioContainer} >/dev/null 2>&1 || true""" + } + + def getClusterBackends = { clusterName -> + def backends = sql """SHOW BACKENDS""" + def clusterBackends = backends.findAll { + it[19].contains("""\"compute_group_name\" : \"${clusterName}\"""") + } + assertTrue(clusterBackends.size() > 0, "No backend found for cluster ${clusterName}") + return clusterBackends + } + + def getPromMetric = { ip, httpPort, name -> + def metrics = new URL("http://${ip}:${httpPort}/metrics").text + def matcher = metrics =~ ~"(?m)^${name}\\s+([0-9]+(?:\\.[0-9]+)?)" + if (matcher.find()) { + return new BigDecimal(matcher[0][1]).longValue() + } + logger.info("${name} not found in /metrics for ${ip}:${httpPort}, treat it as 0") + return 0L + } + + def getBrpcMetric = { ip, brpcPort, name -> + def metrics = new URL("http://${ip}:${brpcPort}/brpc_metrics").text + def matcher = metrics =~ ~"(?m)^.*${name}\\s+([0-9]+)" + if (matcher.find()) { + return matcher[0][1] as long + } + logger.info("${name} not found in /brpc_metrics for ${ip}:${brpcPort}, treat it as 0") + return 0L + } + + def getBackendReadMetrics = { be -> + def ip = be[1] + def httpPort = be[4] + return [ + total : getPromMetric(ip, httpPort, "doris_be_num_io_bytes_read_total"), + local : getPromMetric(ip, httpPort, "doris_be_num_io_bytes_read_from_cache"), + remote: getPromMetric(ip, httpPort, "doris_be_num_io_bytes_read_from_remote"), + peer : getPromMetric(ip, httpPort, "doris_be_num_io_bytes_read_from_peer"), + ] + } + + def getClusterReadMetrics = { clusterName -> + def result = [total: 0L, local: 0L, remote: 0L, peer: 0L] + getClusterBackends(clusterName).each { be -> + def metrics = getBackendReadMetrics(be) + result.total += metrics.total + result.local += metrics.local + result.remote += metrics.remote + result.peer += metrics.peer + } + return result + } + + def getMetricDelta = { before, after -> + return [ + total : after.total - before.total, + local : after.local - before.local, + remote: after.remote - before.remote, + peer : after.peer - before.peer, + ] + } + + def assertMetricInvariant = { metrics -> + assertEquals(metrics.total, metrics.local + metrics.remote + metrics.peer) + } + + def assertNoReadMetricDelta = { before, after, label -> + def delta = getMetricDelta(before, after) + logger.info("${label} read metric delta: ${delta}") + assertEquals(0L, delta.total) + assertEquals(0L, delta.local) + assertEquals(0L, delta.remote) + assertEquals(0L, delta.peer) + } + + def clearFileCache = { ip, httpPort -> + def response = new URL("http://${ip}:${httpPort}/api/file_cache?op=clear&sync=true").text + def json = new JsonSlurper().parseText(response) + if (json.status != "OK") { + throw new RuntimeException("Clear cache on ${ip}:${httpPort} failed: ${json.status}") + } + } + + def clearFileCacheOnCluster = { clusterName -> + getClusterBackends(clusterName).each { be -> + clearFileCache(be[1], be[4]) + } + sleep(5000) + } + + def getBackendCacheSize = { be -> + return getBrpcMetric(be[1], be[5], "cache_cache_size") + } + + def getClusterCacheSizeSum = { clusterName -> + long sum = 0 + getClusterBackends(clusterName).each { be -> + sum += getBackendCacheSize(be) + } + return sum + } + + def getClusterBrpcMetricSum = { clusterName, metricName -> + long sum = 0 + getClusterBackends(clusterName).each { be -> + sum += getBrpcMetric(be[1], be[5], metricName) + } + return sum + } + + def createTable = { table, buckets -> + sql """DROP TABLE IF EXISTS ${table}""" + sql """ + CREATE TABLE ${table} ( + k1 INT, + v1 VARCHAR(4096) + ) + DUPLICATE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS ${buckets} + PROPERTIES ( + "replication_num" = "1", + "file_cache_ttl_seconds" = "3600" + ) + """ + } + + def loadRows = { table, rowStart, rowCount -> + int batchSize = 100 + for (int begin = 0; begin < rowCount; begin += batchSize) { + int end = Math.min(rowCount, begin + batchSize) + def values = (begin.. + int key = rowStart + idx + return "(${key}, '${payload}_${key}_${payload}')" + }.join(",") + sql """INSERT INTO ${table} VALUES ${values}""" + } + sql """sync""" + } + + def queryTable = { table -> + def result = sql """SELECT SUM(LENGTH(v1)) FROM ${table} WHERE k1 >= 0""" + assertTrue((result[0][0] as String).toLong() > 0) + } + + def waitWarmupFinished = { jobId -> + waitForCondition({ + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + if (jobInfo.size() == 0) { + return false + } + def state = jobInfo[0][3].toString() + if (state == "CANCELLED") { + throw new RuntimeException("Warm up job ${jobId} was cancelled") + } + return state == "FINISHED" + }, 120000, "Warm up job ${jobId} did not finish") + } + + def cancelWarmupJob = { jobId -> + sql """CANCEL WARM UP JOB WHERE ID = ${jobId}""" + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals("CANCELLED", jobInfo[0][3]) + } + + try { + startMinio() + + docker(options) { + cluster.addBackend(1, sourceCluster) + cluster.addBackend(1, targetCluster) + + sql """SET enable_file_cache = true""" + sql """SET enable_sql_cache = false""" + sql """SET enable_query_cache = false""" + + sql """use @${computeCluster}""" + def manualTable = "test_file_cache_metrics_manual" + createTable(manualTable, 1) + loadRows(manualTable, 1, 300) + clearFileCacheOnCluster(computeCluster) + + def beforeQueryMiss = getClusterReadMetrics(computeCluster) + queryTable(manualTable) + waitForCondition({ + def delta = getMetricDelta(beforeQueryMiss, getClusterReadMetrics(computeCluster)) + return delta.total > 0 && delta.remote > 0 + }, 30000, "Normal query miss did not increase global remote read metrics") + def queryMissDelta = getMetricDelta(beforeQueryMiss, getClusterReadMetrics(computeCluster)) + logger.info("query miss read metric delta: ${queryMissDelta}") + assertMetricInvariant(queryMissDelta) + assertTrue(queryMissDelta.remote > 0) + + clearFileCacheOnCluster(computeCluster) + def beforeManualWarmup = getClusterReadMetrics(computeCluster) + def manualJob = sql """WARM UP CLUSTER ${computeCluster} WITH TABLE ${manualTable}""" + waitWarmupFinished(manualJob[0][0]) + waitForCondition({ + return getClusterCacheSizeSum(computeCluster) > 0 + }, 30000, "Manual table warm up did not populate file cache") + def afterManualWarmup = getClusterReadMetrics(computeCluster) + assertNoReadMetricDelta(beforeManualWarmup, afterManualWarmup, "manual table warm up") + + def beforeQueryHit = getClusterReadMetrics(computeCluster) + queryTable(manualTable) + waitForCondition({ + def delta = getMetricDelta(beforeQueryHit, getClusterReadMetrics(computeCluster)) + return delta.total > 0 && delta.local > 0 + }, 30000, "Query after warm up did not increase global local read metrics") + def queryHitDelta = getMetricDelta(beforeQueryHit, getClusterReadMetrics(computeCluster)) + logger.info("query hit read metric delta: ${queryHitDelta}") + assertMetricInvariant(queryHitDelta) + assertTrue(queryHitDelta.local > 0) + + sql """use @${sourceCluster}""" + sql """TRUNCATE TABLE __internal_schema.cloud_cache_hotspot""" + def periodicTable = "test_file_cache_metrics_periodic" + createTable(periodicTable, 1) + loadRows(periodicTable, 1000, 300) + clearFileCacheOnCluster(sourceCluster) + clearFileCacheOnCluster(targetCluster) + + def beforePeriodicWarmup = getClusterReadMetrics(targetCluster) + def periodicJob = sql """ + WARM UP CLUSTER ${targetCluster} WITH CLUSTER ${sourceCluster} + PROPERTIES ( + "sync_mode" = "periodic", + "sync_interval_sec" = "1" + ) + """ + for (int i = 0; i < 120; i++) { + queryTable(periodicTable) + } + waitForCondition({ + return getClusterCacheSizeSum(sourceCluster) > 0 && + getClusterCacheSizeSum(targetCluster) > 0 + }, 90000, "Periodic warm up did not populate target file cache") + def afterPeriodicWarmup = getClusterReadMetrics(targetCluster) + assertNoReadMetricDelta(beforePeriodicWarmup, afterPeriodicWarmup, + "periodic cluster warm up") + cancelWarmupJob(periodicJob[0][0]) + + def eventTable = "test_file_cache_metrics_event" + createTable(eventTable, 1) + clearFileCacheOnCluster(sourceCluster) + clearFileCacheOnCluster(targetCluster) + def beforeEventWarmup = getClusterReadMetrics(targetCluster) + def eventJob = sql """ + WARM UP CLUSTER ${targetCluster} WITH CLUSTER ${sourceCluster} + PROPERTIES ( + "sync_mode" = "event_driven", + "sync_event" = "load" + ) + """ + waitForCondition({ + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${eventJob[0][0]}""" + return jobInfo.size() > 0 && jobInfo[0][3].toString() in ["RUNNING", "PENDING"] + }, 30000, "Event-driven warm up job did not enter running state") + sleep(15000) + def beforeEventSubmitted = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_submitted_segment_num") + def beforeEventFinished = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_finished_segment_num") + def beforeEventFailed = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_failed_segment_num") + loadRows(eventTable, 2000, 300) + waitForCondition({ + def submitted = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_submitted_segment_num") + def finished = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_finished_segment_num") + return submitted > beforeEventSubmitted && finished >= submitted + }, 90000, "Event-driven warm up did not finish target segment downloads") + def afterEventSubmitted = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_submitted_segment_num") + def afterEventFinished = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_finished_segment_num") + def afterEventFailed = getClusterBrpcMetricSum(targetCluster, + "file_cache_event_driven_warm_up_failed_segment_num") + logger.info("event-driven warm up segment metrics: submitted ${beforeEventSubmitted}" + + " -> ${afterEventSubmitted}, finished ${beforeEventFinished}" + + " -> ${afterEventFinished}, failed ${beforeEventFailed}" + + " -> ${afterEventFailed}") + assertEquals(beforeEventFailed, afterEventFailed) + def afterEventWarmup = getClusterReadMetrics(targetCluster) + assertNoReadMetricDelta(beforeEventWarmup, afterEventWarmup, + "event-driven cluster warm up") + cancelWarmupJob(eventJob[0][0]) + + sql """use @${computeCluster}""" + def rebalanceTable = "test_file_cache_metrics_rebalance" + createTable(rebalanceTable, 8) + loadRows(rebalanceTable, 3000, 500) + clearFileCacheOnCluster(computeCluster) + def beforeRebalanceSourceQuery = getClusterReadMetrics(computeCluster) + queryTable(rebalanceTable) + waitForCondition({ + def delta = getMetricDelta(beforeRebalanceSourceQuery, + getClusterReadMetrics(computeCluster)) + return delta.total > 0 && delta.remote > 0 + }, 30000, "Rebalance source query did not populate source file cache") + + def beforeBackendIds = getClusterBackends(computeCluster).collect { it[0].toString() } as Set + def beforeRebalanceWarmup = getClusterReadMetrics(computeCluster) + cluster.addBackend(1, computeCluster) + def newBackendHolder = [:] + waitForCondition({ + def added = getClusterBackends(computeCluster).find { + !beforeBackendIds.contains(it[0].toString()) + } + if (added != null) { + newBackendHolder.be = added + return true + } + return false + }, 30000, "New backend was not added to ${computeCluster}") + + waitForCondition({ + def distribution = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM ${rebalanceTable}""" + return distribution.any { + it.BackendId.toString() == newBackendHolder.be[0].toString() && + Integer.valueOf(it.ReplicaNum.toString()) > 0 + } + }, 180000, "Rebalance did not move any replica to the new backend") + + waitForCondition({ + return getBackendCacheSize(newBackendHolder.be) > 0 + }, 180000, "Rebalance warm up did not populate file cache on the new backend") + + def afterRebalanceWarmup = getClusterReadMetrics(computeCluster) + assertNoReadMetricDelta(beforeRebalanceWarmup, afterRebalanceWarmup, + "rebalance warm up") + } + } finally { + stopMinio() + } +}