From 6ad3edefc706633dbe2b119195218d0193bcc3ca Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Fri, 29 May 2026 16:35:11 +0200 Subject: [PATCH 1/2] fix: Propagate security identity and CDI context to AgentExecutor thread For streaming (SSE) requests, VertxSecurityHelper.runInRequestContext() terminates the CDI request context before the agent executor thread runs. This caused @RequestScoped beans (including SecurityIdentity and OIDC token credentials) to be unavailable on the agent thread, breaking patterns like @AccessToken REST client calls from AgentExecutor.execute(). Replace AsyncManagedExecutorProducer (which delegated to ManagedExecutor) with CdiPropagatingExecutorProducer that: - Captures the SecurityIdentity object on the submitting thread - Creates a fresh Vert.x duplicated context on the agent thread - Activates a fresh CDI request context within it - Restores the captured SecurityIdentity Using a Vert.x duplicated context is required because Quarkus ArC (via VertxCurrentContextFactory) stores CDI state in Vert.x context local data, and downstream components like the REST client create their own Vert.x contexts that must find the CDI state. Also adds a streaming variant of the CDI propagation test. This fixes #893 Co-Authored-By: Claude Opus 4.6 --- .../quarkus/AsyncManagedExecutorProducer.java | 55 ------ .../CdiPropagatingExecutorProducer.java | 187 ++++++++++++++++++ .../AsyncManagedExecutorProducerTest.java | 184 ----------------- .../CdiPropagatingExecutorProducerTest.java | 88 +++++++++ .../apps/common/AbstractA2AServerTest.java | 18 +- 5 files changed, 291 insertions(+), 241 deletions(-) delete mode 100644 reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/AsyncManagedExecutorProducer.java create mode 100644 reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducer.java delete mode 100644 reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/AsyncManagedExecutorProducerTest.java create mode 100644 reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducerTest.java diff --git a/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/AsyncManagedExecutorProducer.java b/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/AsyncManagedExecutorProducer.java deleted file mode 100644 index e1f9ad291..000000000 --- a/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/AsyncManagedExecutorProducer.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.a2aproject.sdk.server.common.quarkus; - -import java.util.concurrent.Executor; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.Priority; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Alternative; -import jakarta.enterprise.inject.Produces; -import jakarta.inject.Inject; - -import org.a2aproject.sdk.server.util.async.Internal; -import org.eclipse.microprofile.context.ManagedExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Alternative executor producer that provides a ManagedExecutor with CDI context propagation. - *

