diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java b/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java index 705dea4ed..3fc330215 100644 --- a/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java +++ b/extras/push-notification-config-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java @@ -11,11 +11,14 @@ import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import jakarta.inject.Inject; import jakarta.transaction.Transactional; import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.client.TaskEvent; +import org.a2aproject.sdk.client.TaskUpdateEvent; import org.a2aproject.sdk.client.config.ClientConfig; import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport; import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfigBuilder; @@ -98,23 +101,33 @@ public void testDirectNotificationTrigger() { @Test public void testJpaDatabasePushNotificationConfigStoreIntegration() throws Exception { - final String taskId = "push-notify-test-" + System.currentTimeMillis(); - final String contextId = "test-context"; - - // Step 1: Create the task + // Step 1: Create the task (no client-provided taskId — server generates it) Message createMessage = Message.builder() .role(Message.Role.ROLE_USER) .parts(List.of(new TextPart("create"))) // Send the "create" command - .taskId(taskId) .messageId("test-msg-1") - .contextId(contextId) .build(); - // Use a latch to wait for the first operation to complete + // Use a latch to wait for the first operation to complete and capture server-generated ids CountDownLatch createLatch = new CountDownLatch(1); - client.sendMessage(createMessage, List.of((event, card) -> createLatch.countDown()), (e) -> createLatch.countDown()); + AtomicReference createdTaskRef = new AtomicReference<>(); + client.sendMessage(createMessage, List.of((event, card) -> { + if (event instanceof TaskEvent taskEvent) { + createdTaskRef.set(taskEvent.getTask()); + createLatch.countDown(); + return; + } + if (event instanceof TaskUpdateEvent taskUpdateEvent) { + createdTaskRef.set(taskUpdateEvent.getTask()); + createLatch.countDown(); + } + }), (e) -> createLatch.countDown()); assertTrue(createLatch.await(10, TimeUnit.SECONDS), "Timeout waiting for task creation"); + assertNotNull(createdTaskRef.get(), "Task should have been created"); + final String taskId = createdTaskRef.get().id(); + final String contextId = createdTaskRef.get().contextId(); + // Step 2: Set the push notification configuration TaskPushNotificationConfig taskPushConfig = TaskPushNotificationConfig.builder() .id("test-config-1") diff --git a/extras/queue-manager-replicated/tests-multi-instance/tests/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationTest.java b/extras/queue-manager-replicated/tests-multi-instance/tests/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationTest.java index 4a42d34a4..61676ff00 100644 --- a/extras/queue-manager-replicated/tests-multi-instance/tests/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationTest.java +++ b/extras/queue-manager-replicated/tests-multi-instance/tests/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationTest.java @@ -9,7 +9,9 @@ import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -19,6 +21,7 @@ import org.a2aproject.sdk.A2A; import org.a2aproject.sdk.client.Client; import org.a2aproject.sdk.client.ClientEvent; +import org.a2aproject.sdk.client.TaskEvent; import org.a2aproject.sdk.client.config.ClientConfig; import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport; import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig; @@ -254,15 +257,13 @@ public void testInfrastructureStartup() { */ @Test public void testMultiInstanceEventReplication() throws Exception { - final String taskId = "replication-test-task-" + System.currentTimeMillis(); - final String contextId = "replication-test-context"; - Throwable testFailure = null; + final String[] taskIdHolder = {null}; + final String[] contextIdHolder = {null}; + try { - // Step 1: Send initial message NON-streaming to create task + // Step 1: Send initial message NON-streaming to create task (no client-provided taskId) Message initialMessage = Message.builder(A2A.toUserMessage("Initial test message")) - .taskId(taskId) - .contextId(contextId) .build(); // Use NON-streaming client to create the task @@ -272,13 +273,22 @@ public void testMultiInstanceEventReplication() throws Exception { .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig()) .build(); - Task createdTask = null; try { - nonStreamingClient.sendMessage(initialMessage, null); + CountDownLatch createLatch = new CountDownLatch(1); + AtomicReference taskRef = new AtomicReference<>(); + nonStreamingClient.sendMessage(initialMessage, List.of((ClientEvent event, AgentCard card) -> { + if (event instanceof TaskEvent te) { + taskRef.set(te.getTask()); + } + createLatch.countDown(); + }), (Throwable err) -> createLatch.countDown()); + assertTrue(createLatch.await(15, TimeUnit.SECONDS), "Task creation timed out"); - // Retrieve the task to verify it was created - createdTask = nonStreamingClient.getTask(new TaskQueryParams(taskId), null); + Task createdTask = taskRef.get(); assertNotNull(createdTask, "Task should be created"); + taskIdHolder[0] = createdTask.id(); + contextIdHolder[0] = createdTask.contextId(); + assertNotNull(taskIdHolder[0], "Server-generated task ID should not be null"); // Task should be in a non-final state (SUBMITTED or WORKING are both valid) TaskState state = createdTask.status().state(); @@ -297,6 +307,9 @@ public void testMultiInstanceEventReplication() throws Exception { throw e; } + final String taskId = taskIdHolder[0]; + final String contextId = contextIdHolder[0]; + // Step 2: Subscribe from both app1 and app2 with proper latches // We need to wait for at least 3 new events after resubscription: diff --git a/extras/queue-manager-replicated/tests-single-instance/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java b/extras/queue-manager-replicated/tests-single-instance/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java index 0b6e305ee..c8b7adb78 100644 --- a/extras/queue-manager-replicated/tests-single-instance/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java +++ b/extras/queue-manager-replicated/tests-single-instance/src/test/java/org/a2aproject/sdk/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java @@ -130,19 +130,14 @@ public void tearDown() throws Exception { @Test public void testA2AMessageReplicatedToKafka() throws Exception { - String taskId = "kafka-replication-test-" + System.currentTimeMillis(); - String contextId = "test-context-" + System.currentTimeMillis(); - // Clear any previous events testConsumer.clear(); - // Send A2A message that should trigger events and replication + // Send A2A message that should trigger events and replication (no client-provided taskId) Message message = Message.builder() .role(Message.Role.ROLE_USER) .parts(List.of(new TextPart("create"))) - .taskId(taskId) .messageId("test-msg-" + System.currentTimeMillis()) - .contextId(contextId) .build(); CountDownLatch a2aLatch = new CountDownLatch(1); @@ -166,7 +161,9 @@ public void testA2AMessageReplicatedToKafka() throws Exception { Task task = createdTask.get(); assertNotNull(task, "Task should be created"); - assertEquals(taskId, task.id()); + String taskId = task.id(); + String contextId = task.contextId(); + assertNotNull(taskId, "Server-generated task ID should not be null"); assertEquals(TaskState.TASK_STATE_SUBMITTED, task.status().state()); // Wait for the event to be replicated to Kafka @@ -193,19 +190,14 @@ public void testA2AMessageReplicatedToKafka() throws Exception { @Test public void testKafkaEventReceivedByA2AServer() throws Exception { - String taskId = "kafka-to-a2a-test-" + System.currentTimeMillis(); - String contextId = "test-context-" + System.currentTimeMillis(); - // Clear any previous events testConsumer.clear(); - // First create a task in the A2A system using non-streaming client + // First create a task in the A2A system using non-streaming client (no client-provided taskId) Message createMessage = Message.builder() .role(Message.Role.ROLE_USER) .parts(List.of(new TextPart("create"))) - .taskId(taskId) .messageId("create-msg-" + System.currentTimeMillis()) - .contextId(contextId) .build(); CountDownLatch createLatch = new CountDownLatch(1); @@ -223,6 +215,9 @@ public void testKafkaEventReceivedByA2AServer() throws Exception { assertTrue(createLatch.await(15, TimeUnit.SECONDS), "Task creation timed out"); Task initialTask = createdTask.get(); assertNotNull(initialTask, "Task should be created"); + String taskId = initialTask.id(); + String contextId = initialTask.contextId(); + assertNotNull(taskId, "Server-generated task ID should not be null"); assertEquals(TaskState.TASK_STATE_SUBMITTED, initialTask.status().state(), "Initial task should be in SUBMITTED state"); // Add a small delay to ensure the task is fully processed before resubscription @@ -312,20 +307,15 @@ public void testKafkaEventReceivedByA2AServer() throws Exception { @Test public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception { - String taskId = "queue-closed-test-" + System.currentTimeMillis(); - String contextId = "test-context-" + System.currentTimeMillis(); - // Clear any previous events testConsumer.clear(); - // Use polling (non-blocking) client with "working" command + // Use polling (non-blocking) client with "working" command (no client-provided taskId) // This creates task in WORKING state (non-final) and keeps queue alive Message workingMessage = Message.builder() .role(Message.Role.ROLE_USER) .parts(List.of(new TextPart("working"))) - .taskId(taskId) .messageId("working-msg-" + System.currentTimeMillis()) - .contextId(contextId) .build(); CountDownLatch workingLatch = new CountDownLatch(1); @@ -348,7 +338,7 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception { assertTrue(workingLatch.await(15, TimeUnit.SECONDS), "Task creation timed out"); String createdTaskId = taskIdRef.get(); assertNotNull(createdTaskId, "Task should be created"); - assertEquals(taskId, createdTaskId); + String taskId = createdTaskId; // Set up streaming resubscription to listen for the QueueClosedEvent CountDownLatch streamCompletedLatch = new CountDownLatch(1); @@ -408,19 +398,14 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception { @Test public void testPoisonPillGenerationOnTaskFinalization() throws Exception { - String taskId = "poison-pill-gen-test-" + System.currentTimeMillis(); - String contextId = "test-context-" + System.currentTimeMillis(); - // Clear any previous events testConsumer.clear(); - // Create a task that will be completed (finalized) + // Create a task that will be completed (finalized) - no client-provided taskId Message completeMessage = Message.builder() .role(Message.Role.ROLE_USER) .parts(List.of(new TextPart("complete"))) - .taskId(taskId) .messageId("complete-msg-" + System.currentTimeMillis()) - .contextId(contextId) .build(); CountDownLatch completeLatch = new CountDownLatch(1); @@ -439,6 +424,8 @@ public void testPoisonPillGenerationOnTaskFinalization() throws Exception { assertTrue(completeLatch.await(15, TimeUnit.SECONDS), "Task creation timed out"); Task createdTask = finalTask.get(); assertNotNull(createdTask, "Task should be created"); + String taskId = createdTask.id(); + assertNotNull(taskId, "Server-generated task ID should not be null"); // The task should complete very quickly since it's a simple operation // Wait a moment to ensure all events have been enqueued diff --git a/extras/task-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/taskstore/database/jpa/JpaDatabaseTaskStoreIntegrationTest.java b/extras/task-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/taskstore/database/jpa/JpaDatabaseTaskStoreIntegrationTest.java index 5f93de75e..7bc227553 100644 --- a/extras/task-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/taskstore/database/jpa/JpaDatabaseTaskStoreIntegrationTest.java +++ b/extras/task-store-database-jpa/src/test/java/org/a2aproject/sdk/extras/taskstore/database/jpa/JpaDatabaseTaskStoreIntegrationTest.java @@ -3,7 +3,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Collections; @@ -69,17 +68,11 @@ public void testIsJpaDatabaseTaskStore() { @Test public void testJpaDatabaseTaskStore() throws Exception { - final String taskId = "test-task-1"; - final String contextId = "contextId"; - - // Send a message creating the Task - assertNull(taskStore.get(taskId)); + // Send a message creating the Task (no client-provided taskId — server generates it) Message userMessage = Message.builder() .role(Message.Role.ROLE_USER) .parts(Collections.singletonList(new TextPart("create"))) - .taskId(taskId) .messageId("test-msg-1") - .contextId(contextId) .build(); CountDownLatch latch = new CountDownLatch(1); @@ -102,6 +95,9 @@ public void testJpaDatabaseTaskStore() throws Exception { assertEquals(0, createdTask.artifacts().size()); assertEquals(TaskState.TASK_STATE_SUBMITTED, createdTask.status().state()); + final String taskId = createdTask.id(); + final String contextId = createdTask.contextId(); + // Send a message updating the Task userMessage = Message.builder() .role(Message.Role.ROLE_USER) diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java index 8184d7baf..aea2cbc68 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java @@ -1021,56 +1021,26 @@ private CompletableFuture cleanupProducer(@Nullable CompletableFuture { + throw new AssertionError("AgentExecutor must NOT be invoked when taskId is unknown"); + }; + + Message message = Message.builder() + .messageId("msg-unknown-task") + .role(Message.Role.ROLE_USER) + .taskId("does-not-exist-99999") + .parts(new TextPart("hello")) + .build(); + + MessageSendParams params = MessageSendParams.builder() + .message(message) + .configuration(DEFAULT_CONFIG) + .build(); + + assertThrows(TaskNotFoundError.class, + () -> requestHandler.onMessageSend(params, NULL_CONTEXT), + "Expected TaskNotFoundError when SendMessage references a non-existent taskId"); + } + /** * Test: SendStreamingMessage to a task in a terminal state must also return UnsupportedOperationError * (CORE-SEND-002, streaming path). @@ -740,4 +770,139 @@ void testSendMessageStream_ToCompletedTask_ThrowsUnsupportedOperationError() thr () -> requestHandler.onMessageSendStream(followUpParams, NULL_CONTEXT), "Expected UnsupportedOperationError when streaming message to a completed task"); } + + /** + * CORE-MULTI-004 (streaming path): onMessageSendStream with a client-provided + * taskId that does not reference an existing task must also return + * TaskNotFoundError. + */ + @Test + void testSendMessageStream_WithNonExistentTaskId_ThrowsTaskNotFoundError() { + agentExecutorExecute = (context, emitter) -> { + throw new AssertionError("AgentExecutor must NOT be invoked when taskId is unknown"); + }; + + Message message = Message.builder() + .messageId("msg-stream-unknown-task") + .role(Message.Role.ROLE_USER) + .taskId("does-not-exist-stream-99999") + .parts(new TextPart("hello")) + .build(); + + MessageSendParams params = MessageSendParams.builder() + .message(message) + .configuration(DEFAULT_CONFIG) + .build(); + + assertThrows(TaskNotFoundError.class, + () -> requestHandler.onMessageSendStream(params, NULL_CONTEXT), + "Expected TaskNotFoundError when onMessageSendStream references a non-existent taskId"); + } + + /** + * Verification for Codex adversarial review finding: + * When a follow-up message includes taskId but omits contextId, + * the emitted TaskStatusUpdateEvent should use the task's original + * contextId, NOT a freshly generated UUID. + */ + @Test + void testSendMessage_FollowUpWithTaskIdOnly_PreservesOriginalContextId() throws Exception { + final String originalContextId = "original-ctx-for-verification"; + + // Arrange: create a task with a known contextId via the handler so the task is + // in a non-terminal (SUBMITTED) state and stored in taskStore. + CountDownLatch firstAgentStarted = new CountDownLatch(1); + CountDownLatch releaseFirstAgent = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + emitter.startWork(); + firstAgentStarted.countDown(); + try { + releaseFirstAgent.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + emitter.complete(); + }; + + Message initialMessage = Message.builder() + .messageId("msg-initial-ctx-verify") + .role(Message.Role.ROLE_USER) + .contextId(originalContextId) + .parts(new TextPart("initial message")) + .build(); + + MessageSendParams initialParams = MessageSendParams.builder() + .message(initialMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + EventKind initialResult = requestHandler.onMessageSend(initialParams, NULL_CONTEXT); + assertInstanceOf(Task.class, initialResult); + Task existingTask = (Task) initialResult; + + // Verify the task was stored with the expected contextId + assertEquals(originalContextId, existingTask.contextId(), + "Initial task must have the original contextId"); + + // Wait until the first agent is actively running (task is non-terminal/WORKING) + assertTrue(firstAgentStarted.await(5, TimeUnit.SECONDS), "First agent should start"); + + // Capture the contextId that the agent sees in its RequestContext on the follow-up call + AtomicReference observedContextId = new AtomicReference<>(); + CountDownLatch followUpAgentDone = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + observedContextId.set(context.getContextId()); + emitter.complete(); + followUpAgentDone.countDown(); + }; + + // Act: follow-up message with taskId only, NO contextId + Message followUp = Message.builder() + .messageId("follow-up-msg-ctx-verify") + .role(Message.Role.ROLE_USER) + .taskId(existingTask.id()) + // NOTE: intentionally NO .contextId(...) + .parts(new TextPart("follow up")) + .build(); + + MessageSendParams followUpParams = MessageSendParams.builder() + .message(followUp) + .configuration(DEFAULT_CONFIG) + .build(); + + // Release the first agent so the task reaches a non-terminal state that + // allows a follow-up (the test uses WORKING state, then we send a second message; + // but the spec only allows follow-up to non-terminal tasks so we send before + // completion by driving the task to SUBMITTED first via direct store manipulation). + // Instead: pre-store the task directly to control state precisely. + Task workingTask = new Task( + existingTask.id(), + originalContextId, + new TaskStatus(TaskState.TASK_STATE_WORKING), + null, + null, + null + ); + taskStore.save(workingTask, false); + + EventKind result = requestHandler.onMessageSend(followUpParams, NULL_CONTEXT); + + // Assert: the task returned must still have the ORIGINAL contextId + assertInstanceOf(Task.class, result); + Task returned = (Task) result; + assertEquals(originalContextId, returned.contextId(), + "Task's contextId must be preserved after follow-up without contextId"); + + // Wait for follow-up agent to run and capture its observed contextId + assertTrue(followUpAgentDone.await(5, TimeUnit.SECONDS), "Follow-up agent should complete"); + + // And the agent's view of the contextId must match the original + assertEquals(originalContextId, observedContextId.get(), + "Agent should see the original contextId, not a freshly generated one"); + + // Cleanup: release the first agent + releaseFirstAgent.countDown(); + } } diff --git a/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AbstractA2AServerTest.java index 8d75f783a..cc7ceb188 100644 --- a/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AbstractA2AServerTest.java @@ -468,10 +468,7 @@ public void testListTasksWithHistoryLimit() throws Exception { @Test public void testSendMessageNewMessageSuccess() throws Exception { - assertTrue(getTaskFromTaskStore(MINIMAL_TASK.id()) == null); Message message = Message.builder(MESSAGE) - .taskId(MINIMAL_TASK.id()) - .contextId(MINIMAL_TASK.contextId()) .build(); CountDownLatch latch = new CountDownLatch(1); @@ -598,20 +595,25 @@ public void testGetPushNotificationSuccess() throws Exception { } @Test - public void testError() throws A2AClientException { - Message message = Message.builder(MESSAGE) - .taskId(SEND_MESSAGE_NOT_SUPPORTED.id()) - .contextId(SEND_MESSAGE_NOT_SUPPORTED.contextId()) - .build(); - + public void testError() throws Exception { + saveTaskInTaskStore(SEND_MESSAGE_NOT_SUPPORTED); try { - getNonStreamingClient().sendMessage(message); + Message message = Message.builder(MESSAGE) + .taskId(SEND_MESSAGE_NOT_SUPPORTED.id()) + .contextId(SEND_MESSAGE_NOT_SUPPORTED.contextId()) + .build(); - // For non-streaming clients, the error should still be thrown as an exception - fail("Expected A2AClientException for unsupported send message operation"); - } catch (A2AClientException e) { - // Expected - the client should throw an exception for unsupported operations - assertInstanceOf(UnsupportedOperationError.class, e.getCause()); + try { + getNonStreamingClient().sendMessage(message); + + // For non-streaming clients, the error should still be thrown as an exception + fail("Expected A2AClientException for unsupported send message operation"); + } catch (A2AClientException e) { + // Expected - the client should throw an exception for unsupported operations + assertInstanceOf(UnsupportedOperationError.class, e.getCause()); + } + } finally { + deleteTaskInTaskStore(SEND_MESSAGE_NOT_SUPPORTED.id()); } } @@ -934,19 +936,12 @@ public void testSubscribeExistingTaskSuccessWithClientConsumers() throws Excepti @Test @Timeout(value = 3, unit = TimeUnit.MINUTES) public void testSubscribeToTaskWithInterruptedStateKeepsStreamOpen() throws Exception { - // Use a taskId with the pattern the test agent recognizes - // When we send a message with a taskId to a non-existent task, it creates - // a new task with that ID, and context.getTask() is still null on first invocation - String taskId = "input-required-test-" + UUID.randomUUID(); + AtomicReference taskIdRef = new AtomicReference<>(); try { - // Create initial message with the special taskId pattern - // Use non-streaming client so agent can emit INPUT_REQUIRED and return immediately - // This ensures context.getTask() == null on first agent invocation + // No taskId - server generates one; routing is by message content prefix "input-required:" Message message = Message.builder(MESSAGE) - .taskId(taskId) - .contextId("test-context") - .parts(new TextPart("Trigger INPUT_REQUIRED")) + .parts(new TextPart("input-required:Trigger INPUT_REQUIRED")) .build(); // Send message with non-streaming client - agent will emit INPUT_REQUIRED and complete @@ -956,10 +951,12 @@ public void testSubscribeToTaskWithInterruptedStateKeepsStreamOpen() throws Exce getNonStreamingClient().sendMessage(message, List.of((event, agentCard) -> { if (event instanceof TaskEvent te) { + taskIdRef.compareAndSet(null, te.getTask().id()); finalStateRef.set(te.getTask().status().state()); sendLatch.countDown(); } else if (event instanceof TaskUpdateEvent tue) { if (tue.getUpdateEvent() instanceof TaskStatusUpdateEvent statusUpdate) { + taskIdRef.compareAndSet(null, statusUpdate.taskId()); finalStateRef.set(statusUpdate.status().state()); } } @@ -977,6 +974,9 @@ public void testSubscribeToTaskWithInterruptedStateKeepsStreamOpen() throws Exce assertEquals(TaskState.TASK_STATE_INPUT_REQUIRED, finalState, "Task should be in INPUT_REQUIRED state after agent completes"); + String taskId = taskIdRef.get(); + assertNotNull(taskId, "Should have captured server-generated taskId"); + // CRITICAL: At this point the agent has completed with INPUT_REQUIRED state // The grace period logic should NOT close the queue because INPUT_REQUIRED // is an interrupted state, not a terminal state @@ -1052,7 +1052,7 @@ public void testSubscribeToTaskWithInterruptedStateKeepsStreamOpen() throws Exce Message followUpMessage = Message.builder() .messageId("input-response-" + UUID.randomUUID()) .role(Message.Role.ROLE_USER) - .parts(new TextPart("User input")) + .parts(new TextPart("input-required:User input")) .taskId(taskId) .build(); @@ -1075,7 +1075,10 @@ public void testSubscribeToTaskWithInterruptedStateKeepsStreamOpen() throws Exce assertNull(subscribeErrorRef.get(), "Should not have any errors"); } finally { - deleteTaskInTaskStore(taskId); + String taskId = taskIdRef.get(); + if (taskId != null) { + deleteTaskInTaskStore(taskId); + } } } @@ -1603,13 +1606,12 @@ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exceptio @Test @Timeout(value = 1, unit = TimeUnit.MINUTES) public void testNonBlockingWithMultipleMessages() throws Exception { - String multiEventTaskId = "multi-event-test-" + java.util.UUID.randomUUID(); + AtomicReference generatedTaskIdRef = new AtomicReference<>(); try { - // 1. Send first non-blocking message to create task in WORKING state + // 1. Send first non-blocking message without taskId - server generates one + // Routing is by message content prefix "multi-event:first" Message message1 = Message.builder(MESSAGE) - .taskId(multiEventTaskId) - .contextId("test-context") - .parts(new TextPart("First request")) + .parts(new TextPart("multi-event:first")) .build(); AtomicReference taskIdRef = new AtomicReference<>(); @@ -1632,7 +1634,7 @@ public void testNonBlockingWithMultipleMessages() throws Exception { assertTrue(firstTaskLatch.await(10, TimeUnit.SECONDS)); String taskId = taskIdRef.get(); assertNotNull(taskId); - assertEquals(multiEventTaskId, taskId); + generatedTaskIdRef.set(taskId); // 2. Subscribe to task (queue should still be open) CountDownLatch resubEventLatch = new CountDownLatch(2); // artifact-2 + completion @@ -1693,9 +1695,8 @@ public void testNonBlockingWithMultipleMessages() throws Exception { // 3. Send second streaming message to same taskId Message message2 = Message.builder(MESSAGE) - .taskId(multiEventTaskId) // Same taskId - .contextId("test-context") - .parts(new TextPart("Second request")) + .taskId(taskId) + .parts(new TextPart("multi-event:second")) .build(); CountDownLatch streamEventLatch = new CountDownLatch(2); // artifact-2 + completion @@ -1784,7 +1785,10 @@ public void testNonBlockingWithMultipleMessages() throws Exception { assertEquals("Second message artifact", ((TextPart) streamArtifact.artifact().parts().get(0)).text()); } finally { - deleteTaskInTaskStore(multiEventTaskId); + String taskId = generatedTaskIdRef.get(); + if (taskId != null) { + deleteTaskInTaskStore(taskId); + } } } @@ -1819,14 +1823,13 @@ private boolean awaitChildQueueCountStable(String taskId, int expectedCount, lon @Test @Timeout(value = 1, unit = TimeUnit.MINUTES) public void testInputRequiredWorkflow() throws Exception { - String inputRequiredTaskId = "input-required-test-" + java.util.UUID.randomUUID(); - boolean taskCreated = false; + AtomicBoolean taskCreated = new AtomicBoolean(false); + AtomicReference inputRequiredTaskIdRef = new AtomicReference<>(); try { - // 1. Send initial message - AgentExecutor will transition task to INPUT_REQUIRED + // 1. Send initial message without taskId - server generates one + // Routing is by message content prefix "input-required:" Message initialMessage = Message.builder(MESSAGE) - .taskId(inputRequiredTaskId) - .contextId("test-context") - .parts(new TextPart("Initial request")) + .parts(new TextPart("input-required:Initial request")) .build(); CountDownLatch initialLatch = new CountDownLatch(1); @@ -1839,10 +1842,12 @@ public void testInputRequiredWorkflow() throws Exception { return; } if (event instanceof TaskEvent te) { + inputRequiredTaskIdRef.compareAndSet(null, te.getTask().id()); TaskState state = te.getTask().status().state(); initialState.set(state); // Only count down when we receive INPUT_REQUIRED, not intermediate states like WORKING if (state == TaskState.TASK_STATE_INPUT_REQUIRED) { + taskCreated.set(true); initialLatch.countDown(); } } else { @@ -1855,13 +1860,14 @@ public void testInputRequiredWorkflow() throws Exception { assertTrue(initialLatch.await(10, TimeUnit.SECONDS)); assertFalse(initialUnexpectedEvent.get()); assertEquals(TaskState.TASK_STATE_INPUT_REQUIRED, initialState.get()); - taskCreated = true; + + String inputRequiredTaskId = inputRequiredTaskIdRef.get(); + assertNotNull(inputRequiredTaskId, "Should have captured server-generated taskId"); // 2. Send input message - AgentExecutor will complete the task Message inputMessage = Message.builder(MESSAGE) .taskId(inputRequiredTaskId) - .contextId("test-context") - .parts(new TextPart("User input")) + .parts(new TextPart("input-required:User input")) .build(); CountDownLatch completionLatch = new CountDownLatch(1); @@ -1892,8 +1898,11 @@ public void testInputRequiredWorkflow() throws Exception { assertEquals(TaskState.TASK_STATE_COMPLETED, completedState.get()); } finally { - if (taskCreated) { - deleteTaskInTaskStore(inputRequiredTaskId); + if (taskCreated.get()) { + String taskId = inputRequiredTaskIdRef.get(); + if (taskId != null) { + deleteTaskInTaskStore(taskId); + } } } } @@ -1917,14 +1926,13 @@ public void testInputRequiredWorkflow() throws Exception { @Test @Timeout(value = 1, unit = TimeUnit.MINUTES) public void testAuthRequiredWorkflow() throws Exception { - String authRequiredTaskId = "auth-required-test-" + java.util.UUID.randomUUID(); - boolean taskCreated = false; + AtomicBoolean taskCreated = new AtomicBoolean(false); + AtomicReference authRequiredTaskIdRef = new AtomicReference<>(); try { - // 1. Send initial message - AgentExecutor will transition task to AUTH_REQUIRED then continue in background + // 1. Send initial message without taskId - server generates one + // Routing is by message content prefix "auth-required:" Message initialMessage = Message.builder(MESSAGE) - .taskId(authRequiredTaskId) - .contextId("test-context") - .parts(new TextPart("Initial request requiring auth")) + .parts(new TextPart("auth-required:Initial request requiring auth")) .build(); CountDownLatch initialLatch = new CountDownLatch(1); @@ -1937,10 +1945,12 @@ public void testAuthRequiredWorkflow() throws Exception { return; } if (event instanceof TaskEvent te) { + authRequiredTaskIdRef.compareAndSet(null, te.getTask().id()); TaskState state = te.getTask().status().state(); initialState.set(state); // Only count down when we receive AUTH_REQUIRED, not intermediate states like WORKING if (state == TaskState.TASK_STATE_AUTH_REQUIRED) { + taskCreated.set(true); initialLatch.countDown(); } } else { @@ -1953,7 +1963,9 @@ public void testAuthRequiredWorkflow() throws Exception { assertTrue(initialLatch.await(10, TimeUnit.SECONDS), "Should receive AUTH_REQUIRED state"); assertFalse(initialUnexpectedEvent.get(), "Should only receive TaskEvent"); assertEquals(TaskState.TASK_STATE_AUTH_REQUIRED, initialState.get(), "Task should be in AUTH_REQUIRED state"); - taskCreated = true; + + String authRequiredTaskId = authRequiredTaskIdRef.get(); + assertNotNull(authRequiredTaskId, "Should have captured server-generated taskId"); // 2. Subscribe to task to catch background completion // Agent continues executing after returning AUTH_REQUIRED (simulating out-of-band auth flow) @@ -2010,20 +2022,23 @@ public void testAuthRequiredWorkflow() throws Exception { assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS), "Subscription should be established"); // Note: We don't use awaitChildQueueCountStable() here because the agent is already running - // in the background (sleeping for 3s). By the time we check, it might have already completed. + // in the background (sleeping for 2s). By the time we check, it might have already completed. // The subscriptionLatch already ensures the subscription is established, and completionLatch // below will catch the COMPLETED event from the background agent. // 3. Verify subscription receives COMPLETED state from background agent execution - // Agent should complete after simulating out-of-band auth delay (500ms) + // Agent should complete after simulating out-of-band auth delay (2000ms) assertTrue(completionLatch.await(10, TimeUnit.SECONDS), "Should receive COMPLETED state from background agent"); assertFalse(completionUnexpectedEvent.get(), "Should only receive TaskEvent"); assertNull(errorRef.get(), "Should not receive errors"); assertEquals(TaskState.TASK_STATE_COMPLETED, completedState.get(), "Task should be COMPLETED after background auth"); } finally { - if (taskCreated) { - deleteTaskInTaskStore(authRequiredTaskId); + if (taskCreated.get()) { + String taskId = authRequiredTaskIdRef.get(); + if (taskId != null) { + deleteTaskInTaskStore(taskId); + } } } } @@ -2199,6 +2214,8 @@ public void testStreamingMethodWithoutAcceptHeader() throws Exception { } private void testSendStreamingMessageWithHttpClient(String mediaType) throws Exception { + saveTaskInTaskStore(MINIMAL_TASK); + try { Message message = Message.builder(MESSAGE) .taskId(MINIMAL_TASK.id()) .contextId(MINIMAL_TASK.contextId()) @@ -2245,6 +2262,9 @@ private void testSendStreamingMessageWithHttpClient(String mediaType) throws Exc Assertions.assertTrue(dataRead); Assertions.assertNull(errorRef.get()); + } finally { + deleteTaskInTaskStore(MINIMAL_TASK.id()); + } } public void testSendStreamingMessage(boolean createTask) throws Exception { @@ -2252,10 +2272,11 @@ public void testSendStreamingMessage(boolean createTask) throws Exception { saveTaskInTaskStore(MINIMAL_TASK); } try { - Message message = Message.builder(MESSAGE) - .taskId(MINIMAL_TASK.id()) - .contextId(MINIMAL_TASK.contextId()) - .build(); + Message.Builder messageBuilder = Message.builder(MESSAGE); + if (createTask) { + messageBuilder.taskId(MINIMAL_TASK.id()).contextId(MINIMAL_TASK.contextId()); + } + Message message = messageBuilder.build(); CountDownLatch latch = new CountDownLatch(1); AtomicReference receivedMessage = new AtomicReference<>(); @@ -2759,30 +2780,29 @@ public void testMainQueueStaysOpenForNonFinalTasks() throws Exception { @Test @Timeout(value = 2, unit = TimeUnit.MINUTES) public void testMainQueueClosesForFinalizedTasks() throws Exception { - String taskId = "completed-task-integration"; - String contextId = "completed-ctx"; - - // Send a message that will create and complete the task + // Send a message without taskId - server generates one Message message = Message.builder(MESSAGE) - .taskId(taskId) - .contextId(contextId) .parts(new TextPart("complete task")) .build(); CountDownLatch completionLatch = new CountDownLatch(1); AtomicReference errorRef = new AtomicReference<>(); + AtomicReference generatedTaskId = new AtomicReference<>(); BiConsumer consumer = (event, agentCard) -> { if (event instanceof TaskEvent te) { + generatedTaskId.compareAndSet(null, te.getTask().id()); // Might get Task with final state if (te.getTask().status().state().isFinal()) { completionLatch.countDown(); } } else if (event instanceof MessageEvent me) { - // Message is considered a final event + // Message is considered a final event - capture taskId from the message + generatedTaskId.compareAndSet(null, me.getMessage().taskId()); completionLatch.countDown(); } else if (event instanceof TaskUpdateEvent tue && tue.getUpdateEvent() instanceof TaskStatusUpdateEvent status) { + generatedTaskId.compareAndSet(null, status.taskId()); if (status.isFinal()) { completionLatch.countDown(); } @@ -2804,6 +2824,9 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception { "Should receive final event"); assertNull(errorRef.get(), "Should not have errors during message send"); + String taskId = generatedTaskId.get(); + assertNotNull(taskId, "Should have captured server-generated taskId"); + // Give cleanup time to run after final event Thread.sleep(2000); @@ -2857,13 +2880,16 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception { } finally { // Task might not exist in store if created via message send - try { - Task task = getTaskFromTaskStore(taskId); - if (task != null) { - deleteTaskInTaskStore(taskId); + String taskId = generatedTaskId.get(); + if (taskId != null) { + try { + Task task = getTaskFromTaskStore(taskId); + if (task != null) { + deleteTaskInTaskStore(taskId); + } + } catch (Exception e) { + // Ignore cleanup errors } - } catch (Exception e) { - // Ignore cleanup errors - task might not have been persisted } } } @@ -2887,11 +2913,8 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception { */ @Test public void testAgentToAgentDelegation() throws Exception { - String delegationTaskId = "agent-to-agent-test-" + UUID.randomUUID(); - + // No taskId - server generates one; routing is by message content prefix "delegate:" Message delegationMessage = Message.builder() - .taskId(delegationTaskId) - .contextId("agent-to-agent-context") .role(Message.Role.ROLE_USER) .parts(new TextPart("delegate:What is 2+2?")) .build(); @@ -2928,10 +2951,6 @@ public void testAgentToAgentDelegation() throws Exception { String delegatedText = extractTextFromTask(delegationResult); assertTrue(delegatedText.contains("Handled locally:"), "Delegated content should have been handled locally by target agent. Got: " + delegatedText); - - // Verify the task ID is the original one (not the delegated task's ID) - assertEquals(delegationTaskId, delegationResult.id(), - "Task ID should be the original task ID, not the delegated task's ID"); } /** @@ -2949,13 +2968,10 @@ public void testAgentToAgentDelegation() throws Exception { */ @Test public void testAgentToAgentLocalHandling() throws Exception { - String localTaskId = "agent-to-agent-test-" + UUID.randomUUID(); - + // No taskId - server generates one; routing is by message content prefix "a2a-local:" Message localMessage = Message.builder() - .taskId(localTaskId) - .contextId("agent-to-agent-context") .role(Message.Role.ROLE_USER) - .parts(new TextPart("Hello directly")) + .parts(new TextPart("a2a-local:Hello directly")) .build(); CountDownLatch localLatch = new CountDownLatch(1); diff --git a/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AgentExecutorProducer.java index f6f72be1d..ba9ce0676 100644 --- a/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AgentExecutorProducer.java +++ b/tests/server-common/src/test/java/org/a2aproject/sdk/server/apps/common/AgentExecutorProducer.java @@ -48,92 +48,72 @@ public AgentExecutor agentExecutor() { @Override public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { String taskId = context.getTaskId(); + String input = context.getMessage() != null ? extractTextFromMessage(context.getMessage()) : ""; - // Agent-to-agent communication test - if (taskId != null && taskId.startsWith("agent-to-agent-test")) { + // Agent-to-agent communication test (routed by message content prefix) + if (input.startsWith("delegate:") || input.startsWith("a2a-local:")) { handleAgentToAgentTest(context, agentEmitter); return; } - // Special handling for multi-event test - if (taskId != null && taskId.startsWith("multi-event-test")) { - // First call: context.getTask() == null (new task) - if (context.getTask() == null) { - agentEmitter.startWork(); - // Return immediately - queue stays open because task is in WORKING state - return; - } else { - // Second call: context.getTask() != null (existing task) - agentEmitter.addArtifact( - List.of(new TextPart("Second message artifact")), - "artifact-2", "Second Artifact", null); - agentEmitter.complete(); - return; - } + // Special handling for multi-event test (routed by message content) + if (input.startsWith("multi-event:first")) { + agentEmitter.startWork(); + // Return immediately - queue stays open because task is in WORKING state + return; + } + if (input.startsWith("multi-event:second")) { + agentEmitter.addArtifact( + List.of(new TextPart("Second message artifact")), + "artifact-2", "Second Artifact", null); + agentEmitter.complete(); + return; } - // Special handling for input-required test - if (taskId != null && taskId.startsWith("input-required-test")) { - String input = extractTextFromMessage(context.getMessage()); + // Special handling for input-required test (routed by message content) + if (input.startsWith("input-required:")) { + String payload = input.substring("input-required:".length()); // Second call: user provided the required input - complete the task - if ("User input".equals(input)) { - // Go directly to COMPLETED without intermediate WORKING state - // This avoids race condition where blocking call interrupts on WORKING + if ("User input".equals(payload)) { agentEmitter.complete(); return; } - // First call: any other message - emit INPUT_REQUIRED - // Go directly to INPUT_REQUIRED without intermediate WORKING state - // This avoids race condition where blocking call interrupts on WORKING - // before INPUT_REQUIRED is persisted to TaskStore + // First call: emit INPUT_REQUIRED agentEmitter.requiresInput(agentEmitter.newAgentMessage( List.of(new TextPart("Please provide additional information")), context.getMessage().metadata())); - // Return immediately - queue stays open because task is in INPUT_REQUIRED state return; } - // Special handling for auth-required test - if (taskId != null && taskId.startsWith("auth-required-test")) { - // AUTH_REQUIRED workflow: agent emits AUTH_REQUIRED, simulates out-of-band auth delay, then completes - // Go directly to AUTH_REQUIRED without intermediate WORKING state - // This avoids race condition where blocking call interrupts on WORKING - // before AUTH_REQUIRED is persisted to TaskStore + // Special handling for auth-required test (routed by message content) + if (input.startsWith("auth-required:")) { agentEmitter.requiresAuth(agentEmitter.newAgentMessage( List.of(new TextPart("Please authenticate with OAuth provider")), context.getMessage().metadata())); try { - // Simulate out-of-band authentication delay (user authenticates externally) - // Sleep long enough for test to establish subscription and wait for completion Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new InternalError("Auth simulation interrupted: " + e.getMessage()); } - // Complete task (auth "received" out-of-band) - // Agent continues after AUTH_REQUIRED without new request agentEmitter.complete(); return; } - if (context.getTaskId().equals("task-not-supported-123")) { + if ("task-not-supported-123".equals(taskId)) { throw new UnsupportedOperationError(); } // Check for delegated agent-to-agent messages (marked with special prefix) - if (context.getMessage() != null) { - String userInput = extractTextFromMessage(context.getMessage()); - if (userInput.startsWith("#a2a-delegated#")) { - // This is a delegated message from agent-to-agent test - complete it - String actualContent = userInput.substring("#a2a-delegated#".length()); - agentEmitter.startWork(); - String response = "Handled locally: " + actualContent; - agentEmitter.addArtifact(List.of(new TextPart(response))); - agentEmitter.complete(); - return; - } + if (input.startsWith("#a2a-delegated#")) { + String actualContent = input.substring("#a2a-delegated#".length()); + agentEmitter.startWork(); + String response = "Handled locally: " + actualContent; + agentEmitter.addArtifact(List.of(new TextPart(response))); + agentEmitter.complete(); + return; } // Default handler: echo back message or task @@ -182,6 +162,8 @@ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEm // Check for delegation pattern if (userInput.startsWith("delegate:")) { handleDelegation(userInput, transportProtocol, agentEmitter); + } else if (userInput.startsWith("a2a-local:")) { + handleLocally(userInput.substring("a2a-local:".length()), agentEmitter); } else { handleLocally(userInput, agentEmitter); } diff --git a/transport/grpc/src/test/java/org/a2aproject/sdk/transport/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/org/a2aproject/sdk/transport/grpc/handler/GrpcHandlerTest.java index 28b32cc4e..a3f281b3b 100644 --- a/transport/grpc/src/test/java/org/a2aproject/sdk/transport/grpc/handler/GrpcHandlerTest.java +++ b/transport/grpc/src/test/java/org/a2aproject/sdk/transport/grpc/handler/GrpcHandlerTest.java @@ -180,6 +180,7 @@ public void testOnCancelTaskNotFound() throws Exception { @Test public void testOnMessageNewMessageSuccess() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); + taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { agentEmitter.sendMessage(context.getMessage()); }; @@ -212,6 +213,7 @@ public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception { @Test public void testOnMessageError() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); + taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { agentEmitter.fail(new UnsupportedOperationError()); }; @@ -297,8 +299,9 @@ public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception { @Test public void testOnMessageStreamNewMessageSuccess() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); + taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { - agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentEmitter.sendMessage(context.getMessage()); }; StreamRecorder streamRecorder = sendStreamingMessageRequest(handler); @@ -774,6 +777,7 @@ public void testOnGetExtendedAgentCard() throws Exception { @Test public void testStreamingDoesNotBlockMainThread() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); + taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); // Track if the main thread gets blocked during streaming AtomicBoolean eventReceived = new AtomicBoolean(false); @@ -944,6 +948,7 @@ public ServerCallContext create(StreamObserver streamObserver) { }; } }; + taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { agentEmitter.sendMessage(context.getMessage()); @@ -1085,6 +1090,7 @@ public ServerCallContext create(StreamObserver streamObserver) { }; } }; + taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { agentEmitter.sendMessage(context.getMessage()); @@ -1136,6 +1142,7 @@ public ServerCallContext create(StreamObserver streamObserver) { }; } }; + taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { agentEmitter.sendMessage(context.getMessage()); diff --git a/transport/jsonrpc/src/test/java/org/a2aproject/sdk/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/org/a2aproject/sdk/transport/jsonrpc/handler/JSONRPCHandlerTest.java index 83751e919..f8d4cf74d 100644 --- a/transport/jsonrpc/src/test/java/org/a2aproject/sdk/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/org/a2aproject/sdk/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -174,6 +174,7 @@ public void testOnCancelTaskNotFound() { @Test public void testOnMessageNewMessageSuccess() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { agentEmitter.sendMessage(context.getMessage()); }; @@ -209,6 +210,7 @@ public void testOnMessageError() { // See testMessageOnErrorMocks() for a test more similar to the Python implementation, using mocks for // EventConsumer.consumeAll() JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { agentEmitter.fail(new UnsupportedOperationError()); }; @@ -226,6 +228,7 @@ public void testOnMessageError() { @Test public void testOnMessageErrorMocks() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); Message message = Message.builder(MESSAGE) .taskId(MINIMAL_TASK.id()) .contextId(MINIMAL_TASK.contextId()) @@ -248,8 +251,9 @@ public void testOnMessageErrorMocks() { @Test public void testOnMessageStreamNewMessageSuccess() throws InterruptedException { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); agentExecutorExecute = (context, agentEmitter) -> { - agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentEmitter.sendMessage(context.getMessage()); }; Message message = Message.builder(MESSAGE) @@ -307,6 +311,7 @@ public void testOnMessageStreamNewMessageMultipleEventsSuccess() throws Interrup // Note: Do NOT set callback - DefaultRequestHandler has a permanent callback // We'll verify persistence by checking TaskStore after streaming completes JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); // Create multiple events to be sent during streaming Task taskEvent = Task.builder(MINIMAL_TASK) @@ -426,6 +431,7 @@ public void onComplete() { @Test public void testOnMessageStreamNewMessageSuccessMocks() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); // This is used to send events from a mock List events = List.of( @@ -1728,6 +1734,7 @@ public void testRequiredExtensionProvidedSuccess() { .build(); JSONRPCHandler handler = new JSONRPCHandler(cardWithExtension, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); // Create context WITH the required extension Set requestedExtensions = new HashSet<>(); @@ -1887,6 +1894,7 @@ public void testCompatibleVersionSuccess() { .build(); JSONRPCHandler handler = new JSONRPCHandler(agentCard, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); // Create context with compatible version 1.1 ServerCallContext contextWithVersion = new ServerCallContext( @@ -1930,6 +1938,7 @@ public void testNoVersionDefaultsToCurrentVersionSuccess() { .build(); JSONRPCHandler handler = new JSONRPCHandler(agentCard, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK, false); // Use default callContext (no version - should default to 1.0) agentExecutorExecute = (context, agentEmitter) -> {