Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ protected CompletionStage<Response> doInvoke(Request request, CompletionStage<Se
protected ConsumerInvokerProxy<T> getInvoker(ServiceInstance instance) {
String key = toUniqKey(instance);
ConsumerInvokerProxy<T> result = invokerCache.get(key);
return Optional.ofNullable(result).orElseGet(() -> createInvoker(instance));
if (result != null && result.isAvailable()) {
return result;
}
return createInvoker(instance);
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
import com.tencent.trpc.core.worker.handler.TrpcThreadExceptionHandler;
import com.tencent.trpc.core.worker.spi.WorkerPool;
import com.tencent.trpc.core.worker.support.thread.ThreadWorkerPool;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -248,4 +250,55 @@ public void testDoInvokeWithCompletedFuture() {
}
}

@Test
@SuppressWarnings("unchecked")
public void testGetInvokerSkipsUnavailableCachedInvoker() throws Exception {
Assert.assertFalse(consumerInvokerProxy.isAvailable());

// Put the unavailable invoker into invokerCache via reflection
Field cacheField = DefClusterInvoker.class.getDeclaredField("invokerCache");
cacheField.setAccessible(true);
ConcurrentMap<String, ConsumerInvokerProxy<GenericClient>> cache =
(ConcurrentMap<String, ConsumerInvokerProxy<GenericClient>>) cacheField.get(defClusterInvoker);

ServiceInstance instance = new ServiceInstance("127.0.0.1", 12345);
String key = "127.0.0.1:12345:null";
cache.put(key, consumerInvokerProxy);

// Spy on defClusterInvoker and mock createInvoker to return a new available proxy
DefClusterInvoker<GenericClient> spy = PowerMockito.spy(defClusterInvoker);
ConsumerInvokerProxy<GenericClient> newProxy = PowerMockito.mock(ConsumerInvokerProxy.class);
PowerMockito.when(newProxy.isAvailable()).thenReturn(true);
PowerMockito.doReturn(newProxy).when(spy).createInvoker(instance);

ConsumerInvokerProxy<GenericClient> result = spy.getInvoker(instance);

// Should not return the unavailable cached invoker
Assert.assertNotSame(consumerInvokerProxy, result);
// Should return the new available one from createInvoker
Assert.assertSame(newProxy, result);
}

@Test
@SuppressWarnings("unchecked")
public void testGetInvokerReturnsAvailableCachedInvoker() throws Exception {
// Put an available invoker into invokerCache via reflection
Field cacheField = DefClusterInvoker.class.getDeclaredField("invokerCache");
cacheField.setAccessible(true);
ConcurrentMap<String, ConsumerInvokerProxy<GenericClient>> cache =
(ConcurrentMap<String, ConsumerInvokerProxy<GenericClient>>) cacheField.get(defClusterInvoker);

ServiceInstance instance = new ServiceInstance("127.0.0.1", 12345);
String key = "127.0.0.1:12345:null";

// Create an available proxy (client.isAvailable() = true)
ConsumerInvokerProxy<GenericClient> availableProxy = PowerMockito.mock(ConsumerInvokerProxy.class);
PowerMockito.when(availableProxy.isAvailable()).thenReturn(true);
cache.put(key, availableProxy);

// getInvoker should return the cached available proxy directly
ConsumerInvokerProxy<GenericClient> result = defClusterInvoker.getInvoker(instance);
Assert.assertSame(availableProxy, result);
}

}
Loading