From 54dafb3fcceceac084c5ee4e8fbb1d443b258a78 Mon Sep 17 00:00:00 2001 From: Mahitha Adapa Date: Sat, 30 May 2026 02:37:43 +0000 Subject: [PATCH] Avoid dropping metrics when forceFlush races with periodic export PeriodicMetricReader.forceFlush() delegated to Scheduled.doRun(), which acquires the exportAvailable flag with compareAndSet(true, false). When a force flush happened while the periodic export (or a previous force flush) was still in progress, doRun() took the else branch, logged 'Exporter busy. Dropping metrics.' and returned a failed result -- silently dropping the metrics the caller asked to flush. Make forceFlush() wait for the in-flight export to complete and then retry, mirroring how shutdown() already uses flushInProgress to wait for an in-flight export before its final collection. doRun() is split into tryDoRun(), which returns null when an export is already in progress, and doRun(), which preserves the previous drop-and-fail behavior for the periodic schedule (it retries on the next tick). forceFlush() uses tryDoRun() and, on contention, chains off flushInProgress to retry. Adds a regression test that fails without this change: a forceFlush() racing an in-flight export now waits and exports the latest metrics instead of dropping them. Fixes #8433 --- .../metrics/export/PeriodicMetricReader.java | 41 +++++- .../export/PeriodicMetricReaderTest.java | 122 ++++++++++++++++++ 2 files changed, 158 insertions(+), 5 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index 25e0bc0b2f1..53ea80a561c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -97,7 +97,26 @@ public MemoryMode getMemoryMode() { @Override public CompletableResultCode forceFlush() { CompletableResultCode result = new CompletableResultCode(); - CompletableResultCode doRunResult = scheduled.doRun(); + forceFlush(result); + return result; + } + + /** + * Performs a collect + export cycle on behalf of {@link #forceFlush()}, retrying if an export is + * already in progress. + * + *

