diff --git a/collectivus-plugin-kernel-types.d.ts b/collectivus-plugin-kernel-types.d.ts index 2fac874..d31235b 100644 --- a/collectivus-plugin-kernel-types.d.ts +++ b/collectivus-plugin-kernel-types.d.ts @@ -1109,6 +1109,29 @@ export interface QueryScope { limit: number } +/** + * Opaque, versioned continuation token marking a sink's incremental-read + * watermark: the highest `_hyp_ingest_seq` a `(sink instance, partition)` has + * durably exported. `seq` is an int64 encoded as a decimal string to dodge + * bigint/JSON precision hazards. Opaque + versioned so the underlying watermark + * mechanism can change without invalidating persisted watermarks. See LLP 0040 §2. + */ +export interface SinkContinuation { + v: 1 + seq: string +} + +/** Options for the back-compatible incremental extension to `readRows`. */ +export interface ReadRowsOptions { + /** + * Yield only rows newer than this watermark (`_hyp_ingest_seq > since.seq`). + * Rows with a null `_hyp_ingest_seq` (pre-upgrade "legacy" rows) are always + * yielded — treated as new — so the one-time migration is at worst a full + * re-export, never silent data loss. Absent ⇒ full scan (today's behaviour). + */ + since?: SinkContinuation +} + /** * Intrinsic storage service exposed by core to plugins that materialize * rows into the local Iceberg-backed cache. Plugins do not configure @@ -1132,7 +1155,20 @@ export interface QueryStorageService { discoverCachePartitions(scope?: Partial): Promise tableExists(tablePath: string): boolean tableUrl(tablePath: string): string - readRows(tablePath: string, columns?: string[]): AsyncIterable> + readRows(tablePath: string, columns?: string[], opts?: ReadRowsOptions): AsyncIterable> + /** + * Cursor-aware sibling of `readRows` for sinks that must advance a + * per-(sink instance, partition) watermark. Pairs each internal-stripped row + * with the `after` continuation to persist ONCE that row is durably exported. + * The internal `_hyp_ingest_seq` never reaches the row payload — it is read to + * derive `after`, then stripped. `after` is a monotonic high-water mark, so a + * null-seq legacy row carries the prior watermark forward unchanged. See + * LLP 0040 §2. + */ + readRowsSince( + tablePath: string, + opts: { since?: SinkContinuation; columns?: string[] }, + ): AsyncIterable<{ row: Record; after: SinkContinuation }> } export interface CachePartitionMeta { diff --git a/src/core/cache/iceberg/store.js b/src/core/cache/iceberg/store.js index cab011b..01dde23 100644 --- a/src/core/cache/iceberg/store.js +++ b/src/core/cache/iceberg/store.js @@ -30,6 +30,7 @@ import { partitionSpecForDeclaration, validatePartitionSpecStability, } from '../../iceberg/partition-spec.js' +import { INGEST_SEQ_COLUMN } from '../streaming-reader.js' /** * @import { ColumnSpec } from '../../../../collectivus-plugin-kernel-types.d.ts' @@ -283,24 +284,72 @@ export async function readRowsFromTable(tablePath) { * time so callers (in particular `QueryStorageService.readRows`) never * materialize the full table in memory. * + * When `opts.since` is set (a bigint `_hyp_ingest_seq` watermark) only rows + * NEWER than the watermark are yielded: a row is kept iff its `_hyp_ingest_seq` + * is `null`/absent — a pre-column "legacy" row, treated as NEW so the one-time + * upgrade never silently skips it (LLP 0040 risk #1, the data-loss hazard) — OR + * its seq is strictly `> since`. The seq column is force-projected so the + * predicate can be evaluated even when the caller asked for a narrower set; + * `QueryStorageService` strips it from the row afterwards. + * + * The predicate is applied as a yielded-row filter rather than pushed into + * icebird's `scan({ where })`. icebird couples file/row-group pruning with a + * per-row match that DROPS nulls (`null > since` is false in both hyparquet's + * matcher and JS), which would skip exactly the legacy null-seq rows the + * migration must preserve. The design (LLP 0040 §2) names this yielded-row + * filter as the fallback; a future null-aware icebird filter can layer the + * file-skip optimization on top without changing this contract. + * + * @ref LLP 0040#storage-api-extension [implements] — since-filtered incremental scan; null-seq = new * @param {string} tablePath * @param {string[]} [columns] + * @param {{ since?: bigint }} [opts] * @returns {AsyncGenerator>} */ -export async function* scanRowsFromTable(tablePath, columns) { +export async function* scanRowsFromTable(tablePath, columns, opts) { if (!tableExists(tablePath)) return + const since = opts?.since const { resolver, lister } = await getLocalIO() const url = tableUrlForDir(tablePath) const { metadata } = await loadLatestFileCatalogMetadata({ tableUrl: url, resolver, lister }) if (metadata['current-snapshot-id'] === undefined || !metadata.snapshots?.length) return const source = await icebergDataSource({ tableUrl: url, metadata, resolver, lister }) - const projected = columns && columns.length > 0 ? columns : source.columns + // A table that has never been flushed under the seq-column schema carries no + // seq field: every row is implicitly null-seq (new), so there is nothing to + // project or filter — yield the whole table. + const hasSeq = since !== undefined && source.columns.includes(INGEST_SEQ_COLUMN.name) + let projected = columns && columns.length > 0 ? columns : source.columns + if (hasSeq && !projected.includes(INGEST_SEQ_COLUMN.name)) { + projected = [...projected, INGEST_SEQ_COLUMN.name] + } const scan = source.scan({ columns: projected }) for await (const row of scan.rows()) { - yield await resolveAsyncRow(row, projected) + const resolved = await resolveAsyncRow(row, projected) + if (hasSeq) { + const seq = seqValue(resolved[INGEST_SEQ_COLUMN.name]) + if (seq !== null && seq <= /** @type {bigint} */ (since)) continue + } + yield resolved } } +/** + * Decode a raw `_hyp_ingest_seq` cell to a bigint, or `null` when the row has + * no usable seq — a pre-column legacy row (null/absent), or an unparseable + * value. Returning `null` for an unparseable value is the safe direction: the + * caller treats `null` as a NEW row and never skips it (LLP 0040 risk #1). + * + * @param {unknown} raw + * @returns {bigint | null} + */ +export function seqValue(raw) { + if (raw === null || raw === undefined) return null + if (typeof raw === 'bigint') return raw + if (typeof raw === 'number' && Number.isInteger(raw)) return BigInt(raw) + if (typeof raw === 'string' && /^-?\d+$/.test(raw)) return BigInt(raw) + return null +} + /** * Build a squirreling-compatible `AsyncDataSource` over the latest * snapshot of the table. Returns `null` if the table does not exist diff --git a/src/core/cache/storage.js b/src/core/cache/storage.js index bda5806..1a3b028 100644 --- a/src/core/cache/storage.js +++ b/src/core/cache/storage.js @@ -4,6 +4,7 @@ import { Attr, getLogger, getMeter, withSpan } from '../observability/index.js' import { dataSourceForTable, scanRowsFromTable, + seqValue, tableExists as icebergTableExists, tableUrl as icebergTableUrl, } from './iceberg/store.js' @@ -19,16 +20,34 @@ import { } from './partition.js' import { cacheTablePath, datasetForTablePath } from './paths.js' import { createCacheSpool, discoverSpoolTables, DEFAULT_SPOOL_BYTES_THRESHOLD } from './spool.js' -import { INTERNAL_FIELDS } from './streaming-reader.js' +import { INGEST_SEQ_COLUMN, INTERNAL_FIELDS } from './streaming-reader.js' import path from 'node:path' /** - * @import { ColumnSpec, QueryScope, QueryStorageService } from '../../../collectivus-plugin-kernel-types.d.ts' + * @import { ColumnSpec, QueryScope, QueryStorageService, SinkContinuation } from '../../../collectivus-plugin-kernel-types.d.ts' * @import { CachePartitioningDeclaration, ExtendedQueryStorageService } from './types.d.ts' * @import { AsyncCells } from 'squirreling' */ +/** + * Decode a persisted `SinkContinuation` into its int64 `_hyp_ingest_seq` + * watermark. Absent ⇒ `0n` ("exported nothing"): the allocator starts seqs at + * 1, so `0` is strictly below every real row and a fresh sink reads the whole + * table. The token is opaque + versioned so the watermark mechanism can change + * later without invalidating persisted watermarks (LLP 0040 §2). + * + * @param {SinkContinuation | undefined} since + * @returns {bigint} + */ +function continuationToSeq(since) { + if (since === undefined || since === null) return 0n + if (since.v !== 1 || typeof since.seq !== 'string' || !/^\d+$/.test(since.seq)) { + throw new Error(`readRows: invalid SinkContinuation ${JSON.stringify(since)}`) + } + return BigInt(since.seq) +} + /** * Resolve a tablePath to the Iceberg table directory. * @@ -179,14 +198,43 @@ export function createQueryStorageService({ cacheRoot, getDeclaration, getSettle return icebergTableUrl(resolveIcebergDir(tablePath)) }, - async *readRows(tablePath, columns) { + // @ref LLP 0040#storage-api-extension [implements] — back-compatible + // `opts.since`: absent ⇒ byte-for-byte the pre-existing full scan, so every + // current caller is untouched until it opts in. When set, the scan yields + // only rows newer than the watermark (null-seq legacy rows always yielded). + async *readRows(tablePath, columns, opts) { + const since = opts?.since !== undefined ? continuationToSeq(opts.since) : undefined const projected = columns?.filter((c) => !INTERNAL_FIELDS.includes(c)) - for await (const row of scanRowsFromTable(resolveIcebergDir(tablePath), projected)) { + const scanOpts = since !== undefined ? { since } : undefined + for await (const row of scanRowsFromTable(resolveIcebergDir(tablePath), projected, scanOpts)) { for (const f of INTERNAL_FIELDS) delete row[f] yield row } }, + // @ref LLP 0040#storage-api-extension [implements] — cursor-aware sibling + // for sinks that advance a per-(sink, partition) watermark. `_hyp_ingest_seq` + // is an INTERNAL_FIELD stripped from the row, so a sink reading `readRows` + // can't learn the high-water seq; `readRowsSince` reads it to derive the + // `after` token, then strips it so the seq never reaches the wire payload. + async *readRowsSince(tablePath, opts = {}) { + const since = continuationToSeq(opts.since) + const projected = opts.columns?.filter((c) => !INTERNAL_FIELDS.includes(c)) + // Running high-water of REAL (non-null) seqs seen so far, seeded with the + // incoming watermark. `after` is this monotonic max, so a null-seq legacy + // row never advances the watermark and progress never regresses even when + // the scan visits seqs out of order (interleaved sources; LLP 0040 risk #3). + let high = since + for await (const row of scanRowsFromTable(resolveIcebergDir(tablePath), projected, { since })) { + const seq = seqValue(row[INGEST_SEQ_COLUMN.name]) + if (seq !== null && seq > high) high = seq + for (const f of INTERNAL_FIELDS) delete row[f] + /** @type {SinkContinuation} */ + const after = { v: 1, seq: high.toString() } + yield { row, after } + } + }, + async dataSourceForTable(tablePath) { const source = await dataSourceForTable(resolveIcebergDir(tablePath)) if (!source) return null diff --git a/test/core/sink-reads-since.test.js b/test/core/sink-reads-since.test.js new file mode 100644 index 0000000..0f714d6 --- /dev/null +++ b/test/core/sink-reads-since.test.js @@ -0,0 +1,210 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' +import fs from 'node:fs/promises' +import path from 'node:path' +import os from 'node:os' + +import { createQueryStorageService } from '../../src/core/cache/storage.js' +import { appendRowsToTable, scanRowsFromTable } from '../../src/core/cache/iceberg/store.js' +import { INGEST_SEQ_COLUMN } from '../../src/core/cache/streaming-reader.js' + +/** + * @import { ColumnSpec } from '../../collectivus-plugin-kernel-types.d.ts' + */ + +/** @returns {Promise} */ +async function makeTmpDir() { + return fs.mkdtemp(path.join(os.tmpdir(), 'hyp-since-')) +} + +/** @type {ColumnSpec[]} */ +const COLS = [ + { name: 'id', type: 'INT64', nullable: false }, + { name: 'msg', type: 'STRING', nullable: false }, +] + +test('readRows back-compat: no opts is unchanged, internal fields never leak', async () => { + const cacheRoot = await makeTmpDir() + const svc = createQueryStorageService({ cacheRoot }) + const spoolPath = svc.cacheTablePath('demo', ['all']) + await svc.appendRows(spoolPath, COLS, [ + { id: 1, msg: 'a' }, + { id: 2, msg: 'b' }, + { id: 3, msg: 'c' }, + ]) + await svc.flushTable(spoolPath, { reason: 'manual' }) + + // The spool re-groups rows into a committed `source=` partition; a + // sink reads from the discovered partition path, not the spool path. + const parts = await svc.discoverCachePartitions() + assert.equal(parts.length, 1) + const tablePath = parts[0].path + + /** @type {Record[]} */ + const all = [] + for await (const row of svc.readRows(tablePath)) all.push(row) + assert.equal(all.length, 3) + for (const row of all) { + assert.ok(!('_hyp_ingest_seq' in row)) + assert.ok(!('_hyp_cache_row_id' in row)) + assert.ok(!('_hyp_cache_batch_id' in row)) + } + + // Column projection is still honoured and still strips internals. + /** @type {Record[]} */ + const idOnly = [] + for await (const row of svc.readRows(tablePath, ['id'])) idOnly.push(row) + assert.equal(idOnly.length, 3) + for (const row of idOnly) assert.deepEqual(Object.keys(row), ['id']) + + await fs.rm(cacheRoot, { recursive: true, force: true }) +}) + +test('readRowsSince pairs each row with a monotonic after token and strips the seq', async () => { + const cacheRoot = await makeTmpDir() + const svc = createQueryStorageService({ cacheRoot }) + const spoolPath = svc.cacheTablePath('demo', ['all']) + await svc.appendRows(spoolPath, COLS, [ + { id: 1, msg: 'a' }, + { id: 2, msg: 'b' }, + { id: 3, msg: 'c' }, + ]) + await svc.flushTable(spoolPath, { reason: 'manual' }) + + const parts = await svc.discoverCachePartitions() + assert.equal(parts.length, 1) + const tablePath = parts[0].path + + /** @type {{ row: Record, after: { v: 1, seq: string } }[]} */ + const seen = [] + for await (const pair of svc.readRowsSince(tablePath, {})) seen.push(pair) + assert.equal(seen.length, 3) + + let prev = -1n + for (const { row, after } of seen) { + assert.ok(!('_hyp_ingest_seq' in row), 'seq never reaches the row payload') + assert.equal(after.v, 1) + assert.match(after.seq, /^\d+$/) + const cur = BigInt(after.seq) + assert.ok(cur >= prev, 'after token never regresses across the scan') + prev = cur + } + const watermark = seen[seen.length - 1].after + + // A second read from the watermark with no new rows yields nothing (≈0 bytes), + // via both the cursor-aware surface and the plain `readRows` `since`. + /** @type {unknown[]} */ + const none = [] + for await (const pair of svc.readRowsSince(tablePath, { since: watermark })) none.push(pair) + assert.equal(none.length, 0) + /** @type {unknown[]} */ + const noneFlat = [] + for await (const row of svc.readRows(tablePath, undefined, { since: watermark })) noneFlat.push(row) + assert.equal(noneFlat.length, 0) + + // After N new rows, only the N new ones are read, independent of the rest. + await svc.appendRows(spoolPath, COLS, [ + { id: 4, msg: 'd' }, + { id: 5, msg: 'e' }, + ]) + await svc.flushTable(spoolPath, { reason: 'manual' }) + + /** @type {Record[]} */ + const fresh = [] + for await (const { row, after } of svc.readRowsSince(tablePath, { since: watermark })) { + fresh.push(row) + assert.ok(BigInt(after.seq) > BigInt(watermark.seq)) + } + assert.equal(fresh.length, 2) + assert.deepEqual(fresh.map((r) => Number(r.id)).sort((a, b) => a - b), [4, 5]) + + await fs.rm(cacheRoot, { recursive: true, force: true }) +}) + +test('null-seq (legacy) rows are always treated as new and never skipped', async () => { + const root = await makeTmpDir() + const dir = path.join(root, 'legacy-table') + /** @type {ColumnSpec[]} */ + const cols = [ + { name: 'id', type: 'INT64', nullable: false }, + INGEST_SEQ_COLUMN, + ] + // A migration-era table: some rows pre-date the seq column (null), some carry + // real seqs. Built directly so the seq values are controlled exactly. + await appendRowsToTable(dir, cols, [ + { id: 1, [INGEST_SEQ_COLUMN.name]: null }, + { id: 2, [INGEST_SEQ_COLUMN.name]: 5n }, + { id: 3, [INGEST_SEQ_COLUMN.name]: 10n }, + { id: 4, [INGEST_SEQ_COLUMN.name]: null }, + ]) + + // since = 5: keep null(1), skip seq 5(2), keep seq 10(3), keep null(4). + /** @type {number[]} */ + const kept = [] + for await (const row of scanRowsFromTable(dir, undefined, { since: 5n })) kept.push(Number(row.id)) + assert.deepEqual(kept, [1, 3, 4]) + + // since = 0: every row is new. + /** @type {number[]} */ + const allIds = [] + for await (const row of scanRowsFromTable(dir, undefined, { since: 0n })) allIds.push(Number(row.id)) + assert.deepEqual(allIds, [1, 2, 3, 4]) + + // Through the cursor-aware surface: a null-seq row carries the prior watermark + // forward unchanged (it does not advance the high-water seq). + const svc = createQueryStorageService({ cacheRoot: root }) + /** @type {{ id: number, after: string }[]} */ + const pairs = [] + for await (const { row, after } of svc.readRowsSince(dir, { since: { v: 1, seq: '5' } })) { + assert.ok(!(INGEST_SEQ_COLUMN.name in row)) + pairs.push({ id: Number(row.id), after: after.seq }) + } + assert.deepEqual(pairs, [ + { id: 1, after: '5' }, + { id: 3, after: '10' }, + { id: 4, after: '10' }, + ]) + + await fs.rm(root, { recursive: true, force: true }) +}) + +test('a table with no seq column at all yields everything (pure legacy)', async () => { + const root = await makeTmpDir() + const dir = path.join(root, 'no-seq-col') + await appendRowsToTable(dir, COLS, [ + { id: 1, msg: 'a' }, + { id: 2, msg: 'b' }, + ]) + + // Even with a high watermark, a table that never carried the seq column has + // only implicit null-seq rows, so all are new. + const svc = createQueryStorageService({ cacheRoot: root }) + /** @type {{ id: number, after: string }[]} */ + const pairs = [] + for await (const { row, after } of svc.readRowsSince(dir, { since: { v: 1, seq: '999' } })) { + pairs.push({ id: Number(row.id), after: after.seq }) + } + assert.deepEqual(pairs, [ + { id: 1, after: '999' }, + { id: 2, after: '999' }, + ]) + + await fs.rm(root, { recursive: true, force: true }) +}) + +test('an invalid continuation token is rejected', async () => { + const cacheRoot = await makeTmpDir() + const svc = createQueryStorageService({ cacheRoot }) + const tablePath = svc.cacheTablePath('demo', ['all']) + await svc.appendRows(tablePath, COLS, [{ id: 1, msg: 'a' }]) + await svc.flushTable(tablePath, { reason: 'manual' }) + + await assert.rejects(async () => { + // @ts-expect-error — deliberately malformed token + for await (const _ of svc.readRowsSince(tablePath, { since: { v: 2, seq: '1' } })) { /* drain */ } + }, /invalid SinkContinuation/) + + await fs.rm(cacheRoot, { recursive: true, force: true }) +})