Skip to content

on_message_send blocks HTTP response even with blocking=False #951

@ruimgf

Description

@ruimgf

What happened?

When blocking=False is set in MessageSendConfiguration, the DefaultRequestHandler.on_message_send still waits for the first event from the executor's EventQueue before returning the HTTP response. This causes timeouts in production workflows where the agent should immediately acknowledge task receipt and process asynchronously via push notifications.

Root cause

In on_message_send, after _setup_message_execution creates the task and starts the executor as an asyncio.create_task, the handler calls consume_and_break_on_interrupt which polls the EventQueue with a 0.5s timeout loop. Even with blocking=False, it waits for the first event before breaking:

# Non-blocking call: interrupt as soon as a task is available.
elif not blocking:
    should_interrupt = True

The executor is started via asyncio.create_task, but it may not be scheduled by the event loop before the consumer starts polling. The consumer's asyncio.wait_for(dequeue_event(), timeout=0.5) can timeout multiple times (0.5s × N) before the executor task runs and emits an event. In our production environment with 4 uvicorn workers, we observed consistent 5-7s delays even for simple task acknowledgments.

Expected behavior

When blocking=False and push_notification_config is provided, on_message_send should return the Task object immediately (in submitted state) without waiting for any executor events. The executor should run entirely in the background, delivering results via push notifications.

Workaround

We created a custom request handler that overrides on_message_send:

class AsyncRequestHandler(DefaultRequestHandler):
    async def on_message_send(self, params, context=None):
        if not params.configuration or params.configuration.blocking is not False:
            return await super().on_message_send(params, context)

        # Set up execution (creates task, starts executor in background)
        (task_manager, task_id, queue, result_aggregator, producer_task
        ) = await self._setup_message_execution(params, context)

        # Get task and return immediately
        task = await task_manager.get_task()

        # Consume events and send push notifications in background
        consumer = EventConsumer(queue)
        producer_task.add_done_callback(consumer.agent_task_callback)

        async def _background_consume():
            try:
                async for event in result_aggregator.consume_and_emit(consumer):
                    if self._push_sender and task_id:
                        latest = await result_aggregator.current_result
                        if isinstance(latest, Task):
                            await self._push_sender.send_notification(latest)
            finally:
                await self._cleanup_producer(producer_task, task_id)

        bg_task = asyncio.create_task(_background_consume())
        self._track_background_task(bg_task)

        return task

This reduced response times from 5-7s to 3ms.

Environment

  • a2a-python SDK (latest)
  • Python 3.12
  • uvicorn with 4 multiprocess workers
  • Push notifications enabled, blocking=False

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions