Skip to content

[Enhancement] Add async iterator interface for streaming transcription results #727

@deepgram-robot

Description

@deepgram-robot

Summary

Add an async iterator interface for streaming transcription, enabling developers to consume real-time transcript events using Python's native async for syntax instead of callback-based event handlers.

Problem it solves

The current streaming API requires registering callback functions for each event type:

connection = client.listen.asyncwebsocket.v("1")
connection.on(LiveTranscriptionEvents.Transcript, on_transcript)
connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end)
await connection.start(options)
# ... manage lifecycle manually

This callback pattern requires developers to manage state across multiple handler functions, making it harder to build linear processing pipelines. An async iterator interface would enable:

async with client.listen.stream(options) as session:
    async for event in session:
        if event.type == "transcript":
            print(event.channel.alternatives[0].transcript)
        elif event.type == "utterance_end":
            process_utterance(session.accumulated_transcript)

This is more Pythonic, integrates naturally with asyncio, and makes it straightforward to compose streaming transcription with other async operations (database writes, LLM calls, WebSocket forwarding).

Proposed API

from deepgram import DeepgramClient, StreamingOptions, StreamEvent

client = DeepgramClient(api_key)

# Async context manager handles connection lifecycle
async with client.listen.stream(
    StreamingOptions(model="nova-3", smart_format=True, diarize=True)
) as session:
    # Send audio from any async source
    await session.send(audio_bytes)
    
    # Consume events as an async iterator
    async for event in session:
        match event:
            case StreamEvent.Transcript(transcript=t, is_final=True):
                print(f"Final: {t}")
            case StreamEvent.SpeechStarted():
                print("Speech detected")
            case StreamEvent.UtteranceEnd():
                print("Utterance complete")
    
    # Context manager sends finalize and closes cleanly

Key design points:

  • async with manages WebSocket connection lifecycle (connect on enter, finalize + close on exit)
  • async for yields typed StreamEvent objects for all event types
  • Automatic keepalive in background while iterating
  • Backpressure: if consumer is slow, events are buffered up to a configurable limit
  • Existing callback API remains unchanged — this is additive

Acceptance criteria

  • client.listen.stream() returns an async context manager
  • Context manager sends finalize and closes WebSocket on exit
  • Yields typed StreamEvent objects via __aiter__ / __anext__
  • Automatic keepalive runs in background during iteration
  • Existing callback-based API is not affected
  • Documented with usage example
  • Compatible with existing API

Raised by the DX intelligence system.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions