Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion collectivus-plugin-kernel-types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,29 @@ export interface QueryScope {
limit: number
}

/**
* Opaque, versioned continuation token marking a sink's incremental-read
* watermark: the highest `_hyp_ingest_seq` a `(sink instance, partition)` has
* durably exported. `seq` is an int64 encoded as a decimal string to dodge
* bigint/JSON precision hazards. Opaque + versioned so the underlying watermark
* mechanism can change without invalidating persisted watermarks. See LLP 0040 §2.
*/
export interface SinkContinuation {
v: 1
seq: string
}

/** Options for the back-compatible incremental extension to `readRows`. */
export interface ReadRowsOptions {
/**
* Yield only rows newer than this watermark (`_hyp_ingest_seq > since.seq`).
* Rows with a null `_hyp_ingest_seq` (pre-upgrade "legacy" rows) are always
* yielded — treated as new — so the one-time migration is at worst a full
* re-export, never silent data loss. Absent ⇒ full scan (today's behaviour).
*/
since?: SinkContinuation
}

/**
* Intrinsic storage service exposed by core to plugins that materialize
* rows into the local Iceberg-backed cache. Plugins do not configure
Expand All @@ -1132,7 +1155,20 @@ export interface QueryStorageService {
discoverCachePartitions(scope?: Partial<QueryScope>): Promise<CachePartitionMeta[]>
tableExists(tablePath: string): boolean
tableUrl(tablePath: string): string
readRows(tablePath: string, columns?: string[]): AsyncIterable<Record<string, unknown>>
readRows(tablePath: string, columns?: string[], opts?: ReadRowsOptions): AsyncIterable<Record<string, unknown>>
/**
* Cursor-aware sibling of `readRows` for sinks that must advance a
* per-(sink instance, partition) watermark. Pairs each internal-stripped row
* with the `after` continuation to persist ONCE that row is durably exported.
* The internal `_hyp_ingest_seq` never reaches the row payload — it is read to
* derive `after`, then stripped. `after` is a monotonic high-water mark, so a
* null-seq legacy row carries the prior watermark forward unchanged. See
* LLP 0040 §2.
*/
readRowsSince(
tablePath: string,
opts: { since?: SinkContinuation; columns?: string[] },
): AsyncIterable<{ row: Record<string, unknown>; after: SinkContinuation }>
}

export interface CachePartitionMeta {
Expand Down
55 changes: 52 additions & 3 deletions src/core/cache/iceberg/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
partitionSpecForDeclaration,
validatePartitionSpecStability,
} from '../../iceberg/partition-spec.js'
import { INGEST_SEQ_COLUMN } from '../streaming-reader.js'

/**
* @import { ColumnSpec } from '../../../../collectivus-plugin-kernel-types.d.ts'
Expand Down Expand Up @@ -283,24 +284,72 @@ export async function readRowsFromTable(tablePath) {
* time so callers (in particular `QueryStorageService.readRows`) never
* materialize the full table in memory.
*
* When `opts.since` is set (a bigint `_hyp_ingest_seq` watermark) only rows
* NEWER than the watermark are yielded: a row is kept iff its `_hyp_ingest_seq`
* is `null`/absent — a pre-column "legacy" row, treated as NEW so the one-time
* upgrade never silently skips it (LLP 0040 risk #1, the data-loss hazard) — OR
* its seq is strictly `> since`. The seq column is force-projected so the
* predicate can be evaluated even when the caller asked for a narrower set;
* `QueryStorageService` strips it from the row afterwards.
*
* The predicate is applied as a yielded-row filter rather than pushed into
* icebird's `scan({ where })`. icebird couples file/row-group pruning with a
* per-row match that DROPS nulls (`null > since` is false in both hyparquet's
* matcher and JS), which would skip exactly the legacy null-seq rows the
* migration must preserve. The design (LLP 0040 §2) names this yielded-row
* filter as the fallback; a future null-aware icebird filter can layer the
* file-skip optimization on top without changing this contract.
*
* @ref LLP 0040#storage-api-extension [implements] — since-filtered incremental scan; null-seq = new
* @param {string} tablePath
* @param {string[]} [columns]
* @param {{ since?: bigint }} [opts]
* @returns {AsyncGenerator<Record<string, unknown>>}
*/
export async function* scanRowsFromTable(tablePath, columns) {
export async function* scanRowsFromTable(tablePath, columns, opts) {
if (!tableExists(tablePath)) return
const since = opts?.since
const { resolver, lister } = await getLocalIO()
const url = tableUrlForDir(tablePath)
const { metadata } = await loadLatestFileCatalogMetadata({ tableUrl: url, resolver, lister })
if (metadata['current-snapshot-id'] === undefined || !metadata.snapshots?.length) return
const source = await icebergDataSource({ tableUrl: url, metadata, resolver, lister })
const projected = columns && columns.length > 0 ? columns : source.columns
// A table that has never been flushed under the seq-column schema carries no
// seq field: every row is implicitly null-seq (new), so there is nothing to
// project or filter — yield the whole table.
const hasSeq = since !== undefined && source.columns.includes(INGEST_SEQ_COLUMN.name)
let projected = columns && columns.length > 0 ? columns : source.columns
if (hasSeq && !projected.includes(INGEST_SEQ_COLUMN.name)) {
projected = [...projected, INGEST_SEQ_COLUMN.name]
}
const scan = source.scan({ columns: projected })
for await (const row of scan.rows()) {
yield await resolveAsyncRow(row, projected)
const resolved = await resolveAsyncRow(row, projected)
if (hasSeq) {
const seq = seqValue(resolved[INGEST_SEQ_COLUMN.name])
if (seq !== null && seq <= /** @type {bigint} */ (since)) continue
}
yield resolved
}
}

/**
* Decode a raw `_hyp_ingest_seq` cell to a bigint, or `null` when the row has
* no usable seq — a pre-column legacy row (null/absent), or an unparseable
* value. Returning `null` for an unparseable value is the safe direction: the
* caller treats `null` as a NEW row and never skips it (LLP 0040 risk #1).
*
* @param {unknown} raw
* @returns {bigint | null}
*/
export function seqValue(raw) {
if (raw === null || raw === undefined) return null
if (typeof raw === 'bigint') return raw
if (typeof raw === 'number' && Number.isInteger(raw)) return BigInt(raw)
if (typeof raw === 'string' && /^-?\d+$/.test(raw)) return BigInt(raw)
return null
}

/**
* Build a squirreling-compatible `AsyncDataSource` over the latest
* snapshot of the table. Returns `null` if the table does not exist
Expand Down
56 changes: 52 additions & 4 deletions src/core/cache/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Attr, getLogger, getMeter, withSpan } from '../observability/index.js'
import {
dataSourceForTable,
scanRowsFromTable,
seqValue,
tableExists as icebergTableExists,
tableUrl as icebergTableUrl,
} from './iceberg/store.js'
Expand All @@ -19,16 +20,34 @@ import {
} from './partition.js'
import { cacheTablePath, datasetForTablePath } from './paths.js'
import { createCacheSpool, discoverSpoolTables, DEFAULT_SPOOL_BYTES_THRESHOLD } from './spool.js'
import { INTERNAL_FIELDS } from './streaming-reader.js'
import { INGEST_SEQ_COLUMN, INTERNAL_FIELDS } from './streaming-reader.js'

import path from 'node:path'

/**
* @import { ColumnSpec, QueryScope, QueryStorageService } from '../../../collectivus-plugin-kernel-types.d.ts'
* @import { ColumnSpec, QueryScope, QueryStorageService, SinkContinuation } from '../../../collectivus-plugin-kernel-types.d.ts'
* @import { CachePartitioningDeclaration, ExtendedQueryStorageService } from './types.d.ts'
* @import { AsyncCells } from 'squirreling'
*/

/**
* Decode a persisted `SinkContinuation` into its int64 `_hyp_ingest_seq`
* watermark. Absent ⇒ `0n` ("exported nothing"): the allocator starts seqs at
* 1, so `0` is strictly below every real row and a fresh sink reads the whole
* table. The token is opaque + versioned so the watermark mechanism can change
* later without invalidating persisted watermarks (LLP 0040 §2).
*
* @param {SinkContinuation | undefined} since
* @returns {bigint}
*/
function continuationToSeq(since) {
if (since === undefined || since === null) return 0n
if (since.v !== 1 || typeof since.seq !== 'string' || !/^\d+$/.test(since.seq)) {
throw new Error(`readRows: invalid SinkContinuation ${JSON.stringify(since)}`)
}
return BigInt(since.seq)
}

/**
* Resolve a tablePath to the Iceberg table directory.
*
Expand Down Expand Up @@ -179,14 +198,43 @@ export function createQueryStorageService({ cacheRoot, getDeclaration, getSettle
return icebergTableUrl(resolveIcebergDir(tablePath))
},

async *readRows(tablePath, columns) {
// @ref LLP 0040#storage-api-extension [implements] — back-compatible
// `opts.since`: absent ⇒ byte-for-byte the pre-existing full scan, so every
// current caller is untouched until it opts in. When set, the scan yields
// only rows newer than the watermark (null-seq legacy rows always yielded).
async *readRows(tablePath, columns, opts) {
const since = opts?.since !== undefined ? continuationToSeq(opts.since) : undefined
const projected = columns?.filter((c) => !INTERNAL_FIELDS.includes(c))
for await (const row of scanRowsFromTable(resolveIcebergDir(tablePath), projected)) {
const scanOpts = since !== undefined ? { since } : undefined
for await (const row of scanRowsFromTable(resolveIcebergDir(tablePath), projected, scanOpts)) {
for (const f of INTERNAL_FIELDS) delete row[f]
yield row
}
},

// @ref LLP 0040#storage-api-extension [implements] — cursor-aware sibling
// for sinks that advance a per-(sink, partition) watermark. `_hyp_ingest_seq`
// is an INTERNAL_FIELD stripped from the row, so a sink reading `readRows`
// can't learn the high-water seq; `readRowsSince` reads it to derive the
// `after` token, then strips it so the seq never reaches the wire payload.
async *readRowsSince(tablePath, opts = {}) {
const since = continuationToSeq(opts.since)
const projected = opts.columns?.filter((c) => !INTERNAL_FIELDS.includes(c))
// Running high-water of REAL (non-null) seqs seen so far, seeded with the
// incoming watermark. `after` is this monotonic max, so a null-seq legacy
// row never advances the watermark and progress never regresses even when
// the scan visits seqs out of order (interleaved sources; LLP 0040 risk #3).
let high = since
for await (const row of scanRowsFromTable(resolveIcebergDir(tablePath), projected, { since })) {
const seq = seqValue(row[INGEST_SEQ_COLUMN.name])
if (seq !== null && seq > high) high = seq
for (const f of INTERNAL_FIELDS) delete row[f]
/** @type {SinkContinuation} */
const after = { v: 1, seq: high.toString() }
yield { row, after }
}
},

async dataSourceForTable(tablePath) {
const source = await dataSourceForTable(resolveIcebergDir(tablePath))
if (!source) return null
Expand Down
Loading