Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
e6083b4
Add support for server selection's deprioritized servers to all topol…
vbabanin Jan 12, 2026
de2856b
Remove global OperationContext in tests as it has mutable state.
vbabanin Jan 15, 2026
689d6cb
Add more test-cases.
vbabanin Jan 17, 2026
a8ddab2
Fix static checks.
vbabanin Jan 17, 2026
1eddabf
Allow invoking connect.
vbabanin Jan 18, 2026
d62a576
Update driver-core/src/main/com/mongodb/internal/connection/Operation…
vbabanin Mar 4, 2026
777ced9
Update driver-core/src/test/unit/com/mongodb/connection/ServerSelecti…
vbabanin Mar 5, 2026
85cc1aa
Update driver-core/src/test/unit/com/mongodb/connection/ServerSelecti…
vbabanin Mar 5, 2026
893d419
Use "create" prefix consistently.
vbabanin Mar 3, 2026
19ab190
Remove craeteNewOperationContext.
vbabanin Mar 4, 2026
05beaf2
Change visibility modifier.
vbabanin Mar 4, 2026
0bf110b
Rename applyDeprioritization method to apply.
vbabanin Mar 4, 2026
46b9b8d
Add TODO comments for performance optimization.
vbabanin Mar 4, 2026
cd5dcf1
Add empty line.
vbabanin Mar 4, 2026
dfae55b
Return ALL_SERVERS from selector.
vbabanin Mar 4, 2026
bb54d82
Make heartbeatFrequencyMS a local variable.
vbabanin Mar 4, 2026
3bd9710
Revert to assumeFalse.
vbabanin Mar 4, 2026
1f38bc7
Use Junit4.
vbabanin Mar 5, 2026
70e1eeb
Convert comment to a class documentation comment.
vbabanin Mar 5, 2026
59b1aa1
Remove deprioritizedServerAddresses field.
vbabanin Mar 5, 2026
41849e7
Move inLatencyWindowServers to pre-try.
vbabanin Mar 5, 2026
cb3e7d6
Remove unnecessary symbol.
vbabanin Mar 5, 2026
793507a
Add assertion context.
vbabanin Mar 5, 2026
f4ab841
Remove redundant comment.
vbabanin Mar 5, 2026
11cf14a
Use immediate timeout.
vbabanin Mar 5, 2026
9017de8
Use MongoException instead of MongoConfigurationException.
vbabanin Mar 5, 2026
bc0f8da
Remove createTestCluster method.
vbabanin Mar 5, 2026
23332b2
Use com.mongodb.assertions.Assertions.fail for a Fake cluster.
vbabanin Mar 5, 2026
b4fe6f3
Change message.
vbabanin Mar 5, 2026
7a4dcfd
Update driver-core/src/test/unit/com/mongodb/internal/connection/Serv…
vbabanin Mar 5, 2026
2e38a65
Use MongoMockito.
vbabanin Mar 5, 2026
d1868a3
Remove TODO.
vbabanin Mar 5, 2026
021e7f5
Use withNewServerDeprioritization.
vbabanin Mar 5, 2026
44a66c1
Make ServerDeprioritization private.
vbabanin Mar 5, 2026
9f047b7
Remove unused methods.
vbabanin Mar 5, 2026
a1c052a
Reuse OperationContext.
vbabanin Mar 5, 2026
704d829
Use a documentation comment.
vbabanin Mar 5, 2026
3736c17
Use VisibleForTesting(otherwise = PROTECTED).
vbabanin Mar 5, 2026
817a5c5
Reuse OperatioContext.
vbabanin Mar 5, 2026
3373c14
Change comments.
vbabanin Mar 5, 2026
0cb9bf2
Change comments.
vbabanin Mar 5, 2026
ae8d201
Use createOperationContext.
vbabanin Mar 5, 2026
446f8ff
Address review feedback
vbabanin Apr 16, 2026
2bf6aaa
Limit server selection deprioritization to system overloaded errors o…
vbabanin Apr 16, 2026
1f4c0c9
Update driver-core/src/test/unit/com/mongodb/connection/ServerSelecti…
vbabanin Apr 16, 2026
29199e5
Update driver-core/src/main/com/mongodb/internal/connection/Operation…
vbabanin Apr 16, 2026
4d03cd2
Update driver-core/src/main/com/mongodb/internal/connection/Operation…
vbabanin Apr 16, 2026
4e69816
Update driver-core/src/main/com/mongodb/internal/connection/Operation…
vbabanin Apr 16, 2026
40aecc7
Update driver-core/src/test/unit/com/mongodb/internal/connection/Defa…
vbabanin Apr 16, 2026
a911780
Address PR review feedback
vbabanin Apr 16, 2026
8f1fbbb
Update visibility.
vbabanin Apr 16, 2026
0b8a594
Remove unused imports.
vbabanin Apr 16, 2026
6814a58
Change to find.
vbabanin Apr 17, 2026
08c1164
Fix config.
vbabanin Apr 17, 2026
0110bb0
Fix flaky prose tests by waiting for primary discovery
vbabanin Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ abstract class BaseCluster implements Cluster {
private volatile ClusterDescription description;

BaseCluster(final ClusterId clusterId,
final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata) {
final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata) {
this.clusterId = notNull("clusterId", clusterId);
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
Expand Down Expand Up @@ -159,7 +159,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
if (serverTuple != null) {
ServerAddress serverAddress = serverTuple.getServerDescription().getAddress();
logServerSelectionSucceeded(operationContext, clusterId, serverAddress, serverSelector, currentDescription);
serverDeprioritization.updateCandidate(serverAddress);
serverDeprioritization.updateCandidate(serverAddress, currentDescription.getType());
return serverTuple;
}
computedServerSelectionTimeout.onExpired(() ->
Expand Down Expand Up @@ -302,7 +302,7 @@ private boolean handleServerSelectionRequest(
if (serverTuple != null) {
ServerAddress serverAddress = serverTuple.getServerDescription().getAddress();
logServerSelectionSucceeded(operationContext, clusterId, serverAddress, request.originalSelector, description);
serverDeprioritization.updateCandidate(serverAddress);
serverDeprioritization.updateCandidate(serverAddress, description.getType());
request.onResult(serverTuple, null);
return true;
}
Expand Down Expand Up @@ -361,8 +361,7 @@ private static ServerSelector getCompleteServerSelector(
final ClusterSettings settings) {
List<ServerSelector> selectors = Stream.of(
getRaceConditionPreFilteringSelector(serversSnapshot),
serverSelector,
serverDeprioritization.getServerSelector(),
serverDeprioritization.apply(serverSelector),
settings.getServerSelector(), // may be null
new LatencyMinimizingServerSelector(settings.getLocalThreshold(MILLISECONDS), MILLISECONDS),
AtMostTwoRandomServerSelector.instance(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.mongodb.Function;
import com.mongodb.MongoConnectionPoolClearedException;
import com.mongodb.MongoException;
import com.mongodb.ReadConcern;
import com.mongodb.RequestContext;
import com.mongodb.ServerAddress;
Expand All @@ -27,7 +28,6 @@
import com.mongodb.internal.IgnorableRequestContext;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.observability.micrometer.Span;
import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.session.SessionContext;
Expand All @@ -40,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL;
import static java.util.stream.Collectors.toList;

/**
Expand Down Expand Up @@ -76,7 +77,7 @@ public OperationContext(final RequestContext requestContext, final SessionContex
null);
}

public static OperationContext simpleOperationContext(
static OperationContext simpleOperationContext(
final TimeoutSettings timeoutSettings, @Nullable final ServerApi serverApi) {
return new OperationContext(
IgnorableRequestContext.INSTANCE,
Expand Down Expand Up @@ -113,6 +114,15 @@ public OperationContext withOperationName(final String operationName) {
operationName, tracingSpan);
}

/**
* TODO-JAVA-6058: This method enables overriding the ServerDeprioritization state.
* It is a temporary solution to handle cases where deprioritization state persists across operations.
*/
public OperationContext withNewServerDeprioritization() {
return new OperationContext(id, requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), tracingManager, serverApi,
operationName, tracingSpan);
}

public long getId() {
return id;
}
Expand Down Expand Up @@ -152,8 +162,7 @@ public void setTracingSpan(final Span tracingSpan) {
this.tracingSpan = tracingSpan;
}

@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
public OperationContext(final long id,
private OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
Expand All @@ -174,26 +183,6 @@ public OperationContext(final long id,
this.tracingSpan = tracingSpan;
}

@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
public OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
final TracingManager tracingManager,
@Nullable final ServerApi serverApi,
@Nullable final String operationName) {
this.id = id;
this.serverDeprioritization = new ServerDeprioritization();
this.requestContext = requestContext;
this.sessionContext = sessionContext;
this.timeoutContext = timeoutContext;
this.tracingManager = tracingManager;
this.serverApi = serverApi;
this.operationName = operationName;
this.tracingSpan = null;
}


/**
* @return The same {@link ServerDeprioritization} if called on the same {@link OperationContext}.
*/
Expand Down Expand Up @@ -227,34 +216,45 @@ public OperationContext withOverride(final TimeoutContextOverride timeoutContext
public static final class ServerDeprioritization {
@Nullable
private ServerAddress candidate;
@Nullable
private ClusterType clusterType;
Comment thread
vbabanin marked this conversation as resolved.
private final Set<ServerAddress> deprioritized;
private final DeprioritizingSelector selector;

private ServerDeprioritization() {
candidate = null;
deprioritized = new HashSet<>();
Comment thread
vbabanin marked this conversation as resolved.
selector = new DeprioritizingSelector();
clusterType = null;
}

/**
* The returned {@link ServerSelector} tries to {@linkplain ServerSelector#select(ClusterDescription) select}
* only the {@link ServerDescription}s that do not have deprioritized {@link ServerAddress}es.
* If no such {@link ServerDescription} can be selected, then it selects {@link ClusterDescription#getServerDescriptions()}.
* The returned {@link ServerSelector} wraps the provided selector and attempts
* {@linkplain ServerSelector#select(ClusterDescription) server selection} in two passes:
* <ol>
* <li>First pass: selects using the wrapped selector with only non-deprioritized {@link ServerDescription}s.</li>
* <li>Second pass: if the first pass selects no {@link ServerDescription}s,
* selects using the wrapped selector again with all {@link ServerDescription}s, including deprioritized ones.</li>
* </ol>
*/
ServerSelector getServerSelector() {
return selector;
ServerSelector apply(final ServerSelector wrappedSelector) {
return new DeprioritizingSelector(wrappedSelector);
}

void updateCandidate(final ServerAddress serverAddress) {
candidate = serverAddress;
void updateCandidate(final ServerAddress serverAddress, final ClusterType clusterType) {
this.candidate = serverAddress;
this.clusterType = clusterType;
}

public void onAttemptFailure(final Throwable failure) {
if (candidate == null || failure instanceof MongoConnectionPoolClearedException) {
candidate = null;
return;
}
deprioritized.add(candidate);

boolean isSystemOverloadedError = failure instanceof MongoException
&& ((MongoException) failure).hasErrorLabel(SYSTEM_OVERLOADED_ERROR_LABEL);
if (clusterType == ClusterType.SHARDED || isSystemOverloadedError) {
deprioritized.add(candidate);
}
}

/**
Expand All @@ -263,24 +263,41 @@ public void onAttemptFailure(final Throwable failure) {
* which indeed may be used concurrently. {@link DeprioritizingSelector} does not need to be thread-safe.
*/
private final class DeprioritizingSelector implements ServerSelector {
private DeprioritizingSelector() {
private final ServerSelector wrappedSelector;

private DeprioritizingSelector(final ServerSelector wrappedSelector) {
this.wrappedSelector = wrappedSelector;
}

@Override
public List<ServerDescription> select(final ClusterDescription clusterDescription) {
List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions();
if (!isEnabled(clusterDescription.getType())) {
return serverDescriptions;

// TODO-JAVA-5908: Evaluate whether using the early-return optimization has a meaningful performance impact on server selection.
if (serverDescriptions.size() == 1 || deprioritized.isEmpty()) {
return wrappedSelector.select(clusterDescription);
}

// TODO-JAVA-5908: Evaluate whether using a loop instead of Stream has a meaningful performance impact on server selection.
List<ServerDescription> nonDeprioritizedServerDescriptions = serverDescriptions
.stream()
.filter(serverDescription -> !deprioritized.contains(serverDescription.getAddress()))
.collect(toList());
Comment thread
rozza marked this conversation as resolved.
return nonDeprioritizedServerDescriptions.isEmpty() ? serverDescriptions : nonDeprioritizedServerDescriptions;
}

private boolean isEnabled(final ClusterType clusterType) {
return clusterType == ClusterType.SHARDED;
// TODO-JAVA-5908: Evaluate whether using the early-return optimization has a meaningful performance impact on server selection.
if (nonDeprioritizedServerDescriptions.isEmpty()) {
return wrappedSelector.select(clusterDescription);
}

List<ServerDescription> selected = wrappedSelector.select(
new ClusterDescription(
clusterDescription.getConnectionMode(),
clusterDescription.getType(),
clusterDescription.getSrvResolutionException(),
nonDeprioritizedServerDescriptions,
clusterDescription.getClusterSettings(),
clusterDescription.getServerSettings()));
Comment thread
rozza marked this conversation as resolved.
Comment thread
rozza marked this conversation as resolved.
return selected.isEmpty() ? wrappedSelector.select(clusterDescription) : selected;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
this.wrapped = new AtomicReference<>(assertNotNull(wrapped));
this.binding = binding;
binding.retain();
this.initialOperationContext = operationContext.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride);
this.initialOperationContext = operationContext
.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride)
.withNewServerDeprioritization();
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
isClosed = new AtomicBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T
final int maxWireVersion) {
this.changeStreamOperation = changeStreamOperation;
this.binding = binding.retain();
this.initialOperationContext = operationContext.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride);
this.initialOperationContext = operationContext
.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride)
.withNewServerDeprioritization();
this.wrapped = wrapped;
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
Expand Down
Loading