Skip to content

T4: Wire central forward sink to incremental readRowsSince watermark#156

Merged
philcunliffe merged 1 commit into
integration/incremental-sink-readsfrom
task/incremental-sink-reads/T4
Jun 26, 2026
Merged

T4: Wire central forward sink to incremental readRowsSince watermark#156
philcunliffe merged 1 commit into
integration/incremental-sink-readsfrom
task/incremental-sink-reads/T4

Conversation

@philcunliffe

Copy link
Copy Markdown
Contributor

Implements task T4 of incremental-sink-reads.

Wires the central forward sink (forwardPartition) from a full-partition readRows scan to the cursor-aware readRowsSince({ since }), driven by the per-(sink instance, partition) watermark store (T3) under the sink stateDir. The watermark advances per acked chunk to that chunk's last after (ship first, advance second); chunking, the Retry-After backpressure loop, and batchIdForChunk are unchanged; the server idempotency ledger now backstops only a bounded in-flight suffix. No-new-rows ticks send 0 chunks / 0 bytes; a missing/unreadable watermark falls back to a full scan (at-least-once + server dedup).

Gates: npm test (1423 pass), npm run typecheck, and npm run lint all green. The central_forward_outbox hermetic smoke is pre-existing red on the base integration branch (verified by stashing T4: identical bytesWritten=0 failure) due to a fixture discoverPartitions vs source=unknown mismatch unrelated to this change.

Task-Id: T4

Switch `forwardPartition` from a full-partition `storage.readRows(tablePath)`
scan to the cursor-aware `storage.readRowsSince(tablePath, { since })`, driven
by a per-(sink instance, partition) watermark loaded from the sink's stateDir
watermark store (T3). Each acked chunk advances the watermark to that chunk's
last `after` token (ship first, advance second), so a crash re-sends at most one
chunk and the server idempotency ledger now backstops only a bounded in-flight
suffix instead of the whole partition. A tick with no new rows yields zero rows,
sends zero chunks, and writes zero bytes.

Chunking (MAX_CHUNK_ROWS/MAX_CHUNK_BYTES), the Retry-After backpressure loop,
and `batchIdForChunk` derivation are unchanged. A missing/unreadable watermark
or underivable key falls back to a full scan (at-least-once + server dedup),
never a silent skip.

Implements task T4 of incremental-sink-reads (LLP 0040 §3-4, 0042).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task-Id: T4
@philcunliffe philcunliffe merged commit a3f0808 into integration/incremental-sink-reads Jun 26, 2026
6 checks passed
@philcunliffe philcunliffe deleted the task/incremental-sink-reads/T4 branch June 26, 2026 00:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant