Incremental sink reads: per-row ingest-seq watermark for forward + blob sinks (LLP 0039/0040)#159
Incremental sink reads: per-row ingest-seq watermark for forward + blob sinks (LLP 0039/0040)#159philcunliffe wants to merge 16 commits into
Conversation
Cover LLP 0039 with a neutral-minted design for a per-(sink, partition) watermark so the central forward sink and the core blob sink read and ship only rows added since their last successful export. Recommends a monotonic per-row _hyp_ingest_seq column over snapshot ancestry (does not survive a compaction generation swap) and a content-addressed seen-set (cannot meet the bounded-read goal). Specifies the readRows since/ continuation extension, the persisted watermark contract keyed by the generation-stable logical partition path, application to both sinks, and the exactly-once argument across retention prunes and compaction swaps. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Refine LLP 0040 into six small, independently-mergeable tasks along the producer -> read-API -> persistence -> consumer seam: T1 stamp _hyp_ingest_seq at the decorateRow chokepoint (deps: []) T2 readRows since/continuation + readRowsSince (deps: T1) T3 per-(sink,partition) watermark store keyed by logical path (deps: T2) T4 wire the central forward sink (deps: T2,T3) T5 wire the core blob sink (deps: T2,T3) T6 exactly-once tests across retention prune + compaction swap (deps: T4,T5) Verified with `neutral ready incremental-sink-reads --json`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… chokepoint Adds the row-resident, append-monotonic int64 watermark column that the incremental-sink-reads design (LLP 0040, Candidate B) is built on. This is the producer half of the seam: nothing reads the column yet, and it is stripped by INTERNAL_FIELDS from every existing readRows consumer, so it merges with zero behavioural change. - New `createIngestSeqAllocator` (src/core/cache/ingest-seq.js): a crash-safe, never-regressing monotonic int64 allocator. Reserve-before-stamp — a block of seqs is durably persisted (nextSeq advanced via atomic write-rename) before any seq in it is handed to a row, so a resumed flush never re-issues a seq <= one already stamped/exported. Gaps are tolerated; regressions are not. The counter is cache-global (<cacheRoot>/_hyp_ingest_seq.json), not a per-partition cursor.json, because decorateRow runs before rows are grouped into source= partitions and two spool paths (live + backfill) can feed one partition — only a cache-wide counter keeps every partition's seq subsequence strictly increasing. (LLP 0040 §7 records this refinement of risk #2.) - streaming-reader.js: decorateRow stamps `_hyp_ingest_seq` (the cache_row_id hash is still computed over the original row, so seq does not perturb dedup); the chunk's columns gain the additive nullable INT64 column so it lands in the Iceberg schema and rides a compaction generation swap verbatim; the field joins INTERNAL_FIELDS. - spool.js wires one cache-global allocator into the flush loop. - Tests: allocator monotonicity / never-regress-across-restart / reserve- before-stamp / concurrency; streamFlushFile stamping; and a storage round-trip proving the seq persists in Iceberg, increases per row, and is stripped from readRows. Verified separately that the column survives a real compaction swap. Task-Id: T1 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…into integration/incremental-sink-reads
Add a back-compatible `opts.since` to `readRows` and a cursor-aware
`readRowsSince` sibling that pairs each internal-stripped row with its
`after` continuation token, so the forward and blob sinks can read only
rows added since their last durable export.
- `scanRowsFromTable` gains an `opts.since` (bigint `_hyp_ingest_seq`
watermark) and applies a `seq > since` predicate as a yielded-row
filter. It is NOT pushed into icebird's `scan({ where })`: icebird
couples file/row-group pruning with a per-row match that drops nulls
(`null > since` is false), which would silently skip the legacy
null-seq rows the migration must preserve (LLP 0040 risk #1). The
design names this yielded-row filter as the fallback; a future
null-aware icebird filter can add the file-skip optimization on top.
- null-seq = new: a row whose `_hyp_ingest_seq` is null/absent
(pre-upgrade) is always yielded, so the one-time migration is at worst
a full re-export, never silent data loss. A table that never carried
the seq column yields everything.
- `after` is a monotonic high-water of real seqs, so a null-seq row
carries the prior watermark forward and progress never regresses even
when the scan visits seqs out of order (interleaved sources).
- `opts` absent ⇒ byte-for-byte the pre-existing full scan, so every
current caller is untouched until it opts in.
- Update the kernel-types decl: `SinkContinuation`, `ReadRowsOptions`,
the extended `readRows`, and the new `readRowsSince`.
Tests cover back-compat, after-token monotonicity, no-new-rows ≈0,
incremental new rows, the null-seq migration contract, the pure-legacy
(no seq column) table, and invalid-token rejection.
Task-Id: T2
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…into integration/incremental-sink-reads
Add `src/core/sinks/watermarks.js`, the shared incremental-read watermark
store for the forward and blob sinks (LLP 0042 task T3). Files live at
`<stateDir>/watermarks/<dataset>/<partition-key>.json` and carry the
versioned `{ continuation, exportedRowCount, updatedAt }` record.
The key is derived from the partition's stable LOGICAL path
(`datasets/<dataset>/<partition...>` relative to cacheRoot), never the
physical `tableDir` inside it — the hinge of design constraint (B): the
watermark reads straight through a compaction generation swap and a
retention front-prune. `write` is atomic write-rename (the
`writeCursor`/`writeProgress`/`ingest-seq.js` idiom); a corrupt or absent
record reads back as null so a sink re-exports from the start
(at-least-once + downstream dedup) rather than silently skipping rows.
Adds `SinkWatermarkKey`/`SinkWatermarkRecord`/`SinkWatermarkStore` types,
anchors LLP 0040 §3 (`#watermark-contract`), and a unit suite covering key
derivation (logical-not-tableDir, nesting, sanitize, sentinel, escape
guard), round-trip, in-place advance, atomic-no-temp, malformed-token
rejection, and corrupt-file null.
@ref LLP 0040#watermark-contract
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task-Id: T3
…into integration/incremental-sink-reads
Switch `forwardPartition` from a full-partition `storage.readRows(tablePath)`
scan to the cursor-aware `storage.readRowsSince(tablePath, { since })`, driven
by a per-(sink instance, partition) watermark loaded from the sink's stateDir
watermark store (T3). Each acked chunk advances the watermark to that chunk's
last `after` token (ship first, advance second), so a crash re-sends at most one
chunk and the server idempotency ledger now backstops only a bounded in-flight
suffix instead of the whole partition. A tick with no new rows yields zero rows,
sends zero chunks, and writes zero bytes.
Chunking (MAX_CHUNK_ROWS/MAX_CHUNK_BYTES), the Retry-After backpressure loop,
and `batchIdForChunk` derivation are unchanged. A missing/unreadable watermark
or underivable key falls back to a full scan (at-least-once + server dedup),
never a silent skip.
Implements task T4 of incremental-sink-reads (LLP 0040 §3-4, 0042).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task-Id: T4
Wire the local-fs and s3 blob destinations to the cursor-aware readRowsSince surface (T2) and the persisted per-(sink instance, partition) watermark store (T3), so each tick exports only rows added since the sink's last durable PUT. - New shared helpers in src/core/sinks/incremental.js (exported via hypaware/core/sinks): openIncrementalRows (peek-to-decide-empty, self-tracking rowCount + high-water lastAfter, feeds the unchanged encoder.encodePartition contract), withSeqRangeFilename (embeds [sinceSeq,lastSeq] before the extension), watermarkKeyFor, and createInstanceWatermarkStore. - Empty new-row set writes no blob (skip, 0 bytes). - The output filename/object key embeds the [sinceSeq,lastSeq] range so a crash-retry re-PUTs the same key (idempotent overwrite) — the blob sink's stand-in for the central server ledger. - The watermark advances only after the durable write/PUT. - PluginPaths.stateDir is per-plugin, not per sink instance, so the watermark store is scoped under the instance to honor the per-(sink instance, partition) contract. - Tests: helper unit tests, a local-fs end-to-end incremental test (ranged filename, watermark advance, skip-empty, new range, cumulative count), and rewritten s3-export-batch tests (skip-empty, ranged key, watermark advance, idempotent re-PUT on lost watermark) preserving the prior retry-semantics coverage. - Updated the two local-fs blob-sink smokes to match the ranged filename (note: both are pre-existing red on the integration branch for an unrelated reason — the driver hands the sink the drained spool path). Task-Id: T5 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…into integration/incremental-sink-reads
…into integration/incremental-sink-reads
Add the LLP 0040 exactly-once proof the T4/T5 unit suites can't give (they stub storage): a deterministic acceptance test driving the REAL kernel cache, retention enforcer, maintainCache compaction, BOTH sinks (central forward + core local-fs blob), and the driver outbox respool. Covers: ~0 bytes on a no-new-rows tick, ~N on an N-new tick, exactly-once across a retention front-prune and a compaction generation swap for both sinks, and watermark vs. driver-outbox respool composition (suffix-only replay + idempotent batch-id / re-PUT). Adds a hermetic smoke proving the blob sink reads straight through a compaction generation swap via the real driver. Anchors LLP 0040 section 5 so the @ref resolves. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Task-Id: T6
…into integration/incremental-sink-reads
…ance The forward sink derived its X-Hyp-Batch-Id from the per-tick chunk ordinal (chunkIndex). After an earlier chunk was acked and the (sink, partition) watermark advanced, a respool re-read only the un-acked suffix and re-numbered it from 0 — minting a NEW batch-id for a chunk that may already have committed on the server (ambiguous ack / commit-then-5xx). The server idempotency ledger could not dedupe the redelivery, double-storing rows and breaking the spec's 'ledger covers mid-batch retries' guarantee. Key the batch-id on the chunk's start seq (the watermark it resumes from) instead. A respooled suffix reproduces the same [startSeq, body] and thus the same id, so the ledger dedupes it; distinct chunks still differ because each row's _hyp_ingest_seq is unique. Adds a cross-tick regression test. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
🤖 neutral dual-review — verdict: block Blast radius / risk: HIGH — kernel storage read API ( Both reviewers ran. Both verdicts: block. The exactly-once skeleton is sound where it's been proven: the T6 acceptance suite is non-vacuous — it drives the real cache, real retention enforcer, real Codex key pointsCodex (gpt-5.5, xhigh) returned 3 blockers + 3 majors + 1 nit. Reconciled:
My key points (independent Claude pass)
Fix pushed (round 2 will re-review)Batch-id stability ( The two remaining blockers (null-seq migration; cross-process seq safety) are design-level — LLP 0040 §6 risk #1 itself flags the null-seq default as "must be nailed down (the wrong default … silently skips null-seq rows (data loss))". Resolving them is an author decision (stamp legacy rows during compaction / a one-time backfill migration, or track a Findings
|
🤖 neutral: held for a design decision (review verdict = block)The dual-review found real exactly-once defects. One is fixed; three are design-level and need a human decision — neutral will not guess at choices that risk silent data loss, so this PR is Fixed in this PR (
Needs a design decision (escalated, not fixed):
What holds: the core watermark design is sound — exactly-once across a retention front-prune and a compaction generation swap was verified, and the T6 acceptance suite is non-vacuous (drives real cache/retention/compaction/both sinks). Recommended path: revise the design (LLP 0040) to nail down (a) the null-seq backfill/stamping strategy for legacy + |
Dual-agent review —
|
… sinks (#159 review) Dual-review found a BLOCKER plus majors against the exactly-once claim. Fixes, each with a regression test that fails before and passes after: [BLOCKER] Forward sink advanced the watermark per acked chunk to the scan's running-max `after`. Because the cache scan is NOT seq-ordered (interleaved live+backfill spool; post-compaction sortOrder re-sort), an early acked chunk could checkpoint past lower-seq rows still un-acked in a later chunk; a between-chunk failure then dropped them forever (`seq <= since`). Fix: advance the forward watermark ONCE at end-of-partition (as the blob sink already does), so a partial partition never checkpoints — a failure re-reads the whole partition and the server ledger dedupes the acked prefix (stable chunkStartSeq batch ids). test: central-forward-chunking "an unordered scan never skips a lower-seq row when a later chunk fails". [MAJOR] Legacy null-seq rows re-exported every tick (never reached steady state). Add `includeLegacy` to the storage read API (default true). Both sinks pass `includeLegacy = (no durable watermark)`: a fresh sink exports the pre-upgrade backlog once, then excludes null-seq rows. Safe because no new null-seq row can appear post-upgrade (decorateRow stamps every flushed row). tests: sink-incremental-acceptance pure-legacy + mixed, forward + blob (~0 bytes on 2nd tick, no row in two artifacts). [MAJOR] Central forward sink scoped watermarks per-PLUGIN, not per-instance — two @hypaware/central instances clobbered one watermark file. Switch to createInstanceWatermarkStore({ paths, instanceName }) (matching local-fs/s3); correct the @ref. test: sink-incremental-acceptance "two instances keep independent watermarks (no cross-instance skip)". [MAJOR, doc] Bounded-reads constraint (C): the read is a full scan over the surviving partition with a yielded-row filter, not the file/row-group skip the design implied. Correct LLP 0040 §1(C)/§5 to state reads are bounded by surviving-partition size, with O(N_new) reads pending null-aware icebird pushdown. [MINOR/NIT, doc+style] Correct LLP 0040 §4 (batchIdForChunk keys by chunkStartSeq, not chunkIndex). Hoist inline import('...') types to @import in sink-watermarks/acceptance tests and the compaction smoke. [MAJOR, ESCALATED — left unfixed] Mid-retry duplication when a unit commits, its watermark write is lost, AND new rows append before the retry: the resumed in-flight unit grows past what committed and the dedup net no longer recognizes it. The common in-flight retry (no new arrivals) IS covered. A correct fix needs a pre-commit intent (read `until` upper bound + persisted intent record) across both sinks — a design-level change to the watermark contract, deferred rather than half-patched under the exactly-once claim. Documented as LLP 0040 §6 risk #7 / §5 known gap. LLP 0040/0042 updated in this commit to match the new behavior. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Implements the incremental sink reads change set — sinks read only rows added since their last successful export instead of re-reading the whole partition every tick.
_hyp_ingest_seqat the decorateRow write chokepoint (crash-safe allocator)storage.readRowswith cursor-awaresince/continuation + null-seq migration(sink instance, partition)watermark keyed by logical partition pathEach task landed as its own verified
--no-ffmerge with green CI. Server idempotency ledger retained as the in-flight retry net.Change-Set: incremental-sink-reads