diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metrics/ScannerMetricGroup.java b/fluss-client/src/main/java/org/apache/fluss/client/metrics/ScannerMetricGroup.java index bae44d00ed..19c1c5be14 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/metrics/ScannerMetricGroup.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/metrics/ScannerMetricGroup.java @@ -32,6 +32,7 @@ import org.apache.fluss.rpc.metrics.ClientMetricGroup; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; @@ -41,8 +42,10 @@ public class ScannerMetricGroup extends AbstractMetricGroup { private static final String NAME = "scanner"; private static final int WINDOW_SIZE = 1024; + private static final AtomicLong NEXT_SCANNER_ID = new AtomicLong(); private final TablePath tablePath; + private final String scannerId; private final Counter fetchRequestCount; private final Histogram bytesPerRequest; @@ -61,6 +64,7 @@ public class ScannerMetricGroup extends AbstractMetricGroup { public ScannerMetricGroup(ClientMetricGroup parent, TablePath tablePath) { super(parent.getMetricRegistry(), makeScope(parent, NAME), parent); this.tablePath = tablePath; + this.scannerId = Long.toString(NEXT_SCANNER_ID.getAndIncrement()); fetchRequestCount = new ThreadSafeSimpleCounter(); meter(MetricNames.SCANNER_FETCH_RATE, new MeterView(fetchRequestCount)); @@ -131,5 +135,6 @@ protected String getGroupName(CharacterFilter filter) { protected final void putVariables(Map variables) { variables.put("database", tablePath.getDatabaseName()); variables.put("table", tablePath.getTableName()); + variables.put("scanner_id", scannerId); } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metrics/TestingScannerMetricGroup.java b/fluss-client/src/test/java/org/apache/fluss/client/metrics/TestingScannerMetricGroup.java index 11bcf3d4fa..88a8c43e8b 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metrics/TestingScannerMetricGroup.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metrics/TestingScannerMetricGroup.java @@ -18,8 +18,15 @@ package org.apache.fluss.client.metrics; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.metrics.ClientMetricGroup; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + /** The testing metric group for scanner. */ public class TestingScannerMetricGroup extends ScannerMetricGroup { @@ -32,4 +39,24 @@ public TestingScannerMetricGroup() { public static TestingScannerMetricGroup newInstance() { return new TestingScannerMetricGroup(); } + + @Test + void testScannerMetricGroupContainsScannerIdDimension() { + ClientMetricGroup clientMetricGroup = TestingClientMetricGroup.newInstance(); + ScannerMetricGroup scannerMetricGroup1 = + new ScannerMetricGroup(clientMetricGroup, TABLE_PATH); + ScannerMetricGroup scannerMetricGroup2 = + new ScannerMetricGroup(clientMetricGroup, TABLE_PATH); + Map variables1 = scannerMetricGroup1.getAllVariables(); + Map variables2 = scannerMetricGroup2.getAllVariables(); + assertThat(variables1) + .containsEntry("database", "db") + .containsEntry("table", "table") + .containsKey("scanner_id"); + assertThat(variables2) + .containsEntry("database", "db") + .containsEntry("table", "table") + .containsKey("scanner_id"); + assertThat(variables1.get("scanner_id")).isNotEqualTo("scanner_id"); + } }