Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async def main():

# Try to get as PyArrow Table
try:
pa_table_result = batch_scanner.to_arrow()
pa_table_result = await batch_scanner.to_arrow()
print(f"\nAs PyArrow Table: {pa_table_result}")
except Exception as e:
print(f"Could not convert to PyArrow: {e}")
Expand All @@ -289,7 +289,7 @@ async def main():

# Try to get as Pandas DataFrame
try:
df_result = batch_scanner2.to_pandas()
df_result = await batch_scanner2.to_pandas()
print(f"\nAs Pandas DataFrame:\n{df_result}")
except Exception as e:
print(f"Could not convert to Pandas: {e}")
Expand All @@ -308,7 +308,7 @@ async def main():
# Poll with a timeout of 5000ms (5 seconds)
# Note: poll_arrow() returns an empty table (not an error) on timeout
try:
poll_result = batch_scanner3.poll_arrow(5000)
poll_result = await batch_scanner3.poll_arrow(5000)
print(f"Number of rows: {poll_result.num_rows}")

if poll_result.num_rows > 0:
Expand All @@ -328,7 +328,7 @@ async def main():
batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})

try:
batches = batch_scanner4.poll_record_batch(5000)
batches = await batch_scanner4.poll_record_batch(5000)
print(f"Number of batches: {len(batches)}")

for i, batch in enumerate(batches):
Expand All @@ -354,7 +354,7 @@ async def main():
# Poll returns ScanRecords — records grouped by bucket
print("\n--- Testing poll() method (record-by-record) ---")
try:
scan_records = record_scanner.poll(5000)
scan_records = await record_scanner.poll(5000)
print(f"Total records: {scan_records.count()}, buckets: {len(scan_records.buckets())}")

# Flat iteration over all records (regardless of bucket)
Expand Down Expand Up @@ -387,7 +387,7 @@ async def main():
# Unsubscribe from bucket 0 — future polls will skip this bucket
unsub_scanner.unsubscribe(bucket_id=0)
print("Unsubscribed from bucket 0")
remaining = unsub_scanner.poll_arrow(5000)
remaining = await unsub_scanner.poll_arrow(5000)
print(f"After unsubscribe, got {remaining.num_rows} records (from remaining buckets)")
except Exception as e:
print(f"Error during unsubscribe test: {e}")
Expand Down Expand Up @@ -640,7 +640,7 @@ async def main():
print("\n1. Projection by index [0, 1] (id, name):")
scanner_index = await table.new_scan().project([0, 1]).create_record_batch_log_scanner()
scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
df_projected = scanner_index.to_pandas()
df_projected = await scanner_index.to_pandas()
print(df_projected.head())
print(
f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}"
Expand All @@ -652,7 +652,7 @@ async def main():
.project_by_name(["name", "score"]) \
.create_record_batch_log_scanner()
scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
df_named = scanner_names.to_pandas()
df_named = await scanner_names.to_pandas()
print(df_named.head())
print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}")

Expand All @@ -661,7 +661,7 @@ async def main():
scanner_proj = await table.new_scan().project([0, 2]).create_record_batch_log_scanner()
scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
# Quick poll that may return empty
result = scanner_proj.poll_arrow(100)
result = await scanner_proj.poll_arrow(100)
print(f" Schema columns: {result.schema.names}")

except Exception as e:
Expand Down Expand Up @@ -781,7 +781,7 @@ async def main():
print(f"Subscribed to partition {p.partition_name} (id={p.partition_id})")

# Use to_arrow() - now works for partitioned tables!
partitioned_arrow = partitioned_scanner.to_arrow()
partitioned_arrow = await partitioned_scanner.to_arrow()
print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:")
print(partitioned_arrow.to_pandas())

Expand All @@ -793,7 +793,7 @@ async def main():
}
partitioned_scanner_batch.subscribe_partition_buckets(partition_bucket_offsets)
print(f"Batch subscribed to {len(partition_bucket_offsets)} partition+bucket combinations")
partitioned_batch_arrow = partitioned_scanner_batch.to_arrow()
partitioned_batch_arrow = await partitioned_scanner_batch.to_arrow()
print(f"to_arrow() returned {partitioned_batch_arrow.num_rows} records:")
print(partitioned_batch_arrow.to_pandas())

Expand All @@ -806,7 +806,7 @@ async def main():
first_partition = partition_infos[0]
partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0)
print(f"Unsubscribed from partition {first_partition.partition_name} (id={first_partition.partition_id})")
remaining_arrow = partitioned_scanner3.to_arrow()
remaining_arrow = await partitioned_scanner3.to_arrow()
print(f"After unsubscribe, to_arrow() returned {remaining_arrow.num_rows} records (from remaining partitions):")
print(remaining_arrow.to_pandas())

Expand All @@ -815,7 +815,7 @@ async def main():
partitioned_scanner2 = await partitioned_table.new_scan().create_record_batch_log_scanner()
for p in partition_infos:
partitioned_scanner2.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET)
partitioned_df = partitioned_scanner2.to_pandas()
partitioned_df = await partitioned_scanner2.to_pandas()
print(f"to_pandas() returned {len(partitioned_df)} records:")
print(partitioned_df)

Expand Down
10 changes: 5 additions & 5 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ class LogScanner:
bucket_id: The bucket ID within the partition
"""
...
def poll(self, timeout_ms: int) -> ScanRecords:
async def poll(self, timeout_ms: int) -> ScanRecords:
"""Poll for individual records with metadata.

Requires a record-based scanner (created with new_scan().create_log_scanner()).
Expand All @@ -758,7 +758,7 @@ class LogScanner:
Returns an empty ScanRecords if no records are available or timeout expires.
"""
...
def poll_record_batch(self, timeout_ms: int) -> List[RecordBatch]:
async def poll_record_batch(self, timeout_ms: int) -> List[RecordBatch]:
"""Poll for batches with metadata.

Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()).
Expand All @@ -774,7 +774,7 @@ class LogScanner:
Returns an empty list if no batches are available or timeout expires.
"""
...
def poll_arrow(self, timeout_ms: int) -> pa.Table:
async def poll_arrow(self, timeout_ms: int) -> pa.Table:
"""Poll for records as an Arrow Table.

Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()).
Expand All @@ -790,7 +790,7 @@ class LogScanner:
or timeout expires.
"""
...
def to_pandas(self) -> pd.DataFrame:
async def to_pandas(self) -> pd.DataFrame:
"""Convert all data to Pandas DataFrame.

Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()).
Expand All @@ -799,7 +799,7 @@ class LogScanner:
You must call subscribe(), subscribe_buckets(), or subscribe_partition() first.
"""
...
def to_arrow(self) -> pa.Table:
async def to_arrow(self) -> pa.Table:
"""Convert all data to Arrow Table.

Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()).
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ known-first-party = ["fluss"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
asyncio_default_fixture_loop_scope = "session"
timeout = 120

[tool.mypy]
Expand Down
Loading
Loading