From 481bdbcf397d5e723e07d843a253ed9067cf913c Mon Sep 17 00:00:00 2001 From: Rui Figueiredo Date: Wed, 8 Apr 2026 22:37:33 +0100 Subject: [PATCH 1/4] fix: return task immediately when blocking=False in on_message_send When `blocking=False` is set in MessageSendConfiguration, the handler now returns the Task object immediately without waiting for executor events. Event consumption and push notifications are processed entirely in the background. Previously, even with `blocking=False`, the handler waited for the first event from the EventConsumer polling loop (0.5s timeout per iteration). When the event loop was busy with background work, this caused 5-7s delays before the HTTP response was sent, leading to client timeouts. The new non-blocking fast path: 1. Creates the task and starts the executor (via _setup_message_execution) 2. Returns the task in 'submitted' state immediately 3. Consumes events and sends push notifications in a background task The blocking path remains unchanged for backward compatibility. Fixes #951 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../default_request_handler.py | 50 ++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index 3bd6a0dc2..a0c0271bf 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -41,6 +41,7 @@ TaskPushNotificationConfig, TaskQueryParams, TaskState, + TaskStatus, UnsupportedOperationError, ) from a2a.utils.errors import ServerError @@ -295,6 +296,13 @@ async def on_message_send( Starts the agent execution for the message and waits for the final result (Task or Message). + + When ``blocking`` is ``False``, the handler returns the task + immediately without waiting for executor events and processes + everything in the background. Results are delivered via push + notifications. This avoids the latency introduced by the + ``EventConsumer`` polling loop which can add seconds of delay + when the event loop is busy with other work. """ ( _task_manager, @@ -311,6 +319,46 @@ async def on_message_send( if params.configuration and params.configuration.blocking is False: blocking = False + # Non-blocking fast path: return the task immediately and process + # events entirely in the background via push notifications. + if not blocking: + task = await _task_manager.get_task() + if not task: + task = Task( + id=task_id, + context_id=params.message.context_id, + status=TaskStatus(state=TaskState.submitted), + history=[params.message], + ) + + async def _background_consume() -> None: + try: + async for _event in result_aggregator.consume_and_emit( + consumer + ): + await self._send_push_notification_if_needed( + task_id, result_aggregator + ) + except Exception: + logger.exception( + 'Background event consumption failed for task %s', + task_id, + ) + finally: + await self._cleanup_producer(producer_task, task_id) + + bg_task = asyncio.create_task(_background_consume()) + bg_task.set_name(f'non_blocking_consume:{task_id}') + self._track_background_task(bg_task) + + if params.configuration: + task = apply_history_length( + task, params.configuration.history_length + ) + + return task + + # Blocking path: wait for completion or interruption. interrupted_or_non_blocking = False try: # Create async callback for push notifications @@ -325,7 +373,7 @@ async def push_notification_callback() -> None: bg_consume_task, ) = await result_aggregator.consume_and_break_on_interrupt( consumer, - blocking=blocking, + blocking=True, event_callback=push_notification_callback, ) From c74ec079dfcce95d69357f99e42cf216ab2bfaa4 Mon Sep 17 00:00:00 2001 From: Rui Figueiredo Date: Wed, 8 Apr 2026 22:49:33 +0100 Subject: [PATCH 2/4] fix: extract non-blocking path to method, update tests - Extract non-blocking logic to _on_message_send_non_blocking to fix PLR0915 (too many statements) lint error - Update test_on_message_send_with_push_notification_in_non_blocking_request to test the new immediate-return behavior instead of mocking consume_and_break_on_interrupt - Update test_on_message_send_non_blocking history assertion to account for the immediate return having only the initial user message Co-Authored-By: Claude Opus 4.6 (1M context) --- .../default_request_handler.py | 104 +++++++----- .../test_default_request_handler.py | 149 ++++-------------- 2 files changed, 99 insertions(+), 154 deletions(-) diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index a0c0271bf..9d30b65a1 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -287,6 +287,60 @@ async def _send_push_notification_if_needed( if isinstance(latest_task, Task): await self._push_sender.send_notification(latest_task) + async def _on_message_send_non_blocking( + self, + params: MessageSendParams, + task_manager: TaskManager, + task_id: str, + queue: EventQueue, + result_aggregator: ResultAggregator, + producer_task: asyncio.Task, + consumer: EventConsumer, + ) -> Task: + """Non-blocking fast path for ``on_message_send``. + + Returns the task immediately without waiting for executor events. + Event consumption and push notifications are handled in a background + task. This avoids the latency introduced by the ``EventConsumer`` + polling loop which can add seconds of delay when the event loop is + busy with other work. + """ + task = await task_manager.get_task() + if not task: + task = Task( + id=task_id, + context_id=params.message.context_id, + status=TaskStatus(state=TaskState.submitted), + history=[params.message], + ) + + async def _background_consume() -> None: + try: + async for _event in result_aggregator.consume_and_emit( + consumer + ): + await self._send_push_notification_if_needed( + task_id, result_aggregator + ) + except Exception: + logger.exception( + 'Background event consumption failed for task %s', + task_id, + ) + finally: + await self._cleanup_producer(producer_task, task_id) + + bg_task = asyncio.create_task(_background_consume()) + bg_task.set_name(f'non_blocking_consume:{task_id}') + self._track_background_task(bg_task) + + if params.configuration: + task = apply_history_length( + task, params.configuration.history_length + ) + + return task + async def on_message_send( self, params: MessageSendParams, @@ -300,9 +354,7 @@ async def on_message_send( When ``blocking`` is ``False``, the handler returns the task immediately without waiting for executor events and processes everything in the background. Results are delivered via push - notifications. This avoids the latency introduced by the - ``EventConsumer`` polling loop which can add seconds of delay - when the event loop is busy with other work. + notifications. """ ( _task_manager, @@ -319,44 +371,16 @@ async def on_message_send( if params.configuration and params.configuration.blocking is False: blocking = False - # Non-blocking fast path: return the task immediately and process - # events entirely in the background via push notifications. if not blocking: - task = await _task_manager.get_task() - if not task: - task = Task( - id=task_id, - context_id=params.message.context_id, - status=TaskStatus(state=TaskState.submitted), - history=[params.message], - ) - - async def _background_consume() -> None: - try: - async for _event in result_aggregator.consume_and_emit( - consumer - ): - await self._send_push_notification_if_needed( - task_id, result_aggregator - ) - except Exception: - logger.exception( - 'Background event consumption failed for task %s', - task_id, - ) - finally: - await self._cleanup_producer(producer_task, task_id) - - bg_task = asyncio.create_task(_background_consume()) - bg_task.set_name(f'non_blocking_consume:{task_id}') - self._track_background_task(bg_task) - - if params.configuration: - task = apply_history_length( - task, params.configuration.history_length - ) - - return task + return await self._on_message_send_non_blocking( + params, + _task_manager, + task_id, + queue, + result_aggregator, + producer_task, + consumer, + ) # Blocking path: wait for completion or interruption. interrupted_or_non_blocking = False diff --git a/tests/server/request_handlers/test_default_request_handler.py b/tests/server/request_handlers/test_default_request_handler.py index ec2956fa2..536b7ebbf 100644 --- a/tests/server/request_handlers/test_default_request_handler.py +++ b/tests/server/request_handlers/test_default_request_handler.py @@ -460,135 +460,56 @@ async def get_current_result(): @pytest.mark.asyncio async def test_on_message_send_with_push_notification_in_non_blocking_request(): - """Test that push notification callback is called during background event processing for non-blocking requests.""" - mock_task_store = AsyncMock(spec=TaskStore) - mock_push_notification_store = AsyncMock(spec=PushNotificationConfigStore) - mock_agent_executor = AsyncMock(spec=AgentExecutor) - mock_request_context_builder = AsyncMock(spec=RequestContextBuilder) + """Test that non-blocking requests return immediately and process push notifications in background.""" + task_store = InMemoryTaskStore() + push_store = InMemoryPushNotificationConfigStore() mock_push_sender = AsyncMock() - task_id = 'non_blocking_task_1' - context_id = 'non_blocking_ctx_1' - - # Create a task that will be returned after the first event - initial_task = create_sample_task( - task_id=task_id, context_id=context_id, status_state=TaskState.working - ) - - # Create a final task that will be available during background processing - final_task = create_sample_task( - task_id=task_id, context_id=context_id, status_state=TaskState.completed - ) - - mock_task_store.get.return_value = None - - # Mock request context - mock_request_context = MagicMock(spec=RequestContext) - mock_request_context.task_id = task_id - mock_request_context.context_id = context_id - mock_request_context_builder.build.return_value = mock_request_context - request_handler = DefaultRequestHandler( - agent_executor=mock_agent_executor, - task_store=mock_task_store, - push_config_store=mock_push_notification_store, - request_context_builder=mock_request_context_builder, + agent_executor=HelloAgentExecutor(), + task_store=task_store, + push_config_store=push_store, push_sender=mock_push_sender, ) - # Configure push notification push_config = PushNotificationConfig(url='http://callback.com/push') - message_config = MessageSendConfiguration( - push_notification_config=push_config, - accepted_output_modes=['text/plain'], - blocking=False, # Non-blocking request - ) params = MessageSendParams( message=Message( role=Role.user, message_id='msg_non_blocking', - parts=[], - task_id=task_id, - context_id=context_id, + parts=[Part(root=TextPart(text='Hi'))], + ), + configuration=MessageSendConfiguration( + push_notification_config=push_config, + accepted_output_modes=['text/plain'], + blocking=False, ), - configuration=message_config, - ) - - # Mock ResultAggregator with custom behavior - mock_result_aggregator_instance = AsyncMock(spec=ResultAggregator) - - # First call returns the initial task and indicates interruption (non-blocking) - mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = ( - initial_task, - True, # interrupted = True for non-blocking - MagicMock(spec=asyncio.Task), # background task - ) - - # Mock the current_result property to return the final task - async def get_current_result(): - return final_task - - type(mock_result_aggregator_instance).current_result = PropertyMock( - return_value=get_current_result() ) - # Track if the event_callback was passed to consume_and_break_on_interrupt - event_callback_passed = False - event_callback_received = None - - async def mock_consume_and_break_on_interrupt( - consumer, blocking=True, event_callback=None - ): - nonlocal event_callback_passed, event_callback_received - event_callback_passed = event_callback is not None - event_callback_received = event_callback - return ( - initial_task, - True, - MagicMock(spec=asyncio.Task), - ) # interrupted = True for non-blocking - - mock_result_aggregator_instance.consume_and_break_on_interrupt = ( - mock_consume_and_break_on_interrupt + result = await request_handler.on_message_send( + params, create_server_call_context() ) - with ( - patch( - 'a2a.server.request_handlers.default_request_handler.ResultAggregator', - return_value=mock_result_aggregator_instance, - ), - patch( - 'a2a.server.request_handlers.default_request_handler.TaskManager.get_task', - return_value=initial_task, - ), - patch( - 'a2a.server.request_handlers.default_request_handler.TaskManager.update_with_message', - return_value=initial_task, - ), - ): - # Execute the non-blocking request - result = await request_handler.on_message_send( - params, create_server_call_context() - ) + # Non-blocking: should return immediately with submitted state + assert result is not None + assert isinstance(result, Task) + assert result.status.state == TaskState.submitted - # Verify the result is the initial task (non-blocking behavior) - assert result == initial_task + # Wait for background processing to complete + for _ in range(10): + await asyncio.sleep(0.1) + task = await task_store.get(result.id) + if task and task.status.state == TaskState.completed: + break - # Verify that the event_callback was passed to consume_and_break_on_interrupt - assert event_callback_passed, ( - 'event_callback should have been passed to consume_and_break_on_interrupt' - ) - assert event_callback_received is not None, ( - 'event_callback should not be None' - ) + assert task is not None + assert task.status.state == TaskState.completed - # Verify that the push notification was sent with the final task - mock_push_sender.send_notification.assert_called_with(final_task) + # Verify push notification config was stored + push_store.set_info.assert_awaited_once_with(result.id, push_config) - # Verify that the push notification config was stored - mock_push_notification_store.set_info.assert_awaited_once_with( - task_id, push_config - ) + # Verify push notifications were sent during background processing + assert mock_push_sender.send_notification.call_count >= 1 @pytest.mark.asyncio @@ -843,11 +764,11 @@ async def test_on_message_send_non_blocking(): assert task is not None assert task.status.state == TaskState.completed - assert ( - result.history - and task.history - and len(result.history) == len(task.history) - ) + # The immediately returned result has the initial history (user message), + # while the completed task may have additional history entries from the + # executor. The initial result should have at least the user message. + assert result.history and len(result.history) >= 1 + assert task.history and len(task.history) >= 1 @pytest.mark.asyncio From 96a4183b933544cb85e47d841c5f71d45b194cde Mon Sep 17 00:00:00 2001 From: Rui Figueiredo Date: Wed, 8 Apr 2026 22:53:18 +0100 Subject: [PATCH 3/4] fix: resolve lint PLR0913 and test assertion errors - Move background consume logic to _consume_and_notify_in_background method to reduce argument count and keep non-blocking path inline in on_message_send - Fix test assertion: use push_store.get_info() instead of assert_awaited_once_with since InMemoryPushNotificationConfigStore is not a mock Co-Authored-By: Claude Opus 4.6 (1M context) --- .../default_request_handler.py | 93 ++++++++----------- .../test_default_request_handler.py | 5 +- 2 files changed, 41 insertions(+), 57 deletions(-) diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index 9d30b65a1..625503f4f 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -287,59 +287,26 @@ async def _send_push_notification_if_needed( if isinstance(latest_task, Task): await self._push_sender.send_notification(latest_task) - async def _on_message_send_non_blocking( + async def _consume_and_notify_in_background( # noqa: PLR0913 self, - params: MessageSendParams, - task_manager: TaskManager, task_id: str, - queue: EventQueue, result_aggregator: ResultAggregator, - producer_task: asyncio.Task, consumer: EventConsumer, - ) -> Task: - """Non-blocking fast path for ``on_message_send``. - - Returns the task immediately without waiting for executor events. - Event consumption and push notifications are handled in a background - task. This avoids the latency introduced by the ``EventConsumer`` - polling loop which can add seconds of delay when the event loop is - busy with other work. - """ - task = await task_manager.get_task() - if not task: - task = Task( - id=task_id, - context_id=params.message.context_id, - status=TaskStatus(state=TaskState.submitted), - history=[params.message], - ) - - async def _background_consume() -> None: - try: - async for _event in result_aggregator.consume_and_emit( - consumer - ): - await self._send_push_notification_if_needed( - task_id, result_aggregator - ) - except Exception: - logger.exception( - 'Background event consumption failed for task %s', - task_id, + producer_task: asyncio.Task, + ) -> None: + """Consume executor events and send push notifications in background.""" + try: + async for _event in result_aggregator.consume_and_emit(consumer): + await self._send_push_notification_if_needed( + task_id, result_aggregator ) - finally: - await self._cleanup_producer(producer_task, task_id) - - bg_task = asyncio.create_task(_background_consume()) - bg_task.set_name(f'non_blocking_consume:{task_id}') - self._track_background_task(bg_task) - - if params.configuration: - task = apply_history_length( - task, params.configuration.history_length + except Exception: + logger.exception( + 'Background event consumption failed for task %s', + task_id, ) - - return task + finally: + await self._cleanup_producer(producer_task, task_id) async def on_message_send( self, @@ -371,16 +338,32 @@ async def on_message_send( if params.configuration and params.configuration.blocking is False: blocking = False + # Non-blocking fast path: return the task immediately and process + # events entirely in the background via push notifications. if not blocking: - return await self._on_message_send_non_blocking( - params, - _task_manager, - task_id, - queue, - result_aggregator, - producer_task, - consumer, + task = await _task_manager.get_task() + if not task: + task = Task( + id=task_id, + context_id=params.message.context_id, + status=TaskStatus(state=TaskState.submitted), + history=[params.message], + ) + + bg_task = asyncio.create_task( + self._consume_and_notify_in_background( + task_id, result_aggregator, consumer, producer_task + ) ) + bg_task.set_name(f'non_blocking_consume:{task_id}') + self._track_background_task(bg_task) + + if params.configuration: + task = apply_history_length( + task, params.configuration.history_length + ) + + return task # Blocking path: wait for completion or interruption. interrupted_or_non_blocking = False diff --git a/tests/server/request_handlers/test_default_request_handler.py b/tests/server/request_handlers/test_default_request_handler.py index 536b7ebbf..6a33ddfe5 100644 --- a/tests/server/request_handlers/test_default_request_handler.py +++ b/tests/server/request_handlers/test_default_request_handler.py @@ -505,8 +505,9 @@ async def test_on_message_send_with_push_notification_in_non_blocking_request(): assert task is not None assert task.status.state == TaskState.completed - # Verify push notification config was stored - push_store.set_info.assert_awaited_once_with(result.id, push_config) + # Verify push notification config was stored by checking the store directly + stored_configs = await push_store.get_info(result.id) + assert stored_configs and len(stored_configs) >= 1 # Verify push notifications were sent during background processing assert mock_push_sender.send_notification.call_count >= 1 From 18ddf4ea93e28fc44f2d249b36698d5b565740d4 Mon Sep 17 00:00:00 2001 From: Rui Figueiredo Date: Wed, 8 Apr 2026 22:55:42 +0100 Subject: [PATCH 4/4] fix: remove unused noqa directive --- src/a2a/server/request_handlers/default_request_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index 625503f4f..52e7c01bf 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -287,7 +287,7 @@ async def _send_push_notification_if_needed( if isinstance(latest_task, Task): await self._push_sender.send_notification(latest_task) - async def _consume_and_notify_in_background( # noqa: PLR0913 + async def _consume_and_notify_in_background( self, task_id: str, result_aggregator: ResultAggregator,