Skip to content

feat: add ArtifactStreamer for streaming artifact updates#942

Open
mtsmyassin wants to merge 1 commit intoa2aproject:mainfrom
mtsmyassin:feat/artifact-streamer
Open

feat: add ArtifactStreamer for streaming artifact updates#942
mtsmyassin wants to merge 1 commit intoa2aproject:mainfrom
mtsmyassin:feat/artifact-streamer

Conversation

@mtsmyassin
Copy link
Copy Markdown

Summary

Adds ArtifactStreamer to a2a.utils — a stateful helper that maintains a
stable artifact_id across streaming chunks, enabling correct append=True
semantics for TaskArtifactUpdateEvent.

Problem

new_text_artifact generates a fresh UUID on every call. When used in a
streaming loop, each chunk gets a different artifact_id, making append=True
unusable. Clients see N separate artifacts instead of one progressively
streamed response. (See the travel_planner_agent sample for this exact issue.)

Solution

from a2a.utils import ArtifactStreamer

streamer = ArtifactStreamer(context_id, task_id, name="response")

async for chunk in llm.stream(prompt):
    await event_queue.enqueue_event(streamer.append(chunk))

await event_queue.enqueue_event(streamer.finalize())

- append(text) → returns TaskArtifactUpdateEvent with append=True, last_chunk=False
- finalize(text="") → returns TaskArtifactUpdateEvent with append=True,
last_chunk=True
- Raises RuntimeError if you try to append after finalizing

Changes

- src/a2a/utils/artifact.pyAdded ArtifactStreamer class
- src/a2a/utils/__init__.pyExported ArtifactStreamer
- tests/utils/test_artifact.py14 tests covering stable IDs, append/finalize
semantics, error states

Closes #833

Adds a stateful streaming helper to a2a.utils that maintains a stable
artifact_id across chunks, enabling correct append=True semantics for
TaskArtifactUpdateEvent.

Closes a2aproject#833
@mtsmyassin mtsmyassin requested a review from a team as a code owner April 8, 2026 01:01
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the ArtifactStreamer utility class to facilitate streaming artifact updates with a stable ID, ensuring consistency when appending data. The changes include the new class implementation, its export in the utility module, and a suite of unit tests. Feedback was provided to refactor the event creation logic into a private helper method to eliminate code duplication between the append and finalize methods.

Comment on lines +138 to +195
def append(self, text: str) -> TaskArtifactUpdateEvent:
"""Create an append event for the next chunk of text.

Args:
text: The text content to append.

Returns:
A ``TaskArtifactUpdateEvent`` with ``append=True`` and
``last_chunk=False``.

Raises:
RuntimeError: If ``finalize()`` has already been called.
"""
if self._finalized:
raise RuntimeError(
'Cannot append after finalize() has been called.'
)
return TaskArtifactUpdateEvent(
context_id=self._context_id,
task_id=self._task_id,
append=True,
last_chunk=False,
artifact=Artifact(
artifact_id=self._artifact_id,
name=self._name,
description=self._description,
parts=[Part(root=TextPart(text=text))],
),
)

def finalize(self, text: str = '') -> TaskArtifactUpdateEvent:
"""Create the final chunk event, closing the stream.

Args:
text: Optional final text content. Defaults to empty string.

Returns:
A ``TaskArtifactUpdateEvent`` with ``append=True`` and
``last_chunk=True``.

Raises:
RuntimeError: If ``finalize()`` has already been called.
"""
if self._finalized:
raise RuntimeError('finalize() has already been called.')
self._finalized = True
return TaskArtifactUpdateEvent(
context_id=self._context_id,
task_id=self._task_id,
append=True,
last_chunk=True,
artifact=Artifact(
artifact_id=self._artifact_id,
name=self._name,
description=self._description,
parts=[Part(root=TextPart(text=text))],
),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The logic for creating the TaskArtifactUpdateEvent and its nested Artifact object is duplicated between the append and finalize methods. Refactoring this into a private helper method would improve maintainability and reduce the risk of inconsistencies if the event structure changes in the future.

    def _create_event(self, text: str, last_chunk: bool) -> TaskArtifactUpdateEvent:
        return TaskArtifactUpdateEvent(
            context_id=self._context_id,
            task_id=self._task_id,
            append=True,
            last_chunk=last_chunk,
            artifact=Artifact(
                artifact_id=self._artifact_id,
                name=self._name,
                description=self._description,
                parts=[Part(root=TextPart(text=text))],
            ),
        )

    def append(self, text: str) -> TaskArtifactUpdateEvent:
        """Create an append event for the next chunk of text.

        Args:
            text: The text content to append.

        Returns:
            A TaskArtifactUpdateEvent with append=True and
            last_chunk=False.

        Raises:
            RuntimeError: If finalize() has already been called.
        """
        if self._finalized:
            raise RuntimeError(
                'Cannot append after finalize() has been called.'
            )
        return self._create_event(text, last_chunk=False)

    def finalize(self, text: str = '') -> TaskArtifactUpdateEvent:
        """Create the final chunk event, closing the stream.

        Args:
            text: Optional final text content. Defaults to empty string.

        Returns:
            A TaskArtifactUpdateEvent with append=True and
            last_chunk=True.

        Raises:
            RuntimeError: If finalize() has already been called.
        """
        if self._finalized:
            raise RuntimeError('finalize() has already been called.')
        self._finalized = True
        return self._create_event(text, last_chunk=True)

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 8, 2026

🧪 Code Coverage (vs main)

⬇️ Download Full Report

No coverage changes.

Generated by coverage-comment.yml

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant