Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,124 @@ export interface InsertSummary {
elapsed: number;
}

/**
* Raw insert formats (v1: text formats; the format name is whitelisted, never
* passed through). TSV/TSVWithNames are aliases of TabSeparated*.
*/
export type RawInsertFormat =
| 'JSONEachRow' | 'JSONCompactEachRow'
| 'CSV' | 'CSVWithNames'
| 'TSV' | 'TabSeparated' | 'TSVWithNames' | 'TabSeparatedWithNames';

/**
* Stream-insert formats: line-delimited only. These formats escape raw
* newlines inside values, so a raw '\n' is always a row boundary and every
* re-chunked INSERT payload stays independently valid. CSV (raw newlines legal
* inside quoted fields) and WithNames variants (header cannot repeat per
* chunk) are excluded — use a single-shot Buffer insert for those.
*/
export type StreamInsertFormat = 'JSONEachRow' | 'JSONCompactEachRow' | 'TSV' | 'TabSeparated';

/**
* Parameters for the raw passthrough insert: the payload stays bytes
* (off the V8 heap for Buffer/Uint8Array), is handed to the native side
* zero-copy, and the engine's multithreaded parser does all the parsing — JS
* never builds an object tree from the payload.
*
* Contract: do NOT mutate the Buffer until the returned promise settles (same
* contract as fs.write). A string payload is accepted as a small-payload
* convenience only — it is already a V8 string, so prefer Buffers end to end.
*/
export interface RawInsertParams {
/** Target table (optionally db-qualified). */
table: string;
/** Raw payload bytes in the declared format. */
values: Buffer | Uint8Array | string;
/** Payload format (required; whitelisted). */
format: RawInsertFormat;
/** Explicit column list: INSERT INTO t (a, b) FORMAT ... */
columns?: ReadonlyArray<string>;
/** Per-insert settings (e.g. input_format_skip_unknown_fields: 1). */
settings?: Record<string, string | number | boolean>;
/** Early-settle abort (the underlying write still completes; the payload stays pinned until it does). */
signal?: AbortSignal;
/** Early-settle timeout in ms (same honest single-shot semantics). */
timeout?: number;
}

/**
* Summary of a raw insert. Two ledgers by design:
* - rowsWritten/bytesWritten — engine-side write progress (includes cascaded
* materialized-view writes, same semantics as HTTP X-ClickHouse-Summary).
* - rowsSent/bytesSent — payload-side: non-empty payload lines (exact for
* line-delimited formats; undefined for the CSV family) and payload bytes.
*/
export interface RawInsertSummary {
rowsWritten: number;
bytesWritten: number;
rowsSent?: number;
bytesSent: number;
elapsed: number;
}

/** Progress snapshot for streaming inserts (payload-side ledger). */
export interface InsertProgress {
rowsSent: number;
bytesSent: number;
chunks: number;
}

/**
* Parameters for the backpressured streaming insert. The source is consumed
* pull-based: at most one bounded chunk is buffered and each chunk's INSERT is
* awaited before the next pull, so a fast producer is throttled to the chDB
* write rate (backpressure is flow-control, never an error). Failures surface
* as typed errors that always settle the promise: source-error / stall /
* backpressure-overflow / write-failure / row-too-large / abort — each
* carrying a progress snapshot. Semantics are at-least-once: already-flushed
* chunks are not rolled back.
*/
export interface StreamInsertParams {
table: string;
/** Byte stream: Node Readable or any AsyncIterable of Buffer/Uint8Array/string chunks. */
values: NodeJS.ReadableStream | AsyncIterable<Buffer | Uint8Array | string>;
format: StreamInsertFormat;
columns?: ReadonlyArray<string>;
settings?: Record<string, string | number | boolean>;
/** Target chunk size (default 8 MiB). */
maxChunkBytes?: number;
/** Single-row ceiling; exceeded without a row boundary => row-too-large (default 64 MiB). */
maxRowBytes?: number;
/** Bounded-buffer ceiling for un-pausable Readables => backpressure-overflow (default 64 MiB). */
maxBufferedBytes?: number;
/** Producer idle deadline => ChdbTimeoutError{reason:'stall'}. Off by default. */
stallTimeout?: number;
/** Called after each flushed chunk. */
onProgress?: (p: InsertProgress) => void;
signal?: AbortSignal;
/** Per-chunk timeout in ms. */
timeout?: number;
}

/** Summary of a streaming insert (both ledgers accumulated across chunks). */
export interface StreamInsertSummary {
rowsWritten: number;
bytesWritten: number;
rowsSent: number;
bytesSent: number;
chunks: number;
elapsed: number;
}

/**
* Inserts rows via an inline multi-row INSERT (default connection). Async; never
* reads stdin.
*/
export function insert(params: InsertParams): Promise<InsertSummary>;
/** raw passthrough insert (default connection). */
export function insert(params: RawInsertParams): Promise<RawInsertSummary>;
/** Backpressured streaming insert (default connection). */
export function insert(params: StreamInsertParams): Promise<StreamInsertSummary>;

/**
* Options for {@link Session.queryStream}.
Expand Down Expand Up @@ -205,6 +318,10 @@ export class Session {
* Inserts rows via an inline multi-row INSERT. Async; never reads stdin.
*/
insert(params: InsertParams): Promise<InsertSummary>;
/** raw passthrough insert on this session's connection. */
insert(params: RawInsertParams): Promise<RawInsertSummary>;
/** Backpressured streaming insert on this session's connection. */
insert(params: StreamInsertParams): Promise<StreamInsertSummary>;

/**
* Streams a query result chunk-by-chunk (only one active stream per session).
Expand Down
199 changes: 187 additions & 12 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@ function prepArrow(query, opts, defaultFormat) {
const { formatParamValue } = require('./dist/serialize.js');
const { ChdbResult } = require('./dist/result.js');
const { buildInsertSQL } = require('./dist/insert.js');
const { buildRawInsertPrefix, isRawValues, isStreamValues } = require('./dist/rawInsert.js');
const { streamInsert } = require('./dist/streamInsert.js');

// Map a native/query error into a typed ChdbInsertError (preserving message,
// clickhouseCode and cause).
// clickhouseCode and cause). The engine's "(at row N)" marker is parsed into
// failedAtRow (1-based; chunk-local for streamed chunks — streamInsert
// rebases it to an absolute row number).
function asInsertError(e) {
if (e instanceof ChdbInsertError) return e;
const q = asQueryError(e);
return new ChdbInsertError(q.message, { cause: q, clickhouseCode: q.clickhouseCode });
const m = /\(at row (\d+)\)/.exec(q.message);
return new ChdbInsertError(q.message, {
cause: q,
clickhouseCode: q.clickhouseCode,
failedAtRow: m ? Number(m[1]) : undefined,
});
}

// Shared insert impl: build the inline INSERT...VALUES and run it through the
Expand All @@ -62,6 +71,121 @@ function runInsert(nativeAsyncCall, params) {
);
}

// Normalize a raw payload to a Buffer. Uint8Array views are wrapped zero-copy;
// strings are encoded once on the main thread (documented as a small-payload
// convenience — large payloads should be born as Buffers).
function toPayloadBuffer(v) {
if (typeof v === 'string') return Buffer.from(v, 'utf8');
if (Buffer.isBuffer(v)) return v;
return Buffer.from(v.buffer, v.byteOffset, v.byteLength);
}

// Abort/timeout wrapper for raw inserts. Same honest single-shot semantics as
// withAbortTimeout: JS settles early, the native write runs to completion and
// the payload Buffer stays pinned until it does.
function wrapRawNative(nativePromise, opts, mapOk) {
const signal = opts && opts.signal;
const timeout = opts && opts.timeout;
const base = nativePromise.then(mapOk, (e) => { throw asInsertError(e); });
if (!signal && !timeout) return base;
trackNative(nativePromise); // abort/timeout settles JS early; native runs on after
return new Promise((resolve, reject) => {
let settled = false;
let timer = null;
const cleanup = () => {
if (timer) clearTimeout(timer);
if (signal) signal.removeEventListener('abort', onAbort);
};
const finish = (fn, v) => { if (!settled) { settled = true; cleanup(); fn(v); } };
function onAbort() {
finish(reject, new ChdbAbortError(
'Insert aborted (the underlying write may still complete in the background)'));
}
if (signal) {
if (signal.aborted) { onAbort(); return; }
signal.addEventListener('abort', onAbort, { once: true });
}
if (timeout) {
timer = setTimeout(() => finish(reject, new ChdbTimeoutError(
`Insert timed out after ${timeout}ms (the underlying write may still complete in the background)`)), timeout);
}
base.then((v) => finish(resolve, v), (e) => finish(reject, e));
});
}

// Raw passthrough: the payload Buffer is handed to the native side
// zero-copy; prefix assembly and execution happen on the libuv thread.
function runRawInsert(nativeRawCall, params) {
let built;
try { built = buildRawInsertPrefix(params); } catch (e) { return Promise.reject(asInsertError(e)); }
const buf = toPayloadBuffer(params.values);
if (buf.length === 0) {
return Promise.resolve({
rowsWritten: 0, bytesWritten: 0,
rowsSent: built.fmt.lineDelimited ? 0 : undefined,
bytesSent: 0, elapsed: 0,
});
}
return wrapRawNative(nativeRawCall(built.prefix, buf, built.fmt.lineDelimited), params, (raw) => ({
// Engine-side ledger (chdb-io/chdb-core#88): includes MV-cascade writes.
rowsWritten: raw.rowsWritten,
bytesWritten: raw.bytesWritten,
// Payload-side ledger: non-empty lines minus a WithNames header line.
rowsSent: raw.rowsSent === undefined ? undefined : Math.max(0, raw.rowsSent - built.fmt.headerLines),
bytesSent: buf.length,
elapsed: raw.elapsed,
}));
}

// Streaming input over the raw entry (backpressure contract; see src/streamInsert.ts).
function runStreamInsert(nativeRawCall, params) {
let built;
try { built = buildRawInsertPrefix(params); } catch (e) { return Promise.reject(asInsertError(e)); }
if (!built.fmt.lineDelimited || built.fmt.headerLines !== 0) {
return Promise.reject(new ChdbInsertError(
`Format '${params.format}' cannot be safely re-chunked for stream input ` +
'(CSV may hold raw newlines inside quoted fields; WithNames headers cannot repeat per chunk). ' +
'Use JSONEachRow/JSONCompactEachRow/TabSeparated for streams, or a single-shot Buffer insert.'));
}
// NOTE: objectMode streams are NOT pre-rejected — Readable.from() is
// objectMode even when it yields Buffers/strings (a perfectly good byte
// source). Object rows are rejected at the first non-byte chunk instead
// (streamInsert's toChunkBuffer, with the NDJSON-mapping recipe).
const insertChunk = (data) =>
wrapRawNative(nativeRawCall(built.prefix, data, true),
{ signal: params.signal, timeout: params.timeout }, (raw) => raw);
return streamInsert(insertChunk, params);
}

// insert() dispatch (never guesses on a conflicting signature; every rejected
// branch's message carries its workaround).
function dispatchInsert(nativeSqlCall, nativeRawCall, params) {
const v = params.values;
if (isRawValues(v)) {
if (params.format === undefined) {
return Promise.reject(new ChdbInsertError(
"Raw insert requires an explicit 'format': insert({ table, values: buffer, format: 'JSONEachRow' })"));
}
return runRawInsert(nativeRawCall, params);
}
if (Array.isArray(v)) {
if (params.format !== undefined) {
return Promise.reject(new ChdbInsertError(
"'format' is not supported with row arrays (reserved for chunked object inserts); " +
"drop 'format' for the VALUES path, or pre-serialize: values = Buffer.from(rows.map(r => JSON.stringify(r)).join('\\n') + '\\n')"));
}
return runInsert(nativeSqlCall, params);
}
if (isStreamValues(v)) {
if (params.format === undefined) {
return Promise.reject(new ChdbInsertError(
"Stream insert requires an explicit 'format' (e.g. 'JSONEachRow')"));
}
return runStreamInsert(nativeRawCall, params);
}
return runInsert(nativeSqlCall, params);
}

function emptyResult() {
return new ChdbResult({ bytes: new Uint8Array(0), elapsed: 0, rowsRead: 0, bytesRead: 0 });
}
Expand All @@ -76,6 +200,7 @@ function withAbortTimeout(nativePromise, opts) {
if (!signal && !timeout) {
return nativePromise.then((raw) => new ChdbResult(raw), (e) => { throw asQueryError(e); });
}
trackNative(nativePromise); // abort/timeout settles JS early; native runs on after
return new Promise((resolve, reject) => {
let settled = false;
let timer = null;
Expand Down Expand Up @@ -262,16 +387,47 @@ function queryBindAsync(query, params = {}, opts = {}) {
return runExclusiveParam(defaultParamChain, () => chdbNode.QueryAsync(sql, format, bound), opts);
}

// v3 insert (default connection). opts: { table, values, columns? }
// v3 insert (default connection). Dispatches on the shape of `values`:
// row arrays -> inline VALUES; Buffer/Uint8Array/string + format -> raw
// passthrough; Readable/AsyncIterable + format -> backpressured stream insert.
function insert(opts) {
return runInsert((sql) => chdbNode.QueryAsync(sql, "CSV"), opts || {});
return dispatchInsert(
(sql) => chdbNode.QueryAsync(sql, "CSV"),
(prefix, buf, countLines) => chdbNode.InsertRawAsync(prefix, buf, countLines),
opts || {});
}

// Track open sessions so a normal process exit can release native connections
// and remove temp dirs even when the user forgot to close. This
// complements the native env cleanup hook + std::atexit backstop.
const openSessions = new Set();

// Track in-flight native operations (async query/insert) that were settled
// EARLY by abort/timeout. There is no interrupt in the C ABI, so the native
// computation keeps running on the libuv thread after the JS promise has
// already rejected. libchdb permits only ONE active operation per process, so
// an orphaned background op from one test races the next test's first query and
// crashes the in-process engine ("server is shutting down due to a fatal
// error", ABORTED), cascading into every later test (this is why x64 CI runners
// went all-red while the timing on arm runners usually won the race). Each
// tracked promise settles on NATIVE completion and never rejects — the caller
// already received the mapped early-settle error; here we only need the timing.
const pendingNativeOps = new Set();
function trackNative(nativePromise) {
const done = nativePromise.then(() => {}, () => {});
pendingNativeOps.add(done);
done.then(() => pendingNativeOps.delete(done));
return nativePromise;
}

// Wait for every native op started before now to fully settle. Internal helper
// for test teardown: drained in the global afterEach before sessions are closed
// so an early-settled (aborted/timed-out) op stays local to the test that
// started it instead of poisoning the shared single-connection engine.
function _drainPendingOps() {
return Promise.allSettled([...pendingNativeOps]);
}

// Force-close every session still open in this process. Internal helper for
// test teardown: libchdb allows one active data directory per process, so a
// single test that creates a Session and never closes it (e.g. it threw before
Expand Down Expand Up @@ -385,11 +541,15 @@ class Session {
return runExclusiveParam(this.#paramChain, () => chdbNode.QueryAsyncConnection(this.connection, sql, format, bound), opts);
}

// v3 insert. opts: { table, values, columns? }. Inline INSERT ... VALUES,
// executed async; never reads stdin (closes #26).
// v3 insert. Same dispatch as the standalone insert(): row arrays -> inline
// VALUES; raw bytes + format -> passthrough; stream + format -> backpressured
// stream insert. Executed async; never reads stdin (closes #26).
insert(opts) {
if (!this.connection) return Promise.reject(new ChdbClosedError("No active connection available"));
return runInsert((sql) => chdbNode.QueryAsyncConnection(this.connection, sql, "CSV"), opts || {});
return dispatchInsert(
(sql) => chdbNode.QueryAsyncConnection(this.connection, sql, "CSV"),
(prefix, buf, countLines) => chdbNode.InsertRawAsyncConnection(this.connection, prefix, buf, countLines),
opts || {});
}

// v3 streaming. opts: { format?='JSONEachRow', signal? }. Returns a
Expand Down Expand Up @@ -426,11 +586,26 @@ class Session {
for (const sig of SESSION_SIGNALS) process.removeListener(sig, this.#signalHandler);
this.#signalHandler = null;
}
if (this.connection) {
try { chdbNode.CloseConnection(this.connection); } catch (_) { /* best effort */ }
this.connection = null;
const conn = this.connection;
this.connection = null;
const teardown = () => {
if (conn) { try { chdbNode.CloseConnection(conn); } catch (_) { /* best effort */ } }
if (this.isTemp) { try { this.#removeTempDir(); } catch (_) { /* best effort */ } }
};
// Destroying the native connection while an op is still running on it aborts
// the engine for the rest of the process. abort/timeout settle the JS promise
// early but leave the native computation running (no interrupt in the C ABI),
// so close() may land mid-flight. When ops are still pending, defer teardown
// until they drain rather than racing them. _drainPendingOps() awaits these
// deferred closes too, so a new session is never created before the prior
// connection is fully released.
if (conn && pendingNativeOps.size > 0) {
const deferred = Promise.allSettled([...pendingNativeOps]).then(teardown);
pendingNativeOps.add(deferred);
deferred.finally(() => pendingNativeOps.delete(deferred));
} else {
teardown();
}
if (this.isTemp) this.#removeTempDir();
}

// v2 alias for close().
Expand Down Expand Up @@ -499,4 +674,4 @@ function version() {
};
}

module.exports = { query, queryBind, queryAsync, queryBindAsync, insert, Session, version, _closeAllSessions };
module.exports = { query, queryBind, queryAsync, queryBindAsync, insert, Session, version, _closeAllSessions, _drainPendingOps };
Loading
Loading