Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,26 @@ public MemoryMode getMemoryMode() {
@Override
public CompletableResultCode forceFlush() {
CompletableResultCode result = new CompletableResultCode();
CompletableResultCode doRunResult = scheduled.doRun();
forceFlush(result);
return result;
}

/**
* Performs a collect + export cycle on behalf of {@link #forceFlush()}, retrying if an export is
* already in progress.
*
* <p>If a force flush races with the periodic export (or a previous force flush), the in-progress
* export holds {@code exportAvailable}, so a naive {@code doRun()} would log "Exporter busy" and
* drop the metrics. Instead, wait for the in-flight export to finish and retry, so the force
* flush reflects the latest metrics rather than silently failing.
*/
private void forceFlush(CompletableResultCode result) {
CompletableResultCode doRunResult = scheduled.tryDoRun();
if (doRunResult == null) {
// An export is already in progress. Wait for it to complete, then retry.
scheduled.flushInProgress.whenComplete(() -> forceFlush(result));
return;
}
doRunResult.whenComplete(
() -> {
CompletableResultCode flushResult = exporter.flush();
Expand All @@ -110,7 +129,6 @@ public CompletableResultCode forceFlush() {
}
});
});
return result;
}

@Override
Expand Down Expand Up @@ -248,8 +266,21 @@ public void run() {
doRun();
}

// Runs a collect + export cycle.
// Runs a collect + export cycle. If an export is already in progress, the metrics are dropped
// and a failed result is returned (used by the periodic schedule, which will retry next tick).
CompletableResultCode doRun() {
CompletableResultCode flushResult = tryDoRun();
if (flushResult != null) {
return flushResult;
}
logger.log(Level.FINE, "Exporter busy. Dropping metrics.");
return CompletableResultCode.ofFailure();
}

// Like doRun(), but returns null instead of dropping when an export is already in progress, so
// callers (forceFlush) can wait for the in-flight export and retry rather than losing metrics.
@Nullable
CompletableResultCode tryDoRun() {
CompletableResultCode flushResult = new CompletableResultCode();
if (exportAvailable.compareAndSet(true, false)) {
flushInProgress = flushResult;
Expand Down Expand Up @@ -284,8 +315,8 @@ CompletableResultCode doRun() {
flushResult.fail();
}
} else {
logger.log(Level.FINE, "Exporter busy. Dropping metrics.");
flushResult.fail();
// An export is already in progress; signal the caller so it can wait and retry.
return null;
}
return flushResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,128 @@ public CompletableResultCode shutdown() {
assertThat(shutdownCalledWhileExportPending.get()).isFalse();
}

@Test
@Timeout(10)
void forceFlush_whileExportInFlight_waitsAndExportsLatest() throws Exception {
// Regression test for #8433: a forceFlush() that races with an in-progress export must wait for
// it and then export the latest metrics, instead of logging "Exporter busy. Dropping metrics."
// and silently failing.
CompletableResultCode inflightExportResult = new CompletableResultCode();
CountDownLatch firstExportStarted = new CountDownLatch(1);
AtomicInteger exportCount = new AtomicInteger();

MetricExporter blockingExporter =
new MetricExporter() {
@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return AggregationTemporality.CUMULATIVE;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (exportCount.incrementAndGet() == 1) {
firstExportStarted.countDown();
return inflightExportResult;
}
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
};

PeriodicMetricReader reader =
PeriodicMetricReader.builder(blockingExporter)
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
.build();
reader.register(collectionRegistration);

try {
// First forceFlush triggers an export that blocks.
CompletableResultCode firstFlush = reader.forceFlush();
assertThat(firstExportStarted.await(5, TimeUnit.SECONDS)).isTrue();

// Second forceFlush collides with the in-flight export. It must NOT complete or fail yet -
// it should be waiting for the in-flight export to finish.
CompletableResultCode secondFlush = reader.forceFlush();
assertThat(secondFlush.isDone()).isFalse();

// Release the in-flight export. Both flushes should now succeed, and the second one should
// have performed its own export (latest metrics) rather than dropping them.
inflightExportResult.succeed();

assertThat(firstFlush.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(secondFlush.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
// 1 in-flight export + 1 retried export from the second forceFlush.
assertThat(exportCount.get()).isEqualTo(2);
} finally {
reader.shutdown();
}
}

@Test
@Timeout(10)
@SuppressLogger(PeriodicMetricReader.class)
void periodicExport_whileExportInFlight_dropsAndLogs() throws Exception {
// Covers doRun()'s drop-and-fail branch: when a periodic collection runs while an export is
// already in progress, it drops the metrics and logs, rather than queuing another export.
CompletableResultCode inflightExportResult = new CompletableResultCode();
CountDownLatch firstExportStarted = new CountDownLatch(1);
AtomicInteger exportCount = new AtomicInteger();

MetricExporter blockingExporter =
new MetricExporter() {
@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return AggregationTemporality.CUMULATIVE;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (exportCount.incrementAndGet() == 1) {
firstExportStarted.countDown();
return inflightExportResult;
}
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
};

// Short interval so periodic ticks fire repeatedly while the first export is blocked.
PeriodicMetricReader reader =
PeriodicMetricReader.builder(blockingExporter).setInterval(Duration.ofMillis(10)).build();
reader.register(collectionRegistration);

try {
// Wait until the first (periodic) export has started and is blocking.
assertThat(firstExportStarted.await(5, TimeUnit.SECONDS)).isTrue();
// Let many more periodic ticks fire (interval is 10ms) while the first export is in flight.
// Each collides with the in-flight export and is dropped rather than queuing a new export.
Thread.sleep(200);
assertThat(exportCount.get()).isEqualTo(1);
} finally {
// Releasing the in-flight export frees the slot; a later tick may export again.
inflightExportResult.succeed();
reader.shutdown();
}
}

@Test
@SuppressWarnings("PreferJavaTimeOverload") // Testing the overload
void invalidConfig() {
Expand Down
Loading