From 8729961d0c431a6d222ef5d9ee8cdb255a5fb055 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Fri, 26 Jun 2026 18:16:06 +0800 Subject: [PATCH] [server] add kv pre-write buffer size metric --- .../org/apache/fluss/metrics/MetricNames.java | 1 + .../org/apache/fluss/server/kv/KvTablet.java | 16 ++++++- .../server/kv/prewrite/KvPreWriteBuffer.java | 48 ++++++++++++++++++- .../group/TabletServerMetricGroup.java | 7 +++ .../kv/prewrite/KvPreWriteBufferTest.java | 42 ++++++++++++++++ .../observability/monitor-metrics.md | 5 ++ 6 files changed, 116 insertions(+), 3 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index f56ca967c9..2e02eea92a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -151,6 +151,7 @@ public class MetricNames { // for kv tablet public static final String KV_FLUSH_RATE = "kvFlushPerSecond"; public static final String KV_FLUSH_LATENCY_MS = "kvFlushLatencyMs"; + public static final String KV_PRE_WRITE_BUFFER_SIZE_BYTES = "preWriteBufferSizeBytes"; public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE = "preWriteBufferTruncateAsDuplicatedPerSecond"; public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 0683f5e054..c703d9361a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -76,6 +76,7 @@ import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.BytesUtils; +import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.IOUtils; @@ -851,10 +852,23 @@ public void close() throws Exception { } // Note: RocksDB metrics lifecycle is managed by TableMetricGroup // No need to close it here + Exception exception = null; + try { + kvPreWriteBuffer.close(); + } catch (Exception e) { + exception = e; + } if (rocksDBKv != null) { - rocksDBKv.close(); + try { + rocksDBKv.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } } isClosed = true; + if (exception != null) { + throw exception; + } }); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java index e14d09eacf..fb332dbbdc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java @@ -95,9 +95,15 @@ public class KvPreWriteBuffer implements AutoCloseable { private final LinkedList allKvEntries = new LinkedList<>(); // metrics related. + private final Counter preWriteBufferSizeBytes; private final Counter truncateAsDuplicatedCount; private final Counter truncateAsErrorCount; + // The current payload bytes contributed by this buffer to the server-level aggregate gauge. + private long bufferedPayloadSizeBytes; + + private boolean closed; + // the max LSN in the buffer private long maxLogSequenceNumber = -1; @@ -105,6 +111,7 @@ public KvPreWriteBuffer( KvBatchWriter kvBatchWriter, TabletServerMetricGroup serverMetricGroup) { this.kvBatchWriter = kvBatchWriter; + preWriteBufferSizeBytes = serverMetricGroup.kvPreWriteBufferSizeBytes(); truncateAsDuplicatedCount = serverMetricGroup.kvTruncateAsDuplicatedCount(); truncateAsErrorCount = serverMetricGroup.kvTruncateAsErrorCount(); } @@ -156,6 +163,7 @@ private void doPut(ChangeType changeType, Key key, Value value, long lsn) { : KvEntry.of(changeType, key, value, lsn, v)); // append the entry to the tail of the list for all kv entries allKvEntries.addLast(kvEntry); + increasePreWriteBufferSize(kvEntry.sizeInBytes()); // update the max lsn maxLogSequenceNumber = lsn; } @@ -194,6 +202,7 @@ public void truncateTo(long targetLogSequenceNumber, TruncateReason truncateReas break; } descIter.remove(); + decreasePreWriteBufferSize(entry.sizeInBytes()); boolean removed = kvEntryMap.remove(entry.getKey(), entry); // if the latest entry is removed, we need to rollback the previous entry to the map if (removed && entry.previousEntry != null) { @@ -225,6 +234,7 @@ public int flush(long exclusiveUpToLogSequenceNumber) throws IOException { // first remove the entry from the list it.remove(); + decreasePreWriteBufferSize(entry.sizeInBytes()); // then write data using write batch writer Value value = entry.getValue(); @@ -273,8 +283,20 @@ public long getMaxLSN() { @Override public void close() throws Exception { - if (kvBatchWriter != null) { - kvBatchWriter.close(); + if (closed) { + return; + } + + try { + if (kvBatchWriter != null) { + kvBatchWriter.close(); + } + } finally { + decreasePreWriteBufferSize(bufferedPayloadSizeBytes); + allKvEntries.clear(); + kvEntryMap.clear(); + maxLogSequenceNumber = -1; + closed = true; } } @@ -286,6 +308,16 @@ public Counter getTruncateAsErrorCount() { return truncateAsErrorCount; } + private void increasePreWriteBufferSize(long sizeInBytes) { + preWriteBufferSizeBytes.inc(sizeInBytes); + bufferedPayloadSizeBytes += sizeInBytes; + } + + private void decreasePreWriteBufferSize(long sizeInBytes) { + preWriteBufferSizeBytes.dec(sizeInBytes); + bufferedPayloadSizeBytes -= sizeInBytes; + } + // ------------------------------------------------------------------------------------------- // Inner classes // ------------------------------------------------------------------------------------------- @@ -349,6 +381,10 @@ long getLogSequenceNumber() { return logSequenceNumber; } + long sizeInBytes() { + return key.sizeInBytes() + value.sizeInBytes(); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -410,6 +446,10 @@ public byte[] get() { return key; } + long sizeInBytes() { + return key.length; + } + @Override public int hashCode() { return hashCode; @@ -466,6 +506,10 @@ public byte[] get() { return value; } + long sizeInBytes() { + return value == null ? 0 : value.length; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java index 22215bc6de..83acd6f127 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -67,6 +67,7 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { // aggregated kv metrics private final Counter kvFlushCount; private final Histogram kvFlushLatencyHistogram; + private final Counter kvPreWriteBufferSizeBytes; private final Counter kvTruncateAsDuplicatedCount; private final Counter kvTruncateAsErrorCount; @@ -117,6 +118,8 @@ public TabletServerMetricGroup( meter(MetricNames.KV_FLUSH_RATE, new MeterView(kvFlushCount)); kvFlushLatencyHistogram = new DescriptiveStatisticsHistogram(WINDOW_SIZE); histogram(MetricNames.KV_FLUSH_LATENCY_MS, kvFlushLatencyHistogram); + kvPreWriteBufferSizeBytes = new ThreadSafeSimpleCounter(); + gauge(MetricNames.KV_PRE_WRITE_BUFFER_SIZE_BYTES, kvPreWriteBufferSizeBytes::getCount); kvTruncateAsDuplicatedCount = new SimpleCounter(); meter( MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE, @@ -219,6 +222,10 @@ public Histogram kvFlushLatencyHistogram() { return kvFlushLatencyHistogram; } + public Counter kvPreWriteBufferSizeBytes() { + return kvPreWriteBufferSizeBytes; + } + public Counter kvTruncateAsDuplicatedCount() { return kvTruncateAsDuplicatedCount; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java index cb07c2d66b..74c9cd55c9 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java @@ -17,8 +17,10 @@ package org.apache.fluss.server.kv.prewrite; +import org.apache.fluss.metrics.registry.NOPMetricRegistry; import org.apache.fluss.server.kv.KvBatchWriter; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; +import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.junit.jupiter.api.Test; @@ -238,6 +240,42 @@ void testRowCount() throws IOException { assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(7); } + @Test + void testPreWriteBufferSizeBytesMetric() throws Exception { + TabletServerMetricGroup metricGroup = + new TabletServerMetricGroup(NOPMetricRegistry.INSTANCE, "fluss", "rack", "host", 0); + KvPreWriteBuffer buffer = new KvPreWriteBuffer(new NopKvBatchWriter(), metricGroup); + + assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()).isZero(); + + bufferInsert(buffer, "key1", "value1", 0); + bufferInsert(buffer, "key2", "value2", 1); + bufferInsert(buffer, "key2", "value3", 2); + long key1Value1Size = entrySize("key1", "value1"); + long key2Value2Size = entrySize("key2", "value2"); + long key2Value3Size = entrySize("key2", "value3"); + long key3Value3Size = entrySize("key3", "value3"); + assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()) + .isEqualTo(key1Value1Size + key2Value2Size + key2Value3Size); + + buffer.flush(1); + assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()) + .isEqualTo(key2Value2Size + key2Value3Size); + + bufferInsert(buffer, "key3", "value3", 3); + assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()) + .isEqualTo(key2Value2Size + key2Value3Size + key3Value3Size); + + buffer.truncateTo(2, TruncateReason.ERROR); + assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()).isEqualTo(key2Value2Size); + + buffer.close(); + assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()).isZero(); + + buffer.close(); + assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()).isZero(); + } + private static void bufferInsert( KvPreWriteBuffer kvPreWriteBuffer, String key, String value, int elementCount) { kvPreWriteBuffer.insert(toKey(key), value.getBytes(), elementCount); @@ -268,6 +306,10 @@ private static KvPreWriteBuffer.Key toKey(String str) { return KvPreWriteBuffer.Key.of(str.getBytes()); } + private static long entrySize(String key, String value) { + return key.getBytes().length + value.getBytes().length; + } + /** A {@link KvBatchWriter} for test purpose without doing anything. */ private static class NopKvBatchWriter implements KvBatchWriter { diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 5383250b8d..f459d5f074 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -573,6 +573,11 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM kvFlushLatencyMs The kv pre-write buffer flush to underlying RocksDB latency in ms. Histogram + + + preWriteBufferSizeBytes + The total key-value payload bytes currently buffered in the kv pre-write buffer. + Gauge preWriteBufferTruncateAsDuplicatedPerSecond