Skip to content

feat(2152): support pa.RecordBatchReader in Table.append/overwrite#3335

Open
paultmathew wants to merge 2 commits intoapache:mainfrom
paultmathew:feat/2152-record-batch-reader-append
Open

feat(2152): support pa.RecordBatchReader in Table.append/overwrite#3335
paultmathew wants to merge 2 commits intoapache:mainfrom
paultmathew:feat/2152-record-batch-reader-append

Conversation

@paultmathew
Copy link
Copy Markdown

@paultmathew paultmathew commented May 7, 2026

Rationale for this change

Closes #2152, addresses the long-standing memory problem reported in #1004 and re-discovered by dlt-hub#3753.

Table.append(df) and Table.overwrite(df) currently require a fully materialised pa.Table. For large or unbounded inputs this means loading the entire dataset into memory before writing — fatal at any non-trivial scale and a recurring complaint going back to #1004 (Aug 2024). The reference Java implementation has streaming append; iceberg-go shipped it in iceberg-go#369 (Apr 2025). Python is the last major SDK without it.

This PR adds pa.RecordBatchReader as a valid input to Table.append/overwrite (and Transaction.append/overwrite). The reader is consumed lazily, microbatched into Parquet files via the new bin_pack_record_batches helper, and committed in a single snapshot via the existing fast_append pipeline.

reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
tbl.append(reader)        # ← streams, doesn't materialise
tbl.overwrite(reader)     # ← also supported

Scope (unpartitioned only)

Streaming into a partitioned table raises NotImplementedError pointing back to #2152. Partitioned support is genuinely the harder case — it needs design discussion around partition cardinality bounds, per-partition rolling writers, and idempotency on retry — so I'm proposing to land in three reviewable PRs:

  1. This PR — API + unpartitioned + buffered byte-budget bin-packing.
  2. PR2 (next) — switch internals to a rolling pq.ParquetWriter + OutputStream.tell() for constant-memory streaming. No API change. Detailed plan below.
  3. PR3 (later) — partitioned streaming, after design discussion on Support writing Arrow RecordBatchReader or Scanner to Iceberg tables #2152.

This mirrors iceberg-go#369's staging: ship the unpartitioned API first, iterate from there.

Implementation

The streaming path reuses the existing WriteTaskwrite_filefast_append pipeline. The only new primitive is bin_pack_record_batches (sibling of the existing bin_pack_arrow_table):

  • Accumulates incoming RecordBatches into an in-memory buffer.
  • Flushes when sum(batch.nbytes) >= write.target-file-size-bytes.
  • Each flushed buffer becomes one parquet file via the existing write_parquet task.
  • Schema check (_check_pyarrow_schema_compatible) runs against reader.schema before the snapshot producer opens — schema mismatches fail before any data file is written, so no orphans.

Acknowledged trade-offs

Memory: peak memory is bounded by N_workers × write.target-file-size-bytes (default 8 × 512 MiB ≈ 4 GiB), not constant. This is materially better than today's "materialise everything" but isn't yet "constant memory streaming". PR2 fixes this.

Byte semantics: write.target-file-size-bytes is currently interpreted as uncompressed in-memory Arrow bytes (RecordBatch.nbytes — the bin-packing weight), not compressed on-disk Parquet bytes. The resulting files are typically 3-10× smaller than the property suggests after zstd / dictionary / RLE encoding. This matches the existing pa.Table write path (bin_pack_arrow_table uses the same accounting) — this PR doesn't change pyiceberg's existing semantics, it only documents them in the docstrings of both helpers and the Transaction.append/overwrite Note: blocks. PR2 fixes this too.

Retry: pa.RecordBatchReader is single-pass, so a failed catalog commit leaves the reader drained and a naive retry writes zero rows. Documented in the Note: block — callers needing at-least-once semantics should reconstruct the reader on each attempt via a factory callable, or use the two-stage add_files pattern (whose input is a replayable list of paths).

PR2 — proposed scope (FYI, not in this PR)

