From f8e6644b53e25a8dfe72589d6ec0457add1bf6ef Mon Sep 17 00:00:00 2001 From: Thy Tran <58045538+ThyTran1402@users.noreply.github.com> Date: Sat, 20 Jun 2026 15:41:50 +0700 Subject: [PATCH 1/2] phase III: implement union type support for streaming actions --- CONTRIBUTION_README.md | 354 ++++++++++++++++ burr/core/action.py | 4 +- burr/integrations/pydantic.py | 14 +- .../test_streaming_union_types.py | 398 ++++++++++++++++++ 4 files changed, 765 insertions(+), 5 deletions(-) create mode 100644 CONTRIBUTION_README.md create mode 100644 tests/integrations/test_streaming_union_types.py diff --git a/CONTRIBUTION_README.md b/CONTRIBUTION_README.md new file mode 100644 index 000000000..2732c283c --- /dev/null +++ b/CONTRIBUTION_README.md @@ -0,0 +1,354 @@ +# Contribution: Union Type Support for Streaming Actions + +## Project Information +- **Project:** Apache Burr (incubating) +- **Issue:** Support Union Types in `@streaming_action.pydantic()` decorator +- **Working Branch:** `fix-union-stream-types` (in your fork) +- **Status:** Phase II - Reproduction & Planning Complete + +--- + +## Reproduction Process + +### Environment Setup + +#### Prerequisites +- Python 3.9+ (tested on 3.11) +- Git +- pip or conda for package management + +#### Setup Steps + +1. **Clone your fork:** + ```bash + git clone https://github.com//burr.git + cd burr + ``` + +2. **Create a virtual environment:** + ```bash + python -m venv .venv + source .venv/Scripts/activate # On Windows: .venv\Scripts\activate + ``` + +3. **Install project in development mode:** + ```bash + pip install -e ".[pydantic]" + ``` + Note: The `[pydantic]` extra ensures Pydantic is installed for testing. + +4. **Verify installation:** + ```bash + python -c "import burr; print(burr.__version__)" + ``` + +#### Environment Notes +- No environment variables required for reproducing this issue +- All dependencies are listed in `pyproject.toml` +- The Pydantic integration is optional but required for this feature + +### Steps to Reproduce + +#### Step 1: Create Test Models +Create a test file `test_union_repro.py`: + +```python +from pydantic import BaseModel +from typing import Generator, Optional, Tuple, Union +from burr.core import State +from burr.core.action import streaming_action + +# Define Pydantic models for streaming results +class TextChunk(BaseModel): + """Represents a text chunk from the stream.""" + text: str + chunk_id: int + +class StructuredResult(BaseModel): + """Represents a structured result.""" + summary: str + confidence: float + +class InputState(BaseModel): + """Input state.""" + prompt: str + +class OutputState(BaseModel): + """Output state.""" + response: str +``` + +#### Step 2: Attempt Union Type with Pipe Operator (Python 3.10+) +Add this to your test file: + +```python +# This will cause a type error before the fix +@streaming_action.pydantic( + reads=["prompt"], + writes=["response"], + state_input_type=InputState, + state_output_type=OutputState, + stream_type=TextChunk | StructuredResult, # Type error here +) +def stream_with_pipe_operator(state: State) -> Generator[Tuple[TextChunk | StructuredResult, Optional[OutputState]], None, None]: + """Streaming action using union with pipe operator.""" + yield TextChunk(text="Hello", chunk_id=1), None + yield StructuredResult(summary="Done", confidence=0.95), OutputState(response="Complete") +``` + +#### Step 3: Attempt Union Type with typing.Union +Add this to your test file: + +```python +# This will also cause a type error before the fix +@streaming_action.pydantic( + reads=["prompt"], + writes=["response"], + state_input_type=InputState, + state_output_type=OutputState, + stream_type=Union[TextChunk, StructuredResult], # Type error here +) +def stream_with_typing_union(state: State) -> Generator[Tuple[Union[TextChunk, StructuredResult], Optional[OutputState]], None, None]: + """Streaming action using Union from typing.""" + yield TextChunk(text="Hi", chunk_id=1), None + yield StructuredResult(summary="Finished", confidence=0.90), OutputState(response="Done") +``` + +#### Step 4: Run Type Checker +```bash +# Using mypy (if installed) +mypy test_union_repro.py + +# Or using Pylance through your IDE +# In VS Code: Check the "Problems" panel for type errors +``` + +#### Step 5: Observe the Error +**Before the fix**, you will see errors like: +``` +error: Argument "stream_type" to "pydantic" of "streaming_action" has incompatible type "Union[type[TextChunk], type[StructuredResult]]"; expected "Union[type[BaseModel], type[dict]]" +``` + +### Expected Behavior (After Fix) +After the fix is applied: +1. No type errors in your IDE or from mypy +2. The decorators accept the union types without complaint +3. Runtime behavior is unchanged (union types flow through correctly) + +### Reproduction Evidence + +**Your working branch:** `https://github.com//burr/tree/fix-union-stream-types` + +**Test reproduction file:** Created at workspace root as reference (will be cleaned up before PR) + +--- + +## Root Cause Analysis + +### Problem Location +The issue is in the type annotations for the `stream_type` parameter across two files: + +1. **`burr/core/action.py` (line ~1510)** + - In the `streaming_action.pydantic()` method + - Current type: `Union[Type["BaseModel"], Type[dict]]` + +2. **`burr/integrations/pydantic.py` (line ~272)** + - In the `PartialType` type alias definition + - Current type: `Union[Type[pydantic.BaseModel], Type[dict]]` + +### Why Union Types Are Rejected +- Union types created with `|` (Python 3.10+) are `types.UnionType` objects +- Union types from `typing.Union` are `typing._UnionGenericAlias` objects +- Neither of these types match `Union[Type[BaseModel], Type[dict]]` +- Type checkers (mypy, Pylance, etc.) reject these values as incompatible + +### Runtime vs. Type-Checking +- At **runtime**, the union type value flows through without validation — it actually works! +- At **type-checking time**, the IDE and mypy reject the code as invalid +- This creates a false positive: code that would work is flagged as an error + +### Key Functions Involved +``` +streaming_action.pydantic() + ↓ +pydantic_streaming_action() + ↓ +_validate_and_extract_signature_types_streaming() + ↓ +intermediate_result_type (stored in action schema) +``` + +The union type flows through all of these unchanged, so no runtime changes are needed. + +--- + +## Solution Approach + +### Understanding the Problem +The type system is too restrictive for the `stream_type` parameter. It only accepts a single type, but users want to specify that an action can stream multiple different types (Model1 or Model2). The union type syntax (`|` or `typing.Union`) is the natural Python way to express this, but the current type annotation rejects it. + +### Matching Similar Solutions +This pattern exists elsewhere in Python typing: +- Function return types can be unions: `def foo() -> Model1 | Model2:` +- Variable types can be unions: `result: Model1 | Model2 = ...` +- The type system needs to accept union types as valid values + +### Implementation Plan + +#### Changes Required + +**File 1: `burr/integrations/pydantic.py` (line ~272)** +- Change `PartialType = Union[Type[pydantic.BaseModel], Type[dict]]` +- To: `PartialType = Union[Type[pydantic.BaseModel], Type[dict], object]` +- Add explanatory comment about union type support +- Rationale: `object` is the base type in Python's type system and represents any runtime type + +**File 2: `burr/integrations/pydantic.py` (line ~303-310)** +- Update `_validate_and_extract_signature_types_streaming()` function signature +- Change parameter type from explicit Union to `PartialType` +- Change return type from explicit Union to `PartialType` +- No runtime logic changes needed + +**File 3: `burr/core/action.py` (line ~1510)** +- Update `stream_type` parameter in `streaming_action.pydantic()` +- Change from `Union[Type["BaseModel"], Type[dict]]` to `Union[Type["BaseModel"], Type[dict], object]` +- Update docstring to mention union type support +- Add example showing usage with `|` operator + +### Why This Approach Works +- Minimal change: only type annotations, no runtime logic changes +- Backward compatible: all existing code continues to work +- Works across Python versions: `object` is available in Python 3.9+ +- Type-safe: more specific than `Any`, clearer intent with proper comments +- Union types at runtime are type-compatible with `object` in the union + +### Files to Modify +1. `burr/integrations/pydantic.py` (2 locations) +2. `burr/core/action.py` (1 location) + +**Total: 3 targeted changes in 2 files** + +--- + +## Verification Plan + +### How to Test the Fix + +#### Manual Testing +After applying the fix: +1. Run the reproduction script from "Steps to Reproduce" section +2. Verify no type errors appear in mypy or IDE +3. Verify the decorated functions can be created without runtime errors + +#### Automated Testing +Create unit tests in `tests/integrations/test_pydantic_union_types.py`: + +```python +def test_streaming_action_with_union_pipe_operator(): + """Test that union types with | operator are accepted.""" + @streaming_action.pydantic( + reads=["input"], + writes=["output"], + state_input_type=InputState, + state_output_type=OutputState, + stream_type=Model1 | Model2, + ) + def my_action(state: State): + pass + # Should not raise any errors + +def test_streaming_action_with_typing_union(): + """Test that typing.Union types are accepted.""" + @streaming_action.pydantic( + reads=["input"], + writes=["output"], + state_input_type=InputState, + state_output_type=OutputState, + stream_type=Union[Model1, Model2], + ) + def my_action(state: State): + pass + # Should not raise any errors + +def test_streaming_action_backward_compatibility(): + """Test that existing single-type patterns still work.""" + @streaming_action.pydantic( + reads=["input"], + writes=["output"], + state_input_type=InputState, + state_output_type=OutputState, + stream_type=Model1, # Single type should still work + ) + def my_action(state: State): + pass + # Should not raise any errors +``` + +#### Verification Checklist +- [ ] Type checker (mypy) reports no errors on union type usage +- [ ] IDE/Pylance shows no squiggly lines under union types +- [ ] Existing single-type patterns continue to work (no regression) +- [ ] Unit tests for all union type syntaxes pass +- [ ] Backward compatibility tests pass +- [ ] No breaking changes to runtime behavior + +### Self-Review Checklist +Before submitting your PR, verify: +- [ ] Changes match `CONTRIBUTING.md` guidelines (check project's contribution guidelines) +- [ ] Commit messages are clear and follow project conventions +- [ ] Code follows project's style guide (PEP 8 for Python) +- [ ] All tests pass: `pytest tests/integrations/` +- [ ] No new warnings from type checkers +- [ ] Documentation is updated (docstrings, comments) + +--- + +## Key References + +### Files in Scope +- `burr/core/action.py` — Where `streaming_action` decorator is defined +- `burr/integrations/pydantic.py` — Where `PartialType` and related functions are defined +- `tests/integrations/` — Where unit tests should be added + +### Type System References +- Current implementation: [burr/core/action.py](burr/core/action.py) line 1503-1540 +- Pydantic integration: [burr/integrations/pydantic.py](burr/integrations/pydantic.py) line 270-380 +- Related test file: `tests/integrations/test_pydantic.py` (check existing patterns) + +### Python Type Hints +- [PEP 604 - Union as Type Syntax (`|` operator)](https://www.python.org/dev/peps/pep-0604/) +- [typing.Union documentation](https://docs.python.org/3/library/typing.html#typing.Union) + +--- + +## Phase II Status + +✅ **Completed:** +1. Local development environment set up and verified +2. Issue reproduced with clear steps +3. Root cause identified and documented +4. Solution plan written with implementation details +5. Verification strategy defined +6. Changes scoped to 3 locations in 2 files + +**Ready for:** Phase III - Implementation (write the actual code changes) + +**Next Steps:** +1. Review this plan with mentors/maintainers for feedback +2. Implement the changes as outlined +3. Write and run tests +4. Submit pull request in Phase III + +--- + +## Questions & Notes + +### For Code Review +- Should we add runtime validation to ensure only valid types are passed? +- Is there existing discussion about union type support in the project? +- Any specific test coverage requirements for the Pydantic integration? + +### Additional Context +- This is a type annotation only fix (no runtime logic changes) +- The fix maintains full backward compatibility +- Estimated effort: 2-4 hours including tests and documentation diff --git a/burr/core/action.py b/burr/core/action.py index 08771d411..7da444f2d 100644 --- a/burr/core/action.py +++ b/burr/core/action.py @@ -1507,7 +1507,7 @@ def pydantic( writes: List[str], state_input_type: Type["BaseModel"], state_output_type: Type["BaseModel"], - stream_type: Union[Type["BaseModel"], Type[dict]], + stream_type: Union[Type["BaseModel"], Type[dict], object], tags: Optional[List[str]] = None, ) -> Callable: """Creates a streaming action that uses pydantic models. @@ -1515,7 +1515,7 @@ def pydantic( :param reads: The fields this consumes from the state. :param writes: The fields this writes to the state. :param stream_type: The pydantic model or dictionary type that is used to represent the partial results. - Use a dict if you want this untyped. + Use a dict if you want this untyped. Can also be a union of models (e.g., Model1 | Model2). :param state_input_type: The pydantic model type that is used to represent the input state. :param state_output_type: The pydantic model type that is used to represent the output state. :param tags: Optional list of tags to associate with this action diff --git a/burr/integrations/pydantic.py b/burr/integrations/pydantic.py index 300cbb6a0..f52d6e9f0 100644 --- a/burr/integrations/pydantic.py +++ b/burr/integrations/pydantic.py @@ -269,7 +269,15 @@ async def async_action_function(state: State, **kwargs) -> State: return decorator -PartialType = Union[Type[pydantic.BaseModel], Type[dict]] +# PartialType represents the type of intermediate results in a streaming action. +# It can be: +# - Type[pydantic.BaseModel]: A Pydantic model class +# - Type[dict]: The dict type +# - A union of BaseModel types (e.g., Model1 | Model2, or Union[Model1, Model2]) +# We use 'object' to accept union types since they are valid runtime values even if +# the type system cannot precisely express them. Union types created with | (Python 3.10+) +# or typing.Union are both supported. +PartialType = Union[Type[pydantic.BaseModel], Type[dict], object] PydanticStreamingActionFunctionSync = Callable[ ..., Generator[Tuple[Union[pydantic.BaseModel, dict], Optional[pydantic.BaseModel]], None, None] @@ -290,11 +298,11 @@ async def async_action_function(state: State, **kwargs) -> State: def _validate_and_extract_signature_types_streaming( fn: PydanticStreamingActionFunction, - stream_type: Optional[Union[Type[pydantic.BaseModel], Type[dict]]], + stream_type: Optional[PartialType], state_input_type: Optional[Type[pydantic.BaseModel]] = None, state_output_type: Optional[Type[pydantic.BaseModel]] = None, ) -> Tuple[ - Type[pydantic.BaseModel], Type[pydantic.BaseModel], Union[Type[dict], Type[pydantic.BaseModel]] + Type[pydantic.BaseModel], Type[pydantic.BaseModel], PartialType ]: if stream_type is None: # TODO -- derive from the signature diff --git a/tests/integrations/test_streaming_union_types.py b/tests/integrations/test_streaming_union_types.py new file mode 100644 index 000000000..9a4ce1bf9 --- /dev/null +++ b/tests/integrations/test_streaming_union_types.py @@ -0,0 +1,398 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Tests for union type support in @streaming_action.pydantic() decorator. + +This module tests that the stream_type parameter accepts union types, +including both | operator (Python 3.10+) and typing.Union syntax. +""" + +import asyncio +import sys +from typing import AsyncGenerator, Generator, List, Optional, Tuple, Union + +import pytest +from pydantic import BaseModel + +from burr.core.action import FunctionBasedAction, streaming_action +from burr.core.state import State +from burr.integrations.pydantic import PydanticTypingSystem, pydantic_streaming_action + + +# ============================================================================ +# Test Models +# ============================================================================ + + +class TextChunk(BaseModel): + """Represents a text chunk from a stream.""" + + text: str + chunk_id: int + + +class StructuredResult(BaseModel): + """Represents a structured result.""" + + summary: str + confidence: float + + +class Token(BaseModel): + """Represents a token from a stream.""" + + value: str + position: int + + +class StreamingInputState(BaseModel): + """Input state for streaming tests.""" + + prompt: str + mode: str = "default" + + +class StreamingOutputState(BaseModel): + """Output state for streaming tests.""" + + result: str + processing_time: float = 0.0 + + +# ============================================================================ +# Backward Compatibility Tests (Single Types) +# ============================================================================ + + +def test_streaming_action_single_model_type_backward_compat(): + """Test that single model types still work (backward compatibility).""" + + @streaming_action.pydantic( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=TextChunk, # Single type (should still work) + ) + def act( + state: State, + ) -> Generator[Tuple[TextChunk, Optional[StreamingOutputState]], None, None]: + yield TextChunk(text="Hello", chunk_id=1), None + yield TextChunk(text="World", chunk_id=2), StreamingOutputState(result="Complete") + + # Verify the action was created successfully + assert hasattr(act, "bind") + assert hasattr(act, FunctionBasedAction.ACTION_FUNCTION) + + +def test_streaming_action_dict_type_backward_compat(): + """Test that dict type still works (backward compatibility).""" + + @streaming_action.pydantic( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=dict, # dict type (should still work) + ) + def act( + state: State, + ) -> Generator[Tuple[dict, Optional[StreamingOutputState]], None, None]: + yield {"text": "Hello", "chunk_id": 1}, None + yield {"text": "World", "chunk_id": 2}, StreamingOutputState(result="Complete") + + # Verify the action was created successfully + assert hasattr(act, "bind") + assert hasattr(act, FunctionBasedAction.ACTION_FUNCTION) + + +# ============================================================================ +# Union Type Tests - typing.Union Syntax +# ============================================================================ + + +def test_streaming_action_typing_union_two_models(): + """Test streaming action with Union[Model1, Model2] syntax.""" + + @streaming_action.pydantic( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=Union[TextChunk, StructuredResult], # Union syntax + ) + def act( + state: State, + ) -> Generator[Tuple[Union[TextChunk, StructuredResult], Optional[StreamingOutputState]], None, None]: + yield TextChunk(text="Hello", chunk_id=1), None + yield StructuredResult(summary="Complete", confidence=0.95), StreamingOutputState( + result="Done" + ) + + # Verify the action was created successfully + assert hasattr(act, "bind") + assert hasattr(act, FunctionBasedAction.ACTION_FUNCTION) + + +def test_streaming_action_typing_union_three_models(): + """Test streaming action with Union of three models.""" + + @streaming_action.pydantic( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=Union[TextChunk, StructuredResult, Token], # Union of 3 types + ) + def act( + state: State, + ) -> Generator[ + Tuple[Union[TextChunk, StructuredResult, Token], Optional[StreamingOutputState]], None, None + ]: + yield TextChunk(text="Hello", chunk_id=1), None + yield Token(value="world", position=2), None + yield StructuredResult(summary="Complete", confidence=0.95), StreamingOutputState( + result="Done" + ) + + # Verify the action was created successfully + assert hasattr(act, "bind") + assert hasattr(act, FunctionBasedAction.ACTION_FUNCTION) + + +@pytest.mark.skipif(sys.version_info < (3, 10), reason="requires python3.10+") +def test_streaming_action_pipe_union_two_models(): + """Test streaming action with Model1 | Model2 syntax (Python 3.10+).""" + # Using exec to avoid syntax error on Python < 3.10 + code = """ +@streaming_action.pydantic( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=TextChunk | StructuredResult, # Pipe syntax +) +def act( + state: State, +) -> Generator[Tuple[TextChunk | StructuredResult, Optional[StreamingOutputState]], None, None]: + yield TextChunk(text="Hello", chunk_id=1), None + yield StructuredResult(summary="Complete", confidence=0.95), StreamingOutputState( + result="Done" + ) + +result = (hasattr(act, "bind"), hasattr(act, FunctionBasedAction.ACTION_FUNCTION)) +""" + namespace = { + "streaming_action": streaming_action, + "TextChunk": TextChunk, + "StructuredResult": StructuredResult, + "StreamingInputState": StreamingInputState, + "StreamingOutputState": StreamingOutputState, + "State": State, + "Generator": Generator, + "Tuple": Tuple, + "Optional": Optional, + "FunctionBasedAction": FunctionBasedAction, + } + exec(code, namespace) + has_bind, has_action_func = namespace["result"] + assert has_bind + assert has_action_func + + +@pytest.mark.skipif(sys.version_info < (3, 10), reason="requires python3.10+") +def test_streaming_action_pipe_union_three_models(): + """Test streaming action with Model1 | Model2 | Model3 syntax (Python 3.10+).""" + code = """ +@streaming_action.pydantic( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=TextChunk | StructuredResult | Token, # Pipe syntax with 3 types +) +def act( + state: State, +) -> Generator[Tuple[TextChunk | StructuredResult | Token, Optional[StreamingOutputState]], None, None]: + yield TextChunk(text="Hello", chunk_id=1), None + yield Token(value="world", position=2), None + yield StructuredResult(summary="Complete", confidence=0.95), StreamingOutputState( + result="Done" + ) + +result = (hasattr(act, "bind"), hasattr(act, FunctionBasedAction.ACTION_FUNCTION)) +""" + namespace = { + "streaming_action": streaming_action, + "TextChunk": TextChunk, + "StructuredResult": StructuredResult, + "Token": Token, + "StreamingInputState": StreamingInputState, + "StreamingOutputState": StreamingOutputState, + "State": State, + "Generator": Generator, + "Tuple": Tuple, + "Optional": Optional, + "FunctionBasedAction": FunctionBasedAction, + } + exec(code, namespace) + has_bind, has_action_func = namespace["result"] + assert has_bind + assert has_action_func + + +# ============================================================================ +# pydantic_streaming_action Tests +# ============================================================================ + + +def test_pydantic_streaming_action_union_decorator(): + """Test that pydantic_streaming_action also accepts union types.""" + + @pydantic_streaming_action( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=Union[TextChunk, StructuredResult], # Union in decorator + ) + def act( + state: StreamingInputState, + ) -> Generator[Tuple[Union[TextChunk, StructuredResult], Optional[StreamingOutputState]], None, None]: + yield TextChunk(text="Hello", chunk_id=1), None + yield StructuredResult(summary="Complete", confidence=0.95), StreamingOutputState( + result="Done" + ) + + # Verify the action was created successfully + assert hasattr(act, "bind") + assert hasattr(act, FunctionBasedAction.ACTION_FUNCTION) + + +# ============================================================================ +# Async Union Type Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_streaming_action_async_union(): + """Test async streaming action with union types.""" + + @streaming_action.pydantic( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=Union[TextChunk, StructuredResult], + ) + async def act( + state: State, + ) -> AsyncGenerator[Tuple[Union[TextChunk, StructuredResult], Optional[StreamingOutputState]], None]: + yield TextChunk(text="Hello", chunk_id=1), None + await asyncio.sleep(0.001) + yield StructuredResult(summary="Complete", confidence=0.95), StreamingOutputState( + result="Done" + ) + + # Verify the action was created successfully + assert hasattr(act, "bind") + assert hasattr(act, FunctionBasedAction.ACTION_FUNCTION) + + +# ============================================================================ +# Runtime Execution Tests +# ============================================================================ + + +def test_streaming_action_union_execution(): + """Test that union type streaming actions execute correctly.""" + + @pydantic_streaming_action( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=Union[TextChunk, StructuredResult], + ) + def act( + state: StreamingInputState, + ) -> Generator[Tuple[Union[TextChunk, StructuredResult], Optional[StreamingOutputState]], None, None]: + # Yield both types + yield TextChunk(text="chunk", chunk_id=1), None + yield TextChunk(text="chunk2", chunk_id=2), None + yield StructuredResult(summary="done", confidence=0.9), StreamingOutputState( + result="Complete" + ) + + # Execute the action + action_fn = getattr(act, FunctionBasedAction.ACTION_FUNCTION) + state = State( + {"prompt": "test", "mode": "default"}, + typing_system=PydanticTypingSystem(StreamingInputState), + ) + gen = action_fn.fn(state) + results = list(gen) + + # Verify results + assert len(results) == 3 + assert isinstance(results[0][0], TextChunk) + assert isinstance(results[1][0], TextChunk) + assert isinstance(results[2][0], StructuredResult) + assert results[0][1] is None + assert results[1][1] is None + assert isinstance(results[2][1], State) + + +def test_streaming_action_union_execution_three_types(): + """Test union type execution with three different model types.""" + + @pydantic_streaming_action( + reads=["prompt"], + writes=["result"], + state_input_type=StreamingInputState, + state_output_type=StreamingOutputState, + stream_type=Union[TextChunk, StructuredResult, Token], + ) + def act( + state: StreamingInputState, + ) -> Generator[ + Tuple[Union[TextChunk, StructuredResult, Token], Optional[StreamingOutputState]], None, None + ]: + # Yield all three types + yield TextChunk(text="hello", chunk_id=1), None + yield Token(value="world", position=2), None + yield StructuredResult(summary="complete", confidence=0.95), StreamingOutputState( + result="Done" + ) + + # Execute the action + action_fn = getattr(act, FunctionBasedAction.ACTION_FUNCTION) + state = State( + {"prompt": "test", "mode": "default"}, + typing_system=PydanticTypingSystem(StreamingInputState), + ) + gen = action_fn.fn(state) + results = list(gen) + + # Verify results + assert len(results) == 3 + assert isinstance(results[0][0], TextChunk) + assert isinstance(results[1][0], Token) + assert isinstance(results[2][0], StructuredResult) + assert all(item[1] is None for item in results[:-1]) + assert isinstance(results[2][1], State) From b928a56f680a0a226f8b1f093c39790cf84e3f5c Mon Sep 17 00:00:00 2001 From: Thy Tran <58045538+ThyTran1402@users.noreply.github.com> Date: Sat, 20 Jun 2026 15:54:17 +0700 Subject: [PATCH 2/2] removed md files --- CONTRIBUTION_README.md | 354 ----------------------------------------- 1 file changed, 354 deletions(-) delete mode 100644 CONTRIBUTION_README.md diff --git a/CONTRIBUTION_README.md b/CONTRIBUTION_README.md deleted file mode 100644 index 2732c283c..000000000 --- a/CONTRIBUTION_README.md +++ /dev/null @@ -1,354 +0,0 @@ -# Contribution: Union Type Support for Streaming Actions - -## Project Information -- **Project:** Apache Burr (incubating) -- **Issue:** Support Union Types in `@streaming_action.pydantic()` decorator -- **Working Branch:** `fix-union-stream-types` (in your fork) -- **Status:** Phase II - Reproduction & Planning Complete - ---- - -## Reproduction Process - -### Environment Setup - -#### Prerequisites -- Python 3.9+ (tested on 3.11) -- Git -- pip or conda for package management - -#### Setup Steps - -1. **Clone your fork:** - ```bash - git clone https://github.com//burr.git - cd burr - ``` - -2. **Create a virtual environment:** - ```bash - python -m venv .venv - source .venv/Scripts/activate # On Windows: .venv\Scripts\activate - ``` - -3. **Install project in development mode:** - ```bash - pip install -e ".[pydantic]" - ``` - Note: The `[pydantic]` extra ensures Pydantic is installed for testing. - -4. **Verify installation:** - ```bash - python -c "import burr; print(burr.__version__)" - ``` - -#### Environment Notes -- No environment variables required for reproducing this issue -- All dependencies are listed in `pyproject.toml` -- The Pydantic integration is optional but required for this feature - -### Steps to Reproduce - -#### Step 1: Create Test Models -Create a test file `test_union_repro.py`: - -```python -from pydantic import BaseModel -from typing import Generator, Optional, Tuple, Union -from burr.core import State -from burr.core.action import streaming_action - -# Define Pydantic models for streaming results -class TextChunk(BaseModel): - """Represents a text chunk from the stream.""" - text: str - chunk_id: int - -class StructuredResult(BaseModel): - """Represents a structured result.""" - summary: str - confidence: float - -class InputState(BaseModel): - """Input state.""" - prompt: str - -class OutputState(BaseModel): - """Output state.""" - response: str -``` - -#### Step 2: Attempt Union Type with Pipe Operator (Python 3.10+) -Add this to your test file: - -```python -# This will cause a type error before the fix -@streaming_action.pydantic( - reads=["prompt"], - writes=["response"], - state_input_type=InputState, - state_output_type=OutputState, - stream_type=TextChunk | StructuredResult, # Type error here -) -def stream_with_pipe_operator(state: State) -> Generator[Tuple[TextChunk | StructuredResult, Optional[OutputState]], None, None]: - """Streaming action using union with pipe operator.""" - yield TextChunk(text="Hello", chunk_id=1), None - yield StructuredResult(summary="Done", confidence=0.95), OutputState(response="Complete") -``` - -#### Step 3: Attempt Union Type with typing.Union -Add this to your test file: - -```python -# This will also cause a type error before the fix -@streaming_action.pydantic( - reads=["prompt"], - writes=["response"], - state_input_type=InputState, - state_output_type=OutputState, - stream_type=Union[TextChunk, StructuredResult], # Type error here -) -def stream_with_typing_union(state: State) -> Generator[Tuple[Union[TextChunk, StructuredResult], Optional[OutputState]], None, None]: - """Streaming action using Union from typing.""" - yield TextChunk(text="Hi", chunk_id=1), None - yield StructuredResult(summary="Finished", confidence=0.90), OutputState(response="Done") -``` - -#### Step 4: Run Type Checker -```bash -# Using mypy (if installed) -mypy test_union_repro.py - -# Or using Pylance through your IDE -# In VS Code: Check the "Problems" panel for type errors -``` - -#### Step 5: Observe the Error -**Before the fix**, you will see errors like: -``` -error: Argument "stream_type" to "pydantic" of "streaming_action" has incompatible type "Union[type[TextChunk], type[StructuredResult]]"; expected "Union[type[BaseModel], type[dict]]" -``` - -### Expected Behavior (After Fix) -After the fix is applied: -1. No type errors in your IDE or from mypy -2. The decorators accept the union types without complaint -3. Runtime behavior is unchanged (union types flow through correctly) - -### Reproduction Evidence - -**Your working branch:** `https://github.com//burr/tree/fix-union-stream-types` - -**Test reproduction file:** Created at workspace root as reference (will be cleaned up before PR) - ---- - -## Root Cause Analysis - -### Problem Location -The issue is in the type annotations for the `stream_type` parameter across two files: - -1. **`burr/core/action.py` (line ~1510)** - - In the `streaming_action.pydantic()` method - - Current type: `Union[Type["BaseModel"], Type[dict]]` - -2. **`burr/integrations/pydantic.py` (line ~272)** - - In the `PartialType` type alias definition - - Current type: `Union[Type[pydantic.BaseModel], Type[dict]]` - -### Why Union Types Are Rejected -- Union types created with `|` (Python 3.10+) are `types.UnionType` objects -- Union types from `typing.Union` are `typing._UnionGenericAlias` objects -- Neither of these types match `Union[Type[BaseModel], Type[dict]]` -- Type checkers (mypy, Pylance, etc.) reject these values as incompatible - -### Runtime vs. Type-Checking -- At **runtime**, the union type value flows through without validation — it actually works! -- At **type-checking time**, the IDE and mypy reject the code as invalid -- This creates a false positive: code that would work is flagged as an error - -### Key Functions Involved -``` -streaming_action.pydantic() - ↓ -pydantic_streaming_action() - ↓ -_validate_and_extract_signature_types_streaming() - ↓ -intermediate_result_type (stored in action schema) -``` - -The union type flows through all of these unchanged, so no runtime changes are needed. - ---- - -## Solution Approach - -### Understanding the Problem -The type system is too restrictive for the `stream_type` parameter. It only accepts a single type, but users want to specify that an action can stream multiple different types (Model1 or Model2). The union type syntax (`|` or `typing.Union`) is the natural Python way to express this, but the current type annotation rejects it. - -### Matching Similar Solutions -This pattern exists elsewhere in Python typing: -- Function return types can be unions: `def foo() -> Model1 | Model2:` -- Variable types can be unions: `result: Model1 | Model2 = ...` -- The type system needs to accept union types as valid values - -### Implementation Plan - -#### Changes Required - -**File 1: `burr/integrations/pydantic.py` (line ~272)** -- Change `PartialType = Union[Type[pydantic.BaseModel], Type[dict]]` -- To: `PartialType = Union[Type[pydantic.BaseModel], Type[dict], object]` -- Add explanatory comment about union type support -- Rationale: `object` is the base type in Python's type system and represents any runtime type - -**File 2: `burr/integrations/pydantic.py` (line ~303-310)** -- Update `_validate_and_extract_signature_types_streaming()` function signature -- Change parameter type from explicit Union to `PartialType` -- Change return type from explicit Union to `PartialType` -- No runtime logic changes needed - -**File 3: `burr/core/action.py` (line ~1510)** -- Update `stream_type` parameter in `streaming_action.pydantic()` -- Change from `Union[Type["BaseModel"], Type[dict]]` to `Union[Type["BaseModel"], Type[dict], object]` -- Update docstring to mention union type support -- Add example showing usage with `|` operator - -### Why This Approach Works -- Minimal change: only type annotations, no runtime logic changes -- Backward compatible: all existing code continues to work -- Works across Python versions: `object` is available in Python 3.9+ -- Type-safe: more specific than `Any`, clearer intent with proper comments -- Union types at runtime are type-compatible with `object` in the union - -### Files to Modify -1. `burr/integrations/pydantic.py` (2 locations) -2. `burr/core/action.py` (1 location) - -**Total: 3 targeted changes in 2 files** - ---- - -## Verification Plan - -### How to Test the Fix - -#### Manual Testing -After applying the fix: -1. Run the reproduction script from "Steps to Reproduce" section -2. Verify no type errors appear in mypy or IDE -3. Verify the decorated functions can be created without runtime errors - -#### Automated Testing -Create unit tests in `tests/integrations/test_pydantic_union_types.py`: - -```python -def test_streaming_action_with_union_pipe_operator(): - """Test that union types with | operator are accepted.""" - @streaming_action.pydantic( - reads=["input"], - writes=["output"], - state_input_type=InputState, - state_output_type=OutputState, - stream_type=Model1 | Model2, - ) - def my_action(state: State): - pass - # Should not raise any errors - -def test_streaming_action_with_typing_union(): - """Test that typing.Union types are accepted.""" - @streaming_action.pydantic( - reads=["input"], - writes=["output"], - state_input_type=InputState, - state_output_type=OutputState, - stream_type=Union[Model1, Model2], - ) - def my_action(state: State): - pass - # Should not raise any errors - -def test_streaming_action_backward_compatibility(): - """Test that existing single-type patterns still work.""" - @streaming_action.pydantic( - reads=["input"], - writes=["output"], - state_input_type=InputState, - state_output_type=OutputState, - stream_type=Model1, # Single type should still work - ) - def my_action(state: State): - pass - # Should not raise any errors -``` - -#### Verification Checklist -- [ ] Type checker (mypy) reports no errors on union type usage -- [ ] IDE/Pylance shows no squiggly lines under union types -- [ ] Existing single-type patterns continue to work (no regression) -- [ ] Unit tests for all union type syntaxes pass -- [ ] Backward compatibility tests pass -- [ ] No breaking changes to runtime behavior - -### Self-Review Checklist -Before submitting your PR, verify: -- [ ] Changes match `CONTRIBUTING.md` guidelines (check project's contribution guidelines) -- [ ] Commit messages are clear and follow project conventions -- [ ] Code follows project's style guide (PEP 8 for Python) -- [ ] All tests pass: `pytest tests/integrations/` -- [ ] No new warnings from type checkers -- [ ] Documentation is updated (docstrings, comments) - ---- - -## Key References - -### Files in Scope -- `burr/core/action.py` — Where `streaming_action` decorator is defined -- `burr/integrations/pydantic.py` — Where `PartialType` and related functions are defined -- `tests/integrations/` — Where unit tests should be added - -### Type System References -- Current implementation: [burr/core/action.py](burr/core/action.py) line 1503-1540 -- Pydantic integration: [burr/integrations/pydantic.py](burr/integrations/pydantic.py) line 270-380 -- Related test file: `tests/integrations/test_pydantic.py` (check existing patterns) - -### Python Type Hints -- [PEP 604 - Union as Type Syntax (`|` operator)](https://www.python.org/dev/peps/pep-0604/) -- [typing.Union documentation](https://docs.python.org/3/library/typing.html#typing.Union) - ---- - -## Phase II Status - -✅ **Completed:** -1. Local development environment set up and verified -2. Issue reproduced with clear steps -3. Root cause identified and documented -4. Solution plan written with implementation details -5. Verification strategy defined -6. Changes scoped to 3 locations in 2 files - -**Ready for:** Phase III - Implementation (write the actual code changes) - -**Next Steps:** -1. Review this plan with mentors/maintainers for feedback -2. Implement the changes as outlined -3. Write and run tests -4. Submit pull request in Phase III - ---- - -## Questions & Notes - -### For Code Review -- Should we add runtime validation to ensure only valid types are passed? -- Is there existing discussion about union type support in the project? -- Any specific test coverage requirements for the Pydantic integration? - -### Additional Context -- This is a type annotation only fix (no runtime logic changes) -- The fix maintains full backward compatibility -- Estimated effort: 2-4 hours including tests and documentation