- * This producer replaces the default {@code AsyncExecutorProducer} so that CDI request context - * (and any other registered thread context types) are propagated to the agent executor thread. - * This allows {@code @RequestScoped} beans to be injected and used inside - * {@link org.a2aproject.sdk.server.agentexecution.AgentExecutor#execute}. - *

- * Priority 20 ensures this alternative takes precedence over the default producer (priority 10). - * - * @see org.eclipse.microprofile.context.ManagedExecutor - */ -@ApplicationScoped -@Alternative -@Priority(20) -public class AsyncManagedExecutorProducer { - private static final Logger LOGGER = LoggerFactory.getLogger(AsyncManagedExecutorProducer.class); - - @Inject - ManagedExecutor managedExecutor; - - @PostConstruct - public void init() { - LOGGER.info("Initializing ManagedExecutor for async operations with CDI context propagation"); - if (managedExecutor == null) { - LOGGER.warn("ManagedExecutor not available - context propagation may not work correctly"); - } - } - - @Produces - @Internal - public Executor produce() { - LOGGER.debug("Using ManagedExecutor for async operations with CDI context propagation"); - if (managedExecutor == null) { - throw new IllegalStateException("ManagedExecutor not injected - ensure MicroProfile Context Propagation is available"); - } - return managedExecutor; - } -} diff --git a/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducer.java b/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducer.java new file mode 100644 index 000000000..344c7e248 --- /dev/null +++ b/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducer.java @@ -0,0 +1,187 @@ +package org.a2aproject.sdk.server.common.quarkus; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Alternative; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.ManagedContext; +import io.quarkus.security.identity.CurrentIdentityAssociation; +import io.quarkus.security.identity.SecurityIdentity; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; +import org.a2aproject.sdk.server.config.A2AConfigProvider; +import org.a2aproject.sdk.server.util.async.Internal; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Alternative executor producer that provides CDI request context propagation to the agent executor thread. + *

+ * This producer replaces the default {@code AsyncExecutorProducer} so that {@code @RequestScoped} beans + * — including security identity, OIDC token credentials, and other request-scoped state — are + * available inside {@link org.a2aproject.sdk.server.agentexecution.AgentExecutor#execute}. + *

+ * The security identity is captured from the submitting thread and set up in a fresh CDI request context + * on the agent thread within a proper Vert.x duplicated context. This is necessary because: + *

+ *

+ * Priority 20 ensures this alternative takes precedence over the default producer (priority 10). + */ +@ApplicationScoped +@Alternative +@Priority(20) +public class CdiPropagatingExecutorProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(CdiPropagatingExecutorProducer.class); + + @Inject + A2AConfigProvider configProvider; + + @Inject + Vertx vertx; + + @Inject + Instance currentIdentityAssociation; + + private @Nullable ExecutorService executor; + + @PostConstruct + public void init() { + int corePoolSize = Integer.parseInt(configProvider.getValue("a2a.executor.core-pool-size")); + int maxPoolSize = Integer.parseInt(configProvider.getValue("a2a.executor.max-pool-size")); + long keepAliveSeconds = Long.parseLong(configProvider.getValue("a2a.executor.keep-alive-seconds")); + int queueCapacity = Integer.parseInt(configProvider.getValue("a2a.executor.queue-capacity")); + + LOGGER.info("Initializing CDI-propagating executor: corePoolSize={}, maxPoolSize={}, keepAliveSeconds={}, queueCapacity={}", + corePoolSize, maxPoolSize, keepAliveSeconds, queueCapacity); + + ThreadPoolExecutor tpe = new ThreadPoolExecutor( + corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(queueCapacity), + new A2AThreadFactory()); + tpe.allowCoreThreadTimeOut(true); + executor = tpe; + } + + @PreDestroy + public void close() { + if (executor == null) { + return; + } + LOGGER.info("Shutting down CDI-propagating executor"); + executor.shutdown(); + try { + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @Produces + @Internal + public Executor produce() { + if (executor == null) { + throw new IllegalStateException("Executor not initialized"); + } + return new CdiContextPropagatingExecutor(executor, vertx, currentIdentityAssociation); + } + + /** + * Executor wrapper that captures the authenticated security identity from the submitting thread + * and sets it up in a fresh CDI request context on the agent thread. + *

+ * The agent thread runs within a Vert.x duplicated context so that ArC's + * {@code VertxCurrentContextFactory} stores the CDI state in Vert.x context local data. + * This ensures that downstream components (like the Quarkus REST client with {@code @AccessToken}) + * can find the CDI state even if they create their own Vert.x contexts. + */ + private static class CdiContextPropagatingExecutor implements Executor { + + private final ExecutorService delegate; + private final Vertx vertx; + private final Instance identityAssociationInstance; + + CdiContextPropagatingExecutor(ExecutorService delegate, Vertx vertx, + Instance identityAssociationInstance) { + this.delegate = delegate; + this.vertx = vertx; + this.identityAssociationInstance = identityAssociationInstance; + } + + @Override + public void execute(Runnable command) { + SecurityIdentity capturedIdentity = captureSecurityIdentity(); + + delegate.execute(() -> { + ContextInternal dupCtx = ((ContextInternal) vertx.getOrCreateContext()).duplicate(); + io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe(dupCtx, true); + ContextInternal previous = dupCtx.beginDispatch(); + try { + ManagedContext rc = Arc.container().requestContext(); + rc.activate(); + try { + restoreSecurityIdentity(capturedIdentity); + command.run(); + } finally { + rc.terminate(); + } + } finally { + dupCtx.endDispatch(previous); + } + }); + } + + private @Nullable SecurityIdentity captureSecurityIdentity() { + if (identityAssociationInstance.isResolvable()) { + try { + return identityAssociationInstance.get().getIdentity(); + } catch (Exception e) { + LOGGER.debug("Could not capture security identity", e); + } + } + return null; + } + + private void restoreSecurityIdentity(@Nullable SecurityIdentity identity) { + if (identity != null && identityAssociationInstance.isResolvable()) { + try { + identityAssociationInstance.get().setIdentity(identity); + } catch (Exception e) { + LOGGER.debug("Could not restore security identity on agent thread", e); + } + } + } + } + + private static class A2AThreadFactory implements ThreadFactory { + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "a2a-agent-executor-" + threadNumber.getAndIncrement()); + t.setDaemon(false); + return t; + } + } +} diff --git a/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/AsyncManagedExecutorProducerTest.java b/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/AsyncManagedExecutorProducerTest.java deleted file mode 100644 index 056047e5f..000000000 --- a/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/AsyncManagedExecutorProducerTest.java +++ /dev/null @@ -1,184 +0,0 @@ -package org.a2aproject.sdk.server.common.quarkus; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - -import java.util.concurrent.Executor; - -import org.eclipse.microprofile.context.ManagedExecutor; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class AsyncManagedExecutorProducerTest { - - @Mock - private ManagedExecutor managedExecutor; - - private AsyncManagedExecutorProducer producer; - - @BeforeEach - void setUp() { - producer = new AsyncManagedExecutorProducer(); - } - - @Nested - class InitializationTests { - @Test - void init_withValidManagedExecutor_logsSuccessfully() { - producer.managedExecutor = managedExecutor; - - assertDoesNotThrow(() -> producer.init()); - assertNotNull(producer.managedExecutor); - } - - @Test - void init_withNullManagedExecutor_logsWarning() { - producer.managedExecutor = null; - - assertDoesNotThrow(() -> producer.init()); - assertNull(producer.managedExecutor); - } - } - - @Nested - class ProduceTests { - @Test - void produce_withValidManagedExecutor_returnsExecutor() { - producer.managedExecutor = managedExecutor; - - Executor result = producer.produce(); - - assertNotNull(result); - assertSame(managedExecutor, result); - } - - @Test - void produce_withNullManagedExecutor_throwsIllegalStateException() { - producer.managedExecutor = null; - - IllegalStateException exception = assertThrows( - IllegalStateException.class, - () -> producer.produce() - ); - - assertEquals( - "ManagedExecutor not injected - ensure MicroProfile Context Propagation is available", - exception.getMessage() - ); - } - - @Test - void produce_returnsSameInstanceOnMultipleCalls() { - producer.managedExecutor = managedExecutor; - - Executor result1 = producer.produce(); - Executor result2 = producer.produce(); - - assertSame(result1, result2); - assertSame(managedExecutor, result1); - } - } - - @Nested - class CDIIntegrationTests { - @Test - void producer_hasCorrectAnnotations() { - assertTrue( - AsyncManagedExecutorProducer.class.isAnnotationPresent( - jakarta.enterprise.context.ApplicationScoped.class - ) - ); - - assertTrue( - AsyncManagedExecutorProducer.class.isAnnotationPresent( - jakarta.enterprise.inject.Alternative.class - ) - ); - - assertTrue( - AsyncManagedExecutorProducer.class.isAnnotationPresent( - jakarta.annotation.Priority.class - ) - ); - assertEquals( - 20, - AsyncManagedExecutorProducer.class.getAnnotation( - jakarta.annotation.Priority.class - ).value() - ); - } - - @Test - void produceMethod_hasCorrectAnnotations() throws NoSuchMethodException { - var method = AsyncManagedExecutorProducer.class.getMethod("produce"); - - assertTrue( - method.isAnnotationPresent(jakarta.enterprise.inject.Produces.class) - ); - - assertTrue( - method.isAnnotationPresent(org.a2aproject.sdk.server.util.async.Internal.class) - ); - } - - @Test - void initMethod_hasPostConstructAnnotation() throws NoSuchMethodException { - var method = AsyncManagedExecutorProducer.class.getMethod("init"); - - assertTrue( - method.isAnnotationPresent(jakarta.annotation.PostConstruct.class) - ); - } - - @Test - void managedExecutorField_hasInjectAnnotation() throws NoSuchFieldException { - var field = AsyncManagedExecutorProducer.class.getDeclaredField("managedExecutor"); - - assertTrue( - field.isAnnotationPresent(jakarta.inject.Inject.class) - ); - } - } - - @Nested - class ExecutorBehaviorTests { - @Test - void producedExecutor_canExecuteRunnables() { - producer.managedExecutor = managedExecutor; - Runnable task = mock(Runnable.class); - - Executor executor = producer.produce(); - executor.execute(task); - - verify(managedExecutor).execute(task); - } - - @Test - void producedExecutor_delegatesToManagedExecutor() { - producer.managedExecutor = managedExecutor; - Runnable task1 = mock(Runnable.class); - Runnable task2 = mock(Runnable.class); - - Executor executor = producer.produce(); - executor.execute(task1); - executor.execute(task2); - - verify(managedExecutor).execute(task1); - verify(managedExecutor).execute(task2); - verifyNoMoreInteractions(managedExecutor); - } - } -} diff --git a/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducerTest.java b/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducerTest.java new file mode 100644 index 000000000..a13b2e4da --- /dev/null +++ b/reference/common/src/test/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducerTest.java @@ -0,0 +1,88 @@ +package org.a2aproject.sdk.server.common.quarkus; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class CdiPropagatingExecutorProducerTest { + + @Nested + class CDIAnnotationTests { + @Test + void producer_hasCorrectAnnotations() { + assertTrue( + CdiPropagatingExecutorProducer.class.isAnnotationPresent( + jakarta.enterprise.context.ApplicationScoped.class + ) + ); + + assertTrue( + CdiPropagatingExecutorProducer.class.isAnnotationPresent( + jakarta.enterprise.inject.Alternative.class + ) + ); + + assertTrue( + CdiPropagatingExecutorProducer.class.isAnnotationPresent( + jakarta.annotation.Priority.class + ) + ); + assertEquals( + 20, + CdiPropagatingExecutorProducer.class.getAnnotation( + jakarta.annotation.Priority.class + ).value() + ); + } + + @Test + void produceMethod_hasCorrectAnnotations() throws NoSuchMethodException { + var method = CdiPropagatingExecutorProducer.class.getMethod("produce"); + + assertTrue( + method.isAnnotationPresent(jakarta.enterprise.inject.Produces.class) + ); + + assertTrue( + method.isAnnotationPresent(org.a2aproject.sdk.server.util.async.Internal.class) + ); + } + + @Test + void initMethod_hasPostConstructAnnotation() throws NoSuchMethodException { + var method = CdiPropagatingExecutorProducer.class.getMethod("init"); + + assertTrue( + method.isAnnotationPresent(jakarta.annotation.PostConstruct.class) + ); + } + + @Test + void closeMethod_hasPreDestroyAnnotation() throws NoSuchMethodException { + var method = CdiPropagatingExecutorProducer.class.getMethod("close"); + + assertTrue( + method.isAnnotationPresent(jakarta.annotation.PreDestroy.class) + ); + } + } + + @Nested + class ProduceTests { + @Test + void produce_withNullExecutor_throwsIllegalStateException() { + CdiPropagatingExecutorProducer producer = new CdiPropagatingExecutorProducer(); + + IllegalStateException exception = assertThrows( + IllegalStateException.class, + producer::produce + ); + + assertNotNull(exception.getMessage()); + } + } +} diff --git a/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AbstractA2AServerTest.java index 110708d96..2f7fd1018 100644 --- a/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AbstractA2AServerTest.java @@ -506,6 +506,15 @@ public void testSendMessageNewMessageSuccess() throws Exception { @Test public void testRequestScopedBeanAvailableOnAgentExecutorThread() throws Exception { + doTestRequestScopedBeanAvailableOnAgentExecutorThread(getNonStreamingClient()); + } + + @Test + public void testRequestScopedBeanAvailableOnAgentExecutorThreadStreaming() throws Exception { + doTestRequestScopedBeanAvailableOnAgentExecutorThread(getClient()); + } + + private void doTestRequestScopedBeanAvailableOnAgentExecutorThread(Client client) throws Exception { Message message = Message.builder() .messageId("request-scoped-test") .role(Message.Role.ROLE_USER) @@ -515,21 +524,26 @@ public void testRequestScopedBeanAvailableOnAgentExecutorThread() throws Excepti CountDownLatch latch = new CountDownLatch(1); AtomicReference receivedTask = new AtomicReference<>(); AtomicReference errorRef = new AtomicReference<>(); + AtomicBoolean completed = new AtomicBoolean(false); - getNonStreamingClient().sendMessage(message, List.of((event, agentCard) -> { + client.sendMessage(message, List.of((event, agentCard) -> { if (event instanceof TaskEvent te) { receivedTask.set(te.getTask()); if (te.getTask().status().state() == TaskState.TASK_STATE_COMPLETED) { + completed.set(true); latch.countDown(); } } else if (event instanceof TaskUpdateEvent tue) { receivedTask.set(tue.getTask()); if (tue.getTask().status().state() == TaskState.TASK_STATE_COMPLETED) { + completed.set(true); latch.countDown(); } } }), error -> { - errorRef.set(error); + if (!completed.get()) { + errorRef.set(error); + } latch.countDown(); }); From cb300151580195d88bd1e8915862c23c68625829 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Fri, 29 May 2026 16:57:05 +0200 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../common/quarkus/CdiPropagatingExecutorProducer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducer.java b/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducer.java index 344c7e248..5402bf103 100644 --- a/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducer.java +++ b/reference/common/src/main/java/org/a2aproject/sdk/server/common/quarkus/CdiPropagatingExecutorProducer.java @@ -124,13 +124,14 @@ private static class CdiContextPropagatingExecutor implements Executor { CdiContextPropagatingExecutor(ExecutorService delegate, Vertx vertx, Instance identityAssociationInstance) { - this.delegate = delegate; - this.vertx = vertx; - this.identityAssociationInstance = identityAssociationInstance; + this.delegate = java.util.Objects.requireNonNull(delegate, "delegate must not be null"); + this.vertx = java.util.Objects.requireNonNull(vertx, "vertx must not be null"); + this.identityAssociationInstance = java.util.Objects.requireNonNull(identityAssociationInstance, "identityAssociationInstance must not be null"); } @Override public void execute(Runnable command) { + java.util.Objects.requireNonNull(command, "command must not be null"); SecurityIdentity capturedIdentity = captureSecurityIdentity(); delegate.execute(() -> {