fix(delta): drain all batches per scan file in DeltaInputSourceIterator#19592
Open
rinchinov wants to merge 4 commits into
Open
fix(delta): drain all batches per scan file in DeltaInputSourceIterator#19592rinchinov wants to merge 4 commits into
rinchinov wants to merge 4 commits into
Conversation
Fixes apache#18606 — only 1024 rows ingested per Parquet file when using the Delta Lake input source. Root cause: filteredBatchIterator was a local variable inside hasNext(). When the method returned true after the first non-empty batch of a file, the iterator went out of scope. The next hasNext() call advanced to the next file, skipping all remaining batches of the current file. With Delta kernel's default batch size of 1024 rows, this caused exactly 1024 rows × N files to be ingested regardless of actual file size. Fix: promote filteredBatchIterator to a field (currentFileIterator) so it survives across hasNext() calls and all batches of a file are drained before advancing to the next file. Also fixed close() to properly close currentFileIterator and drain all remaining file iterators.
Adds a Delta table with 2 Parquet files × 2000 rows (total 4000) where each file exceeds the Delta kernel's default batch size of 1024 rows. Without the fix: DeltaInputSourceIterator returns 1024 × 2 = 2048 rows. With the fix: all 4000 rows are returned correctly. Test: DeltaInputSourceBatchDrainTest.testAllRowsReturnedWhenFilesExceedOneBatch
…apacheGH-18606 Adds LargeRowGroupDeltaTable (2 files × 2000 rows = 4000 total) and a BatchDrainRegressionTests inner class inside DeltaInputSourceTest following the same pattern as existing test classes. The regression test fails with the bug (returns 1024 × 2 = 2048 rows) and passes with the fix (returns all 4000 rows).
FrankChen021
left a comment
Member
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
Reviewed 9 of 9 changed files.
This is an automated review by Codex GPT-5.5
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #18606.
Description
Bug
DeltaInputSourceIterator.hasNext()used a local variable for the per-fileCloseableIterator<FilteredColumnarBatch>. When the method returnedtrueafterfinding the first non-empty batch of a file, that iterator went out of scope.
The next
hasNext()call advanced to the next file, skipping all remainingbatches of the current file.
With the Delta kernel's default batch size of 1024 rows, this produced exactly
1024 × numFilesrows regardless of actual file size — matching the symptomreported in #18606.
Fix
Promoted
filteredBatchIteratorto a class field (currentFileIterator).hasNext()now drains all batches of the current file before advancing to thenext one. Also fixed
close()to closecurrentFileIteratorand drain allremaining file iterators (the original only closed one).
Regression test
Added
LargeRowGroupDeltaTable(2 Parquet files × 2000 rows = 4000 total) andBatchDrainRegressionTestsinsideDeltaInputSourceTest.1024 × 2 = 2048rows returned4000rows returnedRelease note
Fixed a bug in the Delta Lake input source where only 1024 rows per Parquet file
were ingested. Ingestion tasks now return all rows from each file.
Key changed/added classes in this PR
DeltaInputSourceReader(fix)LargeRowGroupDeltaTable(new test descriptor)DeltaInputSourceTest(newBatchDrainRegressionTestsinner class)src/test/resources/large-row-group-table(new test Delta table)This PR has: