feat: (Python) Add async context manager#487
Conversation
…, and scanners in Python bindings
There was a problem hiding this comment.
Pull request overview
Adds Python async with support for Fluss client objects to improve lifecycle handling in async code, along with tests and example updates.
Changes:
- Implemented
__aenter__/__aexit__forFlussConnection,AppendWriter,UpsertWriter, andLogScanner(Rust → PyO3 bindings). - Added a new async context-manager-focused test suite and adjusted an existing offset test to reduce a timing race.
- Updated Python type hints and the Python example to demonstrate
async withusage.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| bindings/python/src/connection.rs | Adds async context manager methods for FlussConnection. |
| bindings/python/src/table.rs | Adds async context manager methods for AppendWriter and LogScanner (plus a close() stub). |
| bindings/python/src/upsert.rs | Adds async context manager methods for UpsertWriter and a doc tweak. |
| bindings/python/fluss/init.pyi | Updates Python typing surface to include async context manager methods. |
| bindings/python/test/test_context_manager.py | New tests covering async with behavior for writers/scanners/connection. |
| bindings/python/test/test_log_table.py | Adds a short sleep to reduce a timestamp-offset ordering race. |
| bindings/python/example/example.py | Migrates the example to async with patterns (currently introduces correctness/syntax issues). |
Comments suppressed due to low confidence (3)
bindings/python/example/example.py:112
append_writeris created inside anasync withblock but then used after the block has already exited. This defeats the purpose of the async context manager (auto-flush-on-exit) and will break once the writer/connection gains a realclose()implementation. Please move the write/append logic inside theasync withscope (or don’t useasync withhere).
# Create a writer for the table
async with table.new_append().create_writer() as append_writer:
print(f"Created append writer: {append_writer}")
try:
# Demo: Write PyArrow Table
print("\n--- Testing PyArrow Table write ---")
bindings/python/example/example.py:273
batch_scanneris created insideasync with ... as batch_scanner:butsubscribe_buckets()and subsequent reads are performed after the context has exited. This makes the example misleading and will break if/when scanners implement real close semantics. Keep all scanner usage inside theasync withblock (or avoid the context manager here).
try:
# Use new_scan().create_record_batch_log_scanner() for batch-based operations
async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner:
print(f"Created batch scanner: {batch_scanner}")
# Subscribe to buckets (required before to_arrow/to_pandas)
# Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET
num_buckets = (await admin.get_table_info(table_path)).num_buckets
batch_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
print(f"Subscribed to {num_buckets} buckets from EARLIEST_OFFSET")
bindings/python/example/example.py:445
- The
async with pk_table.new_upsert().create_writer() as upsert_writer:block exits immediately after printing, butupsert_writeris then used for all subsequent upserts. This means the example does not actually run the writes inside the context manager (so no auto-flush-on-exit behavior is exercised) and it will break if the writer later implements real close semantics. Please move the upsert logic inside theasync withblock.
try:
async with pk_table.new_upsert().create_writer() as upsert_writer:
print(f"Created upsert writer: {upsert_writer}")
# Fire-and-forget: queue writes synchronously, flush at end.
# Records are batched internally for efficiency.
upsert_writer.upsert(
{
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Create connection using the static create method | ||
| conn = await fluss.FlussConnection.create(config) | ||
|
|
||
| # Define fields for PyArrow | ||
| async with await fluss.FlussConnection.create(config) as conn: | ||
| # Define fields for PyArrow | ||
| fields = [ | ||
| pa.field("id", pa.int32()), |
There was a problem hiding this comment.
The async with await FlussConnection.create(...) as conn: block has no executable statement in its body (only a comment), and fields = [...] is dedented. This will raise a SyntaxError (“expected an indented block”) and also ends the connection context immediately. Please indent the subsequent setup logic under the async with (or add a real statement inside the block) so the connection remains open for the rest of main().
There was a problem hiding this comment.
+1 Ran into the following running the example
File "/Users/keithlee/IdeaProjects/f/fluss-rust/bindings/python/example/example.py", line 45
fields = [
^^^^^^
IndentationError: expected an indented block after 'with' statement on line 43
| // Exit the async runtime context (for 'async with' statement) | ||
| #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))] | ||
| fn __aexit__<'py>( | ||
| &mut self, | ||
| py: Python<'py>, | ||
| _exc_type: Option<Bound<'py, PyAny>>, | ||
| _exc_value: Option<Bound<'py, PyAny>>, | ||
| _traceback: Option<Bound<'py, PyAny>>, | ||
| ) -> PyResult<Bound<'py, PyAny>> { | ||
| future_into_py(py, async move { | ||
| // In the future, we could call an async close on the core connection here | ||
| // e.g., client.close().await; | ||
| Ok(false) | ||
| }) | ||
| } |
There was a problem hiding this comment.
__exit__ calls self.close()?, but the new async context manager __aexit__ does not. This means async with await FlussConnection.create(...) will not close the connection (even if close() is implemented later). Please mirror __exit__ by calling self.close()? in __aexit__ (it can be done before creating the future since close() is synchronous).
| // Exit the async runtime context (for 'async with' statement) | ||
| /// On successful exit, the writer is automatically flushed. | ||
| /// If an exception occurs, the flush is skipped to allow immediate error | ||
| /// propagation, though pending records may still be sent in the background. | ||
| #[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))] | ||
| fn __aexit__<'py>( | ||
| &self, | ||
| py: Python<'py>, | ||
| exc_type: Option<Bound<'py, PyAny>>, | ||
| _exc_value: Option<Bound<'py, PyAny>>, | ||
| _traceback: Option<Bound<'py, PyAny>>, | ||
| ) -> PyResult<Bound<'py, PyAny>> { | ||
| let has_error = exc_type.is_some(); | ||
| let inner = self.inner.clone(); | ||
| future_into_py(py, async move { | ||
| if !has_error { | ||
| inner | ||
| .flush() | ||
| .await | ||
| .map_err(|e| FlussError::from_core_error(&e))?; | ||
| } | ||
| Ok(false) | ||
| }) | ||
| } |
There was a problem hiding this comment.
AppendWriter.__aexit__ only flushes on the success path and never closes/invalidates the writer. After leaving async with, the writer object remains fully usable and any underlying resources are not deterministically released, which doesn’t match the linked issue’s “flush then close / close on exception” contract. Consider adding an explicit close() (even if initially a no-op) and calling it from __aexit__, or otherwise marking the writer as closed so further writes fail fast.
| /// Close the scanner | ||
| pub fn close(&self) -> PyResult<()> { | ||
| Ok(()) | ||
| } | ||
|
|
||
| // Enter the async runtime context (for 'async with' statement) | ||
| fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { | ||
| let py_slf = slf.into_pyobject(py)?.unbind(); | ||
| future_into_py(py, async move { Ok(py_slf) }) | ||
| } | ||
|
|
||
| // Exit the async runtime context (for 'async with' statement) | ||
| #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))] | ||
| fn __aexit__<'py>( | ||
| &self, | ||
| py: Python<'py>, | ||
| _exc_type: Option<Bound<'py, PyAny>>, | ||
| _exc_value: Option<Bound<'py, PyAny>>, | ||
| _traceback: Option<Bound<'py, PyAny>>, | ||
| ) -> PyResult<Bound<'py, PyAny>> { | ||
| future_into_py(py, async move { | ||
| // In the future, we can call an async close on the core scanner here | ||
| Ok(false) | ||
| }) |
There was a problem hiding this comment.
LogScanner.close() is currently a no-op, and __aexit__ doesn’t call it. This makes async with ... as scanner: misleading because it doesn’t actually perform any cleanup. If the core scanner doesn’t need cleanup, consider either documenting close() as a no-op or removing it; otherwise, implement actual cleanup and invoke it from __aexit__ (and potentially from __del__ for safety).
| /// | ||
| /// Returns: | ||
| /// None on success | ||
| /// Flush any pending data |
There was a problem hiding this comment.
There’s an extra doc line /// Flush any pending data inserted after the Returns: section for flush(). It’s redundant with the existing docstring and makes the generated docs read oddly. Please remove this stray line or integrate it into the main flush doc comment above.
| /// Flush any pending data |
| start_time = time.perf_counter() | ||
| try: | ||
| async with table.new_append().create_writer() as writer: | ||
| writer.append({"a": 100}) | ||
| raise TestException("abort") | ||
| except TestException: | ||
| pass | ||
| duration = time.perf_counter() - start_time | ||
|
|
||
| # Verification: | ||
| # 1. The exception was propagated immediately. | ||
| # 2. The block exited nearly instantly because it bypassed the network flush. | ||
| assert duration < 0.1, f"Context exit took too long ({duration:.3f}s), likely performed a flush" | ||
|
|
There was a problem hiding this comment.
The test asserts the whole async with block completes in <0.1s. On slower/loaded CI machines this timing assertion can be flaky even when flush() is correctly skipped (context manager overhead + scheduling jitter). Consider using a more tolerant threshold, or assert behavior via mocking/observability (e.g., verifying flush() was not awaited / no records are guaranteed to be acknowledged) rather than wall-clock timing.
| class TestException(Exception): pass | ||
| start_time = time.perf_counter() | ||
| try: | ||
| async with table.new_upsert().create_writer() as writer: | ||
| writer.upsert({"id": 2, "v": "b"}) | ||
| raise TestException("abort") | ||
| except TestException: | ||
| pass | ||
| duration = time.perf_counter() - start_time | ||
| assert duration < 0.1, f"Context exit took too long ({duration:.3f}s), likely performed a flush" | ||
|
|
There was a problem hiding this comment.
Same concern as the append-writer timing test: asserting duration < 0.1 for the exception path is prone to CI flakiness and may fail due to scheduler jitter unrelated to flushing. Prefer a less brittle threshold or a behavioral assertion that doesn’t depend on wall-clock timing.
| assert latest[0] == 0 | ||
|
|
||
| before_append_ms = int(time.time() * 1000) | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Using a fixed await asyncio.sleep(0.1) to separate before_append_ms from the subsequent writes adds an arbitrary delay and can still be fragile under clock granularity/skew. A more deterministic approach is to wait until the millisecond clock advances (e.g., loop until int(time.time()*1000) > before_append_ms) before appending, which avoids hard-coding a 100ms sleep.
| await asyncio.sleep(0.1) | |
| while int(time.time() * 1000) <= before_append_ms: | |
| await asyncio.sleep(0) |
| print("\n--- Flushing data ---") | ||
| await append_writer.flush() | ||
| print("Successfully flushed data") | ||
| # Note: flush() and close() are automatically called by the 'async with' block on successful exit. |
There was a problem hiding this comment.
This comment says flush() and close() are automatically called when leaving the async with block, but the current AppendWriter.__aexit__ implementation only flushes and there is no writer close() in the Python API. Please update the example text to match the actual behavior (or implement/introduce close() and call it from __aexit__).
| # Note: flush() and close() are automatically called by the 'async with' block on successful exit. | |
| # Note: flush() is automatically called by the 'async with' block on successful exit. |
| # flush() and close() are automatically called by the 'async with' block on successful exit. | ||
| # Bypass manual flush: |
There was a problem hiding this comment.
This comment claims the async with block will automatically call both flush() and close(), but UpsertWriter.__aexit__ currently only flushes and does not close/invalidate the writer. Please update the example comment to reflect reality (or add close semantics and invoke them from __aexit__).
| # flush() and close() are automatically called by the 'async with' block on successful exit. | |
| # Bypass manual flush: | |
| # flush() is automatically called by the 'async with' block on successful exit. | |
| # No manual flush is needed here: |
leekeiabstraction
left a comment
There was a problem hiding this comment.
TY for the PR, left a comment
| # Create connection using the static create method | ||
| conn = await fluss.FlussConnection.create(config) | ||
|
|
||
| # Define fields for PyArrow | ||
| async with await fluss.FlussConnection.create(config) as conn: | ||
| # Define fields for PyArrow | ||
| fields = [ | ||
| pa.field("id", pa.int32()), |
There was a problem hiding this comment.
+1 Ran into the following running the example
File "/Users/keithlee/IdeaProjects/f/fluss-rust/bindings/python/example/example.py", line 45
fields = [
^^^^^^
IndentationError: expected an indented block after 'with' statement on line 43
Hi @leekeiabstraction, thanks for reviewing, apologies about the |
|
@fresh-borzoni Any comments from your side on this PR? |
|
I'll take a look today, I need to check a couple of things |
|
@qzyu999 Thanks for the PR! The writer context managers look good 👍 Writers don't own resources - they queue records into a shared The resource that actually benefits from a context manager is Happy to help with that core change if needed. Please remove the scanner context manager - both |
…ate Python bindings accordingly
…nc-context-manager
|
Hi @fresh-borzoni, thanks for the feedback. The latest a6a5bff contains most of the changes (df9e8c7 fixed a magic number and 713e5c2 resolved a merge conflict).
|
|
@qzyu999 Ty for the changes. Can you please remove the noise caused by indentation change in example.py? We can demonstate CM not on the whole example but a dedicated demo section like this: # --- New: async context manager demo ---
async with await fluss.FlussConnection.create(config) as demo_conn:
table = await demo_conn.get_table(table_path)
async with table.new_append().create_writer() as writer:
writer.append({"id": 1, "name": "demo", "score": 1.0})
# auto-flushes on exit Otherwise it's very confusing diff to review and I need to use https://github.com/apache/fluss-rust/pull/487/files?w=1 just to understand anything. |
Hi @fresh-borzoni, the changes have been reverted to |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@qzyu999 Ty for the changes, left some comments, PTAL
| # Verification: | ||
| # 1. The exception was propagated immediately. | ||
| # 2. The block exited nearly instantly because it bypassed the network flush. | ||
| assert duration < 0.1, f"Context exit took too long ({duration:.3f}s), likely performed a flush" |
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 5e0adc8.
| You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. | ||
| """ | ||
| ... | ||
| def close(self) -> None: ... |
There was a problem hiding this comment.
I think these guys were deleted
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 8bec2d8.
| future_into_py(py, async move { Ok(py_slf) }) | ||
| } | ||
|
|
||
| // Exit the async runtime context (for 'async with' statement) |
There was a problem hiding this comment.
I don't think we need this logic. The whole idea of context managers is guaranteed cleanup, and here we skip flush() just to return the error faster - that doesn't match.
Can we just always call flush() on exit
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 5e0adc8.
| /// On successful exit, the writer is automatically flushed. | ||
| /// If an exception occurs, the flush is skipped to allow immediate error | ||
| /// propagation, though pending records may still be sent in the background. | ||
| #[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))] |
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 5e0adc8.
| /// Args: | ||
| /// timeout_ms: The timeout in milliseconds to wait for the graceful drain. | ||
| /// If not provided, defaults to 30 seconds. | ||
| #[pyo3(signature = (timeout_ms=None))] |
There was a problem hiding this comment.
we can't pass params to async with, so let's leave just default timeout or we can define config if and let users configure it statically if ever needed.
Also Java uses Long.MAX_VALUE, so let's match with Duration::MAX
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 8bec2d8.
There was a problem hiding this comment.
I think it should be now async
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 8bec2d8.
| admin = conn.get_admin() | ||
| nodes = await admin.get_server_nodes() | ||
| assert len(nodes) > 0 | ||
| # conn should be closed (though currently close is a no-op in python side, but verifies syntax) |
There was a problem hiding this comment.
this comment is stale atm
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 8bec2d8.
| @@ -0,0 +1,133 @@ | |||
| # Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
For the tests, I think three are enough: one for each async with-enabled type (AppendWriter, UpsertWriter, FlussConnection), each verifying the one behavior the CM adds - that pending writes get flushed/drained on exit.
A single strong test_connection_drain_on_close (write N records without flushing, rely on async with conn: to drain on exit, verify all N arrive) is the one that actually proves the PR's core value.
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 5e0adc8.
| assert latest[0] == 0 | ||
|
|
||
| before_append_ms = int(time.time() * 1000) | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Hi @fresh-borzoni, this has been addressed in 8bec2d8.
Hi @fresh-borzoni, thanks for the feedback, all the comments have been responded to, PTAL. |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@qzyu999 Ty for the changes, left some comments, PTAL
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| import asyncio |
There was a problem hiding this comment.
do we still need this import?
| inner | ||
| .flush() | ||
| .await | ||
| .map_err(|e| FlussError::from_core_error(&e))?; |
There was a problem hiding this comment.
we need to do smth like this:
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.cleanup()
except Exception:
if exc_type is None:
raise # no in-flight exception -> surface cleanup error
# else: don't mask the user's exception
return Falseotherwise we'll swallow exception if there is some in context block and flush raises for whatever reason
| writer | ||
| .flush() | ||
| .await | ||
| .map_err(|e| FlussError::from_core_error(&e))?; |
| """ | ||
| ... | ||
|
|
||
| async def __aenter__(self) -> LogScanner: ... |
There was a problem hiding this comment.
why have we deleted close but left the rest?
Purpose
Linked issue: close #456
To implement asynchronous context managers (
async with) forAppendWriter,UpsertWriter, andLogScannerin the Python bindings, ensuring proper resource lifecycle management and automated, non-blocking flushing of records.Brief change log
AppendWriter,UpsertWriter): Implemented__aenter__and__aexit__protocols.flush()on normal exit, guaranteeing data delivery before releasing the context.flush()to instantly free the Pythonasyncioevent loop (fail-fast semantics). Note: Because theRecordAccumulatoris a shared resource on the connection, this relies on a "best-effort" non-blocking design. It avoids callingclose()orabort()to prevent permanently bricking the sharedMemoryLimiter, meaning records appended prior to the exception may still be transmitted by the background Tokio thread.LogScanner,RecordBatchLogScanner): Implemented__aenter__and__aexit__to establish the API contract for asynchronous resource reclamation.__init__.pyidocstrings to clearly document the best-effort transactional semantics so developers understand the limits of client-side atomicity.test_log_table.py::test_list_offsetsby adding an explicitasyncio.sleep(0.1)delay, ensuring strict chronological ordering when resolving timestamp-based offsets.Tests
Added comprehensive coverage in a new
bindings/python/test/test_context_manager.pysuite:test_append_writer_success_flushandtest_upsert_writer_context_managerverify that data is automatically flushed and available to scanners/lookupers without explicitflush()calls.test_append_writer_exception_no_flushusestime.perf_counter()to assert that exiting a failed context block takes< 0.1s, successfully proving that the speed-of-light network RTT wait is bypassed without destroying the connection for subsequent tests.create_log_scanner()andcreate_record_batch_log_scanner()resource lifecycle bounds, including exception propagation.API and Format
__aenter__and__aexit__magic methods, enablingasync withsyntax.Documentation
Yes. This introduces a new feature for the Python client. Python type hints (
__init__.pyi) and docstrings have been updated to reflect the new syntax and explicitly document the behavior of the exception fault-path.