Skip to content

Commit 417fb0d

Browse files
authored
improve: reconciliation counts as retry attempt only if close to retry deadline (#3380)
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent f214108 commit 417fb0d

8 files changed

Lines changed: 313 additions & 9 deletions

File tree

docs/content/en/docs/documentation/error-handling-retries.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ these features:
135135

136136
2. In case an exception is thrown, a retry is initiated. However, if an event is received
137137
meanwhile, it will be reconciled instantly, and this execution won't count as a retry attempt.
138+
If that event-triggered reconciliation also fails inside the current retry window, the
139+
existing retry deadline is preserved rather than reset — the failure does not advance the
140+
retry counter unless the original deadline is imminent.
138141
3. If the retry limit is reached (so no more automatic retry would happen), but a new event
139142
received, the reconciliation will still happen, but won't reset the retry, and will still be
140143
marked as the last attempt in the retry info. The point (1) still holds - thus successful reconciliation will reset the retry - but no retry will happen in case of an error.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
4949
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
5050
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50;
5151

52+
/**
53+
* Threshold below which an event-driven failed reconciliation that lands inside the current retry
54+
* window is allowed to consume a retry attempt (i.e. advance the retry counter). Above this
55+
* threshold the existing retry deadline is preserved instead.
56+
*/
57+
private static final long RETRY_DEADLINE_PRESERVE_THRESHOLD_MILLIS = 5_000;
58+
5259
private volatile boolean running;
5360
private final ControllerConfiguration<?> controllerConfiguration;
5461
private final ReconciliationDispatcher<P> reconciliationDispatcher;
@@ -369,6 +376,15 @@ private void handleRetryOnException(ExecutionScope<P> executionScope, Exception
369376
submitReconciliationExecution(state);
370377
return;
371378
}
379+
Optional<Duration> remaining = state.getRetry().remainingDurationUntilNextRetry();
380+
if (remaining.isPresent()
381+
&& remaining.get().toMillis() > RETRY_DEADLINE_PRESERVE_THRESHOLD_MILLIS) {
382+
log.debug(
383+
"Preserving existing retry deadline; remaining: {} ms. Not consuming a retry attempt.",
384+
remaining.get().toMillis());
385+
retryEventSource().scheduleOnce(resourceID, remaining.get().toMillis());
386+
return;
387+
}
372388
Optional<Long> nextDelay = state.getRetry().nextDelay();
373389
nextDelay.ifPresentOrElse(
374390
delay -> {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.retry;
1717

18+
import java.time.Duration;
1819
import java.util.Optional;
1920

2021
public class GenericRetryExecution implements RetryExecution {
@@ -23,6 +24,7 @@ public class GenericRetryExecution implements RetryExecution {
2324

2425
private int lastAttemptIndex = 0;
2526
private long currentInterval;
27+
private Long lastNextDelayCallEpochMillis;
2628

2729
public GenericRetryExecution(GenericRetry genericRetry) {
2830
this.genericRetry = genericRetry;
@@ -40,6 +42,7 @@ public Optional<Long> nextDelay() {
4042
}
4143
}
4244
lastAttemptIndex++;
45+
lastNextDelayCallEpochMillis = System.currentTimeMillis();
4346
return Optional.of(currentInterval);
4447
}
4548

@@ -52,4 +55,16 @@ public boolean isLastAttempt() {
5255
public int getAttemptCount() {
5356
return lastAttemptIndex;
5457
}
58+
59+
@Override
60+
public Optional<Duration> remainingDurationUntilNextRetry() {
61+
if (lastNextDelayCallEpochMillis == null) {
62+
return Optional.empty();
63+
}
64+
long remaining = (lastNextDelayCallEpochMillis + currentInterval) - System.currentTimeMillis();
65+
if (remaining <= 0) {
66+
return Optional.empty();
67+
}
68+
return Optional.of(Duration.ofMillis(remaining));
69+
}
5570
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.retry;
1717

18+
import java.time.Duration;
1819
import java.util.Optional;
1920

2021
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
@@ -25,4 +26,15 @@ public interface RetryExecution extends RetryInfo {
2526
* @return the time to wait until the next execution in milliseconds
2627
*/
2728
Optional<Long> nextDelay();
29+
30+
/**
31+
* Remaining time of the currently scheduled retry interval, i.e. the time until the previously
32+
* computed retry delay would elapse. Returns an empty {@link Optional} if no retry has been
33+
* scheduled yet (i.e. {@link #nextDelay()} has never been called) or if the deadline has already
34+
* passed.
35+
*
36+
* <p>Used to decide whether an event-driven failed reconciliation that lands well inside the
37+
* retry window should consume a retry attempt or simply be re-scheduled on the original deadline.
38+
*/
39+
Optional<Duration> remainingDurationUntilNextRetry();
2840
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,98 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
465465
verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
466466
}
467467

468+
@Test
469+
void preservesRetryDeadlineWhenRemainingDurationAboveThreshold() {
470+
RetryExecution mockRetryExecution = mock(RetryExecution.class);
471+
when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
472+
when(mockRetryExecution.remainingDurationUntilNextRetry())
473+
.thenReturn(Optional.of(Duration.ofMillis(50_000)));
474+
Retry retry = mock(Retry.class);
475+
when(retry.initExecution()).thenReturn(mockRetryExecution);
476+
eventProcessorWithRetry =
477+
spy(
478+
new EventProcessor(
479+
controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
480+
reconciliationDispatcherMock,
481+
eventSourceManagerMock,
482+
metricsMock));
483+
eventProcessorWithRetry.start();
484+
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
485+
486+
TestCustomResource customResource = testCustomResource();
487+
ExecutionScope executionScope =
488+
new ExecutionScope(null, null, false, false).setResource(customResource);
489+
PostExecutionControl postExecutionControl =
490+
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));
491+
492+
eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
493+
494+
verify(mockRetryExecution, never()).nextDelay();
495+
verify(retryTimerEventSourceMock, times(1))
496+
.scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(50_000L));
497+
}
498+
499+
@Test
500+
void consumesRetryAttemptWhenRemainingDurationAtOrBelowThreshold() {
501+
RetryExecution mockRetryExecution = mock(RetryExecution.class);
502+
when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
503+
when(mockRetryExecution.remainingDurationUntilNextRetry())
504+
.thenReturn(Optional.of(Duration.ofMillis(2_000)));
505+
Retry retry = mock(Retry.class);
506+
when(retry.initExecution()).thenReturn(mockRetryExecution);
507+
eventProcessorWithRetry =
508+
spy(
509+
new EventProcessor(
510+
controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
511+
reconciliationDispatcherMock,
512+
eventSourceManagerMock,
513+
metricsMock));
514+
eventProcessorWithRetry.start();
515+
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
516+
517+
TestCustomResource customResource = testCustomResource();
518+
ExecutionScope executionScope =
519+
new ExecutionScope(null, null, false, false).setResource(customResource);
520+
PostExecutionControl postExecutionControl =
521+
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));
522+
523+
eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
524+
525+
verify(mockRetryExecution, times(1)).nextDelay();
526+
verify(retryTimerEventSourceMock, times(1))
527+
.scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(60_000L));
528+
}
529+
530+
@Test
531+
void firstFailureSchedulesUsingNextDelayWhenNoRemainingDuration() {
532+
RetryExecution mockRetryExecution = mock(RetryExecution.class);
533+
when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
534+
when(mockRetryExecution.remainingDurationUntilNextRetry()).thenReturn(Optional.empty());
535+
Retry retry = mock(Retry.class);
536+
when(retry.initExecution()).thenReturn(mockRetryExecution);
537+
eventProcessorWithRetry =
538+
spy(
539+
new EventProcessor(
540+
controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
541+
reconciliationDispatcherMock,
542+
eventSourceManagerMock,
543+
metricsMock));
544+
eventProcessorWithRetry.start();
545+
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
546+
547+
TestCustomResource customResource = testCustomResource();
548+
ExecutionScope executionScope =
549+
new ExecutionScope(null, null, false, false).setResource(customResource);
550+
PostExecutionControl postExecutionControl =
551+
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));
552+
553+
eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
554+
555+
verify(mockRetryExecution, times(1)).nextDelay();
556+
verify(retryTimerEventSourceMock, times(1))
557+
.scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(60_000L));
558+
}
559+
468560
@Test
469561
void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException {
470562
when(reconciliationDispatcherMock.handleExecution(any()))

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
import static org.assertj.core.api.Assertions.assertThat;
2323

24-
public class GenericRetryExecutionTest {
24+
class GenericRetryExecutionTest {
2525

2626
@Test
27-
public void noNextDelayIfMaxAttemptLimitReached() {
27+
void noNextDelayIfMaxAttemptLimitReached() {
2828
RetryExecution retryExecution =
2929
GenericRetry.defaultLimitedExponentialRetry().setMaxAttempts(3).initExecution();
3030
Optional<Long> res = callNextDelayNTimes(retryExecution, 2);
@@ -35,7 +35,7 @@ public void noNextDelayIfMaxAttemptLimitReached() {
3535
}
3636

3737
@Test
38-
public void canLimitMaxIntervalLength() {
38+
void canLimitMaxIntervalLength() {
3939
RetryExecution retryExecution =
4040
GenericRetry.defaultLimitedExponentialRetry()
4141
.setInitialInterval(2000)
@@ -49,13 +49,13 @@ public void canLimitMaxIntervalLength() {
4949
}
5050

5151
@Test
52-
public void supportsNoRetry() {
52+
void supportsNoRetry() {
5353
RetryExecution retryExecution = GenericRetry.noRetry().initExecution();
5454
assertThat(retryExecution.nextDelay()).isEmpty();
5555
}
5656

5757
@Test
58-
public void supportsIsLastExecution() {
58+
void supportsIsLastExecution() {
5959
GenericRetryExecution execution = new GenericRetry().setMaxAttempts(2).initExecution();
6060
assertThat(execution.isLastAttempt()).isFalse();
6161

@@ -65,19 +65,67 @@ public void supportsIsLastExecution() {
6565
}
6666

6767
@Test
68-
public void returnAttemptIndex() {
68+
void returnAttemptIndex() {
6969
RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution();
7070

7171
assertThat(retryExecution.getAttemptCount()).isEqualTo(0);
7272
retryExecution.nextDelay();
7373
assertThat(retryExecution.getAttemptCount()).isEqualTo(1);
7474
}
7575

76-
private RetryExecution getDefaultRetryExecution() {
77-
return GenericRetry.defaultLimitedExponentialRetry().initExecution();
76+
@Test
77+
void remainingDurationEmptyBeforeFirstNextDelay() {
78+
RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution();
79+
80+
assertThat(retryExecution.remainingDurationUntilNextRetry()).isEmpty();
81+
}
82+
83+
@Test
84+
void remainingDurationPresentAfterNextDelay() {
85+
long interval = 60_000L;
86+
RetryExecution retryExecution = new GenericRetry().setInitialInterval(interval).initExecution();
87+
88+
retryExecution.nextDelay();
89+
90+
Optional<java.time.Duration> remaining = retryExecution.remainingDurationUntilNextRetry();
91+
assertThat(remaining).isPresent();
92+
assertThat(remaining.get().toMillis()).isPositive().isLessThanOrEqualTo(interval);
93+
}
94+
95+
@Test
96+
void remainingDurationEmptyAfterIntervalElapsed() throws InterruptedException {
97+
RetryExecution retryExecution = new GenericRetry().setInitialInterval(20).initExecution();
98+
99+
retryExecution.nextDelay();
100+
Thread.sleep(60);
101+
102+
assertThat(retryExecution.remainingDurationUntilNextRetry()).isEmpty();
103+
}
104+
105+
@Test
106+
void remainingDurationReflectsUpdatedIntervalAfterSubsequentNextDelay() {
107+
long initialInterval = 1000L;
108+
double multiplier = 2.0;
109+
RetryExecution retryExecution =
110+
new GenericRetry()
111+
.setInitialInterval(initialInterval)
112+
.setIntervalMultiplier(multiplier)
113+
.initExecution();
114+
115+
// first two calls keep the initial interval (multiplier only kicks in after attempt 1)
116+
retryExecution.nextDelay();
117+
retryExecution.nextDelay();
118+
// third call doubles the interval to 2000ms
119+
retryExecution.nextDelay();
120+
121+
Optional<java.time.Duration> remaining = retryExecution.remainingDurationUntilNextRetry();
122+
assertThat(remaining).isPresent();
123+
assertThat(remaining.get().toMillis())
124+
.isPositive()
125+
.isLessThanOrEqualTo((long) (initialInterval * multiplier));
78126
}
79127

80-
public Optional<Long> callNextDelayNTimes(RetryExecution retryExecution, int n) {
128+
Optional<Long> callNextDelayNTimes(RetryExecution retryExecution, int n) {
81129
for (int i = 0; i < n; i++) {
82130
retryExecution.nextDelay();
83131
}

0 commit comments

Comments
 (0)