Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 190 additions & 38 deletions be/src/io/cache/block_file_cache.cpp

Large diffs are not rendered by default.

27 changes: 24 additions & 3 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <atomic>
#include <boost/lockfree/spsc_queue.hpp>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
Expand Down Expand Up @@ -87,11 +88,20 @@ class FSFileCacheStorage;
// Note that Blocks are updated in batch, internal order is not important.
class NeedUpdateLRUBlocks {
public:
NeedUpdateLRUBlocks() = default;
enum class InsertResult {
INSERTED,
DUPLICATED,
DROPPED,
IGNORED,
};

explicit NeedUpdateLRUBlocks(size_t hard_cap = std::numeric_limits<size_t>::max())
: _hard_cap(hard_cap) {}

// Insert a block into the pending set. Returns true only when the block
// was not already queued. Null inputs are ignored.
bool insert(FileBlockSPtr block);
InsertResult insert_with_result(FileBlockSPtr block);

// Drain up to `limit` unique blocks into `output`. The method returns how
// many blocks were actually drained and shrinks the internal size
Expand All @@ -103,6 +113,8 @@ class NeedUpdateLRUBlocks {

// Thread-safe approximate size of queued unique blocks.
size_t size() const { return _size.load(std::memory_order_relaxed); }
size_t dropped() const { return _dropped.load(std::memory_order_relaxed); }
size_t hard_cap() const { return _hard_cap; }

private:
static constexpr size_t kShardCount = 64;
Expand All @@ -114,9 +126,12 @@ class NeedUpdateLRUBlocks {
};

size_t shard_index(FileBlock* ptr) const;
bool try_reserve_slot();

std::array<Shard, kShardCount> _shards;
std::atomic<size_t> _size {0};
std::atomic<size_t> _dropped {0};
size_t _hard_cap;
};

// The BlockFileCache is responsible for the management of the blocks
Expand Down Expand Up @@ -286,6 +301,8 @@ class BlockFileCache {

[[nodiscard]] bool get_async_open_success() const { return _async_open_done; }

[[nodiscard]] bool is_memory_storage() const;

BlockFileCache& operator=(const BlockFileCache&) = delete;
BlockFileCache(const BlockFileCache&) = delete;

Expand Down Expand Up @@ -470,6 +487,7 @@ class BlockFileCache {
void restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock);
void run_background_evict_in_advance();
void run_background_block_lru_update();
void record_lru_recorder_log_queue_length();

bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
std::vector<FileCacheType> other_cache_types,
Expand Down Expand Up @@ -610,9 +628,12 @@ class BlockFileCache {
std::shared_ptr<bvar::LatencyRecorder> _storage_retry_sync_remove_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
std::shared_ptr<bvar::Status<size_t>> _recycle_keys_length_metrics;
std::shared_ptr<bvar::LatencyRecorder> _update_lru_blocks_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _need_update_lru_blocks_length_recorder;
std::shared_ptr<bvar::Status<size_t>> _need_update_lru_blocks_length_metrics;
std::shared_ptr<bvar::Adder<size_t>> _need_update_lru_blocks_dropped_metrics;
std::shared_ptr<bvar::Status<size_t>> _lru_recorder_log_queue_length_metrics;
std::shared_ptr<bvar::Adder<size_t>> _lru_recorder_log_queue_dropped_metrics;
std::shared_ptr<bvar::LatencyRecorder> _ttl_gc_latency_us;

std::shared_ptr<bvar::LatencyRecorder> _shadow_queue_levenshtein_distance;
Expand Down
37 changes: 37 additions & 0 deletions be/src/io/cache/block_file_cache_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,43 @@ void FileCacheMetrics::update_metrics_callback() {
stats->num_io_bytes_read_from_peer);
}

FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& current,
const FileCacheStatistics& previous) {
FileCacheStatistics diff;
#define SUBTRACT_FIELD(field) diff.field = current.field - previous.field
SUBTRACT_FIELD(num_local_io_total);
SUBTRACT_FIELD(num_remote_io_total);
SUBTRACT_FIELD(num_peer_io_total);
SUBTRACT_FIELD(local_io_timer);
SUBTRACT_FIELD(bytes_read_from_local);
SUBTRACT_FIELD(bytes_read_from_remote);
SUBTRACT_FIELD(bytes_read_from_peer);
SUBTRACT_FIELD(remote_io_timer);
SUBTRACT_FIELD(peer_io_timer);
SUBTRACT_FIELD(remote_wait_timer);
SUBTRACT_FIELD(write_cache_io_timer);
SUBTRACT_FIELD(bytes_write_into_cache);
SUBTRACT_FIELD(num_skip_cache_io_total);
SUBTRACT_FIELD(read_cache_file_directly_timer);
SUBTRACT_FIELD(cache_get_or_set_timer);
SUBTRACT_FIELD(lock_wait_timer);
SUBTRACT_FIELD(get_timer);
SUBTRACT_FIELD(set_timer);

SUBTRACT_FIELD(inverted_index_num_local_io_total);
SUBTRACT_FIELD(inverted_index_num_remote_io_total);
SUBTRACT_FIELD(inverted_index_num_peer_io_total);
SUBTRACT_FIELD(inverted_index_bytes_read_from_local);
SUBTRACT_FIELD(inverted_index_bytes_read_from_remote);
SUBTRACT_FIELD(inverted_index_bytes_read_from_peer);
SUBTRACT_FIELD(inverted_index_local_io_timer);
SUBTRACT_FIELD(inverted_index_remote_io_timer);
SUBTRACT_FIELD(inverted_index_peer_io_timer);
SUBTRACT_FIELD(inverted_index_io_timer);
#undef SUBTRACT_FIELD
return diff;
}

FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
static const char* cache_profile = "FileCache";
ADD_TIMER_WITH_LEVEL(profile, cache_profile, 2);
Expand Down
7 changes: 5 additions & 2 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class FileCacheMetrics {
}

void update(FileCacheStatistics* stats);
std::shared_ptr<AtomicStatistics> report();

private:
std::shared_ptr<AtomicStatistics> report();
void register_entity();
void update_metrics_callback();

Expand All @@ -64,6 +64,9 @@ class FileCacheMetrics {
std::shared_ptr<AtomicStatistics> _statistics;
};

FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& current,
const FileCacheStatistics& previous);

struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
Expand Down Expand Up @@ -100,4 +103,4 @@ struct FileCacheProfileReporter {
};

} // namespace io
} // namespace doris
} // namespace doris
64 changes: 47 additions & 17 deletions be/src/io/cache/cache_lru_dumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,25 +298,53 @@ void CacheLRUDumper::dump_queue(const std::string& queue_name, bool force) {
if (force || _recorder->get_lru_queue_update_cnt_from_last_dump(type) >
config::file_cache_background_lru_dump_update_cnt_threshold) {
LRUQueue& queue = _recorder->get_shadow_queue(type);
do_dump_queue(queue, queue_name);
_recorder->reset_lru_queue_update_cnt_from_last_dump(type);
std::vector<LruDumpEntry> elements;
size_t dump_update_cnt = 0;
{
std::lock_guard<std::mutex> lru_log_lock(_recorder->_mutex_lru_log);
// Drain logs counted for this dump before updating the counter, otherwise restore may
// see a stale LRU tail after a crash.
_recorder->replay_queue_event_locked(type, 0, lru_log_lock);
elements = collect_lru_queue_entries_locked(queue, lru_log_lock);
dump_update_cnt =
_recorder->get_lru_queue_update_cnt_from_last_dump_locked(type, lru_log_lock);
}
Status st = do_dump_queue(elements, queue_name);
if (st.ok()) {
std::lock_guard<std::mutex> lru_log_lock(_recorder->_mutex_lru_log);
_recorder->subtract_lru_queue_update_cnt_from_last_dump_locked(type, dump_update_cnt,
lru_log_lock);
} else {
LOG(WARNING) << "failed to dump lru queue " << queue_name << ": " << st;
}
}
}

void CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_name) {
Status st;
std::vector<std::tuple<UInt128Wrapper, size_t, size_t>> elements;
elements.reserve(config::file_cache_background_lru_dump_tail_record_num);

Status CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_name) {
std::vector<LruDumpEntry> elements;
{
std::lock_guard<std::mutex> lru_log_lock(_recorder->_mutex_lru_log);
size_t count = 0;
for (const auto& [hash, offset, size] : queue) {
if (count++ >= config::file_cache_background_lru_dump_tail_record_num) break;
elements.emplace_back(hash, offset, size);
elements = collect_lru_queue_entries_locked(queue, lru_log_lock);
}
return do_dump_queue(elements, queue_name);
};

std::vector<CacheLRUDumper::LruDumpEntry> CacheLRUDumper::collect_lru_queue_entries_locked(
LRUQueue& queue, std::lock_guard<std::mutex>& /* lru_log_lock */) {
std::vector<LruDumpEntry> elements;
elements.reserve(config::file_cache_background_lru_dump_tail_record_num);
size_t count = 0;
for (const auto& [hash, offset, size] : queue) {
if (count++ >= config::file_cache_background_lru_dump_tail_record_num) {
break;
}
elements.emplace_back(hash, offset, size);
}
return elements;
}

Status CacheLRUDumper::do_dump_queue(const std::vector<LruDumpEntry>& elements,
const std::string& queue_name) {
// Write to disk
int64_t duration_ns = 0;
std::uintmax_t file_size = 0;
Expand All @@ -330,19 +358,21 @@ void CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_nam
if (out) {
LOG(INFO) << "begin dump " << queue_name << " with " << elements.size() << " elements";
for (const auto& [hash, offset, size] : elements) {
RETURN_IF_STATUS_ERROR(st,
dump_one_lru_entry(out, tmp_filename, hash, offset, size));
RETURN_IF_ERROR(dump_one_lru_entry(out, tmp_filename, hash, offset, size));
}
RETURN_IF_STATUS_ERROR(st, finalize_dump(out, elements.size(), tmp_filename,
final_filename, file_size));
RETURN_IF_ERROR(
finalize_dump(out, elements.size(), tmp_filename, final_filename, file_size));
} else {
LOG(WARNING) << "open lru dump file failed, reason: " << tmp_filename
<< " failed to create";
std::string warn_msg = fmt::format(
"open lru dump file failed, file={} failed to create", tmp_filename);
LOG(WARNING) << warn_msg;
return Status::InternalError<false>(warn_msg);
}
}
*(_mgr->_lru_dump_latency_us) << (duration_ns / 1000);
LOG(INFO) << fmt::format("lru dump for {} size={} element={} time={}us", queue_name, file_size,
elements.size(), duration_ns / 1000);
return Status::OK();
};

Status CacheLRUDumper::parse_dump_footer(std::ifstream& in, std::string& filename,
Expand Down
9 changes: 7 additions & 2 deletions be/src/io/cache/cache_lru_dumper.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class LRUQueueRecorder;

class CacheLRUDumper {
public:
using LruDumpEntry = std::tuple<UInt128Wrapper, size_t, size_t>;

CacheLRUDumper(BlockFileCache* mgr, LRUQueueRecorder* recorder)
: _mgr(mgr), _recorder(recorder) {
auto now = std::chrono::system_clock::now();
Expand All @@ -56,7 +58,10 @@ class CacheLRUDumper {
void set_first_dump_done() { _is_first_dump = false; }

private:
void do_dump_queue(LRUQueue& queue, const std::string& queue_name);
Status do_dump_queue(LRUQueue& queue, const std::string& queue_name);
Status do_dump_queue(const std::vector<LruDumpEntry>& elements, const std::string& queue_name);
std::vector<LruDumpEntry> collect_lru_queue_entries_locked(
LRUQueue& queue, std::lock_guard<std::mutex>& lru_log_lock);
Status check_ofstream_status(std::ofstream& out, std::string& filename);
Status check_ifstream_status(std::ifstream& in, std::string& filename);
Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const UInt128Wrapper& hash,
Expand Down Expand Up @@ -94,4 +99,4 @@ class CacheLRUDumper {
std::string _start_time;
bool _is_first_dump = true;
};
} // namespace doris::io
} // namespace doris::io
Loading
Loading