diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index deeef6239c..9468e787dc 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -111,9 +111,11 @@ *
  • Is every c.complete followed by a return, to end execution?
  • *
  • Have all sync method calls been converted to async, where needed?
  • * - * - *

    This class is not part of the public API and may be removed or changed - * at any time + *

    + * If, when writing a lambda expression, you need to have an effectively {@code final} variable + * whose value may be mutated, use {@link MutableValue}. + *

    + * This class is not part of the public API and may be removed or changed at any time. */ @FunctionalInterface public interface AsyncRunnable extends AsyncSupplier, AsyncConsumer { diff --git a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java index b209035d7a..9742116f23 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java @@ -208,7 +208,8 @@ private static Throwable callOnAttemptFailureOperator( * @param readOnlyRetryState Must not be mutated by this method. * @param onlyRuntimeExceptions See {@link #doAdvanceOrThrow(Throwable, BinaryOperator, BiPredicate, boolean)}. */ - private boolean shouldRetry(final RetryState readOnlyRetryState, final Throwable attemptException, final Throwable newlyChosenException, + private static boolean shouldRetry(final RetryState readOnlyRetryState, final Throwable attemptException, + final Throwable newlyChosenException, final boolean onlyRuntimeExceptions, final BiPredicate retryPredicate) { try { return retryPredicate.test(readOnlyRetryState, attemptException); @@ -295,16 +296,6 @@ public boolean breakAndCompleteIfRetryAnd(final Supplier predicate, fin } } - /** - * This method is similar to - * {@link RetryState#breakAndThrowIfRetryAnd(Supplier)} / {@link RetryState#breakAndCompleteIfRetryAnd(Supplier, SingleResultCallback)}. - * The difference is that it allows the current attempt to continue, yet no more attempts will happen. Also, unlike the aforementioned - * methods, this method has effect even if called during the {@linkplain #isFirstAttempt() first attempt}. - */ - public void markAsLastAttempt() { - loopState.markAsLastIteration(); - } - /** * Returns {@code true} iff the current attempt is the first one, i.e., no retry attempts have been made. * @@ -318,7 +309,7 @@ public boolean isFirstAttempt() { * Returns {@code true} iff the current attempt is known to be the last one, i.e., it is known that no more attempts will be made. * An attempt is known to be the last one iff any of the following applies: *

    diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java index 91e52ca1ba..90c013dd29 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java @@ -110,8 +110,7 @@ static void withAsyncSourceAndConnection( final boolean wrapConnectionSourceException, final OperationContext operationContext, final SingleResultCallback callback, - final AsyncCallbackTriFunction asyncFunction) - throws OperationHelper.ResourceSupplierInternalException { + final AsyncCallbackTriFunction asyncFunction) { SingleResultCallback errorHandlingCallback = errorHandlingCallback(callback, OperationHelper.LOGGER); OperationContext serverSelectionOperationContext = @@ -140,8 +139,7 @@ static void withAsyncSuppliedResource(final Asyn final boolean wrapSourceConnectionException, final OperationContext operationContext, final SingleResultCallback callback, - final AsyncCallbackFunction function) - throws OperationHelper.ResourceSupplierInternalException { + final AsyncCallbackFunction function) { SingleResultCallback errorHandlingCallback = errorHandlingCallback(callback, OperationHelper.LOGGER); resourceSupplier.apply(operationContext, (resource, supplierException) -> { if (supplierException != null) { @@ -331,7 +329,7 @@ static void createReadCommandAndExecuteAsync( static AsyncCallbackSupplier decorateReadWithRetriesAsync(final RetryState retryState, final OperationContext operationContext, final AsyncCallbackSupplier asyncReadFunction) { - return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableReadAttemptFailure(operationContext), + return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableReadAttemptFailure(operationContext.getServerDeprioritization()), CommandOperationHelper::loggingShouldAttemptToRetryRead, callback -> { logRetryCommand(retryState, operationContext); asyncReadFunction.get(callback); @@ -340,7 +338,7 @@ static AsyncCallbackSupplier decorateReadWithRetriesAsync(final RetryStat static AsyncCallbackSupplier decorateWriteWithRetriesAsync(final RetryState retryState, final OperationContext operationContext, final AsyncCallbackSupplier asyncWriteFunction) { - return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext), + return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext.getServerDeprioritization()), CommandOperationHelper::loggingShouldAttemptToRetryWriteAndAddRetryableLabel, callback -> { logRetryCommand(retryState, operationContext); asyncWriteFunction.get(callback); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java index 23c24bc52b..ff078b5b09 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java @@ -89,7 +89,6 @@ import com.mongodb.internal.validator.ReplacingDocumentFieldNameValidator; import com.mongodb.internal.validator.UpdateFieldNameValidator; import com.mongodb.lang.Nullable; -import org.bson.BsonArray; import org.bson.BsonBinaryWriter; import org.bson.BsonBoolean; import org.bson.BsonDocument; @@ -130,7 +129,7 @@ import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection; import static com.mongodb.internal.operation.BulkWriteBatch.logWriteModelDoesNotSupportRetries; import static com.mongodb.internal.operation.CommandOperationHelper.commandWriteConcern; -import static com.mongodb.internal.operation.CommandOperationHelper.getWriteAttemptFailureNotToBeRetriedOrAddRetryableLabel; +import static com.mongodb.internal.operation.CommandOperationHelper.addRetryableLabelOrGetWriteAttemptFailureNotToBeRetried; import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState; import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException; import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite; @@ -180,12 +179,11 @@ public ClientBulkWriteOperation( @Override public String getCommandName() { - return "bulkWrite"; + return BULK_WRITE_COMMAND_NAME; } @Override public MongoNamespace getNamespace() { - // The bulkWrite command is executed on the "admin" database. return ADMIN_DB_COMMAND_NAMESPACE; } @@ -209,17 +207,19 @@ public void executeAsync( final AsyncWriteBinding binding, final OperationContext operationContext, final SingleResultCallback callback) { - WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(operationContext.getSessionContext()); - ResultAccumulator resultAccumulator = new ResultAccumulator(); - MutableValue transformedTopLevelError = new MutableValue<>(); - - beginAsync().thenSupply(c -> { - executeAllBatchesAsync(effectiveWriteConcern, binding, operationContext, resultAccumulator, c); - }).onErrorIf(topLevelError -> topLevelError instanceof MongoException, (topLevelError, c) -> { - transformedTopLevelError.set(transformWriteException((MongoException) topLevelError)); - c.complete(c); - }).thenApply((ignored, c) -> { - c.complete(resultAccumulator.build(transformedTopLevelError.getNullable(), effectiveWriteConcern)); + beginAsync().thenSupply(c -> { + WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(operationContext.getSessionContext()); + ResultAccumulator resultAccumulator = new ResultAccumulator(); + MutableValue transformedTopLevelError = new MutableValue<>(); + + beginAsync().thenRun(executeAllBatchesCallback -> { + executeAllBatchesAsync(effectiveWriteConcern, binding, operationContext, resultAccumulator, executeAllBatchesCallback); + }).onErrorIf(topLevelError -> topLevelError instanceof MongoException, (topLevelError, onErrorCallback) -> { + transformedTopLevelError.set(transformWriteException((MongoException) topLevelError)); + onErrorCallback.complete(onErrorCallback); + }).thenApply((ignored, buildResultCallback) -> { + buildResultCallback.complete(resultAccumulator.build(transformedTopLevelError.getNullable(), effectiveWriteConcern)); + }).finish(c); }).finish(callback); } @@ -253,16 +253,18 @@ private void executeAllBatchesAsync( final OperationContext operationContext, final ResultAccumulator resultAccumulator, final SingleResultCallback callback) { - MutableValue nextBatchStartModelIndex = new MutableValue<>(INITIAL_BATCH_MODEL_START_INDEX); - - beginAsync().thenRunDoWhileLoop(iterationCallback -> { - beginAsync().thenSupply(c -> { - executeBatchAsync(nextBatchStartModelIndex.get(), effectiveWriteConcern, binding, operationContext, resultAccumulator, c); - }).thenApply((nextBatchStartModelIdx, c) -> { - nextBatchStartModelIndex.set(nextBatchStartModelIdx); - c.complete(c); - }).finish(iterationCallback); - }, () -> nextBatchStartModelIndex.getNullable() != null).finish(callback); + beginAsync().thenRun(c -> { + MutableValue nextBatchStartModelIndex = new MutableValue<>(INITIAL_BATCH_MODEL_START_INDEX); + + beginAsync().thenRunDoWhileLoop(iterationCallback -> { + beginAsync().thenSupply(executeBatchCallback -> { + executeBatchAsync(nextBatchStartModelIndex.get(), effectiveWriteConcern, binding, operationContext, resultAccumulator, executeBatchCallback); + }).thenConsume((nextBatchStartModelIdx, setNextBatchStartModelIndexCallback) -> { + nextBatchStartModelIndex.set(nextBatchStartModelIdx); + setNextBatchStartModelIndexCallback.complete(setNextBatchStartModelIndexCallback); + }).finish(iterationCallback); + }, () -> nextBatchStartModelIndex.getNullable() != null).finish(c); + }).finish(callback); } /** @@ -290,7 +292,7 @@ private Integer executeBatch( // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502. // If connection pinning is required, `binding` handles that, // and `ClientSession`, `TransactionContext` are aware of that. - () -> withSourceAndConnection(binding::getWriteConnectionSource, true, + () -> withSourceAndConnection(binding::getWriteConnectionSource, true, operationContext, (connectionSource, connection, operationContextWithMinRtt) -> { ConnectionDescription connectionDescription = connection.getDescription(); boolean effectiveRetryWrites = isRetryableWrite( @@ -304,7 +306,7 @@ private Integer executeBatch( () -> retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), true, true)); return executeBulkWriteCommandAndExhaustOkResponse( retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContextWithMinRtt); - }, operationContext) + }) ); try { @@ -318,11 +320,13 @@ private Integer executeBatch( resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException); throw bulkWriteCommandException; } catch (MongoException mongoException) { - // The server does not have a chance to add "RetryableWriteError" label to `e`, - // and if it is the last attempt failure, `RetryingSyncSupplier` also may not have a chance - // to add the label. So we do that explicitly. - getWriteAttemptFailureNotToBeRetriedOrAddRetryableLabel(retryState, mongoException); resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException); + if (retryWritesSetting) { + // Adding the `RetryableWriteError` label here is unnecessary at this point: + // applications cannot use it for implementing retries, and it is not even part of the public driver API. + // Unfortunately, certain unified tests incorrectly rely on this label to verify retries, resulting in this redundant code. + addRetryableLabelOrGetWriteAttemptFailureNotToBeRetried(retryState, mongoException); + } throw mongoException; } } @@ -337,61 +341,67 @@ private void executeBatchAsync( final OperationContext operationContext, final ResultAccumulator resultAccumulator, final SingleResultCallback callback) { - List unexecutedModels = models.subList(batchStartModelIndex, models.size()); - assertFalse(unexecutedModels.isEmpty()); - SessionContext sessionContext = operationContext.getSessionContext(); - TimeoutContext timeoutContext = operationContext.getTimeoutContext(); - RetryState retryState = initialRetryState(retryWritesSetting, timeoutContext); - BatchEncoder batchEncoder = new BatchEncoder(); - - AsyncCallbackSupplier retryingBatchExecutor = decorateWriteWithRetriesAsync( - retryState, operationContext, - // Each batch re-selects a server and re-checks out a connection because this is simpler, - // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502. - // If connection pinning is required, `binding` handles that, - // and `ClientSession`, `TransactionContext` are aware of that. - funcCallback -> withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, operationContext, funcCallback, - (connectionSource, connection, operationContextWithMinRtt, resultCallback) -> { - ConnectionDescription connectionDescription = connection.getDescription(); - boolean effectiveRetryWrites = isRetryableWrite( - retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext); - retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites); - resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress()); - retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true) - .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false); - ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand( - retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder, - () -> retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), true, true)); - executeBulkWriteCommandAndExhaustOkResponseAsync( - retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContextWithMinRtt, resultCallback); - }) - ); - - beginAsync().thenSupply(c -> { - retryingBatchExecutor.get(c); - }).thenApply((response, c) -> { - c.complete(resultAccumulator.onBulkWriteCommandOkResponseOrNoResponse( - batchStartModelIndex, response, batchEncoder.intoEncodedBatchInfo())); - }).onErrorIf(throwable -> true, (t, c) -> { - if (t instanceof MongoWriteConcernWithResponseException) { - MongoWriteConcernWithResponseException mongoWriteConcernWithOkResponseException = (MongoWriteConcernWithResponseException) t; - c.complete(resultAccumulator.onBulkWriteCommandOkResponseWithWriteConcernError( - batchStartModelIndex, mongoWriteConcernWithOkResponseException, batchEncoder.intoEncodedBatchInfo())); - } else if (t instanceof MongoCommandException) { - MongoCommandException bulkWriteCommandException = (MongoCommandException) t; - resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException); - c.completeExceptionally(t); - } else if (t instanceof MongoException) { - MongoException mongoException = (MongoException) t; - // The server does not have a chance to add "RetryableWriteError" label to `e`, - // and if it is the last attempt failure, `RetryingSyncSupplier` also may not have a chance - // to add the label. So we do that explicitly. - getWriteAttemptFailureNotToBeRetriedOrAddRetryableLabel(retryState, mongoException); - resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException); - c.completeExceptionally(mongoException); - } else { - c.completeExceptionally(t); - } + beginAsync().thenSupply(c -> { + List unexecutedModels = models.subList(batchStartModelIndex, models.size()); + assertFalse(unexecutedModels.isEmpty()); + SessionContext sessionContext = operationContext.getSessionContext(); + TimeoutContext timeoutContext = operationContext.getTimeoutContext(); + RetryState retryState = initialRetryState(retryWritesSetting, timeoutContext); + BatchEncoder batchEncoder = new BatchEncoder(); + + AsyncCallbackSupplier retryingBatchExecutor = decorateWriteWithRetriesAsync( + retryState, operationContext, + // Each batch re-selects a server and re-checks out a connection because this is simpler, + // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502. + // If connection pinning is required, `binding` handles that, + // and `ClientSession`, `TransactionContext` are aware of that. + supplierCallback -> withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, operationContext, supplierCallback, + (connectionSource, connection, operationContextWithMinRtt, functionCallback) -> { + beginAsync().thenSupply(executeAndExhaustCallback -> { + ConnectionDescription connectionDescription = connection.getDescription(); + boolean effectiveRetryWrites = isRetryableWrite( + retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext); + retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites); + resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress()); + retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true) + .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false); + ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand( + retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder, + () -> retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), true, true)); + executeBulkWriteCommandAndExhaustOkResponseAsync( + retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContextWithMinRtt, executeAndExhaustCallback); + }).finish(functionCallback); + }) + ); + + beginAsync().thenSupply(executorCallback -> { + retryingBatchExecutor.get(executorCallback); + }).thenApply((response, transformResponseCallback) -> { + transformResponseCallback.complete(resultAccumulator.onBulkWriteCommandOkResponseOrNoResponse( + batchStartModelIndex, response, batchEncoder.intoEncodedBatchInfo())); + }).onErrorIf(throwable -> true, (t, onErrorCallback) -> { + if (t instanceof MongoWriteConcernWithResponseException) { + MongoWriteConcernWithResponseException mongoWriteConcernWithOkResponseException = (MongoWriteConcernWithResponseException) t; + onErrorCallback.complete(resultAccumulator.onBulkWriteCommandOkResponseWithWriteConcernError( + batchStartModelIndex, mongoWriteConcernWithOkResponseException, batchEncoder.intoEncodedBatchInfo())); + } else if (t instanceof MongoCommandException) { + MongoCommandException bulkWriteCommandException = (MongoCommandException) t; + resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException); + throw bulkWriteCommandException; + } else if (t instanceof MongoException) { + MongoException mongoException = (MongoException) t; + resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException); + if (retryWritesSetting) { + // Adding the `RetryableWriteError` label here is unnecessary at this point: + // applications cannot use it for implementing retries, and it is not even part of the public driver API. + // Unfortunately, certain unified tests incorrectly rely on this label to verify retries, resulting in this redundant code. + addRetryableLabelOrGetWriteAttemptFailureNotToBeRetried(retryState, mongoException); + } + throw mongoException; + } else { + onErrorCallback.completeExceptionally(t); + } + }).finish(c); }).finish(callback); } @@ -462,8 +472,8 @@ private void executeBulkWriteCommandAndExhaustOkResponseAsync( doWithRetriesDisabledAsync(retryState, (actionCallback) -> { exhaustBulkWriteCommandOkResponseCursorAsync(connectionSource, connection, response, operationContext, actionCallback); }, exhaustCallback); - }).thenApply((cursorExhaustBatches, exhaustCallback) -> { - exhaustCallback.complete(createExhaustiveClientBulkWriteCommandOkResponse( + }).thenApply((cursorExhaustBatches, transformExhaustionResultCallback) -> { + transformExhaustionResultCallback.complete(createExhaustiveClientBulkWriteCommandOkResponse( response, cursorExhaustBatches, connection.getDescription())); @@ -471,14 +481,18 @@ private void executeBulkWriteCommandAndExhaustOkResponseAsync( }).finish(callback); } + /** + * @see #executeBulkWriteCommandAndExhaustOkResponse(RetryState, ConnectionSource, Connection, ClientBulkWriteCommand, WriteConcern, OperationContext) + */ private static ExhaustiveClientBulkWriteCommandOkResponse createExhaustiveClientBulkWriteCommandOkResponse( final ClientBulkWriteCommandOkResponse response, final List> cursorExhaustBatches, - final ConnectionDescription connectionDescription) { + final ConnectionDescription connectionDescription) throws MongoWriteConcernWithResponseException { ExhaustiveClientBulkWriteCommandOkResponse exhaustiveResponse = new ExhaustiveClientBulkWriteCommandOkResponse( response, cursorExhaustBatches); - // `Connection.command` does not throw `MongoWriteConcernException`, so we have to construct it ourselves + // Given that the response is OK, `Connection.command` does not throw an exception when the write concern is violated, + // so we have to construct such an exception ourselves. MongoWriteConcernException writeConcernException = Exceptions.createWriteConcernException( response, connectionDescription.getServerAddress()); if (writeConcernException != null) { @@ -513,15 +527,17 @@ private void doWithRetriesDisabledAsync( final RetryState retryState, final AsyncSupplier action, final SingleResultCallback callback) { - // TODO-JAVA-5956 The current implementation incorrectly uses `retryableWriteCommandFlag` to achieve the behavior needed. - Optional originalRetryableWriteCommandFlag = retryState.attachment(AttachmentKeys.retryableWriteCommandFlag()); - beginAsync().thenSupply(c -> { - retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), false, true); - action.finish(c); - }).thenAlwaysRunAndFinish(() -> { - originalRetryableWriteCommandFlag.ifPresent(value -> retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), value, true)); - }, callback); + // TODO-JAVA-5956 The current implementation incorrectly uses `retryableWriteCommandFlag` to achieve the behavior needed. + Optional originalRetryableWriteCommandFlag = retryState.attachment(AttachmentKeys.retryableWriteCommandFlag()); + + beginAsync().thenSupply(actionCallback -> { + retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), false, true); + action.finish(actionCallback); + }).thenAlwaysRunAndFinish(() -> { + originalRetryableWriteCommandFlag.ifPresent(value -> retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), value, true)); + }, c); + }).finish(callback); } private List> exhaustBulkWriteCommandOkResponseCursor( @@ -549,21 +565,23 @@ private void exhaustBulkWriteCommandOkResponseCursorAsync( final ClientBulkWriteCommandOkResponse response, final OperationContext operationContext, final SingleResultCallback>> callback) { - AsyncBatchCursor cursor = cursorDocumentToAsyncBatchCursor( - TimeoutMode.CURSOR_LIFETIME, - response.getDocument(), - SERVER_DEFAULT_CURSOR_BATCH_SIZE, - codecRegistry.get(BsonDocument.class), - options.getComment().orElse(null), - connectionSource, - connection, - operationContext); - beginAsync().>>thenSupply(c -> { - cursor.exhaust(c); - }).thenAlwaysRunAndFinish(() -> { - cursor.close(); - }, callback); + AsyncBatchCursor cursor = cursorDocumentToAsyncBatchCursor( + TimeoutMode.CURSOR_LIFETIME, + response.getDocument(), + SERVER_DEFAULT_CURSOR_BATCH_SIZE, + codecRegistry.get(BsonDocument.class), + options.getComment().orElse(null), + connectionSource, + connection, + operationContext); + + beginAsync().>>thenSupply(exhaustCallback -> { + cursor.exhaust(exhaustCallback); + }).thenAlwaysRunAndFinish(() -> { + cursor.close(); + }, c); + }).finish(callback); } private ClientBulkWriteCommand createBulkWriteCommand( @@ -647,12 +665,9 @@ private static MongoWriteConcernException createWriteConcernException( if (!responseDocument.containsKey(writeConcernErrorFieldName)) { return null; } - BsonDocument writeConcernErrorDocument = responseDocument.getDocument(writeConcernErrorFieldName); - WriteConcernError writeConcernError = WriteConcernHelper.createWriteConcernError(writeConcernErrorDocument); - Set errorLabels = responseDocument.getArray("errorLabels", new BsonArray()).stream() - .map(i -> i.asString().getValue()) - .collect(toSet()); - return new MongoWriteConcernException(writeConcernError, null, serverAddress, errorLabels); + // We do not use `MongoWriteConcernException.getWriteResult`, + // so we do not care what result `WriteConcernHelper.createWriteConcernException` puts there. + return WriteConcernHelper.createWriteConcernException(responseDocument, serverAddress); } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java index da17f0bede..a0acea8348 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java @@ -32,6 +32,7 @@ import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.function.RetryState; import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.connection.OperationContext.ServerDeprioritization; import com.mongodb.internal.operation.OperationHelper.ResourceSupplierInternalException; import com.mongodb.internal.operation.retry.AttachmentKeys; import com.mongodb.internal.session.SessionContext; @@ -77,9 +78,9 @@ BsonDocument create( ConnectionDescription connectionDescription); } - static BinaryOperator onRetryableReadAttemptFailure(final OperationContext operationContext) { + static BinaryOperator onRetryableReadAttemptFailure(final ServerDeprioritization serverDeprioritization) { return (@Nullable Throwable previouslyChosenException, Throwable mostRecentAttemptException) -> { - operationContext.getServerDeprioritization().onAttemptFailure(mostRecentAttemptException); + serverDeprioritization.onAttemptFailure(mostRecentAttemptException); return chooseRetryableReadException(previouslyChosenException, mostRecentAttemptException); }; } @@ -96,9 +97,9 @@ private static Throwable chooseRetryableReadException( } } - static BinaryOperator onRetryableWriteAttemptFailure(final OperationContext operationContext) { + static BinaryOperator onRetryableWriteAttemptFailure(final ServerDeprioritization serverDeprioritization) { return (@Nullable Throwable previouslyChosenException, Throwable mostRecentAttemptException) -> { - operationContext.getServerDeprioritization().onAttemptFailure(mostRecentAttemptException); + serverDeprioritization.onAttemptFailure(mostRecentAttemptException); return chooseRetryableWriteException(previouslyChosenException, mostRecentAttemptException); }; } @@ -176,7 +177,7 @@ static boolean loggingShouldAttemptToRetryRead(final RetryState retryState, fina } static boolean loggingShouldAttemptToRetryWriteAndAddRetryableLabel(final RetryState retryState, final Throwable attemptFailure) { - Throwable attemptFailureNotToBeRetried = getWriteAttemptFailureNotToBeRetriedOrAddRetryableLabel(retryState, attemptFailure); + Throwable attemptFailureNotToBeRetried = addRetryableLabelOrGetWriteAttemptFailureNotToBeRetried(retryState, attemptFailure); boolean decision = attemptFailureNotToBeRetried == null; if (!decision && retryState.attachment(AttachmentKeys.retryableWriteCommandFlag()).orElse(false)) { logUnableToRetryCommand(retryState, assertNotNull(attemptFailureNotToBeRetried)); @@ -185,10 +186,12 @@ static boolean loggingShouldAttemptToRetryWriteAndAddRetryableLabel(final RetryS } /** - * @return {@code null} if the decision is {@code true}. Otherwise, returns the {@link Throwable} that must not be retried. + * Returns {@code null} if the failed attempt should be retried; + * in this case, also adds the {@value #RETRYABLE_WRITE_ERROR_LABEL} label if needed. + * Otherwise, returns a {@link Throwable} that must not be retried. */ @Nullable - static Throwable getWriteAttemptFailureNotToBeRetriedOrAddRetryableLabel(final RetryState retryState, final Throwable attemptFailure) { + static Throwable addRetryableLabelOrGetWriteAttemptFailureNotToBeRetried(final RetryState retryState, final Throwable attemptFailure) { Throwable failure = attemptFailure instanceof ResourceSupplierInternalException ? attemptFailure.getCause() : attemptFailure; boolean decision = false; MongoException exceptionRetryableRegardlessOfCommand = null; diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java index de02983f01..958a0dc5c6 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java @@ -301,7 +301,7 @@ public BatchCursor execute(final ReadBinding binding, final OperationContext OperationContext findOperationContext = getFindOperationContext(operationContext); RetryState retryState = initialRetryState(retryReads, findOperationContext.getTimeoutContext()); Supplier> read = decorateReadWithRetries(retryState, findOperationContext, () -> - withSourceAndConnection(binding::getReadConnectionSource, false, + withSourceAndConnection(binding::getReadConnectionSource, false, findOperationContext, (source, connection, commandOperationContext) -> { retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(commandOperationContext)); try { @@ -311,7 +311,7 @@ public BatchCursor execute(final ReadBinding binding, final OperationContext } catch (MongoCommandException e) { throw new MongoQueryException(e.getResponse(), e.getServerAddress()); } - }, findOperationContext) + }) ); return read.get(); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java index 2340d3880e..fe4929a27b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java @@ -177,7 +177,7 @@ public BatchCursor execute(final ReadBinding binding, final OperationContext RetryState retryState = initialRetryState(retryReads, listCollectionsOperationContext.getTimeoutContext()); Supplier> read = decorateReadWithRetries(retryState, listCollectionsOperationContext, () -> - withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection, operationContextWithMinRTT) -> { + withSourceAndConnection(binding::getReadConnectionSource, false, listCollectionsOperationContext, (source, connection, operationContextWithMinRTT) -> { retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(operationContextWithMinRTT)); try { return createReadCommandAndExecute(retryState, operationContextWithMinRTT, source, databaseName, @@ -186,7 +186,7 @@ public BatchCursor execute(final ReadBinding binding, final OperationContext return rethrowIfNotNamespaceError(e, createEmptySingleBatchCursor(source.getServerDescription().getAddress(), batchSize)); } - }, listCollectionsOperationContext) + }) ); return read.get(); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java index ddb6d8e4c0..75e29b946b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java @@ -134,7 +134,7 @@ public BatchCursor execute(final ReadBinding binding, final OperationContext RetryState retryState = initialRetryState(retryReads, listIndexesOperationContext.getTimeoutContext()); Supplier> read = decorateReadWithRetries(retryState, listIndexesOperationContext, () -> - withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection, operationContextWithMinRTT) -> { + withSourceAndConnection(binding::getReadConnectionSource, false, listIndexesOperationContext, (source, connection, operationContextWithMinRTT) -> { retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(operationContextWithMinRTT)); try { return createReadCommandAndExecute(retryState, operationContextWithMinRTT, source, namespace.getDatabaseName(), @@ -143,7 +143,7 @@ public BatchCursor execute(final ReadBinding binding, final OperationContext return rethrowIfNotNamespaceError(e, createEmptySingleBatchCursor(source.getServerDescription().getAddress(), batchSize)); } - }, listIndexesOperationContext) + }) ); return read.get(); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java index 53b0f23208..e567b2cdf0 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java @@ -19,19 +19,18 @@ import com.mongodb.MongoException; import com.mongodb.MongoNamespace; import com.mongodb.WriteConcern; -import com.mongodb.assertions.Assertions; import com.mongodb.bulk.BulkWriteResult; import com.mongodb.connection.ConnectionDescription; -import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.async.MutableValue; import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.async.function.AsyncCallbackLoop; -import com.mongodb.internal.async.function.AsyncCallbackRunnable; +import com.mongodb.internal.async.function.AsyncCallbackFunction; import com.mongodb.internal.async.function.AsyncCallbackSupplier; -import com.mongodb.internal.async.function.LoopState; +import com.mongodb.internal.async.function.AsyncCallbackTriFunction; import com.mongodb.internal.async.function.RetryState; -import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier; -import com.mongodb.internal.async.function.RetryingSyncSupplier; +import com.mongodb.internal.binding.AsyncConnectionSource; import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.ConnectionSource; +import com.mongodb.internal.binding.ReferenceCounted; import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.bulk.WriteRequest; import com.mongodb.internal.connection.AsyncConnection; @@ -50,27 +49,25 @@ import java.util.List; import java.util.Locale; -import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; -import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.AsyncOperationHelper.exceptionTransformingCallback; +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.decorateWriteWithRetriesAsync; import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection; import static com.mongodb.internal.operation.CommandOperationHelper.addRetryableWriteErrorLabel; -import static com.mongodb.internal.operation.CommandOperationHelper.logRetryCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.loggingShouldAttemptToRetryWriteAndAddRetryableLabel; -import static com.mongodb.internal.operation.CommandOperationHelper.onRetryableWriteAttemptFailure; +import static com.mongodb.internal.operation.CommandOperationHelper.addRetryableLabelOrGetWriteAttemptFailureNotToBeRetried; +import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState; import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException; import static com.mongodb.internal.operation.CommandOperationHelper.validateAndGetEffectiveWriteConcern; -import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite; import static com.mongodb.internal.operation.OperationHelper.validateWriteRequests; -import static com.mongodb.internal.operation.OperationHelper.validateWriteRequestsAndCompleteIfInvalid; +import static com.mongodb.internal.operation.SyncOperationHelper.decorateWriteWithRetries; import static com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection; /** @@ -145,41 +142,6 @@ public Boolean getRetryWrites() { return retryWrites; } - private Supplier decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext, - final Supplier writeFunction) { - return new RetryingSyncSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext), - this::shouldAttemptToRetryWrite, () -> { - logRetryCommand(retryState, operationContext); - return writeFunction.get(); - }); - } - - private AsyncCallbackSupplier decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext, - final AsyncCallbackSupplier writeFunction) { - return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext), - this::shouldAttemptToRetryWrite, callback -> { - logRetryCommand(retryState, operationContext); - writeFunction.get(callback); - }); - } - - private boolean shouldAttemptToRetryWrite(final RetryState retryState, final Throwable attemptFailure) { - BulkWriteTracker bulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail); - /* A retry predicate is called only if there is at least one more attempt left. Here we maintain attempt counters manually - * and emulate the above contract by returning `false` at the very beginning of the retry predicate. */ - if (bulkWriteTracker.lastAttempt()) { - return false; - } - boolean decision = loggingShouldAttemptToRetryWriteAndAddRetryableLabel(retryState, attemptFailure); - if (decision) { - /* The attempt counter maintained by `RetryState` is updated after (in the happens-before order) testing a retry predicate, - * and only if the predicate completes normally. Here we maintain attempt counters manually, and we emulate the - * "after completion" part by updating the counter at the very end of the retry predicate. */ - bulkWriteTracker.advance(); - } - return decision; - } - @Override public String getCommandName() { return commandName; @@ -187,329 +149,506 @@ public String getCommandName() { @Override public BulkWriteResult execute(final WriteBinding binding, final OperationContext operationContext) { - /* We cannot use the tracking of attempts built in the `RetryState` class because conceptually we have to maintain multiple attempt - * counters while executing a single bulk write operation: - * - a counter that limits attempts to select server and checkout a connection before we created a batch; - * - a counter per each batch that limits attempts to execute the specific batch. - * Fortunately, these counters do not exist concurrently with each other. While maintaining the counters manually, - * we must adhere to the contract of `RetryingSyncSupplier`. When the retry timeout is implemented, there will be no counters, - * and the code related to the attempt tracking in `BulkWriteTracker` will be removed. */ - RetryState retryState = new RetryState(); - BulkWriteTracker.attachNew(retryState, retryWrites, operationContext.getTimeoutContext()); - Supplier retryingBulkWrite = decorateWriteWithRetries(retryState, operationContext, () -> - withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection, operationContextWithMinRTT) -> { - TimeoutContext timeoutContextWithMinRtt = operationContextWithMinRTT.getTimeoutContext(); - ConnectionDescription connectionDescription = connection.getDescription(); - // attach `maxWireVersion` ASAP because it is used to check whether we can retry - retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); - SessionContext sessionContext = operationContext.getSessionContext(); - WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext); - if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)) { - handleMongoWriteConcernWithResponseException(retryState, true, timeoutContextWithMinRtt); - } - validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern); - if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) { - BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace, - connectionDescription, ordered, writeConcern, - bypassDocumentValidation, retryWrites, writeRequests, operationContextWithMinRTT, comment, variables), - timeoutContextWithMinRtt); - } - return executeBulkWriteBatch(retryState, writeConcern, operationContextWithMinRTT, connection); - }, operationContext) - ); + WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(writeConcern, operationContext.getSessionContext()); try { - return retryingBulkWrite.get(); + return executeAllBatches(effectiveWriteConcern, binding, operationContext); } catch (MongoException e) { throw transformWriteException(e); } } - public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, final SingleResultCallback callback) { - // see the comment in `execute(WriteBinding)` explaining the manual tracking of attempts - RetryState retryState = new RetryState(); - BulkWriteTracker.attachNew(retryState, retryWrites, operationContext.getTimeoutContext()); - binding.retain(); - AsyncCallbackSupplier retryingBulkWrite = this.decorateWriteWithRetries(retryState, - operationContext, - funcCallback -> - withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, operationContext, funcCallback, - (source, connection, operationContextWithMinRtt, releasingCallback) -> { - TimeoutContext timeoutContextWithMinRtt = operationContextWithMinRtt.getTimeoutContext(); - ConnectionDescription connectionDescription = connection.getDescription(); - - // attach `maxWireVersion` ASAP because it is used to check whether we can retry - retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); - SessionContext sessionContext = operationContextWithMinRtt.getSessionContext(); - WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext); - if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext) - && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback, timeoutContextWithMinRtt)) { - return; - } - if (validateWriteRequestsAndCompleteIfInvalid(connectionDescription, bypassDocumentValidation, writeRequests, - writeConcern, releasingCallback)) { - return; - } - try { - if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) { - BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace, - connectionDescription, ordered, writeConcern, - bypassDocumentValidation, retryWrites, writeRequests, operationContextWithMinRtt, comment, variables), timeoutContextWithMinRtt); - } - } catch (Throwable t) { - releasingCallback.onResult(null, t); - return; - } - executeBulkWriteBatchAsync(retryState, writeConcern, operationContextWithMinRtt, connection, releasingCallback); - }) - ).whenComplete(binding::release); - retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER))); + @Override + public void executeAsync( + final AsyncWriteBinding binding, + final OperationContext operationContext, + final SingleResultCallback callback) { + beginAsync().thenSupply(c -> { + WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(writeConcern, operationContext.getSessionContext()); + beginAsync().thenSupply(executeAllBatchesCallback -> { + executeAllBatchesAsync(effectiveWriteConcern, binding, operationContext, executeAllBatchesCallback); + }).onErrorIf(e -> e instanceof MongoException, (e, onErrorCallback) -> { + throw transformWriteException((MongoException) e); + }).finish(c); + }).finish(callback); } - private BulkWriteResult executeBulkWriteBatch( - final RetryState retryState, + private BulkWriteResult executeAllBatches( final WriteConcern effectiveWriteConcern, - final OperationContext operationContext, - final Connection connection) { - BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()) - .orElseThrow(Assertions::fail); - BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail); - int maxWireVersion = connection.getDescription().getMaxWireVersion(); - TimeoutContext timeoutContext = operationContext.getTimeoutContext(); - - while (currentBatch.shouldProcessBatch()) { - try { - BsonDocument result = executeCommand(effectiveWriteConcern, operationContext, connection, currentBatch); - if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) { - MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result, - connection.getDescription().getServerAddress(), "errMsg", timeoutContext); - if (writeConcernBasedError != null) { - if (currentBulkWriteTracker.lastAttempt()) { - addRetryableWriteErrorLabel(writeConcernBasedError, maxWireVersion); - addErrorLabelsToWriteConcern(result.getDocument("writeConcernError"), writeConcernBasedError.getErrorLabels()); - } else if (loggingShouldAttemptToRetryWriteAndAddRetryableLabel(retryState, writeConcernBasedError)) { - throw new MongoWriteConcernWithResponseException(writeConcernBasedError, result); - } - } - } - currentBatch.addResult(result); - currentBulkWriteTracker = BulkWriteTracker.attachNext(retryState, currentBatch, timeoutContext); - currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail); - } catch (MongoException exception) { - if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) { - addRetryableWriteErrorLabel(exception, maxWireVersion); + final WriteBinding binding, + final OperationContext operationContext) { + SourceAndConnection sourceAndConnection = null; + try { + BulkWriteBatch nextBatch = null; + do { + BatchWithSourceAndConnection nextBatchWithSourceAndConnection = executeBatchReusingConnection( + nextBatch, sourceAndConnection, effectiveWriteConcern, binding, operationContext); + try (BatchWithSourceAndConnection ignoredAndAutoClosed = nextBatchWithSourceAndConnection) { + nextBatch = nextBatchWithSourceAndConnection.getBatch(); + sourceAndConnection = nextBatchWithSourceAndConnection.intoSourceAndConnection(); } - handleMongoWriteConcernWithResponseException(retryState, false, timeoutContext); - throw exception; + } while (nextBatch.shouldProcessBatch()); + return nextBatch.getResult(); + } finally { + if (sourceAndConnection != null) { + sourceAndConnection.close(); } } - try { - return currentBatch.getResult(); - } catch (MongoException e) { - /* if we get here, some of the batches failed on the server side, - * so we need to mark the last attempt to avoid retrying. */ - retryState.markAsLastAttempt(); - throw e; - } } - private void executeBulkWriteBatchAsync( - final RetryState retryState, + private void executeAllBatchesAsync( final WriteConcern effectiveWriteConcern, + final AsyncWriteBinding binding, final OperationContext operationContext, - final AsyncConnection connection, final SingleResultCallback callback) { - LoopState loopState = new LoopState(); - AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> { - BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()) - .orElseThrow(Assertions::fail); - loopState.attach(AttachmentKeys.bulkWriteTracker(), currentBulkWriteTracker, true); - BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail); - int maxWireVersion = connection.getDescription().getMaxWireVersion(); - if (loopState.breakAndCompleteIf(() -> !currentBatch.shouldProcessBatch(), iterationCallback)) { - return; - } - - TimeoutContext timeoutContext = operationContext.getTimeoutContext(); - executeCommandAsync(effectiveWriteConcern, operationContext, connection, currentBatch, (result, t) -> { - if (t == null) { - if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) { - MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result, - connection.getDescription().getServerAddress(), "errMsg", operationContext.getTimeoutContext()); - if (writeConcernBasedError != null) { - if (currentBulkWriteTracker.lastAttempt()) { - addRetryableWriteErrorLabel(writeConcernBasedError, maxWireVersion); - addErrorLabelsToWriteConcern(result.getDocument("writeConcernError"), - writeConcernBasedError.getErrorLabels()); - } else if (loggingShouldAttemptToRetryWriteAndAddRetryableLabel(retryState, writeConcernBasedError)) { - iterationCallback.onResult(null, - new MongoWriteConcernWithResponseException(writeConcernBasedError, result)); - return; - } + beginAsync().thenSupply(c -> { + MutableValue sourceAndConnection = new MutableValue<>(); + beginAsync().thenSupply(loopCallback -> { + MutableValue nextBatch = new MutableValue<>(); + beginAsync().thenRunDoWhileLoop(iterationCallback -> { + beginAsync().>thenSupply(executeBatchCallback -> { + executeBatchReusingConnectionAsync( + nextBatch.getNullable(), sourceAndConnection.getNullable(), effectiveWriteConcern, binding, operationContext, executeBatchCallback); + }).thenConsume((nextBatchWithSourceAndConnection, consumeExecutionResultCallback) -> { + try (BatchWithSourceAndConnection ignoredAndAutoClosed = nextBatchWithSourceAndConnection) { + nextBatch.set(nextBatchWithSourceAndConnection.getBatch()); + sourceAndConnection.set(nextBatchWithSourceAndConnection.intoSourceAndConnection()); } - } - currentBatch.addResult(result); - BulkWriteTracker.attachNext(retryState, currentBatch, timeoutContext); - iterationCallback.onResult(null, null); - } else { - if (t instanceof MongoException) { - MongoException exception = (MongoException) t; - if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) { - addRetryableWriteErrorLabel(exception, maxWireVersion); - } - if (handleMongoWriteConcernWithResponseExceptionAsync(retryState, null, timeoutContext)) { - return; - } - } - iterationCallback.onResult(null, t); + consumeExecutionResultCallback.complete(consumeExecutionResultCallback); + }).finish(iterationCallback); + }, () -> nextBatch.get().shouldProcessBatch()) + .thenSupply(resultCallback -> { + resultCallback.complete(nextBatch.get().getResult()); + }).finish(loopCallback); + }).thenAlwaysRunAndFinish(() -> { + if (sourceAndConnection.getNullable() != null) { + sourceAndConnection.get().close(); } - }); - }); - loop.run((voidResult, t) -> { - if (t != null) { - callback.onResult(null, t); - } else { - BulkWriteResult result; - try { - result = loopState.attachment(AttachmentKeys.bulkWriteTracker()) - .flatMap(BulkWriteTracker::batch).orElseThrow(Assertions::fail).getResult(); - } catch (Throwable loopResultT) { - if (loopResultT instanceof MongoException) { - /* if we get here, some of the batches failed on the server side, - * so we need to mark the last attempt to avoid retrying. */ - retryState.markAsLastAttempt(); + }, c); + }).finish(callback); + } + + /** + * @param maybeSourceAndConnection Is guaranteed to be {@linkplain SourceAndConnection#close() closed} + * if this method creates a new {@link SourceAndConnection}. + */ + private BatchWithSourceAndConnection executeBatchReusingConnection( + @Nullable final BulkWriteBatch maybeBatch, + @Nullable final SourceAndConnection maybeSourceAndConnection, + final WriteConcern effectiveWriteConcern, + final WriteBinding binding, + final OperationContext operationContextForSelectServerAndCheckoutConnection) { + MutableValue batch = new MutableValue<>(maybeBatch); + MutableValue sourceAndConnection = new MutableValue<>(maybeSourceAndConnection); + RetryState retryState = initialRetryState( + retryWrites, operationContextForSelectServerAndCheckoutConnection.getTimeoutContext()); + Supplier> retryingBatchExecutor = decorateWriteWithRetries( + retryState, + operationContextForSelectServerAndCheckoutConnection, + () -> { + SourceAndConnection reusedOrNewSourceAndConnection = reuseOrSelectServerAndCheckoutConnectionIfClosed( + sourceAndConnection.getNullable(), effectiveWriteConcern, binding, + operationContextForSelectServerAndCheckoutConnection, retryState); + try { + sourceAndConnection.set(reusedOrNewSourceAndConnection); + ConnectionDescription connectionDescription = reusedOrNewSourceAndConnection.getConnection().getDescription(); + retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); + batch.set(batch.getNullable() != null + ? batch.get() + : createFirstBatch(connectionDescription, reusedOrNewSourceAndConnection.getOperationContext(), effectiveWriteConcern)); + onBatch(batch.get(), retryState); + executeBatch(batch.get(), reusedOrNewSourceAndConnection, effectiveWriteConcern); + return new BatchWithSourceAndConnection<>(batch.get().getNextBatch(), reusedOrNewSourceAndConnection); + } catch (Throwable e) { + reusedOrNewSourceAndConnection.close(); + throw e; } - callback.onResult(null, loopResultT); - return; } - callback.onResult(result, null); + ); + try { + return retryingBatchExecutor.get(); + } catch (Throwable e) { + if (sourceAndConnection.getNullable() != null) { + sourceAndConnection.get().close(); + } + if (e instanceof MongoWriteConcernWithResponseException) { + batch.get().addResult((BsonDocument) ((MongoWriteConcernWithResponseException) e).getResponse()); + return new BatchWithSourceAndConnection<>(batch.get().getNextBatch(), null); } - }); - } - - private void handleMongoWriteConcernWithResponseException(final RetryState retryState, - final boolean breakAndThrowIfDifferent, - final TimeoutContext timeoutContext) { - if (!retryState.isFirstAttempt()) { - RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null); - boolean prospectiveResultIsWriteConcernException = prospectiveFailedResult instanceof MongoWriteConcernWithResponseException; - retryState.breakAndThrowIfRetryAnd(() -> breakAndThrowIfDifferent && !prospectiveResultIsWriteConcernException); - if (prospectiveResultIsWriteConcernException) { - retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail) - .batch().ifPresent(bulkWriteBatch -> { - bulkWriteBatch.addResult( - (BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult).getResponse()); - BulkWriteTracker.attachNext(retryState, bulkWriteBatch, timeoutContext); - }); + if (retryWrites && e instanceof MongoException) { + // Adding the `RetryableWriteError` label here is unnecessary at this point: + // applications cannot use it for implementing retries, and it is not even part of the public driver API. + // Unfortunately, certain unified tests incorrectly rely on this label to verify retries, resulting in this redundant code. + addRetryableLabelOrGetWriteAttemptFailureNotToBeRetried(retryState, e); } + throw e; } } - private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetryState retryState, - @Nullable final SingleResultCallback callback, - final TimeoutContext timeoutContext) { - if (!retryState.isFirstAttempt()) { - RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null); - boolean prospectiveResultIsWriteConcernException = prospectiveFailedResult instanceof MongoWriteConcernWithResponseException; - if (callback != null && retryState.breakAndCompleteIfRetryAnd(() -> !prospectiveResultIsWriteConcernException, callback)) { - return true; - } - if (prospectiveResultIsWriteConcernException) { - retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail) - .batch().ifPresent(bulkWriteBatch -> { - bulkWriteBatch.addResult( - (BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult).getResponse()); - BulkWriteTracker.attachNext(retryState, bulkWriteBatch, timeoutContext); - }); + /** + * @param maybeSourceAndConnection Is guaranteed to be {@linkplain AsyncSourceAndConnection#close() closed} + * if this method creates a new {@link AsyncSourceAndConnection}. + */ + private void executeBatchReusingConnectionAsync( + @Nullable final BulkWriteBatch maybeBatch, + @Nullable final AsyncSourceAndConnection maybeSourceAndConnection, + final WriteConcern effectiveWriteConcern, + final AsyncWriteBinding binding, + final OperationContext operationContextForSelectServerAndCheckoutConnection, + final SingleResultCallback> callback) { + beginAsync().>thenSupply(c -> { + MutableValue batch = new MutableValue<>(maybeBatch); + MutableValue sourceAndConnection = new MutableValue<>(maybeSourceAndConnection); + RetryState retryState = initialRetryState( + retryWrites, operationContextForSelectServerAndCheckoutConnection.getTimeoutContext()); + AsyncCallbackSupplier> retryingBatchExecutor = decorateWriteWithRetriesAsync( + retryState, + operationContextForSelectServerAndCheckoutConnection, + supplierCallback -> { + beginAsync().thenSupply(reuseOrSelectServerAndCheckoutConnectionCallback -> { + reuseOrSelectServerAndCheckoutConnectionIfClosedAsync( + sourceAndConnection.getNullable(), effectiveWriteConcern, binding, + operationContextForSelectServerAndCheckoutConnection, retryState, reuseOrSelectServerAndCheckoutConnectionCallback); + }).>thenApply((reusedOrNewSourceAndConnection, setSourceAndConnectionCallback) -> { + beginAsync().thenRun(executeBatchCallback -> { + sourceAndConnection.set(reusedOrNewSourceAndConnection); + ConnectionDescription connectionDescription = reusedOrNewSourceAndConnection.getConnection().getDescription(); + retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); + batch.set(batch.getNullable() != null + ? batch.get() + : createFirstBatch(connectionDescription, reusedOrNewSourceAndConnection.getOperationContext(), effectiveWriteConcern)); + onBatch(batch.get(), retryState); + executeBatchAsync(batch.get(), reusedOrNewSourceAndConnection, effectiveWriteConcern, executeBatchCallback); + }).>thenSupply(createNextBatchCallback -> { + createNextBatchCallback.complete(new BatchWithSourceAndConnection<>(batch.get().getNextBatch(), reusedOrNewSourceAndConnection)); + }).onErrorIf(e -> true, (e, onErrorCallback) -> { + reusedOrNewSourceAndConnection.close(); + onErrorCallback.completeExceptionally(e); + }).finish(setSourceAndConnectionCallback); + }).finish(supplierCallback); + } + ); + beginAsync().>thenSupply(executorCallback -> { + retryingBatchExecutor.get(executorCallback); + }).onErrorIf(e -> true, (e, onErrorCallback) -> { + if (sourceAndConnection.getNullable() != null) { + sourceAndConnection.get().close(); + } + if (e instanceof MongoWriteConcernWithResponseException) { + batch.get().addResult((BsonDocument) ((MongoWriteConcernWithResponseException) e).getResponse()); + onErrorCallback.complete(new BatchWithSourceAndConnection<>(batch.get().getNextBatch(), null)); + return; + } + if (retryWrites && e instanceof MongoException) { + // Adding the `RetryableWriteError` label here is unnecessary at this point: + // applications cannot use it for implementing retries, and it is not even part of the public driver API. + // Unfortunately, certain unified tests incorrectly rely on this label to verify retries, resulting in this redundant code. + addRetryableLabelOrGetWriteAttemptFailureNotToBeRetried(retryState, e); + } + onErrorCallback.completeExceptionally(e); + }).finish(c); + }).finish(callback); + } + + private SourceAndConnection reuseOrSelectServerAndCheckoutConnectionIfClosed( + @Nullable final SourceAndConnection sourceAndConnection, + final WriteConcern effectiveWriteConcern, + final WriteBinding binding, + final OperationContext operationContext, + final RetryState retryState) { + if (sourceAndConnection == null || sourceAndConnection.isClosed()) { + SourceAndConnection newSourceAndConnection = selectServerAndCheckoutConnection(binding, operationContext); + try { + onNewConnection( + newSourceAndConnection.getConnection().getDescription(), + newSourceAndConnection.getOperationContext().getSessionContext(), + effectiveWriteConcern, retryState); + } catch (Throwable e) { + newSourceAndConnection.close(); + throw e; } + return newSourceAndConnection; + } else { + return sourceAndConnection; } - return false; } - @Nullable - private BsonDocument executeCommand( + private void reuseOrSelectServerAndCheckoutConnectionIfClosedAsync( + @Nullable final AsyncSourceAndConnection sourceAndConnection, final WriteConcern effectiveWriteConcern, + final AsyncWriteBinding binding, final OperationContext operationContext, - final Connection connection, - final BulkWriteBatch batch) { - commandName = batch.getCommand().getFirstKey(); - return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), - operationContext.withOperationName(commandName), shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload()); + final RetryState retryState, + final SingleResultCallback callback) { + beginAsync().thenSupply(c -> { + if (sourceAndConnection == null || sourceAndConnection.isClosed()) { + beginAsync().thenSupply(selectServerAndCheckoutConnectionCallback -> { + selectServerAndCheckoutConnectionAsync(binding, operationContext, selectServerAndCheckoutConnectionCallback); + }).thenApply((newSourceAndConnection, onNewConnectionCallback) -> { + try { + onNewConnection( + newSourceAndConnection.getConnection().getDescription(), + newSourceAndConnection.getOperationContext().getSessionContext(), + effectiveWriteConcern, retryState); + } catch (Throwable e) { + newSourceAndConnection.close(); + throw e; + } + onNewConnectionCallback.complete(newSourceAndConnection); + }).finish(c); + } else { + c.complete(sourceAndConnection); + } + }).finish(callback); + } + + private static SourceAndConnection selectServerAndCheckoutConnection( + final WriteBinding binding, + final OperationContext operationContext) { + return withSourceAndConnection( + binding::getWriteConnectionSource, + true, + operationContext, + (source, connection, operationContextWithMinRTT) -> + new SourceAndConnection(source, connection, operationContextWithMinRTT)); } - private void executeCommandAsync( + private static void selectServerAndCheckoutConnectionAsync( + final AsyncWriteBinding binding, + final OperationContext operationContext, + final SingleResultCallback callback) { + beginAsync().thenSupply(c -> { + withAsyncSourceAndConnection( + binding::getWriteConnectionSource, + true, + operationContext, + c, + (source, connection, operationContextWithMinRTT, functionCallback) -> + functionCallback.complete(new AsyncSourceAndConnection(source, connection, operationContextWithMinRTT))); + }).finish(callback); + } + + private void onNewConnection( + final ConnectionDescription connectionDescription, + final SessionContext sessionContext, final WriteConcern effectiveWriteConcern, + final RetryState retryState) { + boolean effectiveRetryWrites = isRetryableWrite( + retryWrites, effectiveWriteConcern, connectionDescription, sessionContext); + retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites); + validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, effectiveWriteConcern); + } + + private BulkWriteBatch createFirstBatch( + final ConnectionDescription connectionDescription, final OperationContext operationContext, - final AsyncConnection connection, - final BulkWriteBatch batch, - final SingleResultCallback callback) { + final WriteConcern effectiveWriteConcern) { + return BulkWriteBatch.createBulkWriteBatch( + // TODO-JAVA-6223 this same `connectionDescription` is used by all batches, which is incorrect + namespace, connectionDescription, ordered, effectiveWriteConcern, bypassDocumentValidation, retryWrites, + writeRequests, operationContext, comment, variables); + } + + private void onBatch(final BulkWriteBatch batch, final RetryState retryState) { commandName = batch.getCommand().getFirstKey(); - connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), - operationContext.withOperationName(commandName), shouldExpectResponse(batch, effectiveWriteConcern), - batch.getPayload(), callback); + String commandDescriptionToCapture = commandName; + retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), batch.getRetryWrites(), false) + .attach(AttachmentKeys.commandDescriptionSupplier(), () -> commandDescriptionToCapture, false); + } + + private void executeBatch( + final BulkWriteBatch batch, + final SourceAndConnection sourceAndConnection, + final WriteConcern effectiveWriteConcern) { + Connection connection = sourceAndConnection.getConnection(); + OperationContext operationContext = sourceAndConnection.getOperationContext(); + BsonDocument result = connection.command( + namespace.getDatabaseName(), batch.getCommand(), + NoOpFieldNameValidator.INSTANCE, null, + batch.getDecoder(), operationContext.withOperationName(commandName), + shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload()); + if (batch.getRetryWrites()) { + ConnectionDescription connectionDescription = connection.getDescription(); + MongoException writeConcernBasedError = ProtocolHelper.createSpecialException( + result, connectionDescription.getServerAddress(), "errMsg", operationContext.getTimeoutContext()); + if (writeConcernBasedError != null) { + addRetryableWriteErrorLabel(writeConcernBasedError, connectionDescription.getMaxWireVersion()); + addErrorLabelsToWriteConcern(result.getDocument("writeConcernError"), writeConcernBasedError.getErrorLabels()); + throw new MongoWriteConcernWithResponseException(writeConcernBasedError, result); + } + } + batch.addResult(result); + } + + private void executeBatchAsync( + final BulkWriteBatch batch, + final AsyncSourceAndConnection sourceAndConnection, + final WriteConcern effectiveWriteConcern, + final SingleResultCallback callback) { + beginAsync().thenRun(c -> { + AsyncConnection connection = sourceAndConnection.getConnection(); + OperationContext operationContext = sourceAndConnection.getOperationContext(); + beginAsync().thenSupply(commandCallback -> { + connection.commandAsync( + namespace.getDatabaseName(), batch.getCommand(), + NoOpFieldNameValidator.INSTANCE, null, + batch.getDecoder(), operationContext.withOperationName(commandName), + shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload(), commandCallback); + }).thenConsume((result, consumeCommandResultCallback) -> { + if (batch.getRetryWrites()) { + ConnectionDescription connectionDescription = connection.getDescription(); + MongoException writeConcernBasedError = ProtocolHelper.createSpecialException( + result, connectionDescription.getServerAddress(), "errMsg", operationContext.getTimeoutContext()); + if (writeConcernBasedError != null) { + addRetryableWriteErrorLabel(writeConcernBasedError, connectionDescription.getMaxWireVersion()); + addErrorLabelsToWriteConcern(result.getDocument("writeConcernError"), writeConcernBasedError.getErrorLabels()); + throw new MongoWriteConcernWithResponseException(writeConcernBasedError, result); + } + } + batch.addResult(result); + consumeCommandResultCallback.complete(consumeCommandResultCallback); + }).finish(c); + }).finish(callback); } private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) { return effectiveWriteConcern.isAcknowledged() || (ordered && batch.hasAnotherBatch()); } - private void addErrorLabelsToWriteConcern(final BsonDocument result, final Set errorLabels) { + private static void addErrorLabelsToWriteConcern(final BsonDocument result, final Set errorLabels) { if (!result.containsKey("errorLabels")) { result.put("errorLabels", new BsonArray(errorLabels.stream().map(BsonString::new).collect(Collectors.toList()))); } } - public static final class BulkWriteTracker { - private int attempt; - private final int attempts; - private final boolean retryUntilTimeoutThrowsException; - @Nullable + private static final class BatchWithSourceAndConnection> implements AutoCloseable { private final BulkWriteBatch batch; + @Nullable + private final SC sourceAndConnection; + private boolean consumed; - static void attachNew(final RetryState retryState, final boolean retry, final TimeoutContext timeoutContext) { - retryState.attach(AttachmentKeys.bulkWriteTracker(), new BulkWriteTracker(retry, null, timeoutContext), false); + BatchWithSourceAndConnection(final BulkWriteBatch batch, @Nullable final SC sourceAndConnection) { + this.batch = batch; + this.sourceAndConnection = sourceAndConnection; + consumed = false; } - static void attachNew(final RetryState retryState, final BulkWriteBatch batch, final TimeoutContext timeoutContext) { - attach(retryState, new BulkWriteTracker(batch.getRetryWrites(), batch, timeoutContext)); + /** + * Must not be invoked if {@link #intoSourceAndConnection()} or {@link #close()} was invoked. + */ + BulkWriteBatch getBatch() { + assertFalse(consumed); + return batch; } - static BulkWriteTracker attachNext(final RetryState retryState, final BulkWriteBatch batch, final TimeoutContext timeoutContext) { - BulkWriteBatch nextBatch = batch.getNextBatch(); - BulkWriteTracker nextTracker = new BulkWriteTracker(nextBatch.getRetryWrites(), nextBatch, timeoutContext); - attach(retryState, nextTracker); - return nextTracker; + /** + * May be invoked at most once. + * Must not be invoked if {@link #close()} was invoked. + */ + @Nullable + SC intoSourceAndConnection() { + assertFalse(consumed); + SC sourceAndConnection = this.sourceAndConnection; + consumed = true; + return sourceAndConnection; } - private static void attach(final RetryState retryState, final BulkWriteTracker tracker) { - retryState.attach(AttachmentKeys.bulkWriteTracker(), tracker, false); - BulkWriteBatch batch = tracker.batch; - if (batch != null) { - retryState.attach(AttachmentKeys.retryableWriteCommandFlag(), batch.getRetryWrites(), false) - .attach(AttachmentKeys.commandDescriptionSupplier(), () -> batch.getPayload().getPayloadType().toString(), false); + /** + * {@linkplain AbstractSourceAndConnection#close() Closes} {@link AbstractSourceAndConnection} + * unless {@link #intoSourceAndConnection()} was invoked. + *

    + * Idempotent. + */ + @Override + public void close() { + if (!consumed) { + consumed = true; + if (sourceAndConnection != null) { + sourceAndConnection.close(); + } } } + } - private BulkWriteTracker(final boolean retry, @Nullable final BulkWriteBatch batch, final TimeoutContext timeoutContext) { - attempt = 0; - attempts = retry ? RetryState.MAX_RETRIES + 1 : 1; - this.batch = batch; - this.retryUntilTimeoutThrowsException = timeoutContext.hasTimeoutMS(); + /** + * An {@link AutoCloseable} container for the arguments of + * {@link SyncOperationHelper.ExecutionFunction#apply(ConnectionSource, Connection, OperationContext)} + * provided by + * {@link SyncOperationHelper#withSourceAndConnection(Function, boolean, OperationContext, SyncOperationHelper.ExecutionFunction)}. + * This type allows those resources to be returned from + * {@link SyncOperationHelper#withSourceAndConnection(Function, boolean, OperationContext, SyncOperationHelper.ExecutionFunction)} + * and used after method completion. + */ + private static final class SourceAndConnection extends AbstractSourceAndConnection { + SourceAndConnection( + final ConnectionSource connectionSource, + final Connection connection, + final OperationContext operationContext) { + super(connectionSource, connection, operationContext); } + } - boolean lastAttempt() { - if (retryUntilTimeoutThrowsException){ - return false; - } - return attempt == attempts - 1; + /** + * An {@link AutoCloseable} container for the arguments of {@link AsyncCallbackTriFunction} + * provided by + * {@link AsyncOperationHelper#withAsyncSourceAndConnection(AsyncCallbackFunction, boolean, OperationContext, SingleResultCallback, AsyncCallbackTriFunction)}. + * This type allows those resources to be produced by + * {@link AsyncOperationHelper#withAsyncSourceAndConnection(AsyncCallbackFunction, boolean, OperationContext, SingleResultCallback, AsyncCallbackTriFunction)} + * and passed to the callback of the method. + */ + private static final class AsyncSourceAndConnection extends AbstractSourceAndConnection { + AsyncSourceAndConnection( + final AsyncConnectionSource connectionSource, + final AsyncConnection connection, + final OperationContext operationContext) { + super(connectionSource, connection, operationContext); } + } - void advance() { - assertTrue(!lastAttempt()); - attempt++; + private abstract static class AbstractSourceAndConnection implements AutoCloseable { + private final S connectionSource; + private final C connection; + private final OperationContext operationContext; + private boolean closed; + + AbstractSourceAndConnection( + final S connectionSource, + final C connection, + final OperationContext operationContext) { + connectionSource.retain(); + this.connectionSource = connectionSource; + connection.retain(); + this.connection = connection; + this.operationContext = operationContext; + closed = false; } - Optional batch() { - return Optional.ofNullable(batch); + C getConnection() { + assertFalse(closed); + return connection; + } + + OperationContext getOperationContext() { + assertFalse(closed); + return operationContext; + } + + boolean isClosed() { + return closed; + } + + /** + * Idempotent. + */ + @Override + public void close() { + if (!closed) { + closed = true; + try { + connection.release(); + } finally { + connectionSource.release(); + } + } } } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java index a16768c37e..edcd266ff8 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java @@ -117,23 +117,22 @@ static T withConnection(final WriteBinding binding, return withSourceAndConnection( binding::getWriteConnectionSource, false, + operationContext, (source, connection, operationContextWithMinRtt) -> - callable.call(connection, operationContextWithMinRtt), - operationContext); + callable.call(connection, operationContextWithMinRtt)); } /** * Gets a {@link ConnectionSource} and a {@link Connection} from the {@code sourceSupplier} and executes the {@code function} with them. * Guarantees to {@linkplain ReferenceCounted#release() release} the source and the connection after completion of the {@code function}. - * - * */ - static R withSourceAndConnection(final Function sourceFunction, - final boolean wrapConnectionSourceException, - final ExecutionFunction function, - final OperationContext originalOperationContext) throws ResourceSupplierInternalException { + static R withSourceAndConnection( + final Function sourceFunction, + final boolean wrapConnectionSourceException, + final OperationContext operationContext, + final ExecutionFunction function) throws ResourceSupplierInternalException { OperationContext serverSelectionOperationContext = - originalOperationContext.withOverride(TimeoutContext::withComputedServerSelectionTimeout); + operationContext.withOverride(TimeoutContext::withComputedServerSelectionTimeout); return withSuppliedResource( sourceFunction, @@ -146,7 +145,7 @@ static R withSourceAndConnection(final Function function.apply( source, connection, - originalOperationContext.withMinRoundTripTime(source.getServerDescription()))) + operationContext.withMinRoundTripTime(source.getServerDescription()))) ); } @@ -205,11 +204,11 @@ static T executeRetryableRead( RetryState retryState = CommandOperationHelper.initialRetryState(retryReads, operationContext.getTimeoutContext()); Supplier read = decorateReadWithRetries(retryState, operationContext, () -> - withSourceAndConnection(readConnectionSourceSupplier, false, (source, connection, operationContextWithMinRtt) -> { + withSourceAndConnection(readConnectionSourceSupplier, false, operationContext, (source, connection, operationContextWithMinRtt) -> { retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(operationContextWithMinRtt)); return createReadCommandAndExecute(retryState, operationContextWithMinRtt, source, database, commandCreator, decoder, transformer, connection); - }, operationContext) + }) ); return read.get(); } @@ -218,25 +217,25 @@ static T executeRetryableRead( static T executeCommand(final WriteBinding binding, final OperationContext operationContext, final String database, final CommandCreator commandCreator, final CommandWriteTransformer transformer) { - return withSourceAndConnection(binding::getWriteConnectionSource, false, (source, connection, operationContextWithMinRtt) -> + return withSourceAndConnection(binding::getWriteConnectionSource, false, operationContext, (source, connection, operationContextWithMinRtt) -> transformer.apply(assertNotNull( connection.command(database, commandCreator.create(operationContextWithMinRtt, source.getServerDescription(), connection.getDescription()), NoOpFieldNameValidator.INSTANCE, primary(), BSON_DOCUMENT_CODEC, operationContextWithMinRtt)), - connection), operationContext); + connection)); } @VisibleForTesting(otherwise = PRIVATE) static T executeCommand(final WriteBinding binding, final OperationContext operationContext, final String database, final BsonDocument command, final Decoder decoder, final CommandWriteTransformer transformer) { - return withSourceAndConnection(binding::getWriteConnectionSource, false, (source, connection, operationContextWithMinRtt) -> + return withSourceAndConnection(binding::getWriteConnectionSource, false, operationContext, (source, connection, operationContextWithMinRtt) -> transformer.apply(assertNotNull( connection.command(database, command, NoOpFieldNameValidator.INSTANCE, primary(), decoder, - operationContextWithMinRtt)), connection), - operationContext); + operationContextWithMinRtt)), connection) + ); } @Nullable @@ -267,7 +266,7 @@ static R executeRetryableWrite( if (!firstAttempt && sessionContext.hasActiveTransaction()) { sessionContext.clearTransactionContext(); } - return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection, operationContextWithMinRtt) -> { + return withSourceAndConnection(binding::getWriteConnectionSource, true, operationContext, (source, connection, operationContextWithMinRtt) -> { int maxWireVersion = connection.getDescription().getMaxWireVersion(); try { retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(connection.getDescription())); @@ -291,7 +290,7 @@ static R executeRetryableWrite( } throw e; } - }, operationContext); + }); }); try { return retryingWrite.get(); @@ -323,7 +322,7 @@ static T createReadCommandAndExecute( static Supplier decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext, final Supplier writeFunction) { - return new RetryingSyncSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext), + return new RetryingSyncSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext.getServerDeprioritization()), CommandOperationHelper::loggingShouldAttemptToRetryWriteAndAddRetryableLabel, () -> { logRetryCommand(retryState, operationContext); return writeFunction.get(); @@ -332,7 +331,7 @@ static Supplier decorateWriteWithRetries(final RetryState retryState, static Supplier decorateReadWithRetries(final RetryState retryState, final OperationContext operationContext, final Supplier readFunction) { - return new RetryingSyncSupplier<>(retryState, onRetryableReadAttemptFailure(operationContext), + return new RetryingSyncSupplier<>(retryState, onRetryableReadAttemptFailure(operationContext.getServerDeprioritization()), CommandOperationHelper::loggingShouldAttemptToRetryRead, () -> { logRetryCommand(retryState, operationContext); return readFunction.get(); diff --git a/driver-core/src/main/com/mongodb/internal/operation/retry/AttachmentKeys.java b/driver-core/src/main/com/mongodb/internal/operation/retry/AttachmentKeys.java index ac0f3ec6b3..eb453dd455 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/retry/AttachmentKeys.java +++ b/driver-core/src/main/com/mongodb/internal/operation/retry/AttachmentKeys.java @@ -18,7 +18,6 @@ import com.mongodb.MongoConnectionPoolClearedException; import com.mongodb.annotations.Immutable; import com.mongodb.internal.async.function.LoopState.AttachmentKey; -import com.mongodb.internal.operation.MixedBulkWriteOperation.BulkWriteTracker; import org.bson.BsonDocument; import java.util.HashSet; @@ -40,7 +39,6 @@ public final class AttachmentKeys { private static final AttachmentKey COMMAND = DefaultAttachmentKey.of("command"); private static final AttachmentKey RETRYABLE_WRITE_COMMAND_FLAG = DefaultAttachmentKey.of("retryableWriteCommandFlag"); private static final AttachmentKey> COMMAND_DESCRIPTION_SUPPLIER = DefaultAttachmentKey.of("commandDescriptionSupplier"); - private static final AttachmentKey BULK_WRITE_TRACKER = DefaultAttachmentKey.of("bulkWriteTracker"); public static AttachmentKey maxWireVersion() { return MAX_WIRE_VERSION; @@ -63,10 +61,6 @@ public static AttachmentKey> commandDescriptionSupplier() { return COMMAND_DESCRIPTION_SUPPLIER; } - public static AttachmentKey bulkWriteTracker() { - return BULK_WRITE_TRACKER; - } - private AttachmentKeys() { fail(); } diff --git a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy index a0dd685d4b..906f2e3ff6 100644 --- a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy @@ -234,24 +234,26 @@ class OperationFunctionalSpecification extends Specification { void testOperation(operation, List serverVersion, BsonDocument expectedCommand, boolean async, result = null, boolean checkCommand = true, boolean checkSecondaryOk = false, ReadPreference readPreference = ReadPreference.primary(), boolean retryable = false, - ServerType serverType = ServerType.STANDALONE, Boolean activeTransaction = false) { + ServerType serverType = ServerType.STANDALONE, Boolean activeTransaction = false, + int expectedConnectionReleaseCountPerAttempt = 1) { testOperation(operation, serverVersion, ReadConcern.DEFAULT, expectedCommand, async, result, checkCommand, checkSecondaryOk, - readPreference, retryable, serverType, activeTransaction) + readPreference, retryable, serverType, activeTransaction, expectedConnectionReleaseCountPerAttempt) } void testOperation(operation, List serverVersion, ReadConcern readConcern, BsonDocument expectedCommand, boolean async, result = null, boolean checkCommand = true, boolean checkSecondaryOk = false, ReadPreference readPreference = ReadPreference.primary(), boolean retryable = false, - ServerType serverType = ServerType.STANDALONE, Boolean activeTransaction = false) { + ServerType serverType = ServerType.STANDALONE, Boolean activeTransaction = false, + int expectedConnectionReleaseCountPerAttempt = 1) { def test = async ? this.&testAsyncOperation : this.&testSyncOperation test(operation, serverVersion, readConcern, result, checkCommand, expectedCommand, checkSecondaryOk, readPreference, retryable, - serverType, activeTransaction) + serverType, activeTransaction, expectedConnectionReleaseCountPerAttempt) } void testOperationRetries(operation, List serverVersion, BsonDocument expectedCommand, boolean async, result = null, - Boolean activeTransaction = false) { + Boolean activeTransaction = false, int expectedConnectionReleaseCountPerAttempt = 1) { testOperation(operation, serverVersion, expectedCommand, async, result, true, false, ReadPreference.primary(), true, - ServerType.REPLICA_SET_PRIMARY, activeTransaction) + ServerType.REPLICA_SET_PRIMARY, activeTransaction, expectedConnectionReleaseCountPerAttempt) } void testRetryableOperationThrowsOriginalError(operation, List> serverVersions, List serverTypes, @@ -278,7 +280,8 @@ class OperationFunctionalSpecification extends Specification { def testSyncOperation(operation, List serverVersion, ReadConcern readConcern, result, Boolean checkCommand=true, BsonDocument expectedCommand=null, Boolean checkSecondaryOk=false, ReadPreference readPreference=ReadPreference.primary(), Boolean retryable = false, - ServerType serverType = ServerType.STANDALONE, Boolean activeTransaction = false) { + ServerType serverType = ServerType.STANDALONE, Boolean activeTransaction = false, + int expectedConnectionReleaseCountPerAttempt = 1) { def operationContext = createOperationContext() .withSessionContext(Stub(SessionContext) { hasActiveTransaction() >> activeTransaction @@ -338,9 +341,9 @@ class OperationFunctionalSpecification extends Specification { } if (retryable) { - 2 * connection.release() + (2 * expectedConnectionReleaseCountPerAttempt) * connection.release() } else { - 1 * connection.release() + expectedConnectionReleaseCountPerAttempt * connection.release() } if (operation instanceof ReadOperation) { operation.execute(readBinding, operationContext) @@ -352,7 +355,8 @@ class OperationFunctionalSpecification extends Specification { def testAsyncOperation(operation = operation, List serverVersion = serverVersion, ReadConcern readConcern, result = null, Boolean checkCommand = true, BsonDocument expectedCommand = null, Boolean checkSecondaryOk = false, ReadPreference readPreference = ReadPreference.primary(), Boolean retryable = false, - ServerType serverType = ServerType.STANDALONE, Boolean activeTransaction = false) { + ServerType serverType = ServerType.STANDALONE, Boolean activeTransaction = false, + int expectedConnectionReleaseCountPerAttempt = 1) { def operationContext = createOperationContext() .withSessionContext(Stub(SessionContext) { hasActiveTransaction() >> activeTransaction @@ -417,9 +421,9 @@ class OperationFunctionalSpecification extends Specification { } if (retryable) { - 2 * connection.release() + (2 * expectedConnectionReleaseCountPerAttempt) * connection.release() } else { - 1 * connection.release() + expectedConnectionReleaseCountPerAttempt * connection.release() } if (operation instanceof ReadOperation) { diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy index 619eb6747f..13fc8ea33c 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy @@ -1104,7 +1104,7 @@ class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecificat .append('txnNumber', new BsonInt64(0)) then: - testOperationRetries(operation, [3, 6, 0], expectedCommand, async, cannedResult) + testOperationRetries(operation, [3, 6, 0], expectedCommand, async, cannedResult, false, 2) where: async << [true, false] @@ -1117,16 +1117,16 @@ class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecificat def originalException = new MongoSocketException('Some failure', new ServerAddress()) when: - testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0], [3, 6, 0]], - [REPLICA_SET_PRIMARY, REPLICA_SET_PRIMARY, STANDALONE], originalException, async) + testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0]], + [REPLICA_SET_PRIMARY, REPLICA_SET_PRIMARY, STANDALONE], originalException, async, 4) then: Exception commandException = thrown() commandException == originalException when: - testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0]], - [REPLICA_SET_PRIMARY, REPLICA_SET_PRIMARY], originalException, async, 1) + testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0]], + [REPLICA_SET_PRIMARY, REPLICA_SET_PRIMARY], originalException, async, 2) then: commandException = thrown() diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/function/RetryStateTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/function/RetryStateTest.java index 9d6fc2f586..d428318fa2 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/function/RetryStateTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/function/RetryStateTest.java @@ -71,12 +71,6 @@ void unlimitedAttemptsAndAdvance() { () -> assertFalse(retryState.isFirstAttempt()), () -> assertEquals(1, retryState.attempt()) ); - retryState.markAsLastAttempt(); - assertAll( - () -> assertFalse(retryState.isFirstAttempt()), - () -> assertEquals(1, retryState.attempt()), - () -> assertAdvanceOrThrowThrows(attemptException, retryState, attemptException) - ); } @Test @@ -93,22 +87,6 @@ void limitedAttemptsAndAdvance() { ); } - @ParameterizedTest - @MethodSource({"atMostTwoRetriesAndUnlimitedRetries"}) - void markAsLastAttemptAdvanceWithRuntimeException(final RetryState retryState) { - retryState.markAsLastAttempt(); - RuntimeException attemptException = new RuntimeException(); - assertAdvanceOrThrowThrows(attemptException, retryState, attemptException, (rs, e) -> fail()); - } - - @ParameterizedTest(name = "should advance with non-retryable error when marked as last attempt and : ''{0}''") - @MethodSource({"noRetries", "atMostTwoRetriesAndUnlimitedRetries"}) - void markAsLastAttemptAdvanceWithError(final RetryState retryState) { - retryState.markAsLastAttempt(); - Error attemptException = new Error(); - assertAdvanceOrThrowThrows(attemptException, retryState, attemptException, (rs, e) -> fail()); - } - @ParameterizedTest @MethodSource({"atMostTwoRetriesAndUnlimitedRetries"}) void breakAndThrowIfRetryAndFirstAttempt(final RetryState retryState) { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoWriteConcernWithResponseExceptionTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoWriteConcernWithResponseExceptionTest.java index 09aafc02d4..847a823311 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoWriteConcernWithResponseExceptionTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MongoWriteConcernWithResponseExceptionTest.java @@ -16,15 +16,13 @@ package com.mongodb.reactivestreams.client; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; -import org.junit.Test; -/** - * See {@link com.mongodb.client.MongoWriteConcernWithResponseExceptionTest}. - */ -public class MongoWriteConcernWithResponseExceptionTest { - @Test - public void doesNotLeak() throws InterruptedException { - com.mongodb.client.MongoWriteConcernWithResponseExceptionTest.doesNotLeak(SyncMongoClient::new); +final class MongoWriteConcernWithResponseExceptionTest extends com.mongodb.client.MongoWriteConcernWithResponseExceptionTest { + @Override + protected MongoClient createClient(final MongoClientSettings clientSettings) { + return new SyncMongoClient(clientSettings); } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/FailPoint.java b/driver-sync/src/test/functional/com/mongodb/client/FailPoint.java index 736ad8976d..6e98f69bce 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/FailPoint.java +++ b/driver-sync/src/test/functional/com/mongodb/client/FailPoint.java @@ -81,8 +81,7 @@ private static final class Guard implements AutoCloseable { /** * May be invoked at most once. - * - * @see #close() + * Must not be invoked if {@link #close()} was invoked. */ FailPoint intoFailPoint() { assertFalse(consumed); @@ -93,10 +92,13 @@ FailPoint intoFailPoint() { /** * Invokes {@link #disableAndClose(BsonDocument, MongoClient)} unless {@link #intoFailPoint()} was invoked. + *

    + * Idempotent. */ @Override public void close() { if (!consumed) { + consumed = true; disableAndClose(failPointDocument, client); } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java b/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java index eccc892ce7..0998bea9e8 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java @@ -16,26 +16,24 @@ package com.mongodb.client; -import com.mongodb.Function; import com.mongodb.MongoClientSettings; import com.mongodb.MongoWriteConcernException; import com.mongodb.ServerAddress; -import com.mongodb.assertions.Assertions; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; -import com.mongodb.event.CommandListener; import com.mongodb.event.CommandSucceededEvent; import com.mongodb.internal.connection.MongoWriteConcernWithResponseException; +import com.mongodb.internal.event.ConfigureFailPointCommandListener; import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonString; import org.bson.Document; -import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -46,24 +44,25 @@ import static com.mongodb.internal.operation.CommandOperationHelper.NO_WRITES_PERFORMED_ERROR_LABEL; import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertThrows; -import static org.junit.Assume.assumeTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * Tests in this class check that the internal {@link MongoWriteConcernWithResponseException} does not leak from our API. */ -public final class MongoWriteConcernWithResponseExceptionTest { +public class MongoWriteConcernWithResponseExceptionTest { + protected MongoClient createClient(final MongoClientSettings clientSettings) { + return MongoClients.create(clientSettings); + } + /** * This test is similar to {@link RetryableWritesProseTest#originalErrorMustBePropagatedIfNoWritesPerformed()}. * The difference is in the assertions, it also verifies situations when `writeConcernError` happens on the first attempt * and on the last attempt. */ - @Test - public void doesNotLeak() throws InterruptedException { - doesNotLeak(MongoClients::create); - } - - public static void doesNotLeak(final Function clientCreator) throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + protected void doesNotLeak(final boolean writeConcernErrorOnFirstAttempt) throws Exception { BsonDocument writeConcernErrorFpDoc = new BsonDocument() .append("configureFailPoint", new BsonString("failCommand")) .append("mode", new BsonDocument() @@ -85,47 +84,35 @@ public static void doesNotLeak(final Function .append("errorCode", new BsonInt32(10107)) .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL, NO_WRITES_PERFORMED_ERROR_LABEL) .map(BsonString::new).collect(Collectors.toList())))); - doesNotLeak(clientCreator, writeConcernErrorFpDoc, true, noWritesPerformedFpDoc); - doesNotLeak(clientCreator, noWritesPerformedFpDoc, false, writeConcernErrorFpDoc); + if (writeConcernErrorOnFirstAttempt) { + doesNotLeak(writeConcernErrorFpDoc, true, noWritesPerformedFpDoc); + } else { + doesNotLeak(noWritesPerformedFpDoc, false, writeConcernErrorFpDoc); + } } @SuppressWarnings("try") - private static void doesNotLeak( - final Function clientCreator, + private void doesNotLeak( final BsonDocument firstAttemptFpDoc, - final boolean firstAttemptCommandSucceededEvent, - final BsonDocument lastAttemptFpDoc) throws InterruptedException { + final boolean firstAttemptSucceeds, + final BsonDocument lastAttemptFpDoc) throws Exception { assumeTrue(serverVersionAtLeast(6, 0) && isDiscoverableReplicaSet()); ServerAddress primaryServerAddress = Fixture.getPrimary(); - CompletableFuture futureFailPointFromListener = new CompletableFuture<>(); - CommandListener commandListener = new CommandListener() { - private final AtomicBoolean configureFailPoint = new AtomicBoolean(true); - - @Override - public void commandSucceeded(final CommandSucceededEvent event) { - if (firstAttemptCommandSucceededEvent) { - enableLastAttemptFp(event); - } - } - - @Override - public void commandFailed(final CommandFailedEvent event) { - if (!firstAttemptCommandSucceededEvent) { - enableLastAttemptFp(event); - } - } - - private void enableLastAttemptFp(final CommandEvent event) { - if (event.getCommandName().equals("insert") && configureFailPoint.compareAndSet(true, false)) { - Assertions.assertTrue(futureFailPointFromListener.complete(FailPoint.enable(lastAttemptFpDoc, primaryServerAddress))); - } + Predicate configureFailPointEventMatcher = event -> { + if (event.getCommandName().equals("insert")) { + return firstAttemptSucceeds + ? event instanceof CommandSucceededEvent + : event instanceof CommandFailedEvent; } + return false; }; - try (MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() - .retryWrites(true) - .addCommandListener(commandListener) - .applyToServerSettings(builder -> builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) - .build()); + try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener( + lastAttemptFpDoc, primaryServerAddress, configureFailPointEventMatcher); + MongoClient client = createClient(getMongoClientSettingsBuilder() + .retryWrites(true) + .addCommandListener(commandListener) + .applyToServerSettings(builder -> builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) + .build()); FailPoint ignored = FailPoint.enable(firstAttemptFpDoc, primaryServerAddress)) { MongoCollection collection = client.getDatabase(getDefaultDatabaseName()) .getCollection("originalErrorMustBePropagatedIfNoWritesPerformed"); @@ -139,8 +126,6 @@ private void enableLastAttemptFp(final CommandEvent event) { throw new AssertionError("The internal exception leaked.", e); } }); - } finally { - futureFailPointFromListener.thenAccept(FailPoint::close); } } }