Drop the buffer-and-flush approach and use a rolling pq.ParquetWriter driven by OutputStream.tell() (added in #2998 specifically for this kind of use case):

# sketch
writer = pq.ParquetWriter(fos, schema, **kwargs)
for batch in reader:
    writer.write_batch(batch)
    if fos.tell() >= target_file_size:   # compressed on-disk bytes
        writer.close()
        finalize_data_file(...)
        # open next file
        fos = io.new_output(next_path).create(overwrite=True)
        writer = pq.ParquetWriter(fos, schema, **kwargs)
writer.close()

What this delivers:

  • Constant memory: O(1 batch) per worker (~10s of MB) regardless of target_file_size. The 4 GiB peak in this PR drops to ~50-100 MB.
  • Spec-correct byte semantics: write.target-file-size-bytes becomes actual on-disk compressed bytes, matching the Java/Spark/Flink writers and the spec.
  • No public API change: same tx.append(reader) / tx.overwrite(reader) — internals only.

Open design questions for PR2 (will surface on the issue thread before coding):

  • Parallelism: a single rolling writer is serial. Either accept that for streaming (memory-vs-throughput trade), or add a hybrid (N rolling writers fed via a queue) and pick a default that matches today's executor.map(write_parquet, tasks) parallelism.
  • Backwards compat: switching bin_pack_arrow_table to the same rolling-writer mechanism would also tighten the pa.Table path's byte semantics. That changes file-size characteristics for every existing pyiceberg writer. Probably worth a separate change with a deprecation note, or a feature flag.
  • add_files interaction: rolling writes produce data files we know about directly; we shouldn't go through the parquet-footer round-trip in _dataframe_to_data_files. Means a small refactor in the streaming-only path.

Are these changes tested?

Yes, comprehensively at four layers.

1. Unit tests (tests/io/test_pyarrow.py) — 4 new tests for bin_pack_record_batches covering single-bin, microbatched, empty input, and lazy generator consumption.

2. End-to-end behaviour tests (tests/catalog/test_catalog_behaviors.py) — 8 new tests parametrised across all three in-process catalog backends (memory, sql, sql_without_rowcount) → 24 test runs covering append, overwrite, microbatch verification (multiple files in one snapshot), empty reader, partitioned-table-raises, invalid-input-rejected, reader-consumed-exactly-once, and schema-mismatch-writes-no-files.

3. Integration tests (tests/integration/test_writes/test_writes.py) — 6 new Spark-readback tests for v1 + v2 format versions covering append, overwrite, and multi-file microbatch. Proves Spark can read tables written via the streaming path against the docker-compose stack.

4. Smoke test on a real production stack — verified end-to-end against AWS Glue + S3 in our staging account: 100 k-row streaming append in 17 s, 20-file microbatched commit, Athena read-back (COUNT(*) and MAX(id) matched the input exactly), schema-mismatch rejection leaving no orphan files.

Full unit suite: 3 647 passed. Full integration suite: 122 passed, 1 skipped.

Are there any user-facing changes?

Yes, intentionally:

  • Transaction.append(df), Transaction.overwrite(df), Table.append(df), Table.overwrite(df) accept pa.Table | pa.RecordBatchReader.
  • The ValueError raised on bad input changes from "Expected PyArrow table, got: ..." to "Expected pa.Table or pa.RecordBatchReader, got: ...". Updated test_invalid_arguments accordingly.
  • New module-level helper bin_pack_record_batches in pyiceberg.io.pyarrow (sibling of bin_pack_arrow_table).
  • bin_pack_arrow_table gained its first docstring, documenting the existing uncompressed-Arrow-bytes accounting.
  • Docs: new "Streaming writes from a RecordBatchReader" subsection in mkdocs/docs/api.md.
  • Docstrings on Transaction.append/overwrite document retry semantics and the byte-semantics caveat.

Related

Currently `Table.append(df)` and `Table.overwrite(df)` only accept a
materialised `pa.Table`, which forces callers to load the entire dataset into
memory before writing. This makes pyiceberg unusable for large or unbounded
inputs and has been a recurring complaint (apache#1004, apache#2152, dlt-hub#3753).

Allow `pa.RecordBatchReader` as an alternative input. When a reader is
provided, batches are streamed and microbatched into target-sized Parquet
files via the new `bin_pack_record_batches` helper, then committed in a
single snapshot via the existing fast_append path. Memory is bounded by
`write.target-file-size-bytes` (default 512 MiB) per worker rather than the
full input size.

Scope of this PR — unpartitioned tables only. Streaming into partitioned
tables raises NotImplementedError pointing back to apache#2152; partitioned support
needs additional design (high-cardinality partition handling, per-partition
rolling writers) and is tracked as a follow-up. Mirrors iceberg-go#369's
staging — that project shipped unpartitioned streaming first.

Internal note: the implementation buffers up to `target_file_size` of in-
memory RecordBatches before flushing to a Parquet file. A more memory-
efficient rolling-ParquetWriter approach is a planned follow-up that will
benefit from the `OutputStream.tell()` API added in apache#2998.
@paultmathew paultmathew force-pushed the feat/2152-record-batch-reader-append branch from 4ecaf15 to 67c64ec Compare May 7, 2026 03:18
@paultmathew paultmathew marked this pull request as ready for review May 7, 2026 03:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support writing Arrow RecordBatchReader or Scanner to Iceberg tables

1 participant