From 55ea1b27224db0f04c58a1a90b914a74e4058fa7 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 17 Apr 2026 05:31:39 +0100 Subject: [PATCH] feat: make LogScanner poll methods async to prevent event loop blocking Convert poll(), poll_record_batch(), poll_arrow(), to_arrow(), to_pandas() from sync (py.detach + block_on) to async (future_into_py). The sync methods blocked the asyncio event loop thread, preventing concurrent future_into_py tasks from delivering results. This caused deadlocks when users ran multiple async operations simultaneously. Breaking change: these methods now return awaitables instead of direct values. --- bindings/python/example/example.py | 26 +- bindings/python/fluss/__init__.pyi | 10 +- bindings/python/pyproject.toml | 2 +- bindings/python/src/table.rs | 547 +++++++----------- bindings/python/test/conftest.py | 16 +- bindings/python/test/test_log_table.py | 56 +- .../docs/user-guide/python/api-reference.md | 12 +- website/docs/user-guide/python/data-types.md | 2 +- .../docs/user-guide/python/example/index.md | 2 +- .../user-guide/python/example/log-tables.md | 8 +- .../python/example/partitioned-tables.md | 2 +- 11 files changed, 286 insertions(+), 397 deletions(-) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 52cefe1e..15ac9d78 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -278,7 +278,7 @@ async def main(): # Try to get as PyArrow Table try: - pa_table_result = batch_scanner.to_arrow() + pa_table_result = await batch_scanner.to_arrow() print(f"\nAs PyArrow Table: {pa_table_result}") except Exception as e: print(f"Could not convert to PyArrow: {e}") @@ -289,7 +289,7 @@ async def main(): # Try to get as Pandas DataFrame try: - df_result = batch_scanner2.to_pandas() + df_result = await batch_scanner2.to_pandas() print(f"\nAs Pandas DataFrame:\n{df_result}") except Exception as e: print(f"Could not convert to Pandas: {e}") @@ -308,7 +308,7 @@ async def main(): # Poll with a timeout of 5000ms (5 seconds) # Note: poll_arrow() returns an empty table (not an error) on timeout try: - poll_result = batch_scanner3.poll_arrow(5000) + poll_result = await batch_scanner3.poll_arrow(5000) print(f"Number of rows: {poll_result.num_rows}") if poll_result.num_rows > 0: @@ -328,7 +328,7 @@ async def main(): batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) try: - batches = batch_scanner4.poll_record_batch(5000) + batches = await batch_scanner4.poll_record_batch(5000) print(f"Number of batches: {len(batches)}") for i, batch in enumerate(batches): @@ -354,7 +354,7 @@ async def main(): # Poll returns ScanRecords — records grouped by bucket print("\n--- Testing poll() method (record-by-record) ---") try: - scan_records = record_scanner.poll(5000) + scan_records = await record_scanner.poll(5000) print(f"Total records: {scan_records.count()}, buckets: {len(scan_records.buckets())}") # Flat iteration over all records (regardless of bucket) @@ -387,7 +387,7 @@ async def main(): # Unsubscribe from bucket 0 — future polls will skip this bucket unsub_scanner.unsubscribe(bucket_id=0) print("Unsubscribed from bucket 0") - remaining = unsub_scanner.poll_arrow(5000) + remaining = await unsub_scanner.poll_arrow(5000) print(f"After unsubscribe, got {remaining.num_rows} records (from remaining buckets)") except Exception as e: print(f"Error during unsubscribe test: {e}") @@ -640,7 +640,7 @@ async def main(): print("\n1. Projection by index [0, 1] (id, name):") scanner_index = await table.new_scan().project([0, 1]).create_record_batch_log_scanner() scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - df_projected = scanner_index.to_pandas() + df_projected = await scanner_index.to_pandas() print(df_projected.head()) print( f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}" @@ -652,7 +652,7 @@ async def main(): .project_by_name(["name", "score"]) \ .create_record_batch_log_scanner() scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - df_named = scanner_names.to_pandas() + df_named = await scanner_names.to_pandas() print(df_named.head()) print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}") @@ -661,7 +661,7 @@ async def main(): scanner_proj = await table.new_scan().project([0, 2]).create_record_batch_log_scanner() scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) # Quick poll that may return empty - result = scanner_proj.poll_arrow(100) + result = await scanner_proj.poll_arrow(100) print(f" Schema columns: {result.schema.names}") except Exception as e: @@ -781,7 +781,7 @@ async def main(): print(f"Subscribed to partition {p.partition_name} (id={p.partition_id})") # Use to_arrow() - now works for partitioned tables! - partitioned_arrow = partitioned_scanner.to_arrow() + partitioned_arrow = await partitioned_scanner.to_arrow() print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:") print(partitioned_arrow.to_pandas()) @@ -793,7 +793,7 @@ async def main(): } partitioned_scanner_batch.subscribe_partition_buckets(partition_bucket_offsets) print(f"Batch subscribed to {len(partition_bucket_offsets)} partition+bucket combinations") - partitioned_batch_arrow = partitioned_scanner_batch.to_arrow() + partitioned_batch_arrow = await partitioned_scanner_batch.to_arrow() print(f"to_arrow() returned {partitioned_batch_arrow.num_rows} records:") print(partitioned_batch_arrow.to_pandas()) @@ -806,7 +806,7 @@ async def main(): first_partition = partition_infos[0] partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0) print(f"Unsubscribed from partition {first_partition.partition_name} (id={first_partition.partition_id})") - remaining_arrow = partitioned_scanner3.to_arrow() + remaining_arrow = await partitioned_scanner3.to_arrow() print(f"After unsubscribe, to_arrow() returned {remaining_arrow.num_rows} records (from remaining partitions):") print(remaining_arrow.to_pandas()) @@ -815,7 +815,7 @@ async def main(): partitioned_scanner2 = await partitioned_table.new_scan().create_record_batch_log_scanner() for p in partition_infos: partitioned_scanner2.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) - partitioned_df = partitioned_scanner2.to_pandas() + partitioned_df = await partitioned_scanner2.to_pandas() print(f"to_pandas() returned {len(partitioned_df)} records:") print(partitioned_df) diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 02edcdb3..8268eae3 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -741,7 +741,7 @@ class LogScanner: bucket_id: The bucket ID within the partition """ ... - def poll(self, timeout_ms: int) -> ScanRecords: + async def poll(self, timeout_ms: int) -> ScanRecords: """Poll for individual records with metadata. Requires a record-based scanner (created with new_scan().create_log_scanner()). @@ -758,7 +758,7 @@ class LogScanner: Returns an empty ScanRecords if no records are available or timeout expires. """ ... - def poll_record_batch(self, timeout_ms: int) -> List[RecordBatch]: + async def poll_record_batch(self, timeout_ms: int) -> List[RecordBatch]: """Poll for batches with metadata. Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()). @@ -774,7 +774,7 @@ class LogScanner: Returns an empty list if no batches are available or timeout expires. """ ... - def poll_arrow(self, timeout_ms: int) -> pa.Table: + async def poll_arrow(self, timeout_ms: int) -> pa.Table: """Poll for records as an Arrow Table. Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()). @@ -790,7 +790,7 @@ class LogScanner: or timeout expires. """ ... - def to_pandas(self) -> pd.DataFrame: + async def to_pandas(self) -> pd.DataFrame: """Convert all data to Pandas DataFrame. Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()). @@ -799,7 +799,7 @@ class LogScanner: You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. """ ... - def to_arrow(self) -> pa.Table: + async def to_arrow(self) -> pa.Table: """Convert all data to Arrow Table. Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()). diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index 22e64188..56a059c9 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -95,7 +95,7 @@ known-first-party = ["fluss"] [tool.pytest.ini_options] asyncio_mode = "auto" -asyncio_default_fixture_loop_scope = "function" +asyncio_default_fixture_loop_scope = "session" timeout = 120 [tool.mypy] diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index c1b46734..f349c6ea 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -535,7 +535,7 @@ impl TableScan { admin, table_info, projected_schema, - projected_row_type, + Arc::new(projected_row_type), ); Python::attach(|py| Py::new(py, py_scanner)) @@ -1984,9 +1984,9 @@ pub struct LogScanner { /// The projected Arrow schema to use for empty table creation projected_schema: SchemaRef, /// The projected row type to use for record-based scanning - projected_row_type: fcore::metadata::RowType, + projected_row_type: Arc, /// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls) - partition_name_cache: std::sync::RwLock>>, + partition_name_cache: Arc>>>, } #[pymethods] @@ -2103,9 +2103,7 @@ impl LogScanner { /// - Requires a record-based scanner (created with new_scan().create_log_scanner()) /// - Returns an empty ScanRecords if no records are available /// - When timeout expires, returns an empty ScanRecords (NOT an error) - fn poll(&self, py: Python, timeout_ms: i64) -> PyResult { - let scanner = self.kind.as_record()?; - + fn poll<'py>(&self, py: Python<'py>, timeout_ms: i64) -> PyResult> { if timeout_ms < 0 { return Err(FlussError::new_err(format!( "timeout_ms must be non-negative, got: {timeout_ms}" @@ -2113,29 +2111,36 @@ impl LogScanner { } let timeout = Duration::from_millis(timeout_ms as u64); - let scan_records = py - .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) - .map_err(|e| FlussError::from_core_error(&e))?; + let scanner = Arc::clone(&self.kind); + let projected_row_type = self.projected_row_type.clone(); - // Convert core ScanRecords to Python ScanRecords grouped by bucket - let row_type = &self.projected_row_type; - let mut records_by_bucket = IndexMap::new(); - let mut total_count = 0usize; - - for (bucket, records) in scan_records.into_records_by_buckets() { - let py_bucket = TableBucket::from_core(bucket); - let mut py_records = Vec::with_capacity(records.len()); - for record in &records { - let scan_record = ScanRecord::from_core(py, record, row_type)?; - py_records.push(Py::new(py, scan_record)?); - total_count += 1; - } - records_by_bucket.insert(py_bucket, py_records); - } + future_into_py(py, async move { + let scan_records = scanner + .as_record()? + .poll(timeout) + .await + .map_err(|e| FlussError::from_core_error(&e))?; - Ok(ScanRecords { - records_by_bucket, - total_count, + Python::attach(|py| { + let mut records_by_bucket = IndexMap::new(); + let mut total_count = 0usize; + + for (bucket, records) in scan_records.into_records_by_buckets() { + let py_bucket = TableBucket::from_core(bucket); + let mut py_records = Vec::with_capacity(records.len()); + for record in &records { + let scan_record = ScanRecord::from_core(py, record, &projected_row_type)?; + py_records.push(Py::new(py, scan_record)?); + total_count += 1; + } + records_by_bucket.insert(py_bucket, py_records); + } + + Ok(ScanRecords { + records_by_bucket, + total_count, + }) + }) }) } @@ -2152,9 +2157,11 @@ impl LogScanner { /// - Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()) /// - Returns an empty list if no batches are available /// - When timeout expires, returns an empty list (NOT an error) - fn poll_record_batch(&self, py: Python, timeout_ms: i64) -> PyResult> { - let scanner = self.kind.as_batch()?; - + fn poll_record_batch<'py>( + &self, + py: Python<'py>, + timeout_ms: i64, + ) -> PyResult> { if timeout_ms < 0 { return Err(FlussError::new_err(format!( "timeout_ms must be non-negative, got: {timeout_ms}" @@ -2162,17 +2169,22 @@ impl LogScanner { } let timeout = Duration::from_millis(timeout_ms as u64); - let scan_batches = py - .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) - .map_err(|e| FlussError::from_core_error(&e))?; + let scanner = Arc::clone(&self.kind); - // Convert ScanBatch to RecordBatch with metadata - let result = scan_batches - .into_iter() - .map(RecordBatch::from_scan_batch) - .collect(); + future_into_py(py, async move { + let scan_batches = scanner + .as_batch()? + .poll(timeout) + .await + .map_err(|e| FlussError::from_core_error(&e))?; - Ok(result) + Python::attach(|py| { + scan_batches + .into_iter() + .map(|sb| Py::new(py, RecordBatch::from_scan_batch(sb))) + .collect::>>() + }) + }) } /// Poll for new records as an Arrow Table. @@ -2187,9 +2199,7 @@ impl LogScanner { /// - Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()) /// - Returns an empty table (with correct schema) if no records are available /// - When timeout expires, returns an empty table (NOT an error) - fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult> { - let scanner = self.kind.as_batch()?; - + fn poll_arrow<'py>(&self, py: Python<'py>, timeout_ms: i64) -> PyResult> { if timeout_ms < 0 { return Err(FlussError::new_err(format!( "timeout_ms must be non-negative, got: {timeout_ms}" @@ -2197,38 +2207,23 @@ impl LogScanner { } let timeout = Duration::from_millis(timeout_ms as u64); - let scan_batches = py - .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) - .map_err(|e| FlussError::from_core_error(&e))?; - - // Convert ScanBatch to Arrow batches - if scan_batches.is_empty() { - return self.create_empty_table(py); - } - - let arrow_batches: Vec<_> = scan_batches - .into_iter() - .map(|scan_batch| Arc::new(scan_batch.into_batch())) - .collect(); - - Utils::combine_batches_to_table(py, arrow_batches) - } + let scanner = Arc::clone(&self.kind); + let projected_schema = self.projected_schema.clone(); - /// Create an empty PyArrow table with the correct (projected) schema - fn create_empty_table(&self, py: Python) -> PyResult> { - // Use the projected schema stored in the scanner - let py_schema = self - .projected_schema - .as_ref() - .to_pyarrow(py) - .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; + future_into_py(py, async move { + let scan_batches = scanner + .as_batch()? + .poll(timeout) + .await + .map_err(|e| FlussError::from_core_error(&e))?; - let pyarrow = py.import("pyarrow")?; - let empty_table = pyarrow - .getattr("Table")? - .call_method1("from_batches", (vec![] as Vec>, py_schema))?; + let arrow_batches = scan_batches + .into_iter() + .map(|sb| Arc::new(sb.into_batch())) + .collect(); - Ok(empty_table.into()) + Python::attach(|py| Self::batches_to_arrow_table(py, arrow_batches, &projected_schema)) + }) } /// Convert all data to Arrow Table. @@ -2240,21 +2235,33 @@ impl LogScanner { /// /// Returns: /// PyArrow Table containing all data from subscribed buckets - fn to_arrow(&self, py: Python) -> PyResult> { - let scanner = self.kind.as_batch()?; - let subscribed = scanner.get_subscribed_buckets(); + fn to_arrow<'py>(&self, py: Python<'py>) -> PyResult> { + let kind = Arc::clone(&self.kind); + let admin = Arc::clone(&self.admin); + let table_info = self.table_info.clone(); + let projected_schema = self.projected_schema.clone(); + let partition_name_cache = Arc::clone(&self.partition_name_cache); - if subscribed.is_empty() { - return Err(FlussError::new_err( - "No buckets subscribed. Call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.", - )); - } + future_into_py(py, async move { + let scanner = kind.as_batch()?; + let subscribed = scanner.get_subscribed_buckets(); + if subscribed.is_empty() { + return Err(FlussError::new_err( + "No buckets subscribed. Call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.", + )); + } - // 2. Query latest offsets for all subscribed buckets - let stopping_offsets = self.query_latest_offsets(py, &subscribed)?; + let all_batches = Self::collect_all_batches( + scanner, + &admin, + &table_info, + &subscribed, + &partition_name_cache, + ) + .await?; - // 3. Poll until all buckets reach their stopping offsets - self.poll_until_offsets(py, stopping_offsets) + Python::attach(|py| Self::batches_to_arrow_table(py, all_batches, &projected_schema)) + }) } /// Convert all data to Pandas DataFrame. @@ -2266,12 +2273,36 @@ impl LogScanner { /// /// Returns: /// Pandas DataFrame containing all data from subscribed buckets - fn to_pandas(&self, py: Python) -> PyResult> { - let arrow_table = self.to_arrow(py)?; + fn to_pandas<'py>(&self, py: Python<'py>) -> PyResult> { + let kind = Arc::clone(&self.kind); + let admin = Arc::clone(&self.admin); + let table_info = self.table_info.clone(); + let projected_schema = self.projected_schema.clone(); + let partition_name_cache = Arc::clone(&self.partition_name_cache); + + future_into_py(py, async move { + let scanner = kind.as_batch()?; + let subscribed = scanner.get_subscribed_buckets(); + if subscribed.is_empty() { + return Err(FlussError::new_err( + "No buckets subscribed. Call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.", + )); + } + + let all_batches = Self::collect_all_batches( + scanner, + &admin, + &table_info, + &subscribed, + &partition_name_cache, + ) + .await?; - // Convert Arrow Table to Pandas DataFrame using pyarrow - let df = arrow_table.call_method0(py, "to_pandas")?; - Ok(df) + Python::attach(|py| { + let arrow_table = Self::batches_to_arrow_table(py, all_batches, &projected_schema)?; + arrow_table.call_method0(py, "to_pandas") + }) + }) } fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult> { @@ -2283,14 +2314,11 @@ impl LogScanner { let gen_fn = ASYNC_GEN_FN.get_or_init(py, || { let code = pyo3::ffi::c_str!( r#" -async def _async_scan_generic(scanner, method_name): - # Dynamically resolve the polling method (e.g., _async_poll or _async_poll_batches) +async def _async_scan_generic(scanner, method_name, timeout_ms): poll_method = getattr(scanner, method_name) while True: - items = await poll_method() - if items: - for item in items: - yield item + for item in await poll_method(timeout_ms): + yield item "# ); let globals = pyo3::types::PyDict::new(py); @@ -2302,106 +2330,16 @@ async def _async_scan_generic(scanner, method_name): .unbind() }); - // Determine which internal method to call based on the scanner kind let method_name = match slf.kind.as_ref() { - ScannerKind::Record(_) => "_async_poll", - ScannerKind::Batch(_) => "_async_poll_batches", + ScannerKind::Record(_) => "poll", + ScannerKind::Batch(_) => "poll_record_batch", }; - // Instantiate the generator with the scanner instance and the target method name - gen_fn - .bind(py) - .call1((slf.into_bound_py_any(py)?, method_name)) - } - - /// Perform a single bounded poll and return a list of ScanRecord objects. - /// - /// This is the async building block used by `__aiter__` (record mode) to - /// implement `async for`. Each call does exactly one network poll (bounded - /// by `DEFAULT_POLL_INTERVAL_MS`), converts any results to Python ScanRecord objects, - /// and returns them as a list. An empty list signals a timeout (no data yet), not - /// end-of-stream. - /// - /// Returns: - /// Awaitable that resolves to a list of ScanRecord objects - fn _async_poll<'py>(&self, py: Python<'py>) -> PyResult> { - let timeout = Duration::from_millis(DEFAULT_POLL_INTERVAL_MS as u64); - - let scanner = Arc::clone(&self.kind); - let projected_row_type = self.projected_row_type.clone(); - - future_into_py(py, async move { - let core_scanner = match scanner.as_ref() { - ScannerKind::Record(s) => s, - ScannerKind::Batch(_) => { - return Err(PyTypeError::new_err( - "This internal method only supports record-based scanners. \ - For batch-based scanners, use 'async for' or 'poll_record_batch' instead.", - )); - } - }; - - let scan_records = core_scanner - .poll(timeout) - .await - .map_err(|e| FlussError::from_core_error(&e))?; - - // Convert to Python list - Python::attach(|py| { - let mut result: Vec> = Vec::new(); - for (_, records) in scan_records.into_records_by_buckets() { - for core_record in records { - let scan_record = - ScanRecord::from_core(py, &core_record, &projected_row_type)?; - result.push(Py::new(py, scan_record)?); - } - } - Ok(result) - }) - }) - } - - /// Perform a single bounded poll and return a list of RecordBatch objects. - /// - /// This is the async building block used by `__aiter__` (batch mode) to - /// implement `async for`. Each call does exactly one network poll (bounded - /// by `DEFAULT_POLL_INTERVAL_MS`), converts any results to Python RecordBatch objects, - /// and returns them as a list. An empty list signals a timeout (no data - /// yet), not end-of-stream. - /// - /// Returns: - /// Awaitable that resolves to a list of RecordBatch objects - fn _async_poll_batches<'py>(&self, py: Python<'py>) -> PyResult> { - let timeout = Duration::from_millis(DEFAULT_POLL_INTERVAL_MS as u64); - - let scanner = Arc::clone(&self.kind); - - future_into_py(py, async move { - let core_scanner = match scanner.as_ref() { - ScannerKind::Batch(s) => s, - ScannerKind::Record(_) => { - return Err(PyTypeError::new_err( - "This internal method only supports batch-based scanners. \ - For record-based scanners, use 'async for' or 'poll' instead.", - )); - } - }; - - let scan_batches = core_scanner - .poll(timeout) - .await - .map_err(|e| FlussError::from_core_error(&e))?; - - // Convert to Python list of RecordBatch objects - Python::attach(|py| { - let mut result: Vec> = Vec::new(); - for scan_batch in scan_batches { - let rb = RecordBatch::from_scan_batch(scan_batch); - result.push(Py::new(py, rb)?); - } - Ok(result) - }) - }) + gen_fn.bind(py).call1(( + slf.into_bound_py_any(py)?, + method_name, + DEFAULT_POLL_INTERVAL_MS, + )) } fn __repr__(&self) -> String { @@ -2415,7 +2353,7 @@ impl LogScanner { admin: Arc, table_info: fcore::metadata::TableInfo, projected_schema: SchemaRef, - projected_row_type: fcore::metadata::RowType, + projected_row_type: Arc, ) -> Self { Self { kind: Arc::new(scanner), @@ -2423,73 +2361,52 @@ impl LogScanner { table_info, projected_schema, projected_row_type, - partition_name_cache: std::sync::RwLock::new(None), + partition_name_cache: Arc::new(std::sync::RwLock::new(None)), } } - /// Get partition_id -> partition_name mapping, using cache if available - fn get_partition_name_map( - &self, - py: Python, - table_path: &fcore::metadata::TablePath, - ) -> PyResult> { - // Check cache first (read lock) - { - let cache = self.partition_name_cache.read().unwrap(); - if let Some(map) = cache.as_ref() { - return Ok(map.clone()); - } - } - - // Fetch partition infos (releases GIL during async call) - let partition_infos: Vec = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { self.admin.list_partition_infos(table_path).await }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - // Build and cache the mapping - let map: HashMap = partition_infos - .into_iter() - .map(|info| (info.get_partition_id(), info.get_partition_name())) - .collect(); - - // Store in cache (write lock) - { - let mut cache = self.partition_name_cache.write().unwrap(); - *cache = Some(map.clone()); + /// Convert Arrow record batches to a PyArrow Table (or empty table if no batches). + fn batches_to_arrow_table( + py: Python<'_>, + batches: Vec>, + projected_schema: &SchemaRef, + ) -> PyResult> { + if batches.is_empty() { + let py_schema = projected_schema + .as_ref() + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; + let pyarrow = py.import("pyarrow")?; + let empty_table = pyarrow + .getattr("Table")? + .call_method1("from_batches", (vec![] as Vec>, py_schema))?; + Ok(empty_table.into()) + } else { + Utils::combine_batches_to_table(py, batches) } - - Ok(map) } - /// Query latest offsets for subscribed buckets (handles both partitioned and non-partitioned) - fn query_latest_offsets( - &self, - py: Python, + /// Query stopping offsets and poll until all subscribed buckets are fully read. + /// Returns collected Arrow record batches. + async fn collect_all_batches( + scanner: &fcore::client::RecordBatchLogScanner, + admin: &fcore::client::FlussAdmin, + table_info: &fcore::metadata::TableInfo, subscribed: &[(fcore::metadata::TableBucket, i64)], - ) -> PyResult> { - let scanner = self.kind.as_batch()?; + partition_name_cache: &std::sync::RwLock>>, + ) -> PyResult>> { let is_partitioned = scanner.is_partitioned(); - let table_path = &self.table_info.table_path; + let table_path = &table_info.table_path; + let table_id = table_info.table_id; - if !is_partitioned { - // Non-partitioned: simple case - just query all bucket IDs + // 1. Query latest offsets + let mut stopping_offsets: HashMap = if !is_partitioned { let bucket_ids: Vec = subscribed.iter().map(|(tb, _)| tb.bucket_id()).collect(); - - let offsets: HashMap = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { - self.admin - .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest) - .await - }) - }) + let offsets = admin + .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest) + .await .map_err(|e| FlussError::from_core_error(&e))?; - - // Convert to TableBucket-keyed map - let table_id = self.table_info.table_id; - Ok(offsets + offsets .into_iter() .filter(|(_, offset)| *offset > 0) .map(|(bucket_id, offset)| { @@ -2498,88 +2415,69 @@ impl LogScanner { offset, ) }) - .collect()) + .collect() } else { - // Partitioned: need to query per partition - self.query_partitioned_offsets(py, subscribed) - } - } + let cached = partition_name_cache.read().unwrap().clone(); + let partition_id_to_name = match cached { + Some(map) => map, + None => { + let infos = admin + .list_partition_infos(table_path) + .await + .map_err(|e| FlussError::from_core_error(&e))?; + let map: HashMap = infos + .into_iter() + .map(|info| (info.get_partition_id(), info.get_partition_name())) + .collect(); + *partition_name_cache.write().unwrap() = Some(map.clone()); + map + } + }; - /// Query offsets for partitioned table subscriptions - fn query_partitioned_offsets( - &self, - py: Python, - subscribed: &[(fcore::metadata::TableBucket, i64)], - ) -> PyResult> { - let table_path = &self.table_info.table_path; - - // Get partition_id -> partition_name mapping (cached) - let partition_id_to_name = self.get_partition_name_map(py, table_path)?; - - // Group subscribed buckets by partition_id - let mut by_partition: HashMap> = HashMap::new(); - for (tb, _) in subscribed { - if let Some(partition_id) = tb.partition_id() { - by_partition - .entry(partition_id) - .or_default() - .push(tb.bucket_id()); + let mut by_partition: HashMap> = HashMap::new(); + for (tb, _) in subscribed { + if let Some(partition_id) = tb.partition_id() { + by_partition + .entry(partition_id) + .or_default() + .push(tb.bucket_id()); + } } - } - // Query offsets for each partition - let mut result: HashMap = HashMap::new(); - let table_id = self.table_info.table_id; - - for (partition_id, bucket_ids) in by_partition { - let partition_name = partition_id_to_name.get(&partition_id).ok_or_else(|| { - FlussError::new_err(format!("Unknown partition_id: {partition_id}")) - })?; - - let offsets: HashMap = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { - self.admin - .list_partition_offsets( - table_path, - partition_name, - &bucket_ids, - OffsetSpec::Latest, - ) - .await - }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - for (bucket_id, offset) in offsets { - if offset > 0 { - let tb = fcore::metadata::TableBucket::new_with_partition( - table_id, - Some(partition_id), - bucket_id, - ); - result.insert(tb, offset); + let mut result = HashMap::new(); + for (partition_id, bucket_ids) in by_partition { + let partition_name = partition_id_to_name.get(&partition_id).ok_or_else(|| { + FlussError::new_err(format!("Unknown partition_id: {partition_id}")) + })?; + let offsets = admin + .list_partition_offsets( + table_path, + partition_name, + &bucket_ids, + OffsetSpec::Latest, + ) + .await + .map_err(|e| FlussError::from_core_error(&e))?; + for (bucket_id, offset) in offsets { + if offset > 0 { + let tb = fcore::metadata::TableBucket::new_with_partition( + table_id, + Some(partition_id), + bucket_id, + ); + result.insert(tb, offset); + } } } - } - - Ok(result) - } + result + }; - /// Poll until all buckets reach their stopping offsets - fn poll_until_offsets( - &self, - py: Python, - mut stopping_offsets: HashMap, - ) -> PyResult> { - let scanner = self.kind.as_batch()?; + // 2. Poll until all buckets reach their stopping offsets let mut all_batches = Vec::new(); - while !stopping_offsets.is_empty() { - let scan_batches = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { scanner.poll(Duration::from_millis(500)).await }) - }) + let scan_batches = scanner + .poll(Duration::from_millis(500)) + .await .map_err(|e| FlussError::from_core_error(&e))?; if scan_batches.is_empty() { @@ -2588,8 +2486,6 @@ impl LogScanner { for scan_batch in scan_batches { let table_bucket = scan_batch.bucket().clone(); - - // Check if this bucket is still being tracked let Some(&stop_at) = stopping_offsets.get(&table_bucket) else { continue; }; @@ -2597,14 +2493,12 @@ impl LogScanner { let base_offset = scan_batch.base_offset(); let last_offset = scan_batch.last_offset(); - // If the batch starts at or after the stop_at offset, the bucket is exhausted if base_offset >= stop_at { stopping_offsets.remove(&table_bucket); continue; } let batch = if last_offset >= stop_at { - // Slice batch to keep only records where offset < stop_at let num_to_keep = (stop_at - base_offset) as usize; let b = scan_batch.into_batch(); let limit = num_to_keep.min(b.num_rows()); @@ -2615,14 +2509,13 @@ impl LogScanner { all_batches.push(Arc::new(batch)); - // Check if we're done with this bucket if last_offset >= stop_at - 1 { stopping_offsets.remove(&table_bucket); } } } - Utils::combine_batches_to_table(py, all_batches) + Ok(all_batches) } } diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py index 47c92807..7da0f3d9 100644 --- a/bindings/python/test/conftest.py +++ b/bindings/python/test/conftest.py @@ -124,16 +124,12 @@ def fluss_cluster(): yield (plaintext_addr, sasl_addr or plaintext_addr) -_cached_connection = None - - -@pytest_asyncio.fixture +@pytest_asyncio.fixture(scope="session") async def connection(fluss_cluster): - global _cached_connection - if _cached_connection is None: - plaintext_addr, _sasl_addr = fluss_cluster - _cached_connection = await _connect(plaintext_addr) - yield _cached_connection + plaintext_addr, _sasl_addr = fluss_cluster + conn = await _connect(plaintext_addr) + yield conn + conn.close() @pytest.fixture(scope="session") @@ -148,6 +144,6 @@ def plaintext_bootstrap_servers(fluss_cluster): return plaintext_addr -@pytest_asyncio.fixture +@pytest_asyncio.fixture(scope="session") async def admin(connection): return connection.get_admin() diff --git a/bindings/python/test/test_log_table.py b/bindings/python/test/test_log_table.py index 5708a93c..07bbc39c 100644 --- a/bindings/python/test/test_log_table.py +++ b/bindings/python/test/test_log_table.py @@ -64,7 +64,7 @@ async def test_append_and_scan(connection, admin): num_buckets = (await admin.get_table_info(table_path)).num_buckets scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - records = _poll_records(scanner, expected_count=6) + records = await _poll_records(scanner, expected_count=6) assert len(records) == 6, f"Expected 6 records, got {len(records)}" @@ -107,7 +107,7 @@ async def test_append_dict_rows(connection, admin): num_buckets = (await admin.get_table_info(table_path)).num_buckets scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - records = _poll_records(scanner, expected_count=3) + records = await _poll_records(scanner, expected_count=3) assert len(records) == 3 rows = sorted([r.row for r in records], key=lambda r: r["id"]) @@ -238,7 +238,7 @@ async def test_project(connection, admin): scanner = await scan.create_log_scanner() scanner.subscribe_buckets({0: 0}) - records = _poll_records(scanner, expected_count=3) + records = await _poll_records(scanner, expected_count=3) assert len(records) == 3 records.sort(key=lambda r: r.row["col_c"]) @@ -254,7 +254,7 @@ async def test_project(connection, admin): scanner2 = await table.new_scan().project([1, 0]).create_log_scanner() scanner2.subscribe_buckets({0: 0}) - records2 = _poll_records(scanner2, expected_count=3) + records2 = await _poll_records(scanner2, expected_count=3) assert len(records2) == 3 records2.sort(key=lambda r: r.row["col_a"]) @@ -284,7 +284,7 @@ async def test_poll_batches(connection, admin): scanner.subscribe(bucket_id=0, start_offset=0) # Empty table should return empty result - result = scanner.poll_arrow(500) + result = await scanner.poll_arrow(500) assert result.num_rows == 0 writer = table.new_append().create_writer() @@ -310,7 +310,7 @@ async def test_poll_batches(connection, admin): await writer.flush() # Poll until we get all 6 records - all_ids = _poll_arrow_ids(scanner, expected_count=6) + all_ids = await _poll_arrow_ids(scanner, expected_count=6) assert all_ids == [1, 2, 3, 4, 5, 6] # Append more and verify offset continuation (no duplicates) @@ -322,14 +322,14 @@ async def test_poll_batches(connection, admin): ) await writer.flush() - new_ids = _poll_arrow_ids(scanner, expected_count=2) + new_ids = await _poll_arrow_ids(scanner, expected_count=2) assert new_ids == [7, 8] # Subscribe from mid-offset should truncate (skip earlier records) trunc_scanner = await table.new_scan().create_record_batch_log_scanner() trunc_scanner.subscribe(bucket_id=0, start_offset=3) - trunc_ids = _poll_arrow_ids(trunc_scanner, expected_count=5) + trunc_ids = await _poll_arrow_ids(trunc_scanner, expected_count=5) assert trunc_ids == [4, 5, 6, 7, 8] # Projection with batch scanner @@ -339,7 +339,7 @@ async def test_poll_batches(connection, admin): .create_record_batch_log_scanner() ) proj_scanner.subscribe(bucket_id=0, start_offset=0) - batches = proj_scanner.poll_record_batch(10000) + batches = await proj_scanner.poll_record_batch(10000) assert len(batches) > 0 assert batches[0].batch.num_columns == 1 @@ -374,14 +374,14 @@ async def test_to_arrow_and_to_pandas(connection, admin): # to_arrow() scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - arrow_table = scanner.to_arrow() + arrow_table = await scanner.to_arrow() assert arrow_table.num_rows == 3 assert arrow_table.schema.names == ["id", "name"] # to_pandas() scanner2 = await table.new_scan().create_record_batch_log_scanner() scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - df = scanner2.to_pandas() + df = await scanner2.to_pandas() assert len(df) == 3 assert list(df.columns) == ["id", "name"] @@ -499,7 +499,7 @@ async def test_partitioned_table_append_scan(connection, admin): all_records = [] deadline = time.monotonic() + 10 while len(all_records) < 8 and time.monotonic() < deadline: - scan_records = scanner.poll(5000) + scan_records = await scanner.poll(5000) for bucket, bucket_records in scan_records.items(): assert bucket.partition_id is not None, "Partitioned table should have partition_id" # All records in a bucket should belong to the same partition @@ -524,7 +524,7 @@ async def test_partitioned_table_append_scan(connection, admin): unsub_scanner.subscribe_partition(p.partition_id, 0, 0) unsub_scanner.unsubscribe_partition(eu_partition_id, 0) - remaining = _poll_records(unsub_scanner, expected_count=4, timeout_s=5) + remaining = await _poll_records(unsub_scanner, expected_count=4, timeout_s=5) assert len(remaining) == 4 assert all(r.row["region"] == "US" for r in remaining) @@ -535,7 +535,7 @@ async def test_partitioned_table_append_scan(connection, admin): } batch_scanner.subscribe_partition_buckets(partition_bucket_offsets) - batch_records = _poll_records(batch_scanner, expected_count=8) + batch_records = await _poll_records(batch_scanner, expected_count=8) assert len(batch_records) == 8 batch_collected = sorted( [(r.row["id"], r.row["region"], r.row["value"]) for r in batch_records], @@ -575,7 +575,7 @@ async def test_write_arrow(connection, admin): scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - result = scanner.to_arrow() + result = await scanner.to_arrow() assert result.num_rows == 5 ids = sorted(result.column("id").to_pylist()) @@ -615,7 +615,7 @@ async def test_write_pandas(connection, admin): scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - result = scanner.to_pandas() + result = await scanner.to_pandas() assert len(result) == 3 result_sorted = result.sort_values("id").reset_index(drop=True) @@ -660,7 +660,7 @@ async def test_partitioned_table_to_arrow(connection, admin): for p in partition_infos: scanner.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) - arrow_table = scanner.to_arrow() + arrow_table = await scanner.to_arrow() assert arrow_table.num_rows == 2 await admin.drop_table(table_path, ignore_if_not_exists=False) @@ -695,7 +695,7 @@ async def test_scan_records_indexing_and_slicing(connection, admin): sr = None deadline = time.monotonic() + 10 while time.monotonic() < deadline: - sr = scanner.poll(5000) + sr = await scanner.poll(5000) if len(sr) >= 2: break assert sr is not None and len(sr) >= 2, "Expected at least 2 records" @@ -834,7 +834,7 @@ async def consume_and_break(): # records in one batch. After break, the un-yielded records from that # batch are lost. So sync poll may return 0 records — the key assertion # is that poll() completes without deadlock (returns within timeout). - remaining = scanner.poll(2000) + remaining = await scanner.poll(2000) assert remaining is not None, "poll() should return (not deadlock)" # If we got records, verify no duplicates @@ -1040,7 +1040,7 @@ async def consume_and_break(): assert first_batch.batch.num_rows > 0 # Phase 2: sync poll_record_batch() must still work — proves no leak - remaining = batch_scanner.poll_record_batch(2000) + remaining = await batch_scanner.poll_record_batch(2000) assert remaining is not None, "poll_record_batch() should return (not deadlock)" await admin.drop_table(table_path, ignore_if_not_exists=False) @@ -1110,22 +1110,22 @@ async def consume_all(): # --------------------------------------------------------------------------- -def _poll_records(scanner, expected_count, timeout_s=10): +async def _poll_records(scanner, expected_count, timeout_s=10): """Poll a record-based scanner until expected_count records are collected.""" collected = [] deadline = time.monotonic() + timeout_s while len(collected) < expected_count and time.monotonic() < deadline: - records = scanner.poll(5000) + records = await scanner.poll(5000) collected.extend(records) return collected -def _poll_arrow_ids(scanner, expected_count, timeout_s=10): +async def _poll_arrow_ids(scanner, expected_count, timeout_s=10): """Poll a batch scanner and extract 'id' column values.""" all_ids = [] deadline = time.monotonic() + timeout_s while len(all_ids) < expected_count and time.monotonic() < deadline: - arrow_table = scanner.poll_arrow(5000) + arrow_table = await scanner.poll_arrow(5000) if arrow_table.num_rows > 0: all_ids.extend(arrow_table.column("id").to_pylist()) return all_ids @@ -1176,7 +1176,7 @@ async def test_append_and_scan_with_array(connection, admin): # Verify via LogScanner (record-by-record) scanner = await table.new_scan().create_log_scanner() scanner.subscribe_buckets({0: fluss.EARLIEST_OFFSET}) - records = _poll_records(scanner, expected_count=6) + records = await _poll_records(scanner, expected_count=6) assert len(records) == 6 records.sort(key=lambda r: r.row["id"]) @@ -1200,7 +1200,7 @@ async def test_append_and_scan_with_array(connection, admin): # Verify via to_arrow (batch-based) scanner2 = await table.new_scan().create_record_batch_log_scanner() scanner2.subscribe_buckets({0: fluss.EARLIEST_OFFSET}) - result_table = scanner2.to_arrow() + result_table = await scanner2.to_arrow() assert result_table.num_rows == 6 assert result_table.column("tags").to_pylist() == [ @@ -1254,7 +1254,7 @@ async def test_append_rows_with_array(connection, admin): num_buckets = (await admin.get_table_info(table_path)).num_buckets scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - records = _poll_records(scanner, expected_count=3) + records = await _poll_records(scanner, expected_count=3) assert len(records) == 3 rows = sorted([r.row for r in records], key=lambda r: r["id"]) @@ -1296,7 +1296,7 @@ async def test_append_rows_with_nested_array(connection, admin): num_buckets = (await admin.get_table_info(table_path)).num_buckets scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - records = _poll_records(scanner, expected_count=5) + records = await _poll_records(scanner, expected_count=5) assert len(records) == 5 rows = sorted([r.row for r in records], key=lambda r: r["id"]) diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 1268d37f..51f7eb0f 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -161,11 +161,11 @@ Builder for creating a `Lookuper`. Obtain via `FlussTable.new_lookup()`. | `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) | | `.unsubscribe(bucket_id)` | Unsubscribe from a bucket (non-partitioned tables) | | `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe from a partition bucket | -| `.poll(timeout_ms) -> ScanRecords` | Poll individual records (record scanner only) | -| `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | -| `.poll_record_batch(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | -| `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | -| `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | +| `await .poll(timeout_ms) -> ScanRecords` | Poll individual records (record scanner only) | +| `await .poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | +| `await .poll_record_batch(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | +| `await .to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | +| `await .to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | ## `ScanRecords` @@ -174,7 +174,7 @@ Returned by `LogScanner.poll()`. Records are grouped by bucket. > **Note:** Flat iteration and integer indexing traverse buckets in an arbitrary order that is consistent within a single `ScanRecords` instance but may differ between `poll()` calls. Use per-bucket access (`.items()`, `.records(bucket)`) when bucket ordering matters. ```python -scan_records = scanner.poll(timeout_ms=5000) +scan_records = await scanner.poll(timeout_ms=5000) # Sequence access scan_records[0] # first record diff --git a/website/docs/user-guide/python/data-types.md b/website/docs/user-guide/python/data-types.md index c0acb4c7..df8165f0 100644 --- a/website/docs/user-guide/python/data-types.md +++ b/website/docs/user-guide/python/data-types.md @@ -55,7 +55,7 @@ handle = writer.append(row) ## Reading Data ```python -records = scanner.poll(timeout_ms=1000) +records = await scanner.poll(timeout_ms=1000) for record in records: row = record.row # dict[str, Any] print(row["user_id"]) # int diff --git a/website/docs/user-guide/python/example/index.md b/website/docs/user-guide/python/example/index.md index ec9fa78f..446ca410 100644 --- a/website/docs/user-guide/python/example/index.md +++ b/website/docs/user-guide/python/example/index.md @@ -36,7 +36,7 @@ async def main(): num_buckets = (await admin.get_table_info(table_path)).num_buckets scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - print(scanner.to_pandas()) + print(await scanner.to_pandas()) # Cleanup await admin.drop_table(table_path, ignore_if_not_exists=True) diff --git a/website/docs/user-guide/python/example/log-tables.md b/website/docs/user-guide/python/example/log-tables.md index c320bf48..4dbe2567 100644 --- a/website/docs/user-guide/python/example/log-tables.md +++ b/website/docs/user-guide/python/example/log-tables.md @@ -65,8 +65,8 @@ scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) # Reads everything up to current latest offset, then returns -arrow_table = scanner.to_arrow() -df = scanner.to_pandas() +arrow_table = await scanner.to_arrow() +df = await scanner.to_pandas() ``` ### Continuous Polling @@ -79,7 +79,7 @@ scanner = await table.new_scan().create_record_batch_log_scanner() scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) while True: - result = scanner.poll_arrow(timeout_ms=5000) + result = await scanner.poll_arrow(timeout_ms=5000) if result.num_rows > 0: print(result.to_pandas()) @@ -88,7 +88,7 @@ scanner = await table.new_scan().create_log_scanner() scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) while True: - scan_records = scanner.poll(timeout_ms=5000) + scan_records = await scanner.poll(timeout_ms=5000) for record in scan_records: print(f"offset={record.offset}, change={record.change_type.short_string()}, row={record.row}") diff --git a/website/docs/user-guide/python/example/partitioned-tables.md b/website/docs/user-guide/python/example/partitioned-tables.md index f8280920..894bb519 100644 --- a/website/docs/user-guide/python/example/partitioned-tables.md +++ b/website/docs/user-guide/python/example/partitioned-tables.md @@ -59,7 +59,7 @@ scanner.subscribe_partition_buckets({ (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos }) -print(scanner.to_pandas()) +print(await scanner.to_pandas()) ``` ### Unsubscribing