T5: wire core blob sink (local-fs + s3) to incremental readRowsSince#157
Merged
philcunliffe merged 1 commit intoJun 26, 2026
Merged
Conversation
Wire the local-fs and s3 blob destinations to the cursor-aware readRowsSince surface (T2) and the persisted per-(sink instance, partition) watermark store (T3), so each tick exports only rows added since the sink's last durable PUT. - New shared helpers in src/core/sinks/incremental.js (exported via hypaware/core/sinks): openIncrementalRows (peek-to-decide-empty, self-tracking rowCount + high-water lastAfter, feeds the unchanged encoder.encodePartition contract), withSeqRangeFilename (embeds [sinceSeq,lastSeq] before the extension), watermarkKeyFor, and createInstanceWatermarkStore. - Empty new-row set writes no blob (skip, 0 bytes). - The output filename/object key embeds the [sinceSeq,lastSeq] range so a crash-retry re-PUTs the same key (idempotent overwrite) — the blob sink's stand-in for the central server ledger. - The watermark advances only after the durable write/PUT. - PluginPaths.stateDir is per-plugin, not per sink instance, so the watermark store is scoped under the instance to honor the per-(sink instance, partition) contract. - Tests: helper unit tests, a local-fs end-to-end incremental test (ranged filename, watermark advance, skip-empty, new range, cumulative count), and rewritten s3-export-batch tests (skip-empty, ranged key, watermark advance, idempotent re-PUT on lost watermark) preserving the prior retry-semantics coverage. - Updated the two local-fs blob-sink smokes to match the ranged filename (note: both are pre-existing red on the integration branch for an unrelated reason — the driver hands the sink the drained spool path). Task-Id: T5 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
4e9f386
into
integration/incremental-sink-reads
6 checks passed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implements task T5 of the
incremental-sink-readschange set (LLP 0042 plan; LLP 0040 design §4).Wires the core blob sink (
local-fs+s3destinations) to the cursor-awarereadRowsSinceread surface (T2) and the persisted per-(sink instance, partition)watermark store (T3):_hyp_ingest_seq > watermarkeach tick;[sinceSeq, lastSeq]range in the output filename / object key so a crash-retry re-PUTs the same key (idempotent overwrite — the blob sink's stand-in for the server idempotency ledger);New shared helpers in
src/core/sinks/incremental.js(exported viahypaware/core/sinks):openIncrementalRows,withSeqRangeFilename,watermarkKeyFor,createInstanceWatermarkStore. BecausePluginPaths.stateDiris per-plugin (not per sink instance), the watermark store is scoped under the instance to honor the design's per-(sink instance, partition)contract.Tests
test/core/sink-incremental.test.js— helper unit tests (filename range, peek/empty, since filter, null-seq carry-forward, instance isolation).test/plugins/local-fs-incremental-export.test.js— end-to-end: ranged filename, watermark advance, skip-empty, new range, cumulative count.test/plugins/s3-export-batch.test.js— rewritten for the new behavior (skip-empty, ranged key, watermark advance, idempotent re-PUT on lost watermark) while preserving the prior retry-semantics coverage.npm test(1433 pass / 1 pre-existing skip / 0 fail),npm run typecheck, andnpm run lintall pass.Note: the two local-fs blob-sink smokes (
blob_sink_parquet_local_fs,local_parquet_export) are pre-existing red on the integration branch for an unrelated reason — the sink driver hands the blob sink the drained spool path (dataset/all) rather than the committedsource=<...>partition, so 0 rows are read regardless. Verified by running each on the untouched base branch. Their filename assertions were updated to match the ranged filename so they will pass once that upstream driver/discovery issue is resolved.Task-Id: T5
🤖 Generated with Claude Code