Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,15 @@ async def main():
print(f"Error with partitioned KV table: {e}")
traceback.print_exc()


print("\n--- New: async context manager demo ---")
async with await fluss.FlussConnection.create(config) as demo_conn:
demo_table = await demo_conn.get_table(table_path)
async with demo_table.new_append().create_writer() as writer:
writer.append({"id": 1, "name": "demo", "score": 1.0})
# auto-flushes on exit
# Close connection
conn.close()
await conn.close()
print("\nConnection closed")


Expand Down
60 changes: 59 additions & 1 deletion bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,21 @@ class FlussConnection:
async def create(config: Config) -> FlussConnection: ...
def get_admin(self) -> FlussAdmin: ...
async def get_table(self, table_path: TablePath) -> FlussTable: ...
def close(self) -> None: ...
async def close(self) -> None: ...
def __enter__(self) -> FlussConnection: ...
def __exit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool: ...
async def __aenter__(self) -> FlussConnection: ...
async def __aexit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool: ...
def __repr__(self) -> str: ...

class ServerNode:
Expand Down Expand Up @@ -611,6 +618,27 @@ class AppendWriter:
def write_arrow_batch(self, batch: pa.RecordBatch) -> WriteResultHandle: ...
def write_pandas(self, df: pd.DataFrame) -> None: ...
async def flush(self) -> None: ...
async def __aenter__(self) -> AppendWriter:
"""
Enter the async context manager.

Returns:
The AppendWriter instance.
"""
...
async def __aexit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool:
"""
Exit the async context manager.

On exit, the writer is automatically flushed to ensure
all pending records are sent and acknowledged.
"""
...
def __repr__(self) -> str: ...

class UpsertWriter:
Expand Down Expand Up @@ -644,6 +672,27 @@ class UpsertWriter:
async def flush(self) -> None:
"""Flush all pending upsert/delete operations to the server."""
...
async def __aenter__(self) -> UpsertWriter:
"""
Enter the async context manager.

Returns:
The UpsertWriter instance.
"""
...
async def __aexit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool:
"""
Exit the async context manager.

On exit, the writer is automatically flushed to ensure
all pending records are sent and acknowledged.
"""
...
def __repr__(self) -> str: ...


Expand Down Expand Up @@ -807,6 +856,15 @@ class LogScanner:

You must call subscribe(), subscribe_buckets(), or subscribe_partition() first.
"""
...

async def __aenter__(self) -> LogScanner: ...
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have we deleted close but left the rest?

async def __aexit__(
self,
exc_type: Optional[type],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool: ...
def __repr__(self) -> str: ...
def __aiter__(self) -> AsyncIterator[Union[ScanRecord, RecordBatch]]: ...

Expand Down
45 changes: 41 additions & 4 deletions bindings/python/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::*;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
use std::time::Duration;

/// Connection to a Fluss cluster
#[pyclass]
Expand Down Expand Up @@ -82,9 +83,19 @@ impl FlussConnection {
})
}

// Close the connection
fn close(&mut self) -> PyResult<()> {
Ok(())
/// Close the connection (async).
///
/// Gracefully shuts down the connection by draining any pending write batches.
/// This method is awaitable.
fn close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();

future_into_py(py, async move {
inner
.close(Duration::MAX)
.await
.map_err(|e| FlussError::from_core_error(&e))
})
}

// Enter the runtime context (for 'with' statement)
Expand All @@ -100,10 +111,36 @@ impl FlussConnection {
_exc_value: Option<Bound<'_, PyAny>>,
_traceback: Option<Bound<'_, PyAny>>,
) -> PyResult<bool> {
self.close()?;
// Sync exit cannot await the graceful drain, so it's a no-op here.
// Users should use 'async with' for graceful shutdown.
Ok(false)
}

// Enter the async runtime context (for 'async with' statement)
fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let py_slf = slf.into_pyobject(py)?.unbind();
future_into_py(py, async move { Ok(py_slf) })
}

// Exit the async runtime context (for 'async with' statement)
#[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
fn __aexit__<'py>(
&self,
py: Python<'py>,
_exc_type: Option<Bound<'py, PyAny>>,
_exc_value: Option<Bound<'py, PyAny>>,
_traceback: Option<Bound<'py, PyAny>>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
inner
.close(Duration::MAX)
.await
.map_err(|e| FlussError::from_core_error(&e))?;
Ok(false)
})
}
Comment on lines +125 to +142
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__exit__ calls self.close()?, but the new async context manager __aexit__ does not. This means async with await FlussConnection.create(...) will not close the connection (even if close() is implemented later). Please mirror __exit__ by calling self.close()? in __aexit__ (it can be done before creating the future since close() is synchronous).

Copilot uses AI. Check for mistakes.

fn __repr__(&self) -> String {
"FlussConnection()".to_string()
}
Expand Down
26 changes: 26 additions & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,32 @@ impl AppendWriter {
})
}

// Enter the async runtime context (for 'async with' statement)
fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let py_slf = slf.into_pyobject(py)?.unbind();
future_into_py(py, async move { Ok(py_slf) })
}

// Exit the async runtime context (for 'async with' statement)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this logic. The whole idea of context managers is guaranteed cleanup, and here we skip flush() just to return the error faster - that doesn't match.

Can we just always call flush() on exit

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fresh-borzoni, this has been addressed in 5e0adc8.

/// On exit, the writer is automatically flushed.
#[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
fn __aexit__<'py>(
&self,
py: Python<'py>,
_exc_type: Option<Bound<'py, PyAny>>,
_exc_value: Option<Bound<'py, PyAny>>,
_traceback: Option<Bound<'py, PyAny>>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
inner
.flush()
.await
.map_err(|e| FlussError::from_core_error(&e))?;
Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to do smth like this:

  def __exit__(self, exc_type, exc_val, exc_tb):
      try:
          self.cleanup()
      except Exception:
          if exc_type is None:
              raise        # no in-flight exception -> surface cleanup error
          # else: don't mask the user's exception
      return False

otherwise we'll swallow exception if there is some in context block and flush raises for whatever reason

Ok(false)
})
}
Comment on lines +998 to +1016
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AppendWriter.__aexit__ only flushes on the success path and never closes/invalidates the writer. After leaving async with, the writer object remains fully usable and any underlying resources are not deterministically released, which doesn’t match the linked issue’s “flush then close / close on exception” contract. Consider adding an explicit close() (even if initially a no-op) and calling it from __aexit__, or otherwise marking the writer as closed so further writes fail fast.

Copilot uses AI. Check for mistakes.

fn __repr__(&self) -> String {
"AppendWriter()".to_string()
}
Expand Down
26 changes: 26 additions & 0 deletions bindings/python/src/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,32 @@ impl UpsertWriter {
})
}

// Enter the async runtime context (for 'async with' statement)
fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let py_slf = slf.into_pyobject(py)?.unbind();
future_into_py(py, async move { Ok(py_slf) })
}

// Exit the async runtime context (for 'async with' statement)
/// On exit, the writer is automatically flushed.
#[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
fn __aexit__<'py>(
&self,
py: Python<'py>,
_exc_type: Option<Bound<'py, PyAny>>,
_exc_value: Option<Bound<'py, PyAny>>,
_traceback: Option<Bound<'py, PyAny>>,
) -> PyResult<Bound<'py, PyAny>> {
let writer = self.writer.clone();
future_into_py(py, async move {
writer
.flush()
.await
.map_err(|e| FlussError::from_core_error(&e))?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Ok(false)
})
Comment on lines +117 to +134
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpsertWriter.__aexit__ only flushes on the success path and does not close/invalidate the writer in either path. This means leaving an async with block does not actually end the writer’s lifecycle, which diverges from the linked issue’s expected semantics. Consider adding/using an explicit close() and invoking it from __aexit__ (flush+close on success; close-only on exception).

Copilot uses AI. Check for mistakes.
}

fn __repr__(&self) -> String {
"UpsertWriter()".to_string()
}
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async def _connect(bootstrap_servers):
nodes = await admin.get_server_nodes()
if any(n.server_type == "TabletServer" for n in nodes):
return conn
conn.close()
await conn.close()
last_err = RuntimeError("No TabletServer available yet")
except Exception as e:
last_err = e
Expand Down
117 changes: 117 additions & 0 deletions bindings/python/test/test_context_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the tests, I think three are enough: one for each async with-enabled type (AppendWriter, UpsertWriter, FlussConnection), each verifying the one behavior the CM adds - that pending writes get flushed/drained on exit.
A single strong test_connection_drain_on_close (write N records without flushing, rely on async with conn: to drain on exit, verify all N arrive) is the one that actually proves the PR's core value.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fresh-borzoni, this has been addressed in 5e0adc8.

# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import asyncio
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this import?

import pytest
import pyarrow as pa
import time
import fluss

def _poll_records(scanner, expected_count, timeout_s=10):
"""Poll a record-based scanner until expected_count records are collected."""
collected = []
deadline = time.monotonic() + timeout_s
while len(collected) < expected_count and time.monotonic() < deadline:
records = scanner.poll(5000)
collected.extend(records)
return collected

@pytest.mark.asyncio
async def test_connection_context_manager(plaintext_bootstrap_servers):
config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers})
async with await fluss.FlussConnection.create(config) as conn:
admin = conn.get_admin()
nodes = await admin.get_server_nodes()
assert len(nodes) > 0


@pytest.mark.asyncio
async def test_append_writer_success_flush(connection, admin):
table_path = fluss.TablePath("fluss", "test_append_ctx_success")
await admin.drop_table(table_path, ignore_if_not_exists=True)

schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())]))
await admin.create_table(table_path, fluss.TableDescriptor(schema))

table = await connection.get_table(table_path)

async with table.new_append().create_writer() as writer:
writer.append({"a": 1})
writer.append({"a": 2})
# No explicit flush here

# After context exit, data should be flushed
scanner = await table.new_scan().create_log_scanner()
scanner.subscribe(0, fluss.EARLIEST_OFFSET)
records = _poll_records(scanner, expected_count=2)
assert len(records) == 2
assert sorted([r.row["a"] for r in records]) == [1, 2]

@pytest.mark.asyncio
async def test_connection_drain_on_close(plaintext_bootstrap_servers, admin):
table_path = fluss.TablePath("fluss", "test_conn_drain")
await admin.drop_table(table_path, ignore_if_not_exists=True)
schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())]))
await admin.create_table(table_path, fluss.TableDescriptor(schema))

config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers})
async with await fluss.FlussConnection.create(config) as conn:
table = await conn.get_table(table_path)
writer = table.new_append().create_writer()
writer.append({"a": 123})
# No explicit flush, no writer context exit.
# Rely on connection.__aexit__ -> close() to drain.

# Re-connect with a new connection to verify data arrived
async with await fluss.FlussConnection.create(config) as conn2:
table2 = await conn2.get_table(table_path)
scanner = await table2.new_scan().create_log_scanner()
scanner.subscribe(0, fluss.EARLIEST_OFFSET)
records = _poll_records(scanner, expected_count=1)
assert len(records) == 1
assert records[0].row["a"] == 123

@pytest.mark.asyncio
async def test_upsert_writer_context_manager(connection, admin):
table_path = fluss.TablePath("fluss", "test_upsert_ctx")
await admin.drop_table(table_path, ignore_if_not_exists=True)

schema = fluss.Schema(pa.schema([pa.field("id", pa.int32()), pa.field("v", pa.string())]), primary_keys=["id"])
await admin.create_table(table_path, fluss.TableDescriptor(schema))

table = await connection.get_table(table_path)

# Success path: verify it flushes
async with table.new_upsert().create_writer() as writer:
writer.upsert({"id": 1, "v": "a"})

lookuper = table.new_lookup().create_lookuper()
res = await lookuper.lookup({"id": 1})
assert res is not None
assert res["v"] == "a"

@pytest.mark.asyncio
async def test_connection_context_manager_exception(plaintext_bootstrap_servers):
config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers})
class TestException(Exception): pass

try:
async with await fluss.FlussConnection.create(config) as conn:
raise TestException("connection error")
except TestException:
pass
# If we reach here without hanging, the connection __aexit__ gracefully handled the error
4 changes: 2 additions & 2 deletions bindings/python/test/test_sasl_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def test_sasl_connect_with_valid_credentials(sasl_bootstrap_servers):

# Cleanup
await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
conn.close()
await conn.close()


async def test_sasl_connect_with_second_user(sasl_bootstrap_servers):
Expand All @@ -62,7 +62,7 @@ async def test_sasl_connect_with_second_user(sasl_bootstrap_servers):

# Basic operation to confirm functional connection
assert not await admin.database_exists("some_nonexistent_db_alice")
conn.close()
await conn.close()


async def test_sasl_connect_with_wrong_password(sasl_bootstrap_servers):
Expand Down
Loading