Skip to content

feat(2152): rolling ParquetWriter for streaming writes (constant memory + spec-correct file sizes)#3336

Draft
paultmathew wants to merge 3 commits intoapache:mainfrom
paultmathew:feat/2152-rolling-writer
Draft

feat(2152): rolling ParquetWriter for streaming writes (constant memory + spec-correct file sizes)#3336
paultmathew wants to merge 3 commits intoapache:mainfrom
paultmathew:feat/2152-rolling-writer

Conversation

@paultmathew
Copy link
Copy Markdown

@paultmathew paultmathew commented May 7, 2026

Rationale for this change

Builds on #3335. Please land that one first.

PR #3335 added pa.RecordBatchReader as a valid input to Table.append/Table.overwrite using a buffered bin-pack approach. That implementation has two acknowledged caveats called out in its docstrings:

  1. Memory bound: peak memory is N_workers × write.target-file-size-bytes (~4 GiB at defaults) — better than "materialise everything" but not constant.
  2. Byte semantics: write.target-file-size-bytes is interpreted as uncompressed in-memory Arrow bytes via the bin_pack_record_batches helper, not on-disk compressed Parquet bytes. Resulting files are typically 3-10× smaller than the property suggests.

This PR replaces the buffered approach with a rolling pq.ParquetWriter driven by OutputStream.tell() (added in #2998 specifically for this purpose). Both caveats go away:

# pyiceberg/io/pyarrow.py::_record_batches_to_data_files (excerpt)
with output_file.create(overwrite=True) as fos:
    with pq.ParquetWriter(fos, schema=transformed_first.schema, ...) as writer:
        writer.write_batch(transformed_first, row_group_size=row_group_size)
        while fos.tell() < target_file_size:        # ← compressed on-disk bytes
            try:
                batch = next(batches)
            except StopIteration:
                break
            writer.write_batch(_transform(batch), row_group_size=row_group_size)

What this delivers:

  • Spec-correct file sizes: tell() reports compressed on-disk bytes pyarrow has emitted to the stream, so write.target-file-size-bytes finally means what the spec says it means — matching the Java/Spark/Flink writers.
  • Bounded memory independent of target_file_size: peak RSS is bounded by one input batch + the Parquet page buffer (~1 MiB × columns) + the S3 multipart upload pool (~5 MiB × ~8 in-flight parts). On a real S3 stack that's tens to a few hundred MiB, regardless of file size, dataset size, or number of files produced. See benchmark below.
  • Each input RecordBatch becomes one Parquet row group, with write.parquet.row-group-limit enforced as a per-row-group cap — identical treatment to the materialised pa.Table write path.
  • No public API change. Same tx.append(reader) / tx.overwrite(reader). Internals only.
  • bin_pack_record_batches is removed (no longer needed). Its 4 unit tests are removed; the streaming behaviour is covered end-to-end by tests below.

Memory profile

Streamed 1,000 batches × 5,000 rows × 108 bytes per row (≈ 515 MiB uncompressed, 390 MiB on disk after zstd of random alphanumeric payload, 24 files written at write.target-file-size-bytes = 16 MiB) against AWS Glue + S3 (Aircall staging). Process RSS sampled at 19 Hz from a background thread. Detailed analysis below.

Metric Value
Workload (uncompressed → on-disk) 515 MiB → 390 MiB (1.3× zstd)
Files written / row groups 24 / 5,024
Wall time 301 s (rate-limited by Python random-payload generation, not the writer)
Baseline RSS 178 MiB
Peak RSS 236 MiB (delta +58 MiB; reached at t ≈ 15 s during the first file)
Steady-state mean RSS 167 MiB (≈ 10 MiB below baseline once Python GC reclaims import overhead)
Steady-state p95 RSS 229 MiB
Steady-state σ 18 MiB

The key observation: after the initial ramp during the first file, RSS oscillates within a ~30 MiB band across all 24 file rollovers and shows no growth from start to finish. Memory is bounded by the in-flight RecordBatch + Parquet page buffer + multipart upload pool — independent of target_file_size, dataset size, or number of files produced. The repository's previous buffered approach (#3335) held up to target_file_size × N_workers of uncompressed Arrow buffers (≈ 4 GiB at defaults) — roughly 70× higher peak memory at default property settings.

smoke_memory_PR2

Throughput / parallelism

Streaming writes are sequential — one rolling file at a time. Single-stream throughput is bounded by the underlying multipart upload pool (~8 concurrent S3 PUTs in pyarrow.fs.S3FileSystem), which saturates typical network links and is rarely the bottleneck for streaming pipelines (where the upstream source — DB cursor, API, queue — is the limit). Callers wanting maximum write throughput (backfills, dataset migrations) can materialise as pa.Table and use tx.append(pa.Table), which keeps the existing executor.map-based file-level parallelism for the materialised path completely unchanged by this PR.

A hybrid worker pool for the streaming path (N concurrent rolling writers fed by a queue) is a possible follow-up if real workloads show streaming write throughput as a bottleneck. Mirrors iceberg-go's roadmap, which has shipped single-writer-only streaming since April 2025 (iceberg-go#369) without follow-up demand.

Properties honored

The streaming path honors the same parquet writer properties as the materialised pa.Table path:

Property In Iceberg spec? Honored by this PR?
write.target-file-size-bytes ✅ via OutputStream.tell(), on-disk compressed bytes
write.parquet.compression-codec / compression-level
write.parquet.page-size-bytes
write.parquet.page-row-limit
write.parquet.dict-size-bytes
write.parquet.row-group-limit (pyiceberg-internal) ❌ (not in spec) ✅ as a per-row-group cap, identical treatment to materialised path
write.parquet.row-group-size-bytes ❌ pre-existing pyiceberg-wide gap, warned by _get_parquet_writer_kwargs for both paths; out of scope here
write.parquet.bloom-filter-* ❌ pre-existing pyiceberg-wide gap

When someone fixes write.parquet.row-group-size-bytes for pyiceberg, both write paths benefit. PR2 deliberately doesn't touch this since the gap predates this PR series.

Code duplication note

_record_batches_to_data_files shares some boilerplate with the materialised write_file's nested write_parquet closure: parquet-writer-kwargs / row-group-size / location-provider extraction, file_schema selection, and DataFile construction from Parquet metadata. The shared module-level helpers (_get_parquet_writer_kwargs, _to_requested_schema, data_file_statistics_from_parquet_metadata, etc.) are reused, but the "compose these helpers in the standard pattern" wrapper lives independently in each path.

Extraction is mechanical (~100 lines of pure refactor) but I'd prefer to land it as a standalone follow-up PR — it touches the existing write_file closure which I'd rather not modify in the same PR as the new streaming implementation.

Are these changes tested?

Yes, at four layers.

1. End-to-end behaviour tests (no Docker)

tests/catalog/test_catalog_behaviors.py — 10 tests parametrised across all three in-process catalog backends (memory, sql, sql_without_rowcount) → 30 test runs:

  • test_append_record_batch_reader — basic append round-trip.
  • test_append_record_batch_reader_microbatched — multi-file rollover via target-file-size-bytes=1.
  • test_append_record_batch_reader_row_group_limit_is_cap — feeds a single 1,000-row batch, sets row-group-limit=250, asserts the resulting Parquet has exactly 4 row groups of 250 rows each (verified by reading the Parquet footer with pq.read_metadata).
  • test_append_record_batch_reader_target_file_size_is_on_disk_bytes — sets target-file-size-bytes=32 KiB, streams ~12 MiB, asserts each rolled file is between 0.5× and 5× the target. Catches regression to the old uncompressed-Arrow-bytes behaviour (which would produce files ~3-10× smaller than target).
  • test_append_record_batch_reader_empty — empty reader produces zero data files.
  • test_overwrite_record_batch_reader — overwrite via reader replaces existing rows.
  • test_append_record_batch_reader_to_partitioned_table_raises — partitioned-table input raises NotImplementedError.
  • test_append_invalid_input_type_raises — non-Arrow input rejected.
  • test_record_batch_reader_consumed_exactly_once — reader generator drained once; no double-pass regression.
  • test_record_batch_reader_schema_mismatch_writes_no_files — schema mismatch fails before any data files are written (no orphan files).

2. Spark integration tests

tests/integration/test_writes/test_writes.py — 6 tests (× v1, v2 format versions) proving Spark can read tables written via the streaming path against the docker-compose stack:

  • test_append_record_batch_reader[1, 2]
  • test_overwrite_record_batch_reader[1, 2]
  • test_append_record_batch_reader_multifile[1, 2]

3. Local CI sweep

  • make test (full unit suite): 3,650 passed, 0 failed, lint + mypy + pydocstyle clean.
  • make test-integration (full Spark integration suite on fresh docker-compose): 396 passed, 1 skipped, 0 failed in 3:47.

4. Real-stack smoke test on AWS

Verified end-to-end against AWS Glue + S3 in our staging account:

  • 5M-row × 108 byte streaming append (515 MiB uncompressed → 390 MiB on disk in 24 files), 19 Hz RSS sampling — peak 236 MiB, mean 167 MiB, no growth from start to finish (chart above).
  • Athena cross-engine readback (smoke_test_athena_readback.py): streamed 50,000 rows via tx.append(reader), then queried via Athena → COUNT(*) = 50000 ✓, MAX(id) = 49999 ✓. Confirms the Glue catalog metadata, Iceberg manifest, and Parquet footer produced by the rolling writer are valid from an external query engine's perspective.
  • The same scripts that backed PR feat(2152): support pa.RecordBatchReader in Table.append/overwrite #3335 pass unchanged on this branch — no behavioural divergence at the storage layer.

Are there any user-facing changes?

Effectively none beyond what #3335 already introduced — this PR changes internals only:

  • write.target-file-size-bytes semantics tighten for streaming inputs. A user who set this property on a streaming-write workflow under feat(2152): support pa.RecordBatchReader in Table.append/overwrite #3335 was getting files 3-10× smaller than configured (uncompressed Arrow bytes proxy). With this PR the property now reflects actual on-disk compressed bytes — files become correspondingly larger. This is a net win and matches the spec, but worth noting for anyone who calibrated batch sizes around the old behaviour.
  • bin_pack_record_batches helper removed from pyiceberg.io.pyarrow. It was added in feat(2152): support pa.RecordBatchReader in Table.append/overwrite #3335 (so it's never been in a release) and its only consumer was _dataframe_to_data_files's streaming branch, which is now restructured.

The public Table.append(reader) / Table.overwrite(reader) API and its docstring guarantees are unchanged.

Related

Paul Mathew added 3 commits May 6, 2026 23:18
Currently `Table.append(df)` and `Table.overwrite(df)` only accept a
materialised `pa.Table`, which forces callers to load the entire dataset into
memory before writing. This makes pyiceberg unusable for large or unbounded
inputs and has been a recurring complaint (apache#1004, apache#2152, dlt-hub#3753).

Allow `pa.RecordBatchReader` as an alternative input. When a reader is
provided, batches are streamed and microbatched into target-sized Parquet
files via the new `bin_pack_record_batches` helper, then committed in a
single snapshot via the existing fast_append path. Memory is bounded by
`write.target-file-size-bytes` (default 512 MiB) per worker rather than the
full input size.

Scope of this PR — unpartitioned tables only. Streaming into partitioned
tables raises NotImplementedError pointing back to apache#2152; partitioned support
needs additional design (high-cardinality partition handling, per-partition
rolling writers) and is tracked as a follow-up. Mirrors iceberg-go#369's
staging — that project shipped unpartitioned streaming first.

Internal note: the implementation buffers up to `target_file_size` of in-
memory RecordBatches before flushing to a Parquet file. A more memory-
efficient rolling-ParquetWriter approach is a planned follow-up that will
benefit from the `OutputStream.tell()` API added in apache#2998.
@paultmathew paultmathew changed the title Feat/2152 rolling writer feat(2152): rolling ParquetWriter for streaming writes (constant memory + spec-correct file sizes) May 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant