Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@
* <li>Is every c.complete followed by a return, to end execution?</li>
* <li>Have all sync method calls been converted to async, where needed?</li>
* </ol>
*
* <p>This class is not part of the public API and may be removed or changed
* at any time
* <p>
* If, when writing a lambda expression, you need to have an effectively {@code final} variable
* whose value may be mutated, use {@link MutableValue}.
* <p>
* This class is not part of the public API and may be removed or changed at any time.
*/
@FunctionalInterface
public interface AsyncRunnable extends AsyncSupplier<Void>, AsyncConsumer<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ private static Throwable callOnAttemptFailureOperator(
* @param readOnlyRetryState Must not be mutated by this method.
* @param onlyRuntimeExceptions See {@link #doAdvanceOrThrow(Throwable, BinaryOperator, BiPredicate, boolean)}.
*/
private boolean shouldRetry(final RetryState readOnlyRetryState, final Throwable attemptException, final Throwable newlyChosenException,
private static boolean shouldRetry(final RetryState readOnlyRetryState, final Throwable attemptException,
final Throwable newlyChosenException,
final boolean onlyRuntimeExceptions, final BiPredicate<RetryState, Throwable> retryPredicate) {
try {
return retryPredicate.test(readOnlyRetryState, attemptException);
Expand Down Expand Up @@ -295,16 +296,6 @@ public boolean breakAndCompleteIfRetryAnd(final Supplier<Boolean> predicate, fin
}
}

/**
* This method is similar to
* {@link RetryState#breakAndThrowIfRetryAnd(Supplier)} / {@link RetryState#breakAndCompleteIfRetryAnd(Supplier, SingleResultCallback)}.
* The difference is that it allows the current attempt to continue, yet no more attempts will happen. Also, unlike the aforementioned
* methods, this method has effect even if called during the {@linkplain #isFirstAttempt() first attempt}.
*/
public void markAsLastAttempt() {
loopState.markAsLastIteration();
}

/**
* Returns {@code true} iff the current attempt is the first one, i.e., no retry attempts have been made.
*
Expand All @@ -318,7 +309,7 @@ public boolean isFirstAttempt() {
* Returns {@code true} iff the current attempt is known to be the last one, i.e., it is known that no more attempts will be made.
* An attempt is known to be the last one iff any of the following applies:
* <ul>
* <li>{@link #breakAndThrowIfRetryAnd(Supplier)} / {@link #breakAndCompleteIfRetryAnd(Supplier, SingleResultCallback)} / {@link #markAsLastAttempt()} was called.</li>
* <li>{@link #breakAndThrowIfRetryAnd(Supplier)} / {@link #breakAndCompleteIfRetryAnd(Supplier, SingleResultCallback)} was called.</li>
* <li>{@code attemptException} is a {@link MongoOperationTimeoutException}.</li>
* <li>The number of attempts is limited, and the current attempt is the last one.</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ static <R> void withAsyncSourceAndConnection(
final boolean wrapConnectionSourceException,
final OperationContext operationContext,
final SingleResultCallback<R> callback,
final AsyncCallbackTriFunction<AsyncConnectionSource, AsyncConnection, OperationContext, R> asyncFunction)
throws OperationHelper.ResourceSupplierInternalException {
final AsyncCallbackTriFunction<AsyncConnectionSource, AsyncConnection, OperationContext, R> asyncFunction) {
SingleResultCallback<R> errorHandlingCallback = errorHandlingCallback(callback, OperationHelper.LOGGER);

OperationContext serverSelectionOperationContext =
Expand Down Expand Up @@ -140,8 +139,7 @@ static <R, T extends ReferenceCounted> void withAsyncSuppliedResource(final Asyn
final boolean wrapSourceConnectionException,
final OperationContext operationContext,
final SingleResultCallback<R> callback,
final AsyncCallbackFunction<T, R> function)
throws OperationHelper.ResourceSupplierInternalException {
final AsyncCallbackFunction<T, R> function) {
SingleResultCallback<R> errorHandlingCallback = errorHandlingCallback(callback, OperationHelper.LOGGER);
resourceSupplier.apply(operationContext, (resource, supplierException) -> {
if (supplierException != null) {
Expand Down Expand Up @@ -331,7 +329,7 @@ static <D, T> void createReadCommandAndExecuteAsync(

static <R> AsyncCallbackSupplier<R> decorateReadWithRetriesAsync(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier<R> asyncReadFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableReadAttemptFailure(operationContext),
return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableReadAttemptFailure(operationContext.getServerDeprioritization()),
CommandOperationHelper::loggingShouldAttemptToRetryRead, callback -> {
logRetryCommand(retryState, operationContext);
asyncReadFunction.get(callback);
Expand All @@ -340,7 +338,7 @@ static <R> AsyncCallbackSupplier<R> decorateReadWithRetriesAsync(final RetryStat

static <R> AsyncCallbackSupplier<R> decorateWriteWithRetriesAsync(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier<R> asyncWriteFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext),
return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext.getServerDeprioritization()),
CommandOperationHelper::loggingShouldAttemptToRetryWriteAndAddRetryableLabel, callback -> {
logRetryCommand(retryState, operationContext);
asyncWriteFunction.get(callback);
Expand Down
Loading