Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public class MetricNames {
// for kv tablet
public static final String KV_FLUSH_RATE = "kvFlushPerSecond";
public static final String KV_FLUSH_LATENCY_MS = "kvFlushLatencyMs";
public static final String KV_PRE_WRITE_BUFFER_SIZE_BYTES = "preWriteBufferSizeBytes";
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE =
"preWriteBufferTruncateAsDuplicatedPerSecond";
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.BytesUtils;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.IOUtils;

Expand Down Expand Up @@ -851,10 +852,23 @@ public void close() throws Exception {
}
// Note: RocksDB metrics lifecycle is managed by TableMetricGroup
// No need to close it here
Exception exception = null;
try {
kvPreWriteBuffer.close();
} catch (Exception e) {
exception = e;
}
if (rocksDBKv != null) {
rocksDBKv.close();
try {
rocksDBKv.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
isClosed = true;
if (exception != null) {
throw exception;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,23 @@ public class KvPreWriteBuffer implements AutoCloseable {
private final LinkedList<KvEntry> allKvEntries = new LinkedList<>();

// metrics related.
private final Counter preWriteBufferSizeBytes;
private final Counter truncateAsDuplicatedCount;
private final Counter truncateAsErrorCount;

// The current payload bytes contributed by this buffer to the server-level aggregate gauge.
private long bufferedPayloadSizeBytes;

private boolean closed;

// the max LSN in the buffer
private long maxLogSequenceNumber = -1;

public KvPreWriteBuffer(
KvBatchWriter kvBatchWriter, TabletServerMetricGroup serverMetricGroup) {
this.kvBatchWriter = kvBatchWriter;

preWriteBufferSizeBytes = serverMetricGroup.kvPreWriteBufferSizeBytes();
truncateAsDuplicatedCount = serverMetricGroup.kvTruncateAsDuplicatedCount();
truncateAsErrorCount = serverMetricGroup.kvTruncateAsErrorCount();
}
Expand Down Expand Up @@ -156,6 +163,7 @@ private void doPut(ChangeType changeType, Key key, Value value, long lsn) {
: KvEntry.of(changeType, key, value, lsn, v));
// append the entry to the tail of the list for all kv entries
allKvEntries.addLast(kvEntry);
increasePreWriteBufferSize(kvEntry.sizeInBytes());
// update the max lsn
maxLogSequenceNumber = lsn;
}
Expand Down Expand Up @@ -194,6 +202,7 @@ public void truncateTo(long targetLogSequenceNumber, TruncateReason truncateReas
break;
}
descIter.remove();
decreasePreWriteBufferSize(entry.sizeInBytes());
boolean removed = kvEntryMap.remove(entry.getKey(), entry);
// if the latest entry is removed, we need to rollback the previous entry to the map
if (removed && entry.previousEntry != null) {
Expand Down Expand Up @@ -225,6 +234,7 @@ public int flush(long exclusiveUpToLogSequenceNumber) throws IOException {

// first remove the entry from the list
it.remove();
decreasePreWriteBufferSize(entry.sizeInBytes());

// then write data using write batch writer
Value value = entry.getValue();
Expand Down Expand Up @@ -273,8 +283,20 @@ public long getMaxLSN() {

@Override
public void close() throws Exception {
if (kvBatchWriter != null) {
kvBatchWriter.close();
if (closed) {
return;
}

try {
if (kvBatchWriter != null) {
kvBatchWriter.close();
}
} finally {
decreasePreWriteBufferSize(bufferedPayloadSizeBytes);
allKvEntries.clear();
kvEntryMap.clear();
maxLogSequenceNumber = -1;
closed = true;
}
}

Expand All @@ -286,6 +308,16 @@ public Counter getTruncateAsErrorCount() {
return truncateAsErrorCount;
}

private void increasePreWriteBufferSize(long sizeInBytes) {
preWriteBufferSizeBytes.inc(sizeInBytes);
bufferedPayloadSizeBytes += sizeInBytes;
}

private void decreasePreWriteBufferSize(long sizeInBytes) {
preWriteBufferSizeBytes.dec(sizeInBytes);
bufferedPayloadSizeBytes -= sizeInBytes;
}

// -------------------------------------------------------------------------------------------
// Inner classes
// -------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -349,6 +381,10 @@ long getLogSequenceNumber() {
return logSequenceNumber;
}

long sizeInBytes() {
return key.sizeInBytes() + value.sizeInBytes();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -410,6 +446,10 @@ public byte[] get() {
return key;
}

long sizeInBytes() {
return key.length;
}

@Override
public int hashCode() {
return hashCode;
Expand Down Expand Up @@ -466,6 +506,10 @@ public byte[] get() {
return value;
}

long sizeInBytes() {
return value == null ? 0 : value.length;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class TabletServerMetricGroup extends AbstractMetricGroup {
// aggregated kv metrics
private final Counter kvFlushCount;
private final Histogram kvFlushLatencyHistogram;
private final Counter kvPreWriteBufferSizeBytes;
private final Counter kvTruncateAsDuplicatedCount;
private final Counter kvTruncateAsErrorCount;

Expand Down Expand Up @@ -117,6 +118,8 @@ public TabletServerMetricGroup(
meter(MetricNames.KV_FLUSH_RATE, new MeterView(kvFlushCount));
kvFlushLatencyHistogram = new DescriptiveStatisticsHistogram(WINDOW_SIZE);
histogram(MetricNames.KV_FLUSH_LATENCY_MS, kvFlushLatencyHistogram);
kvPreWriteBufferSizeBytes = new ThreadSafeSimpleCounter();
gauge(MetricNames.KV_PRE_WRITE_BUFFER_SIZE_BYTES, kvPreWriteBufferSizeBytes::getCount);
kvTruncateAsDuplicatedCount = new SimpleCounter();
meter(
MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE,
Expand Down Expand Up @@ -219,6 +222,10 @@ public Histogram kvFlushLatencyHistogram() {
return kvFlushLatencyHistogram;
}

public Counter kvPreWriteBufferSizeBytes() {
return kvPreWriteBufferSizeBytes;
}

public Counter kvTruncateAsDuplicatedCount() {
return kvTruncateAsDuplicatedCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.fluss.server.kv.prewrite;

import org.apache.fluss.metrics.registry.NOPMetricRegistry;
import org.apache.fluss.server.kv.KvBatchWriter;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.metrics.group.TestingMetricGroups;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -238,6 +240,42 @@ void testRowCount() throws IOException {
assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(7);
}

@Test
void testPreWriteBufferSizeBytesMetric() throws Exception {
TabletServerMetricGroup metricGroup =
new TabletServerMetricGroup(NOPMetricRegistry.INSTANCE, "fluss", "rack", "host", 0);
KvPreWriteBuffer buffer = new KvPreWriteBuffer(new NopKvBatchWriter(), metricGroup);

assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()).isZero();

bufferInsert(buffer, "key1", "value1", 0);
bufferInsert(buffer, "key2", "value2", 1);
bufferInsert(buffer, "key2", "value3", 2);
long key1Value1Size = entrySize("key1", "value1");
long key2Value2Size = entrySize("key2", "value2");
long key2Value3Size = entrySize("key2", "value3");
long key3Value3Size = entrySize("key3", "value3");
assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount())
.isEqualTo(key1Value1Size + key2Value2Size + key2Value3Size);

buffer.flush(1);
assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount())
.isEqualTo(key2Value2Size + key2Value3Size);

bufferInsert(buffer, "key3", "value3", 3);
assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount())
.isEqualTo(key2Value2Size + key2Value3Size + key3Value3Size);

buffer.truncateTo(2, TruncateReason.ERROR);
assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()).isEqualTo(key2Value2Size);

buffer.close();
assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()).isZero();

buffer.close();
assertThat(metricGroup.kvPreWriteBufferSizeBytes().getCount()).isZero();
}

private static void bufferInsert(
KvPreWriteBuffer kvPreWriteBuffer, String key, String value, int elementCount) {
kvPreWriteBuffer.insert(toKey(key), value.getBytes(), elementCount);
Expand Down Expand Up @@ -268,6 +306,10 @@ private static KvPreWriteBuffer.Key toKey(String str) {
return KvPreWriteBuffer.Key.of(str.getBytes());
}

private static long entrySize(String key, String value) {
return key.getBytes().length + value.getBytes().length;
}

/** A {@link KvBatchWriter} for test purpose without doing anything. */
private static class NopKvBatchWriter implements KvBatchWriter {

Expand Down
5 changes: 5 additions & 0 deletions website/docs/maintenance/observability/monitor-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,11 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM
<td>kvFlushLatencyMs</td>
<td>The kv pre-write buffer flush to underlying RocksDB latency in ms.</td>
<td>Histogram</td>
</tr>
<tr>
<td>preWriteBufferSizeBytes</td>
<td>The total key-value payload bytes currently buffered in the kv pre-write buffer.</td>
<td>Gauge</td>
</tr>
<tr>
<td>preWriteBufferTruncateAsDuplicatedPerSecond</td>
Expand Down
Loading