If a force flush races with the periodic export (or a previous force flush), the in-progress + * export holds {@code exportAvailable}, so a naive {@code doRun()} would log "Exporter busy" and + * drop the metrics. Instead, wait for the in-flight export to finish and retry, so the force + * flush reflects the latest metrics rather than silently failing. + */ + private void forceFlush(CompletableResultCode result) { + CompletableResultCode doRunResult = scheduled.tryDoRun(); + if (doRunResult == null) { + // An export is already in progress. Wait for it to complete, then retry. + scheduled.flushInProgress.whenComplete(() -> forceFlush(result)); + return; + } doRunResult.whenComplete( () -> { CompletableResultCode flushResult = exporter.flush(); @@ -110,7 +129,6 @@ public CompletableResultCode forceFlush() { } }); }); - return result; } @Override @@ -248,8 +266,21 @@ public void run() { doRun(); } - // Runs a collect + export cycle. + // Runs a collect + export cycle. If an export is already in progress, the metrics are dropped + // and a failed result is returned (used by the periodic schedule, which will retry next tick). CompletableResultCode doRun() { + CompletableResultCode flushResult = tryDoRun(); + if (flushResult != null) { + return flushResult; + } + logger.log(Level.FINE, "Exporter busy. Dropping metrics."); + return CompletableResultCode.ofFailure(); + } + + // Like doRun(), but returns null instead of dropping when an export is already in progress, so + // callers (forceFlush) can wait for the in-flight export and retry rather than losing metrics. + @Nullable + CompletableResultCode tryDoRun() { CompletableResultCode flushResult = new CompletableResultCode(); if (exportAvailable.compareAndSet(true, false)) { flushInProgress = flushResult; @@ -284,8 +315,8 @@ CompletableResultCode doRun() { flushResult.fail(); } } else { - logger.log(Level.FINE, "Exporter busy. Dropping metrics."); - flushResult.fail(); + // An export is already in progress; signal the caller so it can wait and retry. + return null; } return flushResult; } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index d097a4bb4a4..0bc980d79eb 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -440,6 +440,128 @@ public CompletableResultCode shutdown() { assertThat(shutdownCalledWhileExportPending.get()).isFalse(); } + @Test + @Timeout(10) + void forceFlush_whileExportInFlight_waitsAndExportsLatest() throws Exception { + // Regression test for #8433: a forceFlush() that races with an in-progress export must wait for + // it and then export the latest metrics, instead of logging "Exporter busy. Dropping metrics." + // and silently failing. + CompletableResultCode inflightExportResult = new CompletableResultCode(); + CountDownLatch firstExportStarted = new CountDownLatch(1); + AtomicInteger exportCount = new AtomicInteger(); + + MetricExporter blockingExporter = + new MetricExporter() { + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + return AggregationTemporality.CUMULATIVE; + } + + @Override + public CompletableResultCode export(Collection metrics) { + if (exportCount.incrementAndGet() == 1) { + firstExportStarted.countDown(); + return inflightExportResult; + } + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + }; + + PeriodicMetricReader reader = + PeriodicMetricReader.builder(blockingExporter) + .setInterval(Duration.ofSeconds(Integer.MAX_VALUE)) + .build(); + reader.register(collectionRegistration); + + try { + // First forceFlush triggers an export that blocks. + CompletableResultCode firstFlush = reader.forceFlush(); + assertThat(firstExportStarted.await(5, TimeUnit.SECONDS)).isTrue(); + + // Second forceFlush collides with the in-flight export. It must NOT complete or fail yet - + // it should be waiting for the in-flight export to finish. + CompletableResultCode secondFlush = reader.forceFlush(); + assertThat(secondFlush.isDone()).isFalse(); + + // Release the in-flight export. Both flushes should now succeed, and the second one should + // have performed its own export (latest metrics) rather than dropping them. + inflightExportResult.succeed(); + + assertThat(firstFlush.join(5, TimeUnit.SECONDS).isSuccess()).isTrue(); + assertThat(secondFlush.join(5, TimeUnit.SECONDS).isSuccess()).isTrue(); + // 1 in-flight export + 1 retried export from the second forceFlush. + assertThat(exportCount.get()).isEqualTo(2); + } finally { + reader.shutdown(); + } + } + + @Test + @Timeout(10) + @SuppressLogger(PeriodicMetricReader.class) + void periodicExport_whileExportInFlight_dropsAndLogs() throws Exception { + // Covers doRun()'s drop-and-fail branch: when a periodic collection runs while an export is + // already in progress, it drops the metrics and logs, rather than queuing another export. + CompletableResultCode inflightExportResult = new CompletableResultCode(); + CountDownLatch firstExportStarted = new CountDownLatch(1); + AtomicInteger exportCount = new AtomicInteger(); + + MetricExporter blockingExporter = + new MetricExporter() { + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + return AggregationTemporality.CUMULATIVE; + } + + @Override + public CompletableResultCode export(Collection metrics) { + if (exportCount.incrementAndGet() == 1) { + firstExportStarted.countDown(); + return inflightExportResult; + } + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + }; + + // Short interval so periodic ticks fire repeatedly while the first export is blocked. + PeriodicMetricReader reader = + PeriodicMetricReader.builder(blockingExporter).setInterval(Duration.ofMillis(10)).build(); + reader.register(collectionRegistration); + + try { + // Wait until the first (periodic) export has started and is blocking. + assertThat(firstExportStarted.await(5, TimeUnit.SECONDS)).isTrue(); + // Let many more periodic ticks fire (interval is 10ms) while the first export is in flight. + // Each collides with the in-flight export and is dropped rather than queuing a new export. + Thread.sleep(200); + assertThat(exportCount.get()).isEqualTo(1); + } finally { + // Releasing the in-flight export frees the slot; a later tick may export again. + inflightExportResult.succeed(); + reader.shutdown(); + } + } + @Test @SuppressWarnings("PreferJavaTimeOverload") // Testing the overload void invalidConfig() {