diff --git a/index.d.ts b/index.d.ts index a9b838b..4a8d793 100644 --- a/index.d.ts +++ b/index.d.ts @@ -83,11 +83,124 @@ export interface InsertSummary { elapsed: number; } +/** + * Raw insert formats (v1: text formats; the format name is whitelisted, never + * passed through). TSV/TSVWithNames are aliases of TabSeparated*. + */ +export type RawInsertFormat = + | 'JSONEachRow' | 'JSONCompactEachRow' + | 'CSV' | 'CSVWithNames' + | 'TSV' | 'TabSeparated' | 'TSVWithNames' | 'TabSeparatedWithNames'; + +/** + * Stream-insert formats: line-delimited only. These formats escape raw + * newlines inside values, so a raw '\n' is always a row boundary and every + * re-chunked INSERT payload stays independently valid. CSV (raw newlines legal + * inside quoted fields) and WithNames variants (header cannot repeat per + * chunk) are excluded — use a single-shot Buffer insert for those. + */ +export type StreamInsertFormat = 'JSONEachRow' | 'JSONCompactEachRow' | 'TSV' | 'TabSeparated'; + +/** + * Parameters for the raw passthrough insert: the payload stays bytes + * (off the V8 heap for Buffer/Uint8Array), is handed to the native side + * zero-copy, and the engine's multithreaded parser does all the parsing — JS + * never builds an object tree from the payload. + * + * Contract: do NOT mutate the Buffer until the returned promise settles (same + * contract as fs.write). A string payload is accepted as a small-payload + * convenience only — it is already a V8 string, so prefer Buffers end to end. + */ +export interface RawInsertParams { + /** Target table (optionally db-qualified). */ + table: string; + /** Raw payload bytes in the declared format. */ + values: Buffer | Uint8Array | string; + /** Payload format (required; whitelisted). */ + format: RawInsertFormat; + /** Explicit column list: INSERT INTO t (a, b) FORMAT ... */ + columns?: ReadonlyArray; + /** Per-insert settings (e.g. input_format_skip_unknown_fields: 1). */ + settings?: Record; + /** Early-settle abort (the underlying write still completes; the payload stays pinned until it does). */ + signal?: AbortSignal; + /** Early-settle timeout in ms (same honest single-shot semantics). */ + timeout?: number; +} + +/** + * Summary of a raw insert. Two ledgers by design: + * - rowsWritten/bytesWritten — engine-side write progress (includes cascaded + * materialized-view writes, same semantics as HTTP X-ClickHouse-Summary). + * - rowsSent/bytesSent — payload-side: non-empty payload lines (exact for + * line-delimited formats; undefined for the CSV family) and payload bytes. + */ +export interface RawInsertSummary { + rowsWritten: number; + bytesWritten: number; + rowsSent?: number; + bytesSent: number; + elapsed: number; +} + +/** Progress snapshot for streaming inserts (payload-side ledger). */ +export interface InsertProgress { + rowsSent: number; + bytesSent: number; + chunks: number; +} + +/** + * Parameters for the backpressured streaming insert. The source is consumed + * pull-based: at most one bounded chunk is buffered and each chunk's INSERT is + * awaited before the next pull, so a fast producer is throttled to the chDB + * write rate (backpressure is flow-control, never an error). Failures surface + * as typed errors that always settle the promise: source-error / stall / + * backpressure-overflow / write-failure / row-too-large / abort — each + * carrying a progress snapshot. Semantics are at-least-once: already-flushed + * chunks are not rolled back. + */ +export interface StreamInsertParams { + table: string; + /** Byte stream: Node Readable or any AsyncIterable of Buffer/Uint8Array/string chunks. */ + values: NodeJS.ReadableStream | AsyncIterable; + format: StreamInsertFormat; + columns?: ReadonlyArray; + settings?: Record; + /** Target chunk size (default 8 MiB). */ + maxChunkBytes?: number; + /** Single-row ceiling; exceeded without a row boundary => row-too-large (default 64 MiB). */ + maxRowBytes?: number; + /** Bounded-buffer ceiling for un-pausable Readables => backpressure-overflow (default 64 MiB). */ + maxBufferedBytes?: number; + /** Producer idle deadline => ChdbTimeoutError{reason:'stall'}. Off by default. */ + stallTimeout?: number; + /** Called after each flushed chunk. */ + onProgress?: (p: InsertProgress) => void; + signal?: AbortSignal; + /** Per-chunk timeout in ms. */ + timeout?: number; +} + +/** Summary of a streaming insert (both ledgers accumulated across chunks). */ +export interface StreamInsertSummary { + rowsWritten: number; + bytesWritten: number; + rowsSent: number; + bytesSent: number; + chunks: number; + elapsed: number; +} + /** * Inserts rows via an inline multi-row INSERT (default connection). Async; never * reads stdin. */ export function insert(params: InsertParams): Promise; +/** raw passthrough insert (default connection). */ +export function insert(params: RawInsertParams): Promise; +/** Backpressured streaming insert (default connection). */ +export function insert(params: StreamInsertParams): Promise; /** * Options for {@link Session.queryStream}. @@ -205,6 +318,10 @@ export class Session { * Inserts rows via an inline multi-row INSERT. Async; never reads stdin. */ insert(params: InsertParams): Promise; + /** raw passthrough insert on this session's connection. */ + insert(params: RawInsertParams): Promise; + /** Backpressured streaming insert on this session's connection. */ + insert(params: StreamInsertParams): Promise; /** * Streams a query result chunk-by-chunk (only one active stream per session). diff --git a/index.js b/index.js index 9130609..ee7c998 100644 --- a/index.js +++ b/index.js @@ -35,13 +35,22 @@ function prepArrow(query, opts, defaultFormat) { const { formatParamValue } = require('./dist/serialize.js'); const { ChdbResult } = require('./dist/result.js'); const { buildInsertSQL } = require('./dist/insert.js'); +const { buildRawInsertPrefix, isRawValues, isStreamValues } = require('./dist/rawInsert.js'); +const { streamInsert } = require('./dist/streamInsert.js'); // Map a native/query error into a typed ChdbInsertError (preserving message, -// clickhouseCode and cause). +// clickhouseCode and cause). The engine's "(at row N)" marker is parsed into +// failedAtRow (1-based; chunk-local for streamed chunks — streamInsert +// rebases it to an absolute row number). function asInsertError(e) { if (e instanceof ChdbInsertError) return e; const q = asQueryError(e); - return new ChdbInsertError(q.message, { cause: q, clickhouseCode: q.clickhouseCode }); + const m = /\(at row (\d+)\)/.exec(q.message); + return new ChdbInsertError(q.message, { + cause: q, + clickhouseCode: q.clickhouseCode, + failedAtRow: m ? Number(m[1]) : undefined, + }); } // Shared insert impl: build the inline INSERT...VALUES and run it through the @@ -62,6 +71,121 @@ function runInsert(nativeAsyncCall, params) { ); } +// Normalize a raw payload to a Buffer. Uint8Array views are wrapped zero-copy; +// strings are encoded once on the main thread (documented as a small-payload +// convenience — large payloads should be born as Buffers). +function toPayloadBuffer(v) { + if (typeof v === 'string') return Buffer.from(v, 'utf8'); + if (Buffer.isBuffer(v)) return v; + return Buffer.from(v.buffer, v.byteOffset, v.byteLength); +} + +// Abort/timeout wrapper for raw inserts. Same honest single-shot semantics as +// withAbortTimeout: JS settles early, the native write runs to completion and +// the payload Buffer stays pinned until it does. +function wrapRawNative(nativePromise, opts, mapOk) { + const signal = opts && opts.signal; + const timeout = opts && opts.timeout; + const base = nativePromise.then(mapOk, (e) => { throw asInsertError(e); }); + if (!signal && !timeout) return base; + trackNative(nativePromise); // abort/timeout settles JS early; native runs on after + return new Promise((resolve, reject) => { + let settled = false; + let timer = null; + const cleanup = () => { + if (timer) clearTimeout(timer); + if (signal) signal.removeEventListener('abort', onAbort); + }; + const finish = (fn, v) => { if (!settled) { settled = true; cleanup(); fn(v); } }; + function onAbort() { + finish(reject, new ChdbAbortError( + 'Insert aborted (the underlying write may still complete in the background)')); + } + if (signal) { + if (signal.aborted) { onAbort(); return; } + signal.addEventListener('abort', onAbort, { once: true }); + } + if (timeout) { + timer = setTimeout(() => finish(reject, new ChdbTimeoutError( + `Insert timed out after ${timeout}ms (the underlying write may still complete in the background)`)), timeout); + } + base.then((v) => finish(resolve, v), (e) => finish(reject, e)); + }); +} + +// Raw passthrough: the payload Buffer is handed to the native side +// zero-copy; prefix assembly and execution happen on the libuv thread. +function runRawInsert(nativeRawCall, params) { + let built; + try { built = buildRawInsertPrefix(params); } catch (e) { return Promise.reject(asInsertError(e)); } + const buf = toPayloadBuffer(params.values); + if (buf.length === 0) { + return Promise.resolve({ + rowsWritten: 0, bytesWritten: 0, + rowsSent: built.fmt.lineDelimited ? 0 : undefined, + bytesSent: 0, elapsed: 0, + }); + } + return wrapRawNative(nativeRawCall(built.prefix, buf, built.fmt.lineDelimited), params, (raw) => ({ + // Engine-side ledger (chdb-io/chdb-core#88): includes MV-cascade writes. + rowsWritten: raw.rowsWritten, + bytesWritten: raw.bytesWritten, + // Payload-side ledger: non-empty lines minus a WithNames header line. + rowsSent: raw.rowsSent === undefined ? undefined : Math.max(0, raw.rowsSent - built.fmt.headerLines), + bytesSent: buf.length, + elapsed: raw.elapsed, + })); +} + +// Streaming input over the raw entry (backpressure contract; see src/streamInsert.ts). +function runStreamInsert(nativeRawCall, params) { + let built; + try { built = buildRawInsertPrefix(params); } catch (e) { return Promise.reject(asInsertError(e)); } + if (!built.fmt.lineDelimited || built.fmt.headerLines !== 0) { + return Promise.reject(new ChdbInsertError( + `Format '${params.format}' cannot be safely re-chunked for stream input ` + + '(CSV may hold raw newlines inside quoted fields; WithNames headers cannot repeat per chunk). ' + + 'Use JSONEachRow/JSONCompactEachRow/TabSeparated for streams, or a single-shot Buffer insert.')); + } + // NOTE: objectMode streams are NOT pre-rejected — Readable.from() is + // objectMode even when it yields Buffers/strings (a perfectly good byte + // source). Object rows are rejected at the first non-byte chunk instead + // (streamInsert's toChunkBuffer, with the NDJSON-mapping recipe). + const insertChunk = (data) => + wrapRawNative(nativeRawCall(built.prefix, data, true), + { signal: params.signal, timeout: params.timeout }, (raw) => raw); + return streamInsert(insertChunk, params); +} + +// insert() dispatch (never guesses on a conflicting signature; every rejected +// branch's message carries its workaround). +function dispatchInsert(nativeSqlCall, nativeRawCall, params) { + const v = params.values; + if (isRawValues(v)) { + if (params.format === undefined) { + return Promise.reject(new ChdbInsertError( + "Raw insert requires an explicit 'format': insert({ table, values: buffer, format: 'JSONEachRow' })")); + } + return runRawInsert(nativeRawCall, params); + } + if (Array.isArray(v)) { + if (params.format !== undefined) { + return Promise.reject(new ChdbInsertError( + "'format' is not supported with row arrays (reserved for chunked object inserts); " + + "drop 'format' for the VALUES path, or pre-serialize: values = Buffer.from(rows.map(r => JSON.stringify(r)).join('\\n') + '\\n')")); + } + return runInsert(nativeSqlCall, params); + } + if (isStreamValues(v)) { + if (params.format === undefined) { + return Promise.reject(new ChdbInsertError( + "Stream insert requires an explicit 'format' (e.g. 'JSONEachRow')")); + } + return runStreamInsert(nativeRawCall, params); + } + return runInsert(nativeSqlCall, params); +} + function emptyResult() { return new ChdbResult({ bytes: new Uint8Array(0), elapsed: 0, rowsRead: 0, bytesRead: 0 }); } @@ -76,6 +200,7 @@ function withAbortTimeout(nativePromise, opts) { if (!signal && !timeout) { return nativePromise.then((raw) => new ChdbResult(raw), (e) => { throw asQueryError(e); }); } + trackNative(nativePromise); // abort/timeout settles JS early; native runs on after return new Promise((resolve, reject) => { let settled = false; let timer = null; @@ -262,9 +387,14 @@ function queryBindAsync(query, params = {}, opts = {}) { return runExclusiveParam(defaultParamChain, () => chdbNode.QueryAsync(sql, format, bound), opts); } -// v3 insert (default connection). opts: { table, values, columns? } +// v3 insert (default connection). Dispatches on the shape of `values`: +// row arrays -> inline VALUES; Buffer/Uint8Array/string + format -> raw +// passthrough; Readable/AsyncIterable + format -> backpressured stream insert. function insert(opts) { - return runInsert((sql) => chdbNode.QueryAsync(sql, "CSV"), opts || {}); + return dispatchInsert( + (sql) => chdbNode.QueryAsync(sql, "CSV"), + (prefix, buf, countLines) => chdbNode.InsertRawAsync(prefix, buf, countLines), + opts || {}); } // Track open sessions so a normal process exit can release native connections @@ -272,6 +402,32 @@ function insert(opts) { // complements the native env cleanup hook + std::atexit backstop. const openSessions = new Set(); +// Track in-flight native operations (async query/insert) that were settled +// EARLY by abort/timeout. There is no interrupt in the C ABI, so the native +// computation keeps running on the libuv thread after the JS promise has +// already rejected. libchdb permits only ONE active operation per process, so +// an orphaned background op from one test races the next test's first query and +// crashes the in-process engine ("server is shutting down due to a fatal +// error", ABORTED), cascading into every later test (this is why x64 CI runners +// went all-red while the timing on arm runners usually won the race). Each +// tracked promise settles on NATIVE completion and never rejects — the caller +// already received the mapped early-settle error; here we only need the timing. +const pendingNativeOps = new Set(); +function trackNative(nativePromise) { + const done = nativePromise.then(() => {}, () => {}); + pendingNativeOps.add(done); + done.then(() => pendingNativeOps.delete(done)); + return nativePromise; +} + +// Wait for every native op started before now to fully settle. Internal helper +// for test teardown: drained in the global afterEach before sessions are closed +// so an early-settled (aborted/timed-out) op stays local to the test that +// started it instead of poisoning the shared single-connection engine. +function _drainPendingOps() { + return Promise.allSettled([...pendingNativeOps]); +} + // Force-close every session still open in this process. Internal helper for // test teardown: libchdb allows one active data directory per process, so a // single test that creates a Session and never closes it (e.g. it threw before @@ -385,11 +541,15 @@ class Session { return runExclusiveParam(this.#paramChain, () => chdbNode.QueryAsyncConnection(this.connection, sql, format, bound), opts); } - // v3 insert. opts: { table, values, columns? }. Inline INSERT ... VALUES, - // executed async; never reads stdin (closes #26). + // v3 insert. Same dispatch as the standalone insert(): row arrays -> inline + // VALUES; raw bytes + format -> passthrough; stream + format -> backpressured + // stream insert. Executed async; never reads stdin (closes #26). insert(opts) { if (!this.connection) return Promise.reject(new ChdbClosedError("No active connection available")); - return runInsert((sql) => chdbNode.QueryAsyncConnection(this.connection, sql, "CSV"), opts || {}); + return dispatchInsert( + (sql) => chdbNode.QueryAsyncConnection(this.connection, sql, "CSV"), + (prefix, buf, countLines) => chdbNode.InsertRawAsyncConnection(this.connection, prefix, buf, countLines), + opts || {}); } // v3 streaming. opts: { format?='JSONEachRow', signal? }. Returns a @@ -426,11 +586,26 @@ class Session { for (const sig of SESSION_SIGNALS) process.removeListener(sig, this.#signalHandler); this.#signalHandler = null; } - if (this.connection) { - try { chdbNode.CloseConnection(this.connection); } catch (_) { /* best effort */ } - this.connection = null; + const conn = this.connection; + this.connection = null; + const teardown = () => { + if (conn) { try { chdbNode.CloseConnection(conn); } catch (_) { /* best effort */ } } + if (this.isTemp) { try { this.#removeTempDir(); } catch (_) { /* best effort */ } } + }; + // Destroying the native connection while an op is still running on it aborts + // the engine for the rest of the process. abort/timeout settle the JS promise + // early but leave the native computation running (no interrupt in the C ABI), + // so close() may land mid-flight. When ops are still pending, defer teardown + // until they drain rather than racing them. _drainPendingOps() awaits these + // deferred closes too, so a new session is never created before the prior + // connection is fully released. + if (conn && pendingNativeOps.size > 0) { + const deferred = Promise.allSettled([...pendingNativeOps]).then(teardown); + pendingNativeOps.add(deferred); + deferred.finally(() => pendingNativeOps.delete(deferred)); + } else { + teardown(); } - if (this.isTemp) this.#removeTempDir(); } // v2 alias for close(). @@ -499,4 +674,4 @@ function version() { }; } -module.exports = { query, queryBind, queryAsync, queryBindAsync, insert, Session, version, _closeAllSessions }; +module.exports = { query, queryBind, queryAsync, queryBindAsync, insert, Session, version, _closeAllSessions, _drainPendingOps }; diff --git a/lib/chdb_node.cpp b/lib/chdb_node.cpp index b844e3e..159dd0f 100644 --- a/lib/chdb_node.cpp +++ b/lib/chdb_node.cpp @@ -501,6 +501,145 @@ Napi::Value QueryAsyncConnectionWrapper(const Napi::CallbackInfo &info) { return worker->GetPromise(); } +//===--------------------------------------------------------------------===// +// Raw-format passthrough insert. The payload stays a JS Buffer (V8 +// off-heap); this worker pins it with a Persistent reference, assembles +// "INSERT INTO ... FORMAT \n" on the libuv thread (the main thread +// never copies or scans the payload), and executes via the length-aware +// chdb_query_n (binary-safe: embedded NUL bytes survive). +// +// Ownership ledger: +// - payload Buffer: pinned by bufRef_ (constructed on the main thread); +// released when the worker is destroyed on the main thread after +// OnOK/OnError. The caller must not mutate the Buffer until the returned +// promise settles (same contract as fs.write). +// - assembled sql std::string: Execute()-local, freed on return. Peak memory +// during the call is ~2x payload until an upstream two-buffer entry lands. +// - chdb_result: destroyed inside Execute(). +// +// rowsSent (payload-side ledger): for line-delimited formats the worker counts +// non-empty lines off-thread — exact, because those formats escape raw '\n' +// inside values. rowsWritten/bytesWritten (engine-side ledger) come from +// chdb_result_rows_written/bytes_written (chdb-io/chdb-core#88) and include cascaded +// materialized-view writes. +//===--------------------------------------------------------------------===// +class InsertRawWorker : public Napi::AsyncWorker { +public: + InsertRawWorker(Napi::Env env, chdb_connection conn, std::string prefix, + Napi::Buffer data, bool countLines) + : Napi::AsyncWorker(env), deferred_(Napi::Promise::Deferred::New(env)), + conn_(conn), prefix_(std::move(prefix)), + dataPtr_(data.Data()), dataLen_(data.Length()), countLines_(countLines) { + bufRef_ = Napi::Persistent(data.As()); + } + + Napi::Promise GetPromise() { return deferred_.Promise(); } + + void Execute() override { + std::string sql; + sql.reserve(prefix_.size() + 1 + dataLen_); + sql.append(prefix_); + sql.push_back('\n'); + sql.append(dataPtr_, dataLen_); + + if (countLines_) { + // Count non-empty lines (a line holding only whitespace is skipped by + // the engine's row parsers, so it is not a row). + bool content = false; + for (size_t i = 0; i < dataLen_; i++) { + char c = dataPtr_[i]; + if (c == '\n') { + if (content) linesSent_++; + content = false; + } else if (c != '\r' && c != ' ' && c != '\t') { + content = true; + } + } + if (content) linesSent_++; // final line without a trailing newline + } + + chdb_result *result = chdb_query_n(conn_, sql.data(), sql.size(), "CSV", 3); + if (!result) { SetError("chdb query returned a null result"); return; } + const char *error = chdb_result_error(result); + if (error) { + std::string msg = error; + chdb_destroy_query_result(result); + SetError(msg); + return; + } + elapsed_ = chdb_result_elapsed(result); + rowsWritten_ = chdb_result_rows_written(result); + bytesWritten_ = chdb_result_bytes_written(result); + chdb_destroy_query_result(result); + } + + void OnOK() override { + Napi::Env env = Env(); + Napi::HandleScope scope(env); + Napi::Object res = Napi::Object::New(env); + res.Set("elapsed", Napi::Number::New(env, elapsed_)); + res.Set("rowsWritten", Napi::Number::New(env, static_cast(rowsWritten_))); + res.Set("bytesWritten", Napi::Number::New(env, static_cast(bytesWritten_))); + if (countLines_) + res.Set("rowsSent", Napi::Number::New(env, static_cast(linesSent_))); + deferred_.Resolve(res); + } + + void OnError(const Napi::Error &e) override { deferred_.Reject(e.Value()); } + +private: + Napi::Promise::Deferred deferred_; + chdb_connection conn_; + std::string prefix_; + const char *dataPtr_; + size_t dataLen_; + bool countLines_; + Napi::ObjectReference bufRef_; // released on the main thread in the worker dtor + double elapsed_ = 0.0; + uint64_t rowsWritten_ = 0, bytesWritten_ = 0, linesSent_ = 0; +}; + +// Standalone raw insert (default connection). Args: (prefix, dataBuffer, countLines) +Napi::Value InsertRawAsyncWrapper(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + if (info.Length() < 3 || !info[0].IsString() || !info[1].IsBuffer() || !info[2].IsBoolean()) { + Napi::TypeError::New(env, "Usage: prefix, dataBuffer, countLines").ThrowAsJavaScriptException(); + return env.Undefined(); + } + std::string prefix = info[0].As(); + + char *err = nullptr; + chdb_connection *conn_ptr = get_default_conn(&err); // registry: main thread only + if (!conn_ptr) { + Napi::Value p = rejectedPromise(env, err ? err : "Failed to acquire default connection"); + if (err) free(err); + return p; + } + auto *worker = new InsertRawWorker(env, *conn_ptr, std::move(prefix), + info[1].As>(), + info[2].As().Value()); + worker->Queue(); + return worker->GetPromise(); +} + +// Session raw insert. Args: (connection, prefix, dataBuffer, countLines) +Napi::Value InsertRawAsyncConnectionWrapper(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + if (info.Length() < 4 || !info[0].IsExternal() || !info[1].IsString() || !info[2].IsBuffer() + || !info[3].IsBoolean()) { + Napi::TypeError::New(env, "Usage: connection, prefix, dataBuffer, countLines").ThrowAsJavaScriptException(); + return env.Undefined(); + } + chdb_connection *inner = static_cast(info[0].As>().Data()); + if (!inner) return rejectedPromise(env, "No active connection available"); + std::string prefix = info[1].As(); + auto *worker = new InsertRawWorker(env, *inner, std::move(prefix), + info[2].As>(), + info[3].As().Value()); + worker->Queue(); + return worker->GetPromise(); +} + //===--------------------------------------------------------------------===// // Streaming query (chdb_stream_query / _fetch_result / _cancel_query). Each // fetch runs on the libuv thread pool so it does not block the event loop. It @@ -707,6 +846,10 @@ Napi::Object Init(Napi::Env env, Napi::Object exports) { exports.Set("QueryAsync", Napi::Function::New(env, QueryAsyncWrapper)); exports.Set("QueryAsyncConnection", Napi::Function::New(env, QueryAsyncConnectionWrapper)); + // Raw-format passthrough insert + exports.Set("InsertRawAsync", Napi::Function::New(env, InsertRawAsyncWrapper)); + exports.Set("InsertRawAsyncConnection", Napi::Function::New(env, InsertRawAsyncConnectionWrapper)); + // Streaming query exports.Set("StreamQuery", Napi::Function::New(env, StreamQueryWrapper)); exports.Set("StreamFetch", Napi::Function::New(env, StreamFetchWrapper)); diff --git a/package.json b/package.json index d8847e4..3053c31 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "chdb", - "version": "3.0.0", + "version": "3.1.0-rc.1", "description": "chDB bindings for nodejs", "main": "index.js", "types": "index.d.ts", @@ -58,10 +58,10 @@ "node-gyp-build": "^4.6.0" }, "optionalDependencies": { - "@chdb/lib-linux-x64-gnu": "26.5.0", - "@chdb/lib-linux-arm64-gnu": "26.5.0", - "@chdb/lib-darwin-x64": "26.5.0", - "@chdb/lib-darwin-arm64": "26.5.0" + "@chdb/lib-linux-x64-gnu": "26.5.1-rc.1", + "@chdb/lib-linux-arm64-gnu": "26.5.1-rc.1", + "@chdb/lib-darwin-x64": "26.5.1-rc.1", + "@chdb/lib-darwin-arm64": "26.5.1-rc.1" }, "peerDependencies": { "apache-arrow": ">=14" diff --git a/src/errors.ts b/src/errors.ts index 17d8964..ee8e7e4 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -77,23 +77,81 @@ export class ChdbBindError extends ChdbError { readonly code = 'CHDB_BIND' } +/** Progress snapshot carried by streaming-insert callbacks and errors (payload-side ledger). */ +export interface InsertProgress { + /** Rows flushed to the engine so far (non-empty payload lines; exact for line-delimited formats). */ + rowsSent: number + /** Payload bytes flushed so far. */ + bytesSent: number + /** Chunks flushed so far. */ + chunks: number +} + +/** + * Streaming-insert failure discriminator (streaming-insert backpressure model): + * - 'source-error' the source stream errored or closed prematurely + * - 'backpressure-overflow' an un-pausable source overran the bounded buffer + * - 'write-failure' a chunk's INSERT failed in the engine + * - 'row-too-large' a single row exceeded maxRowBytes (no row boundary found) + * (A stalled producer surfaces as ChdbTimeoutError with reason 'stall'.) + */ +export type InsertFailureReason = + | 'source-error' + | 'backpressure-overflow' + | 'write-failure' + | 'row-too-large' + +export interface ChdbInsertErrorOptions extends ChdbErrorOptions { + reason?: InsertFailureReason + /** 1-based row number the engine failed at (absolute across a streamed insert). */ + failedAtRow?: number + progress?: InsertProgress +} + /** Insert serialization / execution / timeout. Subdivision of query error. */ export class ChdbInsertError extends ChdbQueryError { override readonly code = 'CHDB_INSERT' + readonly reason?: InsertFailureReason + readonly failedAtRow?: number + readonly progress?: InsertProgress + + constructor(message: string, options?: ChdbInsertErrorOptions) { + super(message, options) + if (options?.reason !== undefined) this.reason = options.reason + if (options?.failedAtRow !== undefined) this.failedAtRow = options.failedAtRow + if (options?.progress !== undefined) this.progress = options.progress + } } /** AbortSignal fired. `.name` is `'AbortError'` to match the web platform. */ export class ChdbAbortError extends ChdbError { readonly code = 'CHDB_ABORT' - constructor(message = 'The operation was aborted', options?: ChdbErrorOptions) { + readonly progress?: InsertProgress + constructor( + message = 'The operation was aborted', + options?: ChdbErrorOptions & { progress?: InsertProgress }, + ) { super(message, options) this.name = 'AbortError' + if (options?.progress !== undefined) this.progress = options.progress } } -/** Query exceeded its deadline (watchdog). */ +/** Query exceeded its deadline (watchdog), or a streamed-insert source stalled. */ export class ChdbTimeoutError extends ChdbError { readonly code = 'CHDB_TIMEOUT' + /** 'stall' when a streaming-insert producer went idle past stallTimeout. */ + readonly reason?: 'stall' + readonly progress?: InsertProgress + + constructor( + message: string, + options?: ChdbErrorOptions & { reason?: 'stall'; progress?: InsertProgress }, + ) { + super(message, options) + if (options?.reason !== undefined) this.reason = options.reason + if (options?.progress !== undefined) this.progress = options.progress + } } /** Loader could not find a native subpackage for this platform/arch/libc. */ diff --git a/src/rawInsert.ts b/src/rawInsert.ts new file mode 100644 index 0000000..10c30e9 --- /dev/null +++ b/src/rawInsert.ts @@ -0,0 +1,135 @@ +/** + * Raw-format passthrough insert. The payload arrives as bytes + * (Buffer / Uint8Array / string), is handed to the native side zero-copy, and + * the engine's multithreaded C++ parser does all the parsing — JS never builds + * an object tree from the payload. This module owns the SQL-prefix + * construction and the dispatch predicates; the native entry is + * InsertRawAsync(prefix, dataBuffer, countLines). + * + * Two row ledgers, by design: + * - rowsSent (payload view): non-empty payload lines, counted off-thread by + * the native worker. Exact for line-delimited formats (they escape raw + * '\n' inside values); undefined for the CSV family (RFC 4180 allows raw + * newlines inside quoted fields — we refuse to fake a count). + * - rowsWritten (engine view): chdb_result_rows_written (chdb-io/chdb-core#88). + * Includes cascaded materialized-view writes — same semantics as the HTTP + * interface's X-ClickHouse-Summary. + */ + +import { serializeValue, validateIdentifier } from './serialize' +import { ChdbInsertError } from './errors' + +export interface RawFormatInfo { + /** Canonical ClickHouse format name interpolated into the SQL prefix. */ + canonical: string + /** Line-delimited formats escape raw '\n' in values => newline === row boundary. */ + lineDelimited: boolean + /** Header lines included in the payload (WithNames variants). */ + headerLines: 0 | 1 +} + +/** + * v1 whitelist: text formats only (the format name is interpolated into SQL, + * so it must never be passed through unvalidated). Binary formats wait on the + * upstream inline-data feasibility check. + */ +const RAW_FORMATS: Record = { + JSONEachRow: { canonical: 'JSONEachRow', lineDelimited: true, headerLines: 0 }, + JSONCompactEachRow: { canonical: 'JSONCompactEachRow', lineDelimited: true, headerLines: 0 }, + TSV: { canonical: 'TabSeparated', lineDelimited: true, headerLines: 0 }, + TabSeparated: { canonical: 'TabSeparated', lineDelimited: true, headerLines: 0 }, + TSVWithNames: { canonical: 'TabSeparatedWithNames', lineDelimited: true, headerLines: 1 }, + TabSeparatedWithNames: { canonical: 'TabSeparatedWithNames', lineDelimited: true, headerLines: 1 }, + CSV: { canonical: 'CSV', lineDelimited: false, headerLines: 0 }, + CSVWithNames: { canonical: 'CSVWithNames', lineDelimited: false, headerLines: 1 }, +} + +export function rawFormatInfo(format: unknown): RawFormatInfo { + if (typeof format !== 'string' || !(format in RAW_FORMATS)) { + throw new ChdbInsertError( + `Unsupported raw insert format ${JSON.stringify(format)}; ` + + `supported: ${Object.keys(RAW_FORMATS).join(', ')}`, + ) + } + return RAW_FORMATS[format] as RawFormatInfo +} + +/** Raw payload predicate: bytes (or a string for small-payload convenience). */ +export function isRawValues(v: unknown): v is Buffer | Uint8Array | string { + return typeof v === 'string' || Buffer.isBuffer(v) || v instanceof Uint8Array +} + +/** Streaming payload predicate: any async-iterable that is not an array/raw value. */ +export function isStreamValues(v: unknown): v is AsyncIterable { + return ( + v != null && + typeof v === 'object' && + !Array.isArray(v) && + !isRawValues(v) && + typeof (v as AsyncIterable)[Symbol.asyncIterator] === 'function' + ) +} + +export interface RawInsertPrefixParams { + table: string + columns?: ReadonlyArray + settings?: Record + format: string +} + +/** + * Build the SQL prefix `INSERT INTO [ (c1, c2)][ SETTINGS k=v] FORMAT `. + * Injection safety: table/columns/setting keys go through the identifier + * whitelist; setting values through serializeValue (quoted/escaped literals); + * the format through the RAW_FORMATS whitelist. The payload itself is never + * inspected here. + * + * @throws ChdbInsertError on any invalid identifier, setting, or format. + */ +export function buildRawInsertPrefix(params: RawInsertPrefixParams): { + prefix: string + fmt: RawFormatInfo +} { + let table: string + try { + table = validateIdentifier(params.table) + } catch (e) { + throw new ChdbInsertError(`Invalid insert target table: ${(e as Error).message}`, { cause: e }) + } + + let colsClause = '' + if (params.columns !== undefined) { + if (!Array.isArray(params.columns) || params.columns.length === 0) { + throw new ChdbInsertError('columns must be a non-empty array of column names') + } + try { + colsClause = ` (${params.columns.map(validateIdentifier).join(', ')})` + } catch (e) { + throw new ChdbInsertError(`Invalid insert column: ${(e as Error).message}`, { cause: e }) + } + } + + let settingsClause = '' + if (params.settings !== undefined) { + const entries = Object.entries(params.settings) + if (entries.length > 0) { + const parts = entries.map(([k, v]) => { + let key: string + try { + key = validateIdentifier(k) + } catch (e) { + throw new ChdbInsertError(`Invalid setting name: ${(e as Error).message}`, { cause: e }) + } + const t = typeof v + if (t !== 'string' && t !== 'number' && t !== 'boolean') { + throw new ChdbInsertError(`Setting '${k}' must be a string, number or boolean (got ${t})`) + } + return `${key}=${serializeValue(v)}` + }) + settingsClause = ` SETTINGS ${parts.join(', ')}` + } + } + + const fmt = rawFormatInfo(params.format) + return { prefix: `INSERT INTO ${table}${colsClause}${settingsClause} FORMAT ${fmt.canonical}`, fmt } +} diff --git a/src/streamInsert.ts b/src/streamInsert.ts new file mode 100644 index 0000000..9d5fc7f --- /dev/null +++ b/src/streamInsert.ts @@ -0,0 +1,244 @@ +/** + * Streaming insert over the raw passthrough entry. + * Consumes the source pull-based: at most one bounded chunk is + * buffered, and each chunk's INSERT is awaited before the next pull — so a + * fast producer is throttled to the chDB write rate (for an HTTP request + * stream the pause propagates all the way to the TCP window). + * + * Backpressure is flow-control, never an error. What IS an error is every + * backpressure-adjacent failure, surfaced as a typed error that always + * settles the returned promise: + * source-error | stall | backpressure-overflow | write-failure | + * row-too-large | abort + * Every error carries a progress snapshot. Semantics are at-least-once: + * already-flushed chunks are not rolled back; `failedAtRow` / `rowsSent` are + * observability fields, NOT a resume protocol. + * + * Chunking invariant: cuts happen only at raw '\n'. The v1 stream formats are + * line-delimited (they escape newlines inside values), so a raw '\n' is + * always a row boundary, every chunk is an independently valid INSERT + * payload, and multi-byte UTF-8 can never be split (0x0A never occurs inside + * a multi-byte sequence). Each ~8 MiB chunk is far below the engine's block + * thresholds, so a failed chunk lands zero rows (clean retry unit). + */ + +import { ChdbAbortError, ChdbInsertError, ChdbTimeoutError } from './errors' +import type { InsertProgress } from './errors' + +/** Per-chunk result from the prefix-bound native inserter. */ +export interface ChunkInsertResult { + rowsWritten?: number + bytesWritten?: number + rowsSent?: number + elapsed: number +} + +export type ChunkInserter = (data: Buffer) => Promise + +export interface StreamInsertOptions { + values: AsyncIterable + /** Target chunk size (default 8 MiB). */ + maxChunkBytes?: number + /** Single-row ceiling: accumulating past this without a row boundary fails (default 64 MiB). */ + maxRowBytes?: number + /** Bounded-buffer ceiling for un-pausable Readable sources (default 64 MiB). */ + maxBufferedBytes?: number + /** Idle deadline for the producer; off by default (quiet periods are legal for long-lived ingestion). */ + stallTimeout?: number + onProgress?: (p: InsertProgress) => void + signal?: AbortSignal +} + +export interface StreamInsertSummary { + /** Engine-side ledger (chdb-io/chdb-core#88), accumulated across chunks; includes MV-cascade writes. */ + rowsWritten: number + bytesWritten: number + /** Payload-side ledger: non-empty lines / bytes flushed. */ + rowsSent: number + bytesSent: number + chunks: number + elapsed: number +} + +const DEFAULT_CHUNK = 8 * 1024 * 1024 +const DEFAULT_MAX_ROW = 64 * 1024 * 1024 +const DEFAULT_MAX_BUFFERED = 64 * 1024 * 1024 + +const STALL = Symbol('chdb-stall') + +/** Race an iterator pull against the (re-armed per pull) stall timer. */ +function raceStall(p: Promise, ms: number): Promise { + return new Promise((resolve, reject) => { + const t = setTimeout(() => reject(STALL), ms) + p.then( + (v) => { clearTimeout(t); resolve(v) }, + (e) => { clearTimeout(t); reject(e) }, + ) + }) +} + +function toChunkBuffer(v: unknown): Buffer { + if (typeof v === 'string') return Buffer.from(v, 'utf8') + if (Buffer.isBuffer(v)) return v + if (v instanceof Uint8Array) return Buffer.from(v.buffer, v.byteOffset, v.byteLength) + throw new ChdbInsertError( + `Stream insert sources must yield bytes (Buffer/Uint8Array/string), got ${typeof v}. ` + + 'For object streams map rows to NDJSON first: ' + + `(async function* () { for await (const r of src) yield JSON.stringify(r) + '\\n' })()`, + ) +} + +export async function streamInsert( + insertChunk: ChunkInserter, + opts: StreamInsertOptions, +): Promise { + const maxChunkBytes = opts.maxChunkBytes ?? DEFAULT_CHUNK + const maxRowBytes = opts.maxRowBytes ?? DEFAULT_MAX_ROW + const maxBufferedBytes = opts.maxBufferedBytes ?? DEFAULT_MAX_BUFFERED + const { stallTimeout, onProgress, signal } = opts + const source = opts.values + + const progress: InsertProgress = { rowsSent: 0, bytesSent: 0, chunks: 0 } + const summary: StreamInsertSummary = { + rowsWritten: 0, bytesWritten: 0, rowsSent: 0, bytesSent: 0, chunks: 0, elapsed: 0, + } + const snap = (): InsertProgress => ({ ...progress }) + + const abortError = () => + new ChdbAbortError( + 'Insert aborted mid-stream (already-flushed chunks are not rolled back)', + { progress: snap() }, + ) + + if (signal?.aborted) throw abortError() // pre-aborted: reject before pulling anything + + // Accumulator: list of pending buffers plus the global offset of the last + // raw '\n' seen — tracked incrementally so no byte is scanned or copied + // more than once (a no-newline source cannot trigger O(n²) re-concats). + let acc: Buffer[] = [] + let accBytes = 0 + let lastNl = -1 + + const flush = async (data: Buffer): Promise => { + let res: ChunkInsertResult + try { + res = await insertChunk(data) + } catch (e) { + if (e instanceof ChdbAbortError) { + throw new ChdbAbortError(e.message, { cause: e.cause ?? e, progress: snap() }) + } + if (e instanceof ChdbTimeoutError) { + throw new ChdbTimeoutError(e.message, { cause: e.cause ?? e, progress: snap() }) + } + const ie = e as ChdbInsertError + throw new ChdbInsertError(ie.message ?? String(e), { + cause: e, + clickhouseCode: ie.clickhouseCode, + reason: 'write-failure', + // Absolute row number: rows already flushed + the engine's chunk-local "(at row N)". + failedAtRow: + typeof ie.failedAtRow === 'number' ? progress.rowsSent + ie.failedAtRow : undefined, + progress: snap(), + }) + } + progress.chunks += 1 + progress.bytesSent += data.length + if (typeof res.rowsSent === 'number') progress.rowsSent += res.rowsSent + summary.rowsWritten += res.rowsWritten ?? 0 + summary.bytesWritten += res.bytesWritten ?? 0 + summary.elapsed += res.elapsed + if (onProgress) onProgress(snap()) + } + + // Cut everything up to (and including) the last seen newline into one chunk. + const cutChunk = (): Buffer => { + const whole = acc.length === 1 ? (acc[0] as Buffer) : Buffer.concat(acc, accBytes) + const head = whole.subarray(0, lastNl + 1) + const tail = whole.subarray(lastNl + 1) + acc = tail.length ? [tail] : [] + accBytes = tail.length + lastNl = -1 // the tail is past the last newline by construction + return head + } + + const it = source[Symbol.asyncIterator]() + let finishedCleanly = false + try { + while (true) { + if (signal?.aborted) throw abortError() + + // Bounded-buffer gate for push-style Readables that ignore backpressure. + const buffered = (source as { readableLength?: unknown }).readableLength + if (typeof buffered === 'number' && buffered > maxBufferedBytes) { + throw new ChdbInsertError( + `Source outran the bounded buffer (${buffered} > maxBufferedBytes=${maxBufferedBytes}); ` + + 'refusing to buffer unboundedly. Use a pull-based source or raise maxBufferedBytes.', + { reason: 'backpressure-overflow', progress: snap() }, + ) + } + + let step: IteratorResult + try { + step = stallTimeout ? await raceStall(it.next(), stallTimeout) : await it.next() + } catch (e) { + if (e === STALL) { + throw new ChdbTimeoutError( + `Insert source stalled: no data and no end for ${stallTimeout}ms`, + { reason: 'stall', progress: snap() }, + ) + } + throw new ChdbInsertError(`Insert source stream failed: ${(e as Error)?.message ?? e}`, { + reason: 'source-error', cause: e, progress: snap(), + }) + } + if (step.done) break + + const chunk = toChunkBuffer(step.value) + if (chunk.length === 0) continue + const idx = chunk.lastIndexOf(0x0a) + if (idx >= 0) lastNl = accBytes + idx + acc.push(chunk) + accBytes += chunk.length + + while (accBytes >= maxChunkBytes) { + if (lastNl < 0) { + if (accBytes > maxRowBytes) { + throw new ChdbInsertError( + `A single row exceeds maxRowBytes=${maxRowBytes} (no row boundary in ${accBytes} buffered bytes); ` + + 'raise maxRowBytes or split the row upstream.', + { reason: 'row-too-large', progress: snap() }, + ) + } + break // keep accumulating until a row boundary arrives + } + await flush(cutChunk()) + if (signal?.aborted) throw abortError() + } + } + + // Final flush: whatever remains (the last line may lack a trailing newline). + if (accBytes > 0) { + const whole = acc.length === 1 ? (acc[0] as Buffer) : Buffer.concat(acc, accBytes) + acc = [] + accBytes = 0 + await flush(whole) + } + + finishedCleanly = true + summary.rowsSent = progress.rowsSent + summary.bytesSent = progress.bytesSent + summary.chunks = progress.chunks + return summary + } finally { + if (!finishedCleanly) { + // Stop the producer: destroy a Readable, or close a generator. + const src = source as { destroy?: (e?: Error) => void; destroyed?: boolean } + try { + if (typeof src.destroy === 'function' && !src.destroyed) src.destroy() + else if (typeof it.return === 'function') void it.return(undefined as never) + } catch { + /* best effort */ + } + } + } +} diff --git a/test/v3/insert-heavy.test.ts b/test/v3/insert-heavy.test.ts new file mode 100644 index 0000000..02c0169 --- /dev/null +++ b/test/v3/insert-heavy.test.ts @@ -0,0 +1,111 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest' +import { Readable } from 'node:stream' +import { Session } from '../../index.js' + +// Heavy acceptance gates (CHDB_TEST_HUGE=1; run locally / nightly, not on the +// default CI lane): +// A1 256MB single-shot insert keeps the event loop responsive (p99 < 10ms) +// A3 a 1GB Buffer inserts successfully (past the V8 string ceiling) +// B5 a multi-GB stream completes with bounded RSS +const HUGE = !!process.env.CHDB_TEST_HUGE + +function ndjsonBuffer(totalBytes: number): Buffer { + // Pre-build with Buffer.concat so the test itself never creates a giant V8 string. + const row = Buffer.from(JSON.stringify({ id: 1, msg: 'm'.repeat(990) }) + '\n') // ~1KB + const reps = Math.ceil(totalBytes / row.length) + return Buffer.concat(Array.from({ length: reps }, () => row)) +} + +describe.skipIf(!HUGE)('insert: heavy acceptance gates (CHDB_TEST_HUGE)', () => { + let session: Session + beforeEach(() => { + session = new Session() + session.query('CREATE TABLE h (id UInt32, msg String) ENGINE = MergeTree ORDER BY id') + }) + afterEach(() => session?.close()) + + it('A1: a 256MB insert keeps event-loop jitter p99 under 10ms', async () => { + const payload = ndjsonBuffer(256 * 1024 * 1024) + const jitters: number[] = [] + let last = process.hrtime.bigint() + const timer = setInterval(() => { + const now = process.hrtime.bigint() + jitters.push(Number(now - last) / 1e6 - 5) // delay beyond the 5ms interval + last = now + }, 5) + try { + const sum = await session.insert({ table: 'h', values: payload, format: 'JSONEachRow' }) + expect(sum.bytesSent).toBe(payload.length) + } finally { + clearInterval(timer) + } + const sorted = jitters.slice(2).sort((a, b) => a - b) // drop warmup ticks + const p99 = sorted[Math.floor(sorted.length * 0.99)] ?? 0 + expect(p99).toBeLessThan(10) + }, 300_000) + + it('A3: a 1GB Buffer inserts (payload past the V8 string ceiling)', async () => { + const payload = ndjsonBuffer(1024 * 1024 * 1024) + expect(payload.length).toBeGreaterThan(2 ** 29) // > the ~512MB V8 string limit + const sum = await session.insert({ table: 'h', values: payload, format: 'JSONEachRow' }) + expect(sum.rowsWritten).toBeGreaterThan(1_000_000) + }, 600_000) + + it('B5: a 4GB stream completes with bounded memory (O(chunk), not O(total))', async () => { + const row = Buffer.from(JSON.stringify({ id: 7, msg: 'm'.repeat(990) }) + '\n') + const total = 4 * 1024 * 1024 * 1024 + async function* synth() { + for (let sent = 0; sent < total; sent += row.length) yield row + } + const rssBefore = process.memoryUsage().rss + let rssPeak = rssBefore + const sum = await session.insert({ + table: 'h', values: synth(), format: 'JSONEachRow', + maxChunkBytes: 8 * 1024 * 1024, + onProgress: () => { rssPeak = Math.max(rssPeak, process.memoryUsage().rss) }, + }) + expect(sum.bytesSent).toBeGreaterThanOrEqual(total) + // JS-side overhead stays O(chunk); the generous bound absorbs engine-side buffers. + expect(rssPeak - rssBefore).toBeLessThan(1.5 * 1024 * 1024 * 1024) + }, 1_800_000) + + it('leak gate: 50 x 32MB inserts hold a flat RSS in the second half', async () => { + const payload = ndjsonBuffer(32 * 1024 * 1024) + const samples: number[] = [] + for (let i = 0; i < 50; i++) { + await session.insert({ table: 'h', values: payload, format: 'JSONEachRow' }) + if (i >= 25) samples.push(process.memoryUsage().rss) + } + const first = samples.slice(0, 5).reduce((a, b) => a + b, 0) / 5 + const last = samples.slice(-5).reduce((a, b) => a + b, 0) / 5 + expect(last - first).toBeLessThan(256 * 1024 * 1024) // flat within noise + }, 1_800_000) +}) + +// Always-on lightweight stand-in for the jitter gate: proves the dispatch path +// itself never serializes on the main thread (a 16MB payload through the raw +// path must not produce a single >250ms stall, which the old VALUES path +// reliably would at this size). +describe('insert: event-loop responsiveness (light, always on)', () => { + it('16MB raw insert produces no quarter-second main-thread stall', async () => { + const session = new Session() + try { + session.query('CREATE TABLE l (id UInt32, msg String) ENGINE = Memory') + const payload = ndjsonBuffer(16 * 1024 * 1024) + let maxGap = 0 + let last = Date.now() + const timer = setInterval(() => { + maxGap = Math.max(maxGap, Date.now() - last - 5) + last = Date.now() + }, 5) + try { + await session.insert({ table: 'l', values: payload, format: 'JSONEachRow' }) + } finally { + clearInterval(timer) + } + expect(maxGap).toBeLessThan(250) + } finally { + session.close() + } + }, 60_000) +}) diff --git a/test/v3/insert-raw.test.ts b/test/v3/insert-raw.test.ts new file mode 100644 index 0000000..153ca4b --- /dev/null +++ b/test/v3/insert-raw.test.ts @@ -0,0 +1,271 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest' +import { Session } from '../../index.js' + +// Raw-format passthrough insert. The payload is handed to the native +// side as bytes; the engine parses (JS never builds an object tree). Every +// failure must be a typed error — never a hang (test timeout would catch it). +// +// Two row ledgers, asserted throughout: +// - rowsSent payload view: non-empty payload lines (line formats only) +// - rowsWritten engine view (chdb-io/chdb-core#88): includes MV-cascade writes + +const NDJSON = '{"id":1,"msg":"a"}\n{"id":2,"msg":"b c"}\n{"id":3,"msg":"🚀🙂"}\n' + +describe('insert: raw passthrough (Buffer/string + format)', () => { + let session: Session + beforeEach(() => { + session = new Session() + session.query('CREATE TABLE t (id UInt64, msg String) ENGINE = MergeTree ORDER BY id') + }) + afterEach(() => session?.close()) + + it('inserts a Buffer payload and reports both ledgers', async () => { + const buf = Buffer.from(NDJSON) + const sum = await session.insert({ table: 't', values: buf, format: 'JSONEachRow' }) + expect(sum.rowsWritten).toBe(3) // engine-reported write progress + expect(sum.rowsSent).toBe(3) // payload ledger (native line scan) + expect(sum.bytesSent).toBe(buf.length) + expect(sum.bytesWritten).toBeGreaterThan(0) + expect(session.query('SELECT count(), sum(id) FROM t', 'CSV').trim()).toBe('3,6') + }) + + it('round-trips UTF-8/emoji and uint64 ns timestamps exactly', async () => { + session.query('CREATE TABLE ns (id UInt64, ts UInt64, msg String) ENGINE = Memory') + const line = '{"id":1,"ts":"1780000000000000001","msg":"émoji 🚀"}\n' + await session.insert({ table: 'ns', values: Buffer.from(line), format: 'JSONEachRow' }) + expect(session.query('SELECT ts, msg FROM ns', 'TSVRaw').trim()).toBe('1780000000000000001\témoji 🚀') + }) + + it('accepts string and Uint8Array payloads (documented conveniences)', async () => { + const s1 = await session.insert({ table: 't', values: NDJSON, format: 'JSONEachRow' }) + expect(s1.rowsWritten).toBe(3) + const u8 = new Uint8Array(Buffer.from('{"id":9,"msg":"u8"}\n')) + const s2 = await session.insert({ table: 't', values: u8, format: 'JSONEachRow' }) + expect(s2.rowsWritten).toBe(1) + expect(session.query('SELECT count() FROM t', 'CSV').trim()).toBe('4') + }) + + it('NUL bytes survive the length-aware path (no gate, no truncation)', async () => { + const nul = String.fromCharCode(0) + await session.insert({ + table: 't', + values: Buffer.from(`{"id":7,"msg":"a${nul}b"}\n`), + format: 'JSONEachRow', + }) + expect(session.query('SELECT length(msg) FROM t WHERE id=7', 'CSV').trim()).toBe('3') + }) + + it('handles a missing trailing newline (last line still counted and inserted)', async () => { + const sum = await session.insert({ + table: 't', + values: Buffer.from('{"id":1,"msg":"x"}\n{"id":2,"msg":"y"}'), + format: 'JSONEachRow', + }) + expect(sum.rowsSent).toBe(2) + expect(sum.rowsWritten).toBe(2) + }) + + it('empty payloads short-circuit to a zero summary without touching the engine', async () => { + for (const empty of [Buffer.alloc(0), '', new Uint8Array(0)]) { + const sum = await session.insert({ table: 't', values: empty as never, format: 'JSONEachRow' }) + expect(sum.rowsWritten).toBe(0) + expect(sum.bytesSent).toBe(0) + } + }) +}) + +describe('insert: raw formats matrix', () => { + let session: Session + beforeEach(() => { + session = new Session() + session.query('CREATE TABLE m (a UInt32, b String) ENGINE = Memory') + }) + afterEach(() => session?.close()) + + it('CSV inserts; rowsSent is undefined (quoted newlines make line counts unreliable)', async () => { + const sum = await session.insert({ table: 'm', values: '1,"x"\n2,"y"\n', format: 'CSV' }) + expect(sum.rowsWritten).toBe(2) // engine ledger still exact for CSV + expect(sum.rowsSent).toBeUndefined() + }) + + it('CSVWithNames consumes the header; engine ledger counts data rows only', async () => { + const sum = await session.insert({ table: 'm', values: 'a,b\n1,"x"\n2,"y"\n', format: 'CSVWithNames' }) + expect(sum.rowsWritten).toBe(2) + expect(session.query('SELECT count() FROM m', 'CSV').trim()).toBe('2') + }) + + it('TSV / TSVWithNames: rowsSent excludes the header line', async () => { + const s1 = await session.insert({ table: 'm', values: '1\tx\n2\ty\n', format: 'TSV' }) + expect(s1.rowsSent).toBe(2) + const s2 = await session.insert({ table: 'm', values: 'a\tb\n3\tz\n', format: 'TSVWithNames' }) + expect(s2.rowsSent).toBe(1) + expect(s2.rowsWritten).toBe(1) + }) + + it('type parity: JSONEachRow passthrough matches the VALUES object path column-for-column', async () => { + const ddl = `(i Int64, u UInt64, f Float64, d DateTime64(9), arr Array(Int32), + n Nullable(String), s String) ENGINE = Memory` + session.query(`CREATE TABLE via_values ${ddl}`) + session.query(`CREATE TABLE via_raw ${ddl}`) + const logical = { + i: -(2n ** 62n), u: 1780000000000000001n, f: 1.5, + d: new Date('2026-06-12T10:20:30.000Z'), arr: [1, -2, 3], n: null, s: "quote ' back \\ tab\t🚀", + } + await session.insert({ table: 'via_values', values: [logical] }) + const rawLine = JSON.stringify({ + i: String(logical.i), u: String(logical.u), f: logical.f, + d: '2026-06-12 10:20:30.000000000', arr: logical.arr, n: null, s: logical.s, + }) + await session.insert({ table: 'via_raw', values: rawLine + '\n', format: 'JSONEachRow' }) + const a = session.query('SELECT * FROM via_values', 'TSVRaw') + const b = session.query('SELECT * FROM via_raw', 'TSVRaw') + expect(b).toBe(a) + }) + + it('rowsWritten includes materialized-view cascade writes (engine write-progress semantics)', async () => { + session.query('CREATE TABLE src (k UInt64) ENGINE = Null') + session.query('CREATE TABLE dst (k UInt64) ENGINE = MergeTree ORDER BY k') + session.query('CREATE MATERIALIZED VIEW mv TO dst AS SELECT k FROM src') + const sum = await session.insert({ table: 'src', values: '{"k":1}\n{"k":2}\n', format: 'JSONEachRow' }) + expect(sum.rowsSent).toBe(2) // payload ledger: what was sent + expect(sum.rowsWritten).toBe(4) // engine ledger: Null sink (2) + MV target (2) + expect(session.query('SELECT count() FROM dst', 'CSV').trim()).toBe('2') + }) + + it('settings channel: per-insert SETTINGS reach the engine', async () => { + const line = '{"a":5,"b":"ok","extra":42}\n' + // skip_unknown_fields defaults ON in current engines — force it OFF to + // prove the per-insert settings clause actually takes effect… + await expect( + session.insert({ + table: 'm', values: line, format: 'JSONEachRow', + settings: { input_format_skip_unknown_fields: 0 }, + }), + ).rejects.toMatchObject({ code: 'CHDB_INSERT' }) + // …and back ON explicitly: accepted. + const sum = await session.insert({ + table: 'm', values: line, format: 'JSONEachRow', + settings: { input_format_skip_unknown_fields: 1 }, + }) + expect(sum.rowsWritten).toBe(1) + }) + + it('columns option targets a subset (others take defaults)', async () => { + const sum = await session.insert({ table: 'm', values: '7\n', format: 'CSV', columns: ['a'] }) + expect(sum.rowsWritten).toBe(1) + expect(session.query("SELECT a, b FROM m WHERE a=7", 'CSV').trim()).toBe('7,""') + }) +}) + +describe('insert: raw failure paths (typed, never a hang)', () => { + let session: Session + beforeEach(() => { + session = new Session() + session.query('CREATE TABLE f (n UInt32, s String) ENGINE = Memory') + }) + afterEach(() => session?.close()) + + const codeOf = async (p: Promise) => { + try { + await p + return '' + } catch (e: unknown) { + return (e as { code?: string })?.code ?? '' + } + } + + it('a bad JSON line is a typed error with failedAtRow, and the batch lands zero rows', async () => { + let err: { code?: string; failedAtRow?: number } | undefined + try { + await session.insert({ + table: 'f', + values: '{"n":1,"s":"ok"}\n{broken json\n{"n":3,"s":"ok"}\n', + format: 'JSONEachRow', + }) + } catch (e: unknown) { + err = e as never + } + expect(err?.code).toBe('CHDB_INSERT') + expect(err?.failedAtRow).toBe(2) // parsed from the engine's "(at row 2)" + expect(session.query('SELECT count() FROM f', 'CSV').trim()).toBe('0') // block-atomic + }) + + it('dispatch matrix: every rejected branch is typed and carries its workaround', async () => { + // raw bytes without format + expect(await codeOf(session.insert({ table: 'f', values: Buffer.from('x') } as never))).toBe('CHDB_INSERT') + // row array WITH format (reserved for chunked object inserts) + expect(await codeOf(session.insert({ table: 'f', values: [{ n: 1, s: 'x' }], format: 'JSONEachRow' } as never))).toBe('CHDB_INSERT') + // non-whitelisted format + expect(await codeOf(session.insert({ table: 'f', values: 'x', format: 'Parquet' } as never))).toBe('CHDB_INSERT') + // hostile table / setting names never reach SQL + expect(await codeOf(session.insert({ table: 'f; DROP TABLE f', values: 'x\n', format: 'CSV' } as never))).toBe('CHDB_INSERT') + expect(await codeOf(session.insert({ table: 'f', values: '1,"x"\n', format: 'CSV', settings: { 'a=1, b': 1 } } as never))).toBe('CHDB_INSERT') + }) + + it('format/data mismatch surfaces the engine error (re-wrapped, typed)', async () => { + const code = await codeOf(session.insert({ table: 'f', values: '{"n":1,"s":"x"}\n', format: 'CSV' })) + expect(code).toMatch(/^CHDB_/) + }) + + it('#26 regression: complex types via passthrough neither hang nor corrupt', async () => { + session.query('CREATE TABLE cx (a Array(String), m Map(String, UInt32)) ENGINE = Memory') + const sum = await session.insert({ + table: 'cx', + values: '{"a":["x","y"],"m":{"k1":1,"k2":2}}\n', + format: 'JSONEachRow', + }) + expect(sum.rowsWritten).toBe(1) + expect(session.query('SELECT a[2], m[\'k2\'] FROM cx', 'CSV').trim()).toBe('"y",2') + }, 10_000) + + it('a pre-aborted signal rejects immediately with AbortError', async () => { + const ctl = new AbortController() + ctl.abort() + await expect( + session.insert({ table: 'f', values: '1,"x"\n', format: 'CSV', signal: ctl.signal }), + ).rejects.toMatchObject({ code: 'CHDB_ABORT' }) + }) + + it('timeout settles early with ChdbTimeoutError (the write may still complete)', async () => { + // A payload big enough that the engine cannot finish within 1ms. + const big = Buffer.from( + Array.from({ length: 200_000 }, (_, i) => `{"n":${i},"s":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}`).join('\n') + '\n', + ) + await expect( + session.insert({ table: 'f', values: big, format: 'JSONEachRow', timeout: 1 }), + ).rejects.toMatchObject({ code: 'CHDB_TIMEOUT' }) + }) +}) + +describe('insert: worker_threads recipe (serialize off-thread, transfer, passthrough)', () => { + it('worker stringifies rows, transfers the ArrayBuffer, main thread inserts raw', async () => { + const session = new Session() + try { + session.query('CREATE TABLE w (id UInt32, msg String) ENGINE = Memory') + const { Worker } = await import('node:worker_threads') + const worker = new Worker( + `const { parentPort } = require('node:worker_threads') + parentPort.on('message', (rows) => { + const buf = Buffer.from(rows.map((r) => JSON.stringify(r)).join('\\n') + '\\n') + const ab = buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength) + parentPort.postMessage(ab, [ab]) + })`, + { eval: true }, + ) + try { + const rows = Array.from({ length: 100 }, (_, i) => ({ id: i, msg: `row-${i}` })) + const ab: ArrayBuffer = await new Promise((resolve) => { + worker.once('message', resolve) + worker.postMessage(rows) + }) + const sum = await session.insert({ table: 'w', values: Buffer.from(ab), format: 'JSONEachRow' }) + expect(sum.rowsWritten).toBe(100) + expect(session.query('SELECT count() FROM w', 'CSV').trim()).toBe('100') + } finally { + await worker.terminate() + } + } finally { + session.close() + } + }) +}) diff --git a/test/v3/insert-stream.test.ts b/test/v3/insert-stream.test.ts new file mode 100644 index 0000000..d9447b8 --- /dev/null +++ b/test/v3/insert-stream.test.ts @@ -0,0 +1,274 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest' +import { Readable } from 'node:stream' +import { Session } from '../../index.js' + +// Streaming insert: the backpressure contract, end to end. +// Backpressure is flow-control, not an error; every backpressure-adjacent +// failure is a typed error that settles the promise — never a silent hang +// (the vitest timeout is the hang detector). Every failure carries a +// progress snapshot. Already-flushed chunks are not rolled back +// (at-least-once; failedAtRow/rowsSent are observability, NOT resume). + +const line = (i: number, pad = 24) => JSON.stringify({ id: i, msg: 'x'.repeat(pad) }) + +function* ndjsonChunks(n: number, chunkBytes: number, pad = 24): Generator { + let buf = '' + for (let i = 0; i < n; i++) { + buf += line(i, pad) + '\n' + while (buf.length >= chunkBytes) { + yield Buffer.from(buf.slice(0, chunkBytes)) + buf = buf.slice(chunkBytes) + } + } + if (buf.length) yield Buffer.from(buf) +} + +describe('insert: streaming (Readable / AsyncIterable + format)', () => { + let session: Session + beforeEach(() => { + session = new Session() + session.query('CREATE TABLE s (id UInt32, msg String) ENGINE = MergeTree ORDER BY id') + }) + afterEach(() => session?.close()) + + it('multi-chunk stream: both ledgers exact, rows conserved, progress monotonic', async () => { + const N = 2000 + const seen: number[] = [] + const sum = await session.insert({ + table: 's', + values: Readable.from(ndjsonChunks(N, 1024)), + format: 'JSONEachRow', + maxChunkBytes: 16 * 1024, + onProgress: (p) => seen.push(p.rowsSent), + }) + expect(sum.rowsSent).toBe(N) + expect(sum.rowsWritten).toBe(N) + expect(sum.chunks).toBeGreaterThan(1) + expect(seen.length).toBe(sum.chunks) + expect([...seen]).toEqual([...seen].sort((a, b) => a - b)) // monotonic + expect(session.query('SELECT count(), min(id), max(id) FROM s', 'CSV').trim()).toBe(`${N},0,${N - 1}`) + }) + + it('cuts only at row boundaries: straddling lines, blank lines, missing trailing newline', async () => { + async function* src() { + yield Buffer.from('{"id":1,"msg":"a"}\n\n{"id":2,"ms') // row 2 straddles chunks; blank line in between + yield Buffer.from('g":"b"}\n \n') + yield Buffer.from('{"id":3,"msg":"c"}') // no trailing newline + } + const sum = await session.insert({ + table: 's', values: src(), format: 'JSONEachRow', maxChunkBytes: 24, + }) + expect(sum.rowsSent).toBe(3) // blank/whitespace-only lines are not rows + expect(sum.rowsWritten).toBe(3) + expect(session.query('SELECT count() FROM s', 'CSV').trim()).toBe('3') + }) + + it('multi-byte UTF-8 across the chunk cut round-trips byte-exact', async () => { + const msg = '🚀🙂émoji🎯'.repeat(40) + const payload = Buffer.from(JSON.stringify({ id: 1, msg }) + '\n' + JSON.stringify({ id: 2, msg }) + '\n') + async function* drip() { + for (let o = 0; o < payload.length; o += 7) yield payload.subarray(o, o + 7) + } + await session.insert({ table: 's', values: drip(), format: 'JSONEachRow', maxChunkBytes: 64 }) + expect(session.query('SELECT countIf(msg = ' + `'${msg}'` + ') FROM s', 'CSV').trim()).toBe('2') + }) + + it('accepts mixed Buffer / string / Uint8Array chunks', async () => { + async function* mixed() { + yield '{"id":1,"msg":"str"}\n' + yield Buffer.from('{"id":2,"msg":"buf"}\n') + yield new Uint8Array(Buffer.from('{"id":3,"msg":"u8"}\n')) + } + const sum = await session.insert({ table: 's', values: mixed(), format: 'JSONEachRow' }) + expect(sum.rowsSent).toBe(3) + }) + + it('backpressure is pull-based: the producer never runs ahead by more than the bounded buffer', async () => { + const maxChunkBytes = 8 * 1024 + let yielded = 0 + let maxLead = 0 + let flushedBytes = 0 + async function* paced() { + for (const chunk of ndjsonChunks(3000, 512)) { + yielded += chunk.length + maxLead = Math.max(maxLead, yielded - flushedBytes) + yield chunk + } + } + await session.insert({ + table: 's', values: paced(), format: 'JSONEachRow', maxChunkBytes, + onProgress: (p) => { flushedBytes = p.bytesSent }, + }) + // At most one in-flight accumulation (≤ maxChunkBytes) plus one source chunk. + expect(maxLead).toBeLessThanOrEqual(maxChunkBytes + 1024) + }) +}) + +describe('insert: streaming failure taxonomy (six reasons, all typed, all settle)', () => { + let session: Session + beforeEach(() => { + session = new Session() + session.query('CREATE TABLE q (n UInt32) ENGINE = Memory') + }) + afterEach(() => session?.close()) + + it("source 'error' → reason source-error, cause preserved, progress attached", async () => { + const src = new Readable({ read() {} }) + src.push('{"n":1}\n') + setTimeout(() => src.destroy(new Error('upstream exploded')), 20) + let err: { code?: string; reason?: string; cause?: unknown; progress?: object } | undefined + try { + await session.insert({ table: 'q', values: src, format: 'JSONEachRow' }) + } catch (e: unknown) { + err = e as never + } + expect(err?.code).toBe('CHDB_INSERT') + expect(err?.reason).toBe('source-error') + expect(String((err?.cause as Error)?.message ?? err?.cause)).toContain('upstream exploded') + expect(err?.progress).toBeDefined() + }) + + it('premature close (destroy without error) → source-error too', async () => { + const src = new Readable({ read() {} }) + src.push('{"n":1}\n') + setTimeout(() => src.destroy(), 20) + await expect( + session.insert({ table: 'q', values: src, format: 'JSONEachRow' }), + ).rejects.toMatchObject({ code: 'CHDB_INSERT', reason: 'source-error' }) + }) + + it("stalled producer + stallTimeout → ChdbTimeoutError{reason:'stall'}", async () => { + async function* stalls() { + yield Buffer.from('{"n":1}\n') + await new Promise(() => {}) // never yields again, never ends + } + await expect( + session.insert({ table: 'q', values: stalls(), format: 'JSONEachRow', stallTimeout: 200 }), + ).rejects.toMatchObject({ code: 'CHDB_TIMEOUT', reason: 'stall' }) + }) + + it('no stallTimeout → long quiet periods are legal (negative case)', async () => { + async function* quiet() { + yield Buffer.from('{"n":1}\n') + await new Promise((r) => setTimeout(r, 300)) + yield Buffer.from('{"n":2}\n') + } + const sum = await session.insert({ table: 'q', values: quiet(), format: 'JSONEachRow' }) + expect(sum.rowsSent).toBe(2) + }) + + it('un-pausable source past the bounded buffer → backpressure-overflow, not OOM', async () => { + const src = new Readable({ read() {} }) + for (let i = 0; i < 64; i++) src.push(Buffer.alloc(4096, 0x61)) // stuff 256KB before any pull + await expect( + session.insert({ table: 'q', values: src, format: 'JSONEachRow', maxBufferedBytes: 64 * 1024 }), + ).rejects.toMatchObject({ code: 'CHDB_INSERT', reason: 'backpressure-overflow' }) + src.destroy() + }) + + it('chunk write failure → write-failure with ABSOLUTE failedAtRow; remaining source not pulled', async () => { + // 6 good rows (flushed in earlier chunks), one bad row, then a long good + // tail. Chunking has a one-row lookahead (rows pack into the failing + // chunk until the size threshold), so "pull stopped" is asserted on the + // tail well past the failing chunk, not on the bad row's neighbour. + const rows = [ + '{"n":1}', '{"n":2}', '{"n":3}', '{"n":4}', '{"n":5}', '{"n":6}', + '{boom', + ...Array.from({ length: 12 }, (_, i) => `{"n":${8 + i}}`), + ] + let pulledPastFailure = false + async function* src() { + for (const [i, r] of rows.entries()) { + if (i >= 11) pulledPastFailure = true // only reachable if iteration survives the failing chunk + yield Buffer.from(r + '\n') + } + } + let err: { reason?: string; failedAtRow?: number; progress?: { rowsSent: number } } | undefined + try { + await session.insert({ table: 'q', values: src(), format: 'JSONEachRow', maxChunkBytes: 16 }) + } catch (e: unknown) { + err = e as never + } + expect(err?.reason).toBe('write-failure') + expect(err?.failedAtRow).toBe(7) // absolute: rows flushed in earlier chunks + engine's "(at row N)" + expect(err?.progress?.rowsSent).toBe(6) // the six good rows before the bad chunk were flushed + expect(pulledPastFailure).toBe(false) // pull stopped at the failure + expect(session.query('SELECT count() FROM q', 'CSV').trim()).toBe('6') // flushed chunks stay (at-least-once) + }) + + it('abort mid-stream → AbortError; flushed chunks remain; source torn down', async () => { + const ctl = new AbortController() + const src = Readable.from(ndjsonChunks(5000, 512)) + let err: { code?: string; progress?: { chunks: number } } | undefined + try { + await session.insert({ + table: 'q', + values: src, + format: 'JSONEachRow', + maxChunkBytes: 4 * 1024, + columns: ['n'], + settings: { input_format_skip_unknown_fields: 1 }, + onProgress: (p) => { if (p.chunks === 2) ctl.abort() }, + signal: ctl.signal, + }) + } catch (e: unknown) { + err = e as never + } + expect(err?.code).toBe('CHDB_ABORT') + expect(err?.progress?.chunks).toBeGreaterThanOrEqual(2) + expect(src.destroyed).toBe(true) + expect(Number(session.query('SELECT count() FROM q', 'CSV').trim())).toBeGreaterThan(0) + }) + + it('pre-aborted signal rejects before pulling anything', async () => { + const ctl = new AbortController() + ctl.abort() + let pulled = false + async function* src() { + pulled = true + yield Buffer.from('{"n":1}\n') + } + await expect( + session.insert({ table: 'q', values: src(), format: 'JSONEachRow', signal: ctl.signal }), + ).rejects.toMatchObject({ code: 'CHDB_ABORT' }) + expect(pulled).toBe(false) + }) + + it('a single row past maxRowBytes (no boundary) → row-too-large, refusing unbounded buffering', async () => { + async function* endless() { + while (true) yield Buffer.alloc(64 * 1024, 0x61) // 'a' forever, never a newline + } + await expect( + session.insert({ + table: 'q', values: endless(), format: 'JSONEachRow', + maxChunkBytes: 64 * 1024, maxRowBytes: 256 * 1024, + }), + ).rejects.toMatchObject({ code: 'CHDB_INSERT', reason: 'row-too-large' }) + }) + + it('object rows are rejected at the first chunk with the NDJSON-mapping recipe', async () => { + async function* objs() { + yield { n: 1 } as never + } + let msg = '' + try { + await session.insert({ table: 'q', values: objs(), format: 'JSONEachRow' }) + } catch (e: unknown) { + msg = (e as Error).message + } + expect(msg).toContain('JSON.stringify') + }) + + it('format gate: CSV and WithNames variants are rejected for streams with the workaround', async () => { + for (const format of ['CSV', 'CSVWithNames', 'TSVWithNames'] as const) { + let err: { code?: string; message?: string } | undefined + try { + await session.insert({ table: 'q', values: Readable.from([Buffer.from('1\n')]), format: format as never }) + } catch (e: unknown) { + err = e as never + } + expect(err?.code).toBe('CHDB_INSERT') + expect(err?.message).toMatch(/single-shot/) + } + }) +}) diff --git a/test/v3/setup.ts b/test/v3/setup.ts index 3277d39..c6f16f9 100644 --- a/test/v3/setup.ts +++ b/test/v3/setup.ts @@ -1,6 +1,6 @@ import { afterEach } from 'vitest' -// @ts-expect-error — internal test helper, not in the type surface -import { _closeAllSessions } from '../../index.js' +// @ts-expect-error — internal test helpers, not in the type surface +import { _closeAllSessions, _drainPendingOps } from '../../index.js' // Global safety net for the single-connection-per-process constraint. // @@ -13,6 +13,14 @@ import { _closeAllSessions } from '../../index.js' // unrelated files (it surfaced as an intermittent stream.test.ts failure on the // slowest runner). Force-closing any lingering session after every test makes a // leak local to the test that caused it instead of poisoning the rest. -afterEach(() => { +afterEach(async () => { + // Wait for any native op a test settled early (abort/timeout) to actually + // finish before tearing down. The C ABI has no interrupt, so the native write + // keeps running on the libuv thread after the JS promise rejected; closing the + // session or starting the next test while it is still in flight means two + // operations hit libchdb's single in-process engine at once, which aborts the + // engine for the rest of the process. Draining first keeps an early-settled op + // local to the test that started it. + await _drainPendingOps() _closeAllSessions() }) diff --git a/update_libchdb.sh b/update_libchdb.sh index c6736be..849cfb0 100755 --- a/update_libchdb.sh +++ b/update_libchdb.sh @@ -11,7 +11,9 @@ cd "$(dirname "$0")" # Fail fast so a bad download never silently leaves a stale/partial libchdb. set -e -LATEST_RELEASE=v26.5.0 +# Pre-release engine for the 3.1.0-rc.1 test line (carries the written-rows +# accessors the raw/streaming insert needs; absent in the v26.5.0 stable line). +LATEST_RELEASE=v26.5.1-rc.1 # Download the correct version based on the platform case "$(uname -s)" in