Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import org.a2aproject.sdk.client.Client;
import org.a2aproject.sdk.client.TaskEvent;
import org.a2aproject.sdk.client.TaskUpdateEvent;
import org.a2aproject.sdk.client.config.ClientConfig;
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfigBuilder;
Expand Down Expand Up @@ -98,23 +101,33 @@ public void testDirectNotificationTrigger() {

@Test
public void testJpaDatabasePushNotificationConfigStoreIntegration() throws Exception {
final String taskId = "push-notify-test-" + System.currentTimeMillis();
final String contextId = "test-context";

// Step 1: Create the task
// Step 1: Create the task (no client-provided taskId — server generates it)
Message createMessage = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(List.of(new TextPart("create"))) // Send the "create" command
.taskId(taskId)
.messageId("test-msg-1")
.contextId(contextId)
.build();

// Use a latch to wait for the first operation to complete
// Use a latch to wait for the first operation to complete and capture server-generated ids
CountDownLatch createLatch = new CountDownLatch(1);
client.sendMessage(createMessage, List.of((event, card) -> createLatch.countDown()), (e) -> createLatch.countDown());
AtomicReference<Task> createdTaskRef = new AtomicReference<>();
client.sendMessage(createMessage, List.of((event, card) -> {
if (event instanceof TaskEvent taskEvent) {
createdTaskRef.set(taskEvent.getTask());
createLatch.countDown();
return;
}
if (event instanceof TaskUpdateEvent taskUpdateEvent) {
createdTaskRef.set(taskUpdateEvent.getTask());
createLatch.countDown();
}
}), (e) -> createLatch.countDown());
assertTrue(createLatch.await(10, TimeUnit.SECONDS), "Timeout waiting for task creation");

assertNotNull(createdTaskRef.get(), "Task should have been created");
final String taskId = createdTaskRef.get().id();
final String contextId = createdTaskRef.get().contextId();

// Step 2: Set the push notification configuration
TaskPushNotificationConfig taskPushConfig = TaskPushNotificationConfig.builder()
.id("test-config-1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -19,6 +21,7 @@
import org.a2aproject.sdk.A2A;
import org.a2aproject.sdk.client.Client;
import org.a2aproject.sdk.client.ClientEvent;
import org.a2aproject.sdk.client.TaskEvent;
import org.a2aproject.sdk.client.config.ClientConfig;
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig;
Expand Down Expand Up @@ -254,15 +257,13 @@ public void testInfrastructureStartup() {
*/
@Test
public void testMultiInstanceEventReplication() throws Exception {
final String taskId = "replication-test-task-" + System.currentTimeMillis();
final String contextId = "replication-test-context";

Throwable testFailure = null;
final String[] taskIdHolder = {null};
final String[] contextIdHolder = {null};

try {
// Step 1: Send initial message NON-streaming to create task
// Step 1: Send initial message NON-streaming to create task (no client-provided taskId)
Message initialMessage = Message.builder(A2A.toUserMessage("Initial test message"))
.taskId(taskId)
.contextId(contextId)
.build();

// Use NON-streaming client to create the task
Expand All @@ -272,13 +273,22 @@ public void testMultiInstanceEventReplication() throws Exception {
.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
.build();

Task createdTask = null;
try {
nonStreamingClient.sendMessage(initialMessage, null);
CountDownLatch createLatch = new CountDownLatch(1);
AtomicReference<Task> taskRef = new AtomicReference<>();
nonStreamingClient.sendMessage(initialMessage, List.of((ClientEvent event, AgentCard card) -> {
if (event instanceof TaskEvent te) {
taskRef.set(te.getTask());
}
createLatch.countDown();
}), (Throwable err) -> createLatch.countDown());
assertTrue(createLatch.await(15, TimeUnit.SECONDS), "Task creation timed out");

// Retrieve the task to verify it was created
createdTask = nonStreamingClient.getTask(new TaskQueryParams(taskId), null);
Task createdTask = taskRef.get();
assertNotNull(createdTask, "Task should be created");
taskIdHolder[0] = createdTask.id();
contextIdHolder[0] = createdTask.contextId();
assertNotNull(taskIdHolder[0], "Server-generated task ID should not be null");

// Task should be in a non-final state (SUBMITTED or WORKING are both valid)
TaskState state = createdTask.status().state();
Expand All @@ -297,6 +307,9 @@ public void testMultiInstanceEventReplication() throws Exception {
throw e;
}

final String taskId = taskIdHolder[0];
final String contextId = contextIdHolder[0];

// Step 2: Subscribe from both app1 and app2 with proper latches

// We need to wait for at least 3 new events after resubscription:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,14 @@ public void tearDown() throws Exception {

@Test
public void testA2AMessageReplicatedToKafka() throws Exception {
String taskId = "kafka-replication-test-" + System.currentTimeMillis();
String contextId = "test-context-" + System.currentTimeMillis();

// Clear any previous events
testConsumer.clear();

// Send A2A message that should trigger events and replication
// Send A2A message that should trigger events and replication (no client-provided taskId)
Message message = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(List.of(new TextPart("create")))
.taskId(taskId)
.messageId("test-msg-" + System.currentTimeMillis())
.contextId(contextId)
.build();

CountDownLatch a2aLatch = new CountDownLatch(1);
Expand All @@ -166,7 +161,9 @@ public void testA2AMessageReplicatedToKafka() throws Exception {

Task task = createdTask.get();
assertNotNull(task, "Task should be created");
assertEquals(taskId, task.id());
String taskId = task.id();
String contextId = task.contextId();
assertNotNull(taskId, "Server-generated task ID should not be null");
assertEquals(TaskState.TASK_STATE_SUBMITTED, task.status().state());

// Wait for the event to be replicated to Kafka
Expand All @@ -193,19 +190,14 @@ public void testA2AMessageReplicatedToKafka() throws Exception {

@Test
public void testKafkaEventReceivedByA2AServer() throws Exception {
String taskId = "kafka-to-a2a-test-" + System.currentTimeMillis();
String contextId = "test-context-" + System.currentTimeMillis();

// Clear any previous events
testConsumer.clear();

// First create a task in the A2A system using non-streaming client
// First create a task in the A2A system using non-streaming client (no client-provided taskId)
Message createMessage = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(List.of(new TextPart("create")))
.taskId(taskId)
.messageId("create-msg-" + System.currentTimeMillis())
.contextId(contextId)
.build();

CountDownLatch createLatch = new CountDownLatch(1);
Expand All @@ -223,6 +215,9 @@ public void testKafkaEventReceivedByA2AServer() throws Exception {
assertTrue(createLatch.await(15, TimeUnit.SECONDS), "Task creation timed out");
Task initialTask = createdTask.get();
assertNotNull(initialTask, "Task should be created");
String taskId = initialTask.id();
String contextId = initialTask.contextId();
assertNotNull(taskId, "Server-generated task ID should not be null");
assertEquals(TaskState.TASK_STATE_SUBMITTED, initialTask.status().state(), "Initial task should be in SUBMITTED state");

// Add a small delay to ensure the task is fully processed before resubscription
Expand Down Expand Up @@ -312,20 +307,15 @@ public void testKafkaEventReceivedByA2AServer() throws Exception {

@Test
public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {
String taskId = "queue-closed-test-" + System.currentTimeMillis();
String contextId = "test-context-" + System.currentTimeMillis();

// Clear any previous events
testConsumer.clear();

// Use polling (non-blocking) client with "working" command
// Use polling (non-blocking) client with "working" command (no client-provided taskId)
// This creates task in WORKING state (non-final) and keeps queue alive
Message workingMessage = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(List.of(new TextPart("working")))
.taskId(taskId)
.messageId("working-msg-" + System.currentTimeMillis())
.contextId(contextId)
.build();

CountDownLatch workingLatch = new CountDownLatch(1);
Expand All @@ -348,7 +338,7 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {
assertTrue(workingLatch.await(15, TimeUnit.SECONDS), "Task creation timed out");
String createdTaskId = taskIdRef.get();
assertNotNull(createdTaskId, "Task should be created");
assertEquals(taskId, createdTaskId);
String taskId = createdTaskId;

// Set up streaming resubscription to listen for the QueueClosedEvent
CountDownLatch streamCompletedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -408,19 +398,14 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {

@Test
public void testPoisonPillGenerationOnTaskFinalization() throws Exception {
String taskId = "poison-pill-gen-test-" + System.currentTimeMillis();
String contextId = "test-context-" + System.currentTimeMillis();

// Clear any previous events
testConsumer.clear();

// Create a task that will be completed (finalized)
// Create a task that will be completed (finalized) - no client-provided taskId
Message completeMessage = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(List.of(new TextPart("complete")))
.taskId(taskId)
.messageId("complete-msg-" + System.currentTimeMillis())
.contextId(contextId)
.build();

CountDownLatch completeLatch = new CountDownLatch(1);
Expand All @@ -439,6 +424,8 @@ public void testPoisonPillGenerationOnTaskFinalization() throws Exception {
assertTrue(completeLatch.await(15, TimeUnit.SECONDS), "Task creation timed out");
Task createdTask = finalTask.get();
assertNotNull(createdTask, "Task should be created");
String taskId = createdTask.id();
assertNotNull(taskId, "Server-generated task ID should not be null");

// The task should complete very quickly since it's a simple operation
// Wait a moment to ensure all events have been enqueued
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Collections;
Expand Down Expand Up @@ -69,17 +68,11 @@ public void testIsJpaDatabaseTaskStore() {

@Test
public void testJpaDatabaseTaskStore() throws Exception {
final String taskId = "test-task-1";
final String contextId = "contextId";

// Send a message creating the Task
assertNull(taskStore.get(taskId));
// Send a message creating the Task (no client-provided taskId — server generates it)
Message userMessage = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(Collections.singletonList(new TextPart("create")))
.taskId(taskId)
.messageId("test-msg-1")
.contextId(contextId)
.build();

CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -102,6 +95,9 @@ public void testJpaDatabaseTaskStore() throws Exception {
assertEquals(0, createdTask.artifacts().size());
assertEquals(TaskState.TASK_STATE_SUBMITTED, createdTask.status().state());

final String taskId = createdTask.id();
final String contextId = createdTask.contextId();

// Send a message updating the Task
userMessage = Message.builder()
.role(Message.Role.ROLE_USER)
Expand Down
Loading
Loading