diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index 25e0bc0b2f1..53ea80a561c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -97,7 +97,26 @@ public MemoryMode getMemoryMode() { @Override public CompletableResultCode forceFlush() { CompletableResultCode result = new CompletableResultCode(); - CompletableResultCode doRunResult = scheduled.doRun(); + forceFlush(result); + return result; + } + + /** + * Performs a collect + export cycle on behalf of {@link #forceFlush()}, retrying if an export is + * already in progress. + * + *
If a force flush races with the periodic export (or a previous force flush), the in-progress
+ * export holds {@code exportAvailable}, so a naive {@code doRun()} would log "Exporter busy" and
+ * drop the metrics. Instead, wait for the in-flight export to finish and retry, so the force
+ * flush reflects the latest metrics rather than silently failing.
+ */
+ private void forceFlush(CompletableResultCode result) {
+ CompletableResultCode doRunResult = scheduled.tryDoRun();
+ if (doRunResult == null) {
+ // An export is already in progress. Wait for it to complete, then retry.
+ scheduled.flushInProgress.whenComplete(() -> forceFlush(result));
+ return;
+ }
doRunResult.whenComplete(
() -> {
CompletableResultCode flushResult = exporter.flush();
@@ -110,7 +129,6 @@ public CompletableResultCode forceFlush() {
}
});
});
- return result;
}
@Override
@@ -248,8 +266,21 @@ public void run() {
doRun();
}
- // Runs a collect + export cycle.
+ // Runs a collect + export cycle. If an export is already in progress, the metrics are dropped
+ // and a failed result is returned (used by the periodic schedule, which will retry next tick).
CompletableResultCode doRun() {
+ CompletableResultCode flushResult = tryDoRun();
+ if (flushResult != null) {
+ return flushResult;
+ }
+ logger.log(Level.FINE, "Exporter busy. Dropping metrics.");
+ return CompletableResultCode.ofFailure();
+ }
+
+ // Like doRun(), but returns null instead of dropping when an export is already in progress, so
+ // callers (forceFlush) can wait for the in-flight export and retry rather than losing metrics.
+ @Nullable
+ CompletableResultCode tryDoRun() {
CompletableResultCode flushResult = new CompletableResultCode();
if (exportAvailable.compareAndSet(true, false)) {
flushInProgress = flushResult;
@@ -284,8 +315,8 @@ CompletableResultCode doRun() {
flushResult.fail();
}
} else {
- logger.log(Level.FINE, "Exporter busy. Dropping metrics.");
- flushResult.fail();
+ // An export is already in progress; signal the caller so it can wait and retry.
+ return null;
}
return flushResult;
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
index d097a4bb4a4..0bc980d79eb 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java
@@ -440,6 +440,128 @@ public CompletableResultCode shutdown() {
assertThat(shutdownCalledWhileExportPending.get()).isFalse();
}
+ @Test
+ @Timeout(10)
+ void forceFlush_whileExportInFlight_waitsAndExportsLatest() throws Exception {
+ // Regression test for #8433: a forceFlush() that races with an in-progress export must wait for
+ // it and then export the latest metrics, instead of logging "Exporter busy. Dropping metrics."
+ // and silently failing.
+ CompletableResultCode inflightExportResult = new CompletableResultCode();
+ CountDownLatch firstExportStarted = new CountDownLatch(1);
+ AtomicInteger exportCount = new AtomicInteger();
+
+ MetricExporter blockingExporter =
+ new MetricExporter() {
+ @Override
+ public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
+ return AggregationTemporality.CUMULATIVE;
+ }
+
+ @Override
+ public CompletableResultCode export(Collection