AsyncPipeline.stream POC#11258
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. 1 Skipped Deployment
|
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||||||||||||||
|
@mpangrazzi @sjrl I'd appreciate your thoughts |
|
@anakin87 its looking good! I took a look at your demo repo and looks reasonable to me. I think @mpangrazzi can speak better on the implications of integrating this with Hayhooks. Perhaps requiring using of an AsyncIterable messes with easy integration into FastAPI? (I'm not really sure). The major question I had is more on what this would look for the sync pipeline? You mention that it could be problematic to set up and I was wondering if you could explain more in detail what you mean by that. |
| Iterate the handle to consume chunks; after iteration ends, `handle.result` holds the final pipeline output dict | ||
| (same as `run_async`). | ||
|
|
||
| For every async-capable component whose `run_async` accepts `streaming_callback`, a forwarder is injected |
There was a problem hiding this comment.
Let's be a little more explicit here to say that we only check if the input param is called streaming_callback in run_async. I don't think we do any type checking past that right?
| if is_callable_async_compatible(user_callback): | ||
| await cast(AsyncStreamingCallbackT, user_callback)(chunk) | ||
| else: | ||
| cast(SyncStreamingCallbackT, user_callback)(chunk) |
There was a problem hiding this comment.
Oh interesting so we want to support both async and sync streaming callbacks?
There was a problem hiding this comment.
Maybe in the future, but here it was wrong. I am now switching to the same select_streaming_callback utility we use everywhere.
| if not isinstance(comp_inputs, dict): | ||
| continue |
There was a problem hiding this comment.
Could you explain what this is catching? Do we allow non-dict entries in our data?
There was a problem hiding this comment.
Relic of the initial vibe-coded solution. Removing it now.
Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com>
| if not isinstance(comp_inputs, dict): | ||
| continue | ||
| runtime_callback: StreamingCallbackT | None = comp_inputs.get("streaming_callback") | ||
| init_callback: StreamingCallbackT | None = getattr(instance, "streaming_callback", None) |
There was a problem hiding this comment.
This feels a bit fragile. E.g. it could be stored under a different attribute name. At the very least could we update the docstring The forwarder composes with any user-supplied streaming_callback. to mention how users can or should supply a streaming callback?
There was a problem hiding this comment.
Also should we allow users to directly provide a streaming callback in the stream method?
There was a problem hiding this comment.
- I now tried to explain better in the docstring (b72c878). LMK if still unclear/fragile
- Yes, can be passed via
data(now explained)
I think that this new feature provides value to async users in APIs (#8742, #9347). For sync users, the Why? Stream does: a Pipeline produces chunks in the background; consumer pulls them on demand. In Hayhooks, motivated by the API context and by the fact that most old pipelines are sync, we bridge sync pipelines into the async streaming endpoint, but, talking with Michele, this creates similar problems. |
Related Issues
Proposed Changes:
AsyncPipeline.streammethod that returns aPipelineStreamHandleasync for chunk in handleresultfield to get the final resultChoices and limitations:
PipelineStreamHandle). This currently makes the integration with Hayhooks not exactly ergonomic, but we can work to improve itDoes not contain breaking changes but it's thought for Haystack 3.
How did you test it?
Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:and added!in case the PR includes breaking changes.