diff --git a/KERNEL_REV b/KERNEL_REV new file mode 100644 index 00000000..1603f791 --- /dev/null +++ b/KERNEL_REV @@ -0,0 +1 @@ +8bedaabf69f5bce5a957a8775f29dbb8dbdd2e71 diff --git a/lib/DBSQLParameter.ts b/lib/DBSQLParameter.ts index 5e7f0abc..63c2465b 100644 --- a/lib/DBSQLParameter.ts +++ b/lib/DBSQLParameter.ts @@ -8,6 +8,15 @@ export enum DBSQLParameterType { STRING = 'STRING', DATE = 'DATE', TIMESTAMP = 'TIMESTAMP', + // `TIMESTAMP_NTZ` binds a timezone-free (wall-clock) timestamp. It is a real + // Spark type, bound natively on both the Thrift and kernel backends (requires + // a server that supports TIMESTAMP_NTZ; Spark 3.4+ / recent DBR). + TIMESTAMP_NTZ = 'TIMESTAMP_NTZ', + // `TIMESTAMP_LTZ` is an alias for `TIMESTAMP`: Spark has no distinct + // TIMESTAMP_LTZ type — `TIMESTAMP` already carries local/instant (LTZ) + // semantics. `toSparkParameter` therefore binds it as `TIMESTAMP` on the wire + // (valid on both backends); it exists only as a self-documenting alias. + TIMESTAMP_LTZ = 'TIMESTAMP_LTZ', FLOAT = 'FLOAT', DECIMAL = 'DECIMAL', DOUBLE = 'DOUBLE', @@ -50,10 +59,16 @@ export class DBSQLParameter { return new TSparkParameter({ name }); // for NULL neither `type` nor `value` should be set } + // Map timezone-explicit timestamp aliases to their Spark wire type. Spark + // has no distinct TIMESTAMP_LTZ type (TIMESTAMP carries LTZ semantics), so + // bind it as TIMESTAMP — valid on both the Thrift and kernel backends. + // TIMESTAMP_NTZ is a real Spark type and is bound natively. + const wireType = this.type === DBSQLParameterType.TIMESTAMP_LTZ ? DBSQLParameterType.TIMESTAMP : this.type; + if (typeof this.value === 'boolean') { return new TSparkParameter({ name, - type: this.type ?? DBSQLParameterType.BOOLEAN, + type: wireType ?? DBSQLParameterType.BOOLEAN, value: new TSparkParameterValue({ stringValue: this.value ? 'TRUE' : 'FALSE', }), @@ -63,7 +78,7 @@ export class DBSQLParameter { if (typeof this.value === 'number') { return new TSparkParameter({ name, - type: this.type ?? (Number.isInteger(this.value) ? DBSQLParameterType.INTEGER : DBSQLParameterType.DOUBLE), + type: wireType ?? (Number.isInteger(this.value) ? DBSQLParameterType.INTEGER : DBSQLParameterType.DOUBLE), value: new TSparkParameterValue({ stringValue: Number(this.value).toString(), }), @@ -73,7 +88,7 @@ export class DBSQLParameter { if (this.value instanceof Int64 || typeof this.value === 'bigint') { return new TSparkParameter({ name, - type: this.type ?? DBSQLParameterType.BIGINT, + type: wireType ?? DBSQLParameterType.BIGINT, value: new TSparkParameterValue({ stringValue: this.value.toString(), }), @@ -83,7 +98,7 @@ export class DBSQLParameter { if (this.value instanceof Date) { return new TSparkParameter({ name, - type: this.type ?? DBSQLParameterType.TIMESTAMP, + type: wireType ?? DBSQLParameterType.TIMESTAMP, value: new TSparkParameterValue({ stringValue: this.value.toISOString(), }), @@ -92,7 +107,7 @@ export class DBSQLParameter { return new TSparkParameter({ name, - type: this.type ?? DBSQLParameterType.STRING, + type: wireType ?? DBSQLParameterType.STRING, value: new TSparkParameterValue({ stringValue: this.value, }), diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index 392f3108..d17cfa5a 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -12,7 +12,18 @@ export type ExecuteStatementOptions = { */ queryTimeout?: number | bigint | Int64; /** - * @deprecated This option is no longer supported and will be removed in future releases + * Selects the execution lifecycle. The only observable effect is WHEN + * `executeStatement` resolves; the result data, schema, and error classes are + * identical regardless. + * + * - **Thrift backend:** no-op. The Thrift path always submits asynchronously + * (`runAsync: true` on the wire) and polls during fetch; this option is not + * read. + * - **Kernel backend (`useSEA`):** selects the kernel execution path — + * `false`/unset (default) runs the blocking direct-results path (faster, + * cancellable mid-compute); `true` submits and polls (returns a pending + * handle before completion). Default is sync, matching the python + * connector's `cursor.execute()`. */ runAsync?: boolean; maxRows?: number | bigint | Int64 | null; @@ -27,6 +38,18 @@ export type ExecuteStatementOptions = { * These tags apply only to this statement and do not persist across queries. */ queryTags?: Record; + /** + * SEA-only: server-side row cap for this statement (kernel `row_limit`). The + * Thrift backend has no execute-time server cap, so this is a no-op there; + * use `maxRows` for the cross-backend client-side fetch limit. + */ + rowLimit?: number; + /** + * SEA-only: per-statement Spark conf overlay (kernel `statement_conf`). + * Merged with the serialized `queryTags` (which land under the reserved + * `query_tags` key). Ignored by the Thrift backend. + */ + statementConf?: Record; }; export type TypeInfoRequest = { diff --git a/lib/contracts/InternalConnectionOptions.ts b/lib/contracts/InternalConnectionOptions.ts index a115aa47..24575984 100644 --- a/lib/contracts/InternalConnectionOptions.ts +++ b/lib/contracts/InternalConnectionOptions.ts @@ -18,4 +18,27 @@ export interface InternalConnectionOptions { * @internal Not stable; M0 stub only. */ useSEA?: boolean; + + /** + * SEA-only: kernel connection-pool size (`ConnectionOptions.max_connections`). + * Validated as a positive integer within the napi `u32` range. + * @internal SEA path only. + */ + maxConnections?: number; + + /** + * SEA-only: verify the server's TLS certificate. Secure-by-default — omit + * to keep full chain + hostname verification; set `false` only to opt into + * the insecure accept-anything mode. + * @internal SEA path only. + */ + checkServerCertificate?: boolean; + + /** + * SEA-only: PEM-encoded CA certificate (string or `Buffer`) added to the + * trust store on top of the system roots — for TLS-inspecting proxies or + * on-prem internal CAs. Honoured regardless of `checkServerCertificate`. + * @internal SEA path only. + */ + customCaCert?: Buffer | string; } diff --git a/lib/sea/SeaAuth.ts b/lib/sea/SeaAuth.ts index bdfabf3d..a9d9d116 100644 --- a/lib/sea/SeaAuth.ts +++ b/lib/sea/SeaAuth.ts @@ -13,6 +13,7 @@ // limitations under the License. import { ConnectionOptions } from '../contracts/IDBSQLClient'; +import { InternalConnectionOptions } from '../contracts/InternalConnectionOptions'; import AuthenticationError from '../errors/AuthenticationError'; import HiveDriverError from '../errors/HiveDriverError'; @@ -66,9 +67,58 @@ export interface SeaSessionDefaults { catalog?: string; schema?: string; sessionConf?: Record; + /** + * Render `INTERVAL` / `DURATION` result columns as strings + * (kernel `ResultConfig.intervals_as_string`). The kernel default is + * native Arrow `month_interval` / `duration[us]`, but the NodeJS + * Thrift driver surfaces intervals as strings — so the SEA path sets + * this `true` so its result shape is a byte-compatible drop-in for the + * Thrift backend. Omitting it falls back to the kernel's native types. + */ + intervalsAsString?: boolean; + /** + * Render complex (`ARRAY` / `MAP` / `STRUCT` / `VARIANT`) result + * columns as JSON strings (kernel `ResultConfig.complex_types_as_json`). + * Left unset on the SEA path: native Arrow nested types already decode + * identically to the Thrift backend through the shared Arrow converter, + * so forcing JSON here would *introduce* a divergence rather than + * remove one. + */ + complexTypesAsJson?: boolean; + /** + * Per-session kernel connection-pool size + * (kernel `ConnectionOptions.max_connections`). Validated as a positive + * integer within the napi `u32` range by `buildSeaConnectionOptions`. + */ + maxConnections?: number; +} + +/** + * TLS options shared across all auth-mode variants. Mirror the napi + * binding's `ConnectionOptions.checkServerCertificate` / `.customCaCert` + * (kernel `Session::builder().tls(TlsConfig)`). + * + * The napi shape takes `customCaCert` as a `Buffer` only; the public + * `ConnectionOptions` additionally accepts a PEM string, which + * `buildSeaConnectionOptions` normalises to a `Buffer` before crossing + * the FFI boundary. + */ +export interface SeaTlsOptions { + /** + * Verify the server's TLS certificate. The SEA backend is + * **secure-by-default**: omitting this leaves the kernel default of + * `true` (full chain + hostname verification). Set `false` only to opt + * into the insecure, accept-anything mode (analogous to Thrift's + * `rejectUnauthorized: false`); prefer pairing strict checking with + * `customCaCert` over disabling verification entirely. + */ + checkServerCertificate?: boolean; + /** PEM-encoded CA bytes to add to the trust store. */ + customCaCert?: Buffer; } export type SeaNativeConnectionOptions = SeaSessionDefaults & + SeaTlsOptions & ( | { hostName: string; @@ -114,6 +164,63 @@ export function isBlankOrReserved(s: string): boolean { return normalized.length === 0 || normalized === 'undefined' || normalized === 'null'; } +/** napi-rs marshals `maxConnections` as a `u32`; reject values it can't hold. */ +const MAX_U32 = 0xffffffff; + +/** + * Normalise the public TLS options (`checkServerCertificate` / + * `customCaCert`) into the napi shape. + * + * - `checkServerCertificate` passes through verbatim (only when set; an + * absent value leaves the kernel default, which is secure — verify on). + * - `customCaCert` accepts a PEM string or `Buffer` on the public + * surface; we convert a string to a `Buffer` here and do a light PEM + * sanity check. The bytes are NOT parsed in JS — the kernel returns a + * meaningful error if the PEM is malformed. + * + * Throws `HiveDriverError` when `customCaCert` is supplied but empty or + * (for strings) lacks a PEM certificate header. + */ +export function buildSeaTlsOptions(options: ConnectionOptions): SeaTlsOptions { + // Read the SEA-only fields through the purpose-built internal options type + // rather than an ad-hoc inline cast, so the shape can't silently drift from + // its declaration and a typo'd key fails to compile. + const { checkServerCertificate, customCaCert } = options as ConnectionOptions & InternalConnectionOptions; + + const tls: SeaTlsOptions = {}; + + if (checkServerCertificate !== undefined) { + tls.checkServerCertificate = checkServerCertificate; + } + + if (customCaCert !== undefined) { + if (typeof customCaCert === 'string') { + // Light PEM sanity check — require a well-ordered BEGIN…END block so a + // truncated/headerless cert (or a stray page that merely contains both + // literals out of order, e.g. a proxy-intercept page) is rejected here + // rather than surfacing as an opaque kernel TLS error. Ordered match, not + // two independent substring checks. Full parsing is deferred to the kernel. + if (!/-----BEGIN CERTIFICATE-----[\s\S]+?-----END CERTIFICATE-----/.test(customCaCert)) { + throw new HiveDriverError( + 'SEA backend: `customCaCert` string does not look like a PEM certificate ' + + "(expected a '-----BEGIN CERTIFICATE-----' … '-----END CERTIFICATE-----' block). " + + 'Pass PEM text or a Buffer of PEM bytes.', + ); + } + tls.customCaCert = Buffer.from(customCaCert, 'utf8'); + } else if (Buffer.isBuffer(customCaCert)) { + if (customCaCert.length === 0) { + throw new HiveDriverError('SEA backend: `customCaCert` Buffer is empty.'); + } + tls.customCaCert = customCaCert; + } else { + throw new HiveDriverError('SEA backend: `customCaCert` must be a PEM string or a Buffer.'); + } + } + + return tls; +} + /** * Validate the user-supplied `ConnectionOptions` and build the * napi-binding's connection-options shape. @@ -170,11 +277,43 @@ export function isBlankOrReserved(s: string): boolean { export function buildSeaConnectionOptions(options: ConnectionOptions): SeaNativeConnectionOptions { const { authType } = options as { authType?: string }; - const base = { + const base: { + hostName: string; + httpPath: string; + intervalsAsString: boolean; + maxConnections?: number; + } & SeaTlsOptions = { hostName: options.host, httpPath: prependSlash(options.path), + // Match the NodeJS Thrift driver, which surfaces INTERVAL columns as + // strings. The kernel defaults to native Arrow interval/duration types; + // forcing the string rendering here keeps the SEA path a byte-compatible + // drop-in. Complex types are intentionally left at the kernel default + // (native Arrow) — they already decode identically to Thrift via the + // shared Arrow converter, so `complexTypesAsJson` is not forced on. + intervalsAsString: true, + // TLS knobs (server-cert verification toggle + custom CA). Validated and + // normalised (string PEM → Buffer) here so the napi shape only sees a Buffer. + ...buildSeaTlsOptions(options), }; + // SEA-only pool sizing; read via cast to match how this function reads the + // other SEA-specific options (TLS) — they live on the internal options + // surface, not the published public `ConnectionOptions` `.d.ts`. + const { maxConnections } = options as ConnectionOptions & InternalConnectionOptions; + if (maxConnections !== undefined) { + if (!Number.isInteger(maxConnections) || maxConnections < 1) { + throw new HiveDriverError(`SEA backend: \`maxConnections\` must be a positive integer; got ${maxConnections}.`); + } + if (maxConnections > MAX_U32) { + throw new HiveDriverError( + `SEA backend: \`maxConnections\` exceeds the napi u32 limit (${MAX_U32}); got ${maxConnections}. ` + + 'Typical pool sizes are 10-500.', + ); + } + base.maxConnections = maxConnections; + } + const oauth = options as { oauthClientId?: string; oauthClientSecret?: string; diff --git a/lib/sea/SeaBackend.ts b/lib/sea/SeaBackend.ts index 1043da8d..d609cac9 100644 --- a/lib/sea/SeaBackend.ts +++ b/lib/sea/SeaBackend.ts @@ -16,6 +16,8 @@ import IBackend from '../contracts/IBackend'; import ISessionBackend from '../contracts/ISessionBackend'; import IClientContext from '../contracts/IClientContext'; import { ConnectionOptions, OpenSessionRequest } from '../contracts/IDBSQLClient'; +import { InternalConnectionOptions } from '../contracts/InternalConnectionOptions'; +import { LogLevel } from '../contracts/IDBSQLLogger'; import HiveDriverError from '../errors/HiveDriverError'; import { getSeaNative, SeaNativeBinding, SeaConnection } from './SeaNativeLoader'; import { decodeNapiKernelError } from './SeaErrorMapping'; @@ -78,6 +80,22 @@ export default class SeaBackend implements IBackend { // Any non-PAT mode (or a missing/empty token) throws here, before // we ever touch the native binding. this.nativeOptions = buildSeaConnectionOptions(options); + + // Warn on the insecure combo: a `customCaCert` paired with + // `checkServerCertificate: false` is almost always a mistake — verification + // is fully off, so the custom trust anchor is never used. The combo is + // still honoured (kernel contract), but a secure-looking `customCaCert` + // shouldn't silently mask disabled verification. + const tlsOpts = options as ConnectionOptions & InternalConnectionOptions; + if (tlsOpts.checkServerCertificate === false && tlsOpts.customCaCert !== undefined) { + this.context + .getLogger() + .log( + LogLevel.warn, + 'SEA: `customCaCert` is set but `checkServerCertificate: false` disables certificate ' + + 'verification entirely — the custom CA is not used. Set `checkServerCertificate: true` to use it.', + ); + } } public async openSession(request: OpenSessionRequest): Promise { diff --git a/lib/sea/SeaNativeLoader.ts b/lib/sea/SeaNativeLoader.ts index 8eb36f6a..80352be6 100644 --- a/lib/sea/SeaNativeLoader.ts +++ b/lib/sea/SeaNativeLoader.ts @@ -36,6 +36,9 @@ import type { ExecuteOptions as NativeExecuteOptions, TypedValueInput as NativeTypedValueInput, NamedTypedValueInput as NativeNamedTypedValueInput, + AsyncStatement as NativeAsyncStatement, + AsyncResultHandle as NativeAsyncResultHandle, + CancellableExecution as NativeCancellableExecution, } from '../../native/sea'; // SEA-prefixed re-exports. The kernel-generated `.d.ts` keeps the @@ -59,6 +62,22 @@ export type SeaNativeExecuteOptions = NativeExecuteOptions; export type SeaNativeTypedValueInput = NativeTypedValueInput; export type SeaNativeNamedTypedValueInput = NativeNamedTypedValueInput; +// Async-submit surface: `Connection.submitStatement` returns an +// `AsyncStatement` (status / awaitResult / cancel / close); `awaitResult` +// yields an `AsyncResultHandle` whose `fetchNextBatch` / `schema` match the +// blocking `Statement`'s fetch surface, so the results pipeline consumes +// either interchangeably. +export type SeaNativeAsyncStatement = NativeAsyncStatement; +export type SeaNativeAsyncResultHandle = NativeAsyncResultHandle; + +// Cancellable sync-execute surface: `Connection.executeStatementCancellable` +// returns a `CancellableExecution` that captures a detached StatementCanceller +// BEFORE dispatching the blocking `execute()`, so a concurrent `cancel()` +// interrupts a still-running query mid-compute. `result()` drives the blocking +// execute and resolves to the same terminal `Statement` `executeStatement` +// returns. +export type SeaNativeCancellableExecution = NativeCancellableExecution; + /** * The full native binding surface, derived from the generated module * so it can never drift from the `.d.ts` contract: when the kernel @@ -124,6 +143,13 @@ function assertBindingShape(binding: SeaNativeBinding): void { if (typeof binding.openSession !== 'function') missing.push('openSession'); if (typeof binding.Connection !== 'function') missing.push('Connection'); if (typeof binding.Statement !== 'function') missing.push('Statement'); + // Classes the async (submit/poll) and cancellable-sync execution paths depend + // on. Checking them here turns a stale/older cached `.node` into a clear + // load-time error instead of an `X is not a function` mid-query (e.g. + // `Connection.submitStatement` / `executeStatementCancellable`). + if (typeof binding.AsyncStatement !== 'function') missing.push('AsyncStatement'); + if (typeof binding.AsyncResultHandle !== 'function') missing.push('AsyncResultHandle'); + if (typeof binding.CancellableExecution !== 'function') missing.push('CancellableExecution'); if (missing.length > 0) { throw new Error( `SEA native binding loaded but is missing expected export(s): ${missing.join(', ')}. ` + diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts index eb002870..2a4c8136 100644 --- a/lib/sea/SeaOperationBackend.ts +++ b/lib/sea/SeaOperationBackend.ts @@ -45,12 +45,18 @@ import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; import Status from '../dto/Status'; import HiveDriverError from '../errors/HiveDriverError'; +import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; import ArrowResultConverter from '../result/ArrowResultConverter'; import ResultSlicer from '../result/ResultSlicer'; import SeaResultsProvider from './SeaResultsProvider'; import { arrowSchemaToThriftSchema, decodeIpcSchema, patchIpcBytes } from './SeaArrowIpc'; import { decodeNapiKernelError } from './SeaErrorMapping'; -import { SeaStatement } from './SeaNativeLoader'; +import { + SeaStatement, + SeaNativeAsyncStatement, + SeaNativeAsyncResultHandle, + SeaNativeCancellableExecution, +} from './SeaNativeLoader'; import { SeaStatementHandle, SeaOperationLifecycleState, @@ -71,23 +77,99 @@ import { export type SeaOperationStatement = SeaStatementHandle & Partial; /** - * Constructor options for `SeaOperationBackend`. + * The fetch surface shared by the blocking metadata `Statement` and the async + * query path's `AsyncResultHandle` (from `awaitResult()`): both expose + * `fetchNextBatch()` + a synchronous `schema()`, so the results pipeline + * (`SeaResultsProvider` → `ArrowResultConverter` → `ResultSlicer`) consumes + * either interchangeably. + */ +type SeaFetchHandle = Pick; + +/** Poll cadence for the async `status()` loop — matches the Thrift backend's 100ms. */ +const STATUS_POLL_INTERVAL_MS = 100; + +function delay(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +/** + * Map a kernel `AsyncStatement.status()` string to the backend-neutral + * `OperationState`. The kernel variant names (`Pending` / `Running` / + * `Succeeded` / `Failed` / `Cancelled` / `Closed` / `Unknown`) line up 1:1 + * with the enum; `Canceled` (one-L spelling) is mapped defensively, and any + * unrecognised value collapses to `Unknown`. + */ +// Hoisted out of the (hot, 100ms) async poll loop — `Object.values` would +// otherwise allocate a fresh array on every status tick. +const OPERATION_STATE_VALUES = Object.values(OperationState) as string[]; + +function statusStringToOperationState(state: string): OperationState { + if (state === 'Canceled') { + return OperationState.Cancelled; + } + if (OPERATION_STATE_VALUES.includes(state)) { + return state as OperationState; + } + return OperationState.Unknown; +} + +/** + * Constructor options for `SeaOperationBackend`. Exactly one of + * `asyncStatement` (query path — `Connection.submitStatement`) or `statement` + * (metadata path — `Connection.list*` / `get*`, already terminal) must be set. */ export interface SeaOperationBackendOptions { - /** The opaque napi `Statement` handle returned by `Connection.executeStatement(...)`. */ - statement: SeaOperationStatement; + /** The pending napi `AsyncStatement` from `Connection.submitStatement(...)`. */ + asyncStatement?: SeaNativeAsyncStatement; + /** The terminal napi `Statement` from a metadata call. */ + statement?: SeaOperationStatement; + /** + * The pending napi `CancellableExecution` from + * `Connection.executeStatementCancellable(...)` — the sync (`runAsync: false`) + * query path. `result()` drives the blocking `execute()` to a terminal + * `Statement` (the fetch handle); `cancel()` fires a detached canceller that + * interrupts a still-running `result()` mid-COMPUTE. Exactly one of + * `asyncStatement`, `statement`, or `cancellableExecution` must be set. + */ + cancellableExecution?: SeaNativeCancellableExecution; context: IClientContext; /** - * Optional override for `id`. When not provided a fresh UUIDv4 is - * generated upstream (in `SeaSessionBackend.executeStatement`); the - * kernel does not yet surface its internal statement-id at the napi - * boundary. Once it does, the JS layer can thread it through here. + * Optional override for `id`. Defaults to the napi statement-id when the + * handle exposes one, else a fresh UUIDv4. */ id?: string; + /** + * Client-side query timeout in whole seconds (the public `queryTimeout`). + * The kernel ignores `queryTimeoutSecs` on the async submit path + * (`submitStatement` always sends `wait_timeout=0s`), so the JS poll loop + * enforces it as a deadline — on expiry it best-effort cancels the statement + * and throws `OperationStateError(Timeout)`, matching the Thrift path's + * server-side TIMEDOUT outcome. Omitted ⇒ no client-side deadline. + */ + queryTimeoutSecs?: number; } export default class SeaOperationBackend implements IOperationBackend { - private readonly statement: SeaOperationStatement; + // Async query path: pending async statement we poll to terminal. Undefined on + // the metadata / sync-execute paths. + private readonly asyncStatement?: SeaNativeAsyncStatement; + + // Sync query path (`runAsync: false`): pending cancellable execution whose + // `result()` drives the blocking `execute()` to a terminal `Statement`. + // Undefined on the async / metadata paths. + private readonly cancellableExecution?: SeaNativeCancellableExecution; + + // Metadata path: terminal statement. Also the resolved fetch handle on the + // sync-execute path once `cancellableExecution.result()` settles. + private blockingStatement?: SeaOperationStatement; + + // The cancel/close surface — whichever handle backs this operation. Both + // `AsyncStatement` and `Statement` expose `cancel()` / `close()`; the + // sync-execute path uses a composite that routes `cancel()` to the + // cancellable execution (mid-compute) and `close()` to the resolved statement. + private readonly lifecycleHandle: SeaStatementHandle; private readonly context: IClientContext; @@ -103,13 +185,65 @@ export default class SeaOperationBackend implements IOperationBackend { private metadataPromise?: Promise; - constructor({ statement, context, id }: SeaOperationBackendOptions) { - this.statement = statement; + // Memoised fetch handle: on the async path it is `awaitResult()`'s result + // (resolved once the statement is terminal); on the metadata path it is the + // already-terminal statement. Drives both fetch and result-metadata. + private fetchHandlePromise?: Promise; + + // Client-side query-timeout deadline in ms (the public `queryTimeout`), + // undefined when unset. Enforced in the async poll loop. + private readonly queryTimeoutMs?: number; + + constructor({ + asyncStatement, + statement, + cancellableExecution, + context, + id, + queryTimeoutSecs, + }: SeaOperationBackendOptions) { + // Exactly one of the three handle kinds must be supplied. + const providedCount = + (asyncStatement !== undefined ? 1 : 0) + + (statement !== undefined ? 1 : 0) + + (cancellableExecution !== undefined ? 1 : 0); + if (providedCount !== 1) { + throw new HiveDriverError( + 'SeaOperationBackend: exactly one of `asyncStatement`, `statement`, or `cancellableExecution` must be provided', + ); + } + this.asyncStatement = asyncStatement; + this.cancellableExecution = cancellableExecution; + this.blockingStatement = statement; + // Lifecycle surface. The async/metadata handles expose both cancel/close. + // The sync-execute path uses a composite: `cancel()` always routes to the + // cancellable execution (lock-free, interrupts a running `result()` + // mid-compute and is a no-op once terminal); `close()` closes the resolved + // terminal statement once `result()` produced it, OR — if `result()` is + // still in flight — proactively cancels the running execution so the server + // stops computing immediately rather than running on until the kernel's + // drop-guard fires whenever this handle is eventually GC'd. + this.lifecycleHandle = cancellableExecution + ? { + cancel: () => cancellableExecution.cancel(), + close: () => (this.blockingStatement ? this.blockingStatement.close() : cancellableExecution.cancel()), + } + : ((asyncStatement ?? statement) as SeaStatementHandle); this.context = context; - this._id = id ?? uuidv4(); + this._id = + id ?? asyncStatement?.statementId ?? statement?.statementId ?? cancellableExecution?.statementId ?? uuidv4(); + this.queryTimeoutMs = queryTimeoutSecs !== undefined && queryTimeoutSecs > 0 ? queryTimeoutSecs * 1000 : undefined; } public get id(): string { + // STABLE for the operation's lifetime. The facade keys telemetry start/ + // complete on this value (DBSQLOperation → MetricsAggregator), so it must + // NOT mutate — a sync op's server statement_id isn't known until `result()` + // resolves (mid-execute), and flipping `id` then would split the start/ + // complete records across two keys and silently drop the summary. The + // resolved server statement_id is instead surfaced via a debug log (see + // `getFetchHandle`) for server/kernel log correlation. On the async path + // `_id` already IS the server id (available at submit). return this._id; } @@ -162,7 +296,7 @@ export default class SeaOperationBackend implements IOperationBackend { // wedged, so nothing downstream forces another close). We still don't // mask the original fetch error, but log the close failure at warn so // the leak is diagnosable rather than completely invisible. - await seaClose(this.lifecycle, this.statement, this.context, this._id).catch((closeErr) => { + await seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id).catch((closeErr) => { const cause = closeErr instanceof Error ? closeErr.message : String(closeErr); this.context .getLogger() @@ -191,12 +325,16 @@ export default class SeaOperationBackend implements IOperationBackend { return this.metadataPromise; } this.metadataPromise = (async () => { - if (!this.statement.schema) { - throw new HiveDriverError('SeaOperationBackend: statement.schema() is not available on this handle'); + // The schema lives on the fetch handle: the metadata `Statement` + // directly, or the async path's `AsyncResultHandle` (materialised by + // `getFetchHandle()` once the statement is terminal). + const handle = await this.getFetchHandle(); + if (!handle.schema) { + throw new HiveDriverError('SeaOperationBackend: schema() is not available on this handle'); } // `schema()` is a synchronous napi getter (returns `ArrowSchema`, not a // Promise) — no `await` needed. - const arrowSchemaIpc = this.statement.schema(); + const arrowSchemaIpc = handle.schema(); const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes); // `ResultMetadata.schema` keeps the Thrift `TTableSchema` shape for // back-compat with the public `IOperation.getSchema()` surface. @@ -229,60 +367,288 @@ export default class SeaOperationBackend implements IOperationBackend { // --------------------------------------------------------------------------- public async status(_progress: boolean): Promise { - // Synthesised — the kernel resolves `Statement::execute().await` before - // it hands back a Statement handle, so by the time a SeaOperationBackend - // exists the statement is terminal. Note there is intentionally no - // `Failed` arm: a failed execution rejects inside `executeStatement` - // (the kernel surfaces the error at submit), so a `Failed` statement - // never becomes a SeaOperationBackend — `status()` only ever observes - // Succeeded, or Cancelled/Closed from a client-side lifecycle call. - // Report Cancelled/Closed if the lifecycle flag is set, else Succeeded. - // Returns the backend-neutral OperationStatus the IOperationBackend - // contract expects, so the DBSQLOperation facade switches on `state` - // identically across backends. + // A client-side cancel/close wins over any server state. if (this.lifecycle.isCancelled) { return { state: OperationState.Cancelled, hasResultSet: true }; } if (this.lifecycle.isClosed) { return { state: OperationState.Closed, hasResultSet: true }; } + if (this.asyncStatement) { + // Async query path: report the real kernel state (single + // GetStatementStatus RPC — no polling here; `waitUntilReady` owns the + // poll loop). + const state = statusStringToOperationState(await this.asyncStatement.status()); + return { state, hasResultSet: true }; + } + if (this.cancellableExecution) { + // Sync (`runAsync: false`) path: the kernel `execute()` blocks and polls + // server-side; there is no per-status RPC to query while it runs. Report + // Running until `result()` has materialised the terminal statement, then + // Succeeded — mirroring the kernel's blocking-then-terminal lifecycle. + const state = this.fetchHandlePromise ? OperationState.Succeeded : OperationState.Running; + return { state, hasResultSet: true }; + } + // Metadata path: the kernel statement is already terminal. return { state: OperationState.Succeeded, hasResultSet: true }; } public async waitUntilReady(options?: IOperationBackendWaitOptions): Promise { - // Kernel's `Statement::execute().await` has already resolved by the - // time we hold a Statement handle — there is no pending/running - // state to poll for M0. seaFinished fires the progress callback - // once with a synthesised FINISHED response so progress-UI callers - // see the same one-shot completion tick the Thrift path emits at - // the end of its polling loop. + if (this.asyncStatement) { + return this.waitUntilReadyAsync(options); + } + if (this.cancellableExecution) { + return this.waitUntilReadyCancellable(options); + } + // Metadata path: the kernel statement has already resolved, so there is + // nothing to poll. seaFinished fires the progress callback once with a + // synthesised completion tick, matching the Thrift path's final tick. return seaFinished(this.lifecycle, options); } public async cancel(): Promise { - return seaCancel(this.lifecycle, this.statement, this.context, this._id); + return seaCancel(this.lifecycle, this.lifecycleHandle, this.context, this._id); } public async close(): Promise { - return seaClose(this.lifecycle, this.statement, this.context, this._id); + return seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id); } // --------------------------------------------------------------------------- // Internals. // --------------------------------------------------------------------------- + /** + * Poll the kernel `AsyncStatement` to a terminal state on a fixed 100ms + * cadence, mirroring the Thrift backend's `waitUntilReady` loop. We poll + * `status()` (a cheap GetStatementStatus RPC) rather than awaiting + * `awaitResult()` directly so that `status()` reports the real + * Pending/Running/Succeeded state to a progress callback each tick, and so a + * JS-initiated `cancel()`/`close()` is observed between ticks via + * `failIfNotActive`. On success it materialises the result handle (so the + * first fetch is free); on a server-driven terminal state it throws the typed + * error the `IOperationBackend` contract requires. + * + * Terminal errors are thrown as `OperationStateError` (NOT plain + * `HiveDriverError`) for Cancelled/Closed/Unknown, because the DBSQLOperation + * facade only mirrors its `cancelled`/`closed` flags when + * `err instanceof OperationStateError` — exactly as the Thrift backend does. + * The Failed branch surfaces the kernel's typed SQL-error envelope via + * `awaitResult()`. + */ + private async waitUntilReadyAsync(options?: IOperationBackendWaitOptions): Promise { + // Already materialised → terminal-and-ready, nothing to wait for. + if (this.fetchHandlePromise) { + return; + } + // Client-side timeout deadline: the kernel ignores queryTimeoutSecs on the + // async submit path, so we enforce the public `queryTimeout` here. + const deadline = this.queryTimeoutMs !== undefined ? Date.now() + this.queryTimeoutMs : undefined; + for (;;) { + // A JS-initiated cancel/close short-circuits before the next poll. + failIfNotActive(this.lifecycle); + + // eslint-disable-next-line no-await-in-loop + const state = statusStringToOperationState(await this.asyncStatement!.status()); + + if (options?.callback) { + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(options.callback({ state, hasResultSet: true })); + } + + switch (state) { + case OperationState.Pending: + case OperationState.Running: + break; + case OperationState.Succeeded: + // Materialise the result stream now so the first fetch/metadata call + // doesn't pay an extra await_result round-trip. + // eslint-disable-next-line no-await-in-loop + await this.getFetchHandle(); + return; + case OperationState.Failed: + // `status()` collapses Failed to the variant name only; the real + // SQL-error envelope (sql_state / error_code / query_id) rides on + // `awaitResult()`'s rejection — drive it to surface the typed error, + // then best-effort close the leaked statement before it propagates. + try { + // eslint-disable-next-line no-await-in-loop + await this.throwAsyncError(); + } catch (failErr) { + // eslint-disable-next-line no-await-in-loop + await this.bestEffortClose(); + throw failErr; + } + break; + case OperationState.Cancelled: + // eslint-disable-next-line no-await-in-loop + await this.bestEffortClose(); + throw new OperationStateError(OperationStateErrorCode.Canceled); + case OperationState.Closed: + // eslint-disable-next-line no-await-in-loop + await this.bestEffortClose(); + throw new OperationStateError(OperationStateErrorCode.Closed); + default: + // eslint-disable-next-line no-await-in-loop + await this.bestEffortClose(); + throw new OperationStateError(OperationStateErrorCode.Unknown); + } + + // Still Pending/Running — enforce the client-side timeout before sleeping. + if (deadline !== undefined && Date.now() >= deadline) { + // Best-effort server-side cancel so the statement doesn't keep running + // after we stop waiting; never mask the timeout with a cancel failure, + // but warn-log a failed cancel so a still-running server statement is + // diagnosable (mirrors the fetch-error cleanup path). + // eslint-disable-next-line no-await-in-loop + await this.cancel().catch((cancelErr) => { + const cause = cancelErr instanceof Error ? cancelErr.message : String(cancelErr); + this.context + .getLogger() + .log( + LogLevel.warn, + `SEA query-timeout cleanup: cancel() failed for operation ${this._id}; the server-side ` + + `statement may keep running until the session is closed. Cause: ${cause}`, + ); + }); + // Release the statement handle too (cancel stops the work; close frees + // the handle) — best-effort, mirroring the other terminal branches. + // eslint-disable-next-line no-await-in-loop + await this.bestEffortClose(); + throw new OperationStateError(OperationStateErrorCode.Timeout); + } + + // eslint-disable-next-line no-await-in-loop + await delay(STATUS_POLL_INTERVAL_MS); + } + } + + /** + * Sync (`runAsync: false`) execute path. Drives the blocking + * `CancellableExecution.result()` to a terminal `Statement` (the kernel polls + * to completion server-side, honouring `queryTimeoutSecs` on this path). The + * await is interruptible: a JS-initiated `cancel()` fires the detached + * canceller, the server flips the statement terminal, and the parked + * `result()` rejects with `Cancelled` — which we map to the typed + * `OperationStateError(Canceled)`. + * + * Unlike the async path there is no status poll loop (the kernel owns + * polling), so the progress callback fires once on completion, matching the + * metadata path's single completion tick. + */ + private async waitUntilReadyCancellable(options?: IOperationBackendWaitOptions): Promise { + // Already materialised → terminal-and-ready, nothing to wait for. + if (this.fetchHandlePromise) { + return; + } + // A JS-initiated cancel/close before we start short-circuits to the typed + // state error rather than dispatching the blocking execute. + failIfNotActive(this.lifecycle); + // `getFetchHandle()` drives `result()` and memoises the resolved Statement + // (also stored on `blockingStatement` so `close()` can reach it). + await this.getFetchHandle(); + // Single completion tick, matching the metadata path. + if (options?.callback) { + await Promise.resolve(options.callback({ state: OperationState.Succeeded, hasResultSet: true })); + } + } + + /** + * Drive `awaitResult()` on a Failed statement to surface the kernel's typed + * SQL-error envelope. Falls back to a generic error if `awaitResult()` + * unexpectedly resolves instead of rejecting. + */ + private async throwAsyncError(): Promise { + try { + await this.asyncStatement!.awaitResult(); + } catch (err) { + throw decodeNapiKernelError(err); + } + throw new HiveDriverError(`SEA operation ${this._id} reported Failed but produced a result.`); + } + + /** + * Best-effort close of the kernel statement when the poll loop ends on a + * server-driven terminal error (Failed/Cancelled/Closed/Unknown/Timeout). + * Without it the kernel-side statement handle leaks until session close (the + * poll loop, unlike `fetchChunk`, otherwise just throws). Never masks the + * original error; warn-logs a close failure so the leak is diagnosable. + */ + private async bestEffortClose(): Promise { + await seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id).catch((closeErr) => { + const cause = closeErr instanceof Error ? closeErr.message : String(closeErr); + this.context + .getLogger() + .log( + LogLevel.warn, + `SEA poll-loop cleanup: close() failed for operation ${this._id}; the server-side ` + + `statement may leak until the session is closed. Cause: ${cause}`, + ); + }); + } + + /** + * Resolve (and memoise) the fetch handle: `awaitResult()`'s `AsyncResultHandle` + * on the query path, or the already-terminal `Statement` on the metadata path. + */ + private getFetchHandle(): Promise { + if (!this.fetchHandlePromise) { + if (this.asyncStatement) { + this.fetchHandlePromise = this.asyncStatement.awaitResult().catch((err) => { + throw decodeNapiKernelError(err); + }) as Promise; + } else if (this.cancellableExecution) { + // Sync (`runAsync: false`) path: drive the blocking `result()` to the + // terminal `Statement`. Store it on `blockingStatement` so `close()` can + // reach it post-execute, and so a subsequent fetch uses it directly. + this.fetchHandlePromise = this.cancellableExecution + .result() + .then((stmt) => { + this.blockingStatement = stmt as unknown as SeaOperationStatement; + // Log the now-known server statement id (NOT surfaced via `id`, + // which must stay stable for telemetry correlation) so a sync op is + // correlatable to server/kernel logs by its client operation id. + const serverId = this.blockingStatement.statementId; + if (serverId && serverId !== this._id) { + this.context + .getLogger() + .log(LogLevel.debug, `SEA operation ${this._id} resolved to server statement_id ${serverId}`); + } + return stmt as unknown as SeaFetchHandle; + }) + .catch((err) => { + const mapped = decodeNapiKernelError(err); + // A cancel-induced rejection surfaces as the kernel's Cancelled + // error; map it to the typed `OperationStateError(Canceled)` so the + // `DBSQLOperation` facade mirrors its cancelled flag (it only does so + // for `OperationStateError`), matching the Thrift path. If the + // operation was cancelled client-side, prefer the typed code + // regardless of the kernel error text. + if (this.lifecycle.isCancelled) { + throw new OperationStateError(OperationStateErrorCode.Canceled); + } + throw mapped; + }); + } else { + const stmt = this.blockingStatement!; + if (!stmt.fetchNextBatch) { + throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle'); + } + this.fetchHandlePromise = Promise.resolve(stmt as unknown as SeaFetchHandle); + } + } + return this.fetchHandlePromise; + } + private async getResultSlicer(): Promise> { if (this.resultSlicer) { return this.resultSlicer; } - if (!this.statement.fetchNextBatch) { - throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle'); - } const metadata = await this.getResultMetadata(); - // The lifecycle subset has cancel/close only; fetch methods exist on - // the full napi Statement. Cast is safe here because we've just - // verified `fetchNextBatch` is callable. - this.resultsProvider = new SeaResultsProvider(this.statement as SeaStatement); + const handle = await this.getFetchHandle(); + // SeaResultsProvider consumes only `fetchNextBatch`; both the async result + // handle and the blocking statement satisfy that surface. + this.resultsProvider = new SeaResultsProvider(handle as unknown as SeaStatement); const converter = new ArrowResultConverter(this.context, this.resultsProvider, metadata); this.resultSlicer = new ResultSlicer(this.context, converter); return this.resultSlicer; diff --git a/lib/sea/SeaOperationLifecycle.ts b/lib/sea/SeaOperationLifecycle.ts index e640b2ab..d8b6b2c9 100644 --- a/lib/sea/SeaOperationLifecycle.ts +++ b/lib/sea/SeaOperationLifecycle.ts @@ -131,12 +131,17 @@ export async function seaCancel( context.getLogger().log(LogLevel.debug, `Cancelling SEA operation with id: ${operationId}`); + // Set the intent BEFORE the RPC and keep it set even if the cancel RPC + // fails: the caller asked to cancel, so the operation must stay cancelled + // (subsequent fetches fail fast, and any poll-loop observer that already saw + // the flag stays consistent). The RPC failure is surfaced via the rethrow — + // we do NOT roll the flag back, which would silently resurrect a cancelled + // operation while the kernel-side statement may still be running. state.isCancelled = true; try { await statement.cancel(); } catch (err) { - state.isCancelled = false; rethrowKernelError(err); } diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index e12e1d60..d593f87e 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -35,9 +35,11 @@ import ParameterError from '../errors/ParameterError'; import { LogLevel } from '../contracts/IDBSQLLogger'; import { SeaConnection, SeaNativeExecuteOptions, SeaStatement } from './SeaNativeLoader'; import { decodeNapiKernelError } from './SeaErrorMapping'; +import { numberToInt64 } from '../thrift-backend/ThriftSessionBackend'; import SeaOperationBackend from './SeaOperationBackend'; import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams'; import { seaServerInfoValue } from './SeaServerInfo'; +import { serializeQueryTags } from '../utils'; export interface SeaSessionBackendOptions { /** The opaque napi `Connection` handle returned by `openSession`. */ @@ -50,12 +52,12 @@ export interface SeaSessionBackendOptions { /** * SEA-backed implementation of `ISessionBackend`. * - * **M0 scope:** `executeStatement` + `close`. Metadata methods - * (`getCatalogs`, `getSchemas`, etc.) defer to M1 — they throw a clear - * `HiveDriverError` so consumers using SEA against metadata APIs get an - * actionable message instead of silently falling back. The Thrift - * backend continues to handle the metadata path by default (callers - * opt into SEA via `ConnectionOptions.useSEA`). + * **Scope:** `executeStatement` (sync + async), `close`, `getInfo`, and the + * full metadata surface (`getCatalogs`, `getSchemas`, `getTables`, + * `getColumns`, `getFunctions`, `getTableTypes`, `getTypeInfo`, + * `getPrimaryKeys`, `getCrossReference`) — each forwards to the kernel's napi + * metadata calls (see `runMetadata`). The Thrift backend remains the default; + * callers opt into the kernel path via `ConnectionOptions.useSEA`. * * **Session config flow:** catalog / schema / sessionConf are applied * once at session creation (kernel `Session::builder().defaults()` + @@ -116,51 +118,32 @@ export default class SeaSessionBackend implements ISessionBackend { /** * Execute a SQL statement through the napi binding. * - * Catalog / schema / sessionConf were applied at session open, so - * there are no per-statement options to thread through. + * Catalog / schema / sessionConf are session-level (applied at open). + * Per-statement options forwarded to the kernel `ExecuteOptions`: + * - `ordinalParameters` / `namedParameters` → bound params (mutually + * exclusive — the kernel binds one placeholder style per statement); + * - `queryTimeout` → enforced client-side by the operation backend's poll + * deadline (the kernel ignores `queryTimeoutSecs` on the async submit + * path), NOT forwarded to the napi options; + * - `rowLimit` → `rowLimit` (SEA-only server-side row cap); + * - `queryTags` → serialised into the conf overlay's reserved + * `query_tags` key (the same wire shape Thrift's `serializeQueryTags` + * produces), merged with any explicit `statementConf`. * - * M0 intentionally rejects `queryTimeout`, `namedParameters`, and - * `ordinalParameters` with explicit deferred-to-M1 errors. `useCloudFetch` - * is a no-op on the SEA path — the kernel hardcodes the SEA - * `disposition` to `INLINE_OR_EXTERNAL_LINKS`, and per-statement - * conf overrides have no reader on the kernel; cloud-fetch behaviour - * is governed entirely by the kernel's `ResultConfig` (M1 binding - * surface). - * - * The Thrift backend remains the path for consumers that need any - * of those today. + * Still rejected (genuinely unsupported on SEA, rather than silently + * dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a + * per-statement knob), `useLZ4Compression` (kernel owns result compression), + * and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by + * the facade at fetch time, so it is intentionally not handled here. */ public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise { this.failIfClosed(); - // Positional (`?`) and named (`:name`) parameters are mutually exclusive — - // the kernel param codec binds exactly one placeholder style per statement. - // Use the SAME error type and message as the Thrift backend - // (`ThriftSessionBackend.getQueryParameters`) so a caller catching - // `ParameterError` for this case behaves identically across backends. - const positionalParams = buildSeaPositionalParams(options.ordinalParameters); - const namedParams = buildSeaNamedParams(options.namedParameters); - if (positionalParams !== undefined && namedParams !== undefined) { - throw new ParameterError('Driver does not support both ordinal and named parameters.'); - } - - if (options.queryTimeout !== undefined) { - throw new HiveDriverError('SEA executeStatement: queryTimeout is not supported in M0 (deferred to M1)'); - } if (options.useCloudFetch !== undefined) { throw new HiveDriverError( 'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA', ); } - // Reject — rather than silently ignore — the remaining Thrift-path - // options the SEA M0 backend does not honor. Silently dropping them - // is the worst failure mode for an agent/caller: passing e.g. - // `queryTags` or `useLZ4Compression` would no-op with zero signal. - // (`maxRows` is intentionally NOT here — the facade applies it at - // fetch time.) - if (options.queryTags !== undefined) { - throw new HiveDriverError('SEA executeStatement: queryTags is not supported in M0 (deferred to M1)'); - } if (options.useLZ4Compression !== undefined) { throw new HiveDriverError( 'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)', @@ -168,30 +151,143 @@ export default class SeaSessionBackend implements ISessionBackend { } if (options.stagingAllowedLocalPath !== undefined) { throw new HiveDriverError( - 'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported in M0 (deferred to M1)', + 'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA', ); } - // Only build the napi options object when there is something to send — - // the no-params path keeps the minimal call shape (`executeStatement(sql)`). - let execOptions: SeaNativeExecuteOptions | undefined; - if (positionalParams !== undefined || namedParams !== undefined) { - execOptions = { positionalParams, namedParams }; + // `runAsync` selects the kernel execution path. NOTE: this is a SEA/kernel- + // specific use of the option — the Thrift backend hardcodes `runAsync: true` + // on the wire and never reads `options.runAsync`, so the field is a no-op + // there. The only observable difference between the two SEA paths is WHEN + // `executeStatement` resolves; the public API, result shape, schema, and + // error classes are identical on both (and to Thrift). See the option's + // JSDoc in `IDBSQLSession` for the cross-backend contract. + // + // - DEFAULT (`runAsync` false/undefined) — SYNC. Route through + // `executeStatementCancellable`: the kernel blocks on `execute()` + // (server-side direct-results / poll-to-terminal), which is faster and, + // with the napi sync canceller, fully cancellable mid-COMPUTE. The + // blocking drive runs in the operation backend's `result()` (inside + // `waitUntilReady`, which the facade invokes lazily at first fetch). + // `queryTimeoutSecs` IS honoured on this path (forwarded to the napi + // options below) since the kernel `execute()` consults it. + // + // - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server + // returns a pending `AsyncStatement` immediately while the query runs; + // the backend polls `status()` to terminal in `waitUntilReady()` and + // materialises results via `awaitResult()`. `queryTimeoutSecs` is + // ignored by the kernel on submit, so it is enforced client-side by the + // operation backend's poll-loop deadline instead. + const runAsync = options.runAsync ?? false; + const queryTimeoutSecs = + options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined; + + if (!runAsync) { + // Sync path: forward `queryTimeoutSecs` to the napi options — the kernel + // `execute()` honours it (server statement timeout). + const execOptions = this.buildExecuteOptions(options, queryTimeoutSecs); + let cancellableExecution; + try { + cancellableExecution = + execOptions === undefined + ? await this.connection.executeStatementCancellable(statement) + : await this.connection.executeStatementCancellable(statement, execOptions); + } catch (err) { + throw this.logAndMapError('executeStatement', err); + } + return new SeaOperationBackend({ + cancellableExecution: cancellableExecution!, + context: this.context, + // The kernel honours `queryTimeoutSecs` on the sync `execute` path, so + // it is forwarded via the napi options (see `buildExecuteOptions`); the + // backend also keeps it as a deadline guard for parity with async. + queryTimeoutSecs, + }); } - let nativeStatement: SeaStatement; + // Async path: do NOT forward `queryTimeoutSecs` (the kernel ignores it on + // submit — `wait_timeout=0s`); it is enforced client-side by the poll loop. + const execOptions = this.buildExecuteOptions(options); + let asyncStatement; try { - nativeStatement = + asyncStatement = execOptions === undefined - ? await this.connection.executeStatement(statement) - : await this.connection.executeStatement(statement, execOptions); + ? await this.connection.submitStatement(statement) + : await this.connection.submitStatement(statement, execOptions); } catch (err) { throw this.logAndMapError('executeStatement', err); } - return this.wrapStatement(nativeStatement); + // `queryTimeout` is enforced client-side by the operation backend's poll + // loop: the kernel ignores `queryTimeoutSecs` on the async submit path + // (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward + // it to the napi options — passing it there would be a silent no-op. + return new SeaOperationBackend({ + asyncStatement: asyncStatement!, + context: this.context, + // `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()` + // coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf). + queryTimeoutSecs, + }); + } + + /** + * Translate the public `ExecuteStatementOptions` into the kernel napi + * `ExecuteOptions`, returning `undefined` when nothing is set so the + * no-options call shape (`executeStatement(sql)`) is preserved. + */ + private buildExecuteOptions( + options: ExecuteStatementOptions, + queryTimeoutSecs?: number, + ): SeaNativeExecuteOptions | undefined { + // Positional (`?`) and named (`:name`) parameters are mutually exclusive — + // the kernel binds one placeholder style per statement. Use the SAME error + // type and message as the Thrift backend (`ThriftSessionBackend`) so a + // caller catching `ParameterError` behaves identically across backends. + const positionalParams = buildSeaPositionalParams(options.ordinalParameters); + const namedParams = buildSeaNamedParams(options.namedParameters); + if (positionalParams !== undefined && namedParams !== undefined) { + throw new ParameterError('Driver does not support both ordinal and named parameters.'); + } + + const execOptions: SeaNativeExecuteOptions = {}; + if (positionalParams !== undefined) { + execOptions.positionalParams = positionalParams; + } + if (namedParams !== undefined) { + execOptions.namedParams = namedParams; + } + // `queryTimeoutSecs` is forwarded only on the SYNC path (the caller passes + // it in): the kernel `execute()` consults it as the server statement + // timeout. On the async submit path the caller omits it (the kernel ignores + // it under `wait_timeout=0s`), so it is enforced client-side by the + // operation backend's poll-loop deadline instead (see executeStatement). + if (queryTimeoutSecs !== undefined) { + execOptions.queryTimeoutSecs = queryTimeoutSecs; + } + if (options.rowLimit !== undefined) { + execOptions.rowLimit = Number(options.rowLimit); + } + // Per-statement conf overlay plus query tags. Tags are serialised JS-side + // into the reserved `query_tags` key (the same wire shape the Thrift + // backend produces via `serializeQueryTags` → `confOverlay`), rather than + // via the napi `queryTags` field: napi's `HashMap` can't + // represent a null-valued tag, and the kernel rejects setting both the + // `queryTags` field and a `query_tags` conf key. + const serializedQueryTags = serializeQueryTags(options.queryTags); + if (options.statementConf !== undefined || serializedQueryTags !== undefined) { + const statementConf: Record = { ...(options.statementConf ?? {}) }; + if (serializedQueryTags !== undefined) { + statementConf.query_tags = serializedQueryTags; + } + if (Object.keys(statementConf).length > 0) { + execOptions.statementConf = statementConf; + } + } + + return Object.keys(execOptions).length > 0 ? execOptions : undefined; } - /** Wrap a napi `Statement` (from execute or a metadata call) as an operation backend. */ + /** Wrap a napi metadata `Statement` (already terminal) as an operation backend. */ private wrapStatement(nativeStatement: SeaStatement): IOperationBackend { return new SeaOperationBackend({ statement: nativeStatement, diff --git a/lib/thrift-backend/ThriftSessionBackend.ts b/lib/thrift-backend/ThriftSessionBackend.ts index c103ab4f..d8db377e 100644 --- a/lib/thrift-backend/ThriftSessionBackend.ts +++ b/lib/thrift-backend/ThriftSessionBackend.ts @@ -170,6 +170,20 @@ export default class ThriftSessionBackend implements ISessionBackend { const driver = await this.context.getDriver(); const clientConfig = this.context.getConfig(); + // `rowLimit` / `statementConf` are kernel-backend (SEA) options with no + // Thrift wire equivalent. Surface a debug breadcrumb rather than dropping + // them silently, so a caller that set them on the Thrift path has signal. + if (options.rowLimit !== undefined || options.statementConf !== undefined) { + this.context + .getLogger() + .log( + LogLevel.warn, + 'ThriftSessionBackend.executeStatement: rowLimit / statementConf are kernel-backend (useSEA) ' + + 'options with no Thrift wire equivalent — they are IGNORED on the Thrift path (e.g. rowLimit ' + + 'will not cap the result set). Use the kernel backend (useSEA) to honour them.', + ); + } + const request = new TExecuteStatementReq({ sessionHandle: this.sessionHandle, statement, diff --git a/native/sea/index.d.ts b/native/sea/index.d.ts index 59283a88..4ecd1ad6 100644 --- a/native/sea/index.d.ts +++ b/native/sea/index.d.ts @@ -51,11 +51,15 @@ export interface ExecuteOptions { * raises `InvalidArgument` — the caller's intent is ambiguous * so we refuse to silently pick one over the other. * + * A **`null`** value emits a **bare key** (no colon) — e.g. + * `{ production: null }` → `"production"` — matching the + * connectors' `key`-only tag form. + * * See the struct-level "Tag-order caveat" for the * HashMap-iteration-order vs `Object.keys`-iteration-order * divergence and the byte-identical-Thrift-parity workaround. */ - queryTags?: Record + queryTags?: Record /** * Server-side cap on the number of rows this statement returns * (SEA `row_limit`), independent of any SQL `LIMIT`. Maps to @@ -481,6 +485,66 @@ export declare class AsyncResultHandle { */ schema(): ArrowSchema } +/** + * Handle returned by `Connection.executeStatementCancellable`. Owns the + * built-but-not-yet-executed kernel `Statement` plus a detached + * [`StatementCanceller`] captured before dispatch, so JS can fire a + * server-side cancel while the blocking `result()` is in flight. + * + * `pending` is `Arc>>` so `result()` can + * `.take()` the statement (the kernel `execute()` borrows it `&mut`, + * then it moves into the produced `Statement` wrapper to keep its + * `ValidityFlag` set — see `statement.rs`). A second `result()` call + * after the first resolved surfaces `InvalidStatementHandle`. + */ +export declare class CancellableExecution { + /** + * The server-issued statement id this execution targets, if the + * server has issued one yet (`null` before the initial submit + * round-trip publishes it mid-`result()`). Useful for log + * correlation while the blocking drive is in flight. + */ + get statementId(): string | null + /** + * Drive the blocking `execute()` and resolve to a `Statement` + * (identical to what `executeStatement` returns) once the kernel + * reaches a terminal state and the result stream is ready. + * + * Consumes the pending statement: a second `result()` call returns + * `KernelError(InvalidStatementHandle)`. The future is + * drop-cancel-safe — the kernel's per-execute `MidExecuteCancelState` + * guard fires a fire-and-forget `cancel_statement` if this future is + * dropped mid-flight (`Promise.race` / timeout loser), independently + * of an explicit `cancel()`. + * + * On a server-side cancel the kernel's blocking `execute()` currently + * surfaces `InvalidArgument` (a known kernel quirk — the async path + * returns `Cancelled`). When this handle's `cancel()` actually dispatched a + * server-side cancel, we normalise that into `Cancelled` here so JS callers + * can rely on a single cancelled-status code regardless of execution path. + * + * Three outcomes can race the blocking drive: (1) a natural terminal state + * → `Ok` or the genuine error; (2) an explicit `cancel()` that dispatched a + * server cancel → this `result()` rejects with a `Cancelled`-coded error + * (the normalisation above); (3) the future being **dropped** mid-flight + * (`Promise.race`/timeout loser) → the kernel's `MidExecuteCancelState` + * drop-guard fires a fire-and-forget `cancel_statement`, but there is no + * `result()` left to observe a code. Only (2) yields a `Cancelled` error. + */ + result(): Promise + /** + * Server-side cancel of the in-flight statement. + * + * Lock-free: fires the detached `StatementCanceller` captured at + * construction rather than taking the mutex `result()` holds, so it + * interrupts a still-running blocking `result()` instead of queueing + * behind it. No-op (returns `Ok`) if `result()` already finished + * successfully, or if no statement id has been observed yet (query still + * in its initial submit round-trip), and idempotent against a server + * already in a terminal state. + */ + cancel(): Promise +} /** * Opaque connection handle wrapping a kernel `Session`. * @@ -526,6 +590,29 @@ export declare class Connection { * omission to keep the wire shape stable for the common case. */ executeStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise + /** + * Execute a SQL statement on the blocking (sync) path, but return a + * `CancellableExecution` handle so a concurrent JS task can cancel + * the query *while it is still running server-side*. + * + * `executeStatement` builds the kernel `Statement`, awaits the + * blocking `execute()`, and only then hands JS a `Statement` — so a + * query that runs for several seconds is uncancellable from JS on + * that path (there is no handle until the blocking call resolves). + * This method instead builds the statement, captures a detached + * `StatementCanceller` **before** dispatching `execute()`, and hands + * JS a `CancellableExecution` immediately. The caller drives the + * blocking execution via `result()` (resolves to the same + * `Statement` `executeStatement` returns) and can fire `cancel()` + * concurrently to interrupt a still-running query mid-COMPUTE. + * + * Option semantics are identical to `executeStatement` — including + * `queryTimeoutSecs`, which IS honoured here (this is the sync + * `execute` path; only the async `submitStatement` ignores it). + * Mirrors the pyo3 `Statement.canceller()` / `Statement.execute()` + * split (PR #121): obtain the canceller before the blocking drive. + */ + executeStatementCancellable(sql: string, options?: ExecuteOptions | undefined | null): Promise /** * Submit a SQL statement and return immediately with an * `AsyncStatement` handle, without blocking until the query diff --git a/native/sea/index.js b/native/sea/index.js index 44fa227a..d800a6cd 100644 --- a/native/sea/index.js +++ b/native/sea/index.js @@ -310,10 +310,11 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { AsyncStatement, AsyncResultHandle, Connection, AuthMode, openSession, Statement, version } = nativeBinding +const { AsyncStatement, AsyncResultHandle, CancellableExecution, Connection, AuthMode, openSession, Statement, version } = nativeBinding module.exports.AsyncStatement = AsyncStatement module.exports.AsyncResultHandle = AsyncResultHandle +module.exports.CancellableExecution = CancellableExecution module.exports.Connection = Connection module.exports.AuthMode = AuthMode module.exports.openSession = openSession diff --git a/tests/unit/DBSQLParameter.test.ts b/tests/unit/DBSQLParameter.test.ts index a3f7659e..deefb13e 100644 --- a/tests/unit/DBSQLParameter.test.ts +++ b/tests/unit/DBSQLParameter.test.ts @@ -101,4 +101,27 @@ describe('DBSQLParameter', () => { expect(dbsqlParam.toSparkParameter()).to.deep.equal(expectedParam); } }); + + it('maps timezone-explicit timestamp types to valid Spark wire types', () => { + // TIMESTAMP_NTZ is a real Spark type → bound verbatim. + expect( + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_NTZ, value: '2024-01-15 10:30:00' }).toSparkParameter(), + ).to.deep.equal( + new TSparkParameter({ + type: DBSQLParameterType.TIMESTAMP_NTZ, + value: new TSparkParameterValue({ stringValue: '2024-01-15 10:30:00' }), + }), + ); + // TIMESTAMP_LTZ has no distinct Spark type → bound as TIMESTAMP (valid on + // both Thrift and kernel; the old verbatim 'TIMESTAMP_LTZ' was rejected by + // the Thrift server). + expect( + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_LTZ, value: '2024-01-15 10:30:00' }).toSparkParameter(), + ).to.deep.equal( + new TSparkParameter({ + type: DBSQLParameterType.TIMESTAMP, + value: new TSparkParameterValue({ stringValue: '2024-01-15 10:30:00' }), + }), + ); + }); }); diff --git a/tests/unit/sea/auth-m2m.test.ts b/tests/unit/sea/auth-m2m.test.ts index a4f90ed5..159afe1d 100644 --- a/tests/unit/sea/auth-m2m.test.ts +++ b/tests/unit/sea/auth-m2m.test.ts @@ -35,6 +35,7 @@ describe('SeaAuth + SeaBackend — OAuth M2M auth flow', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthM2m', oauthClientId: 'client-uuid', oauthClientSecret: 'dose-fake-secret', @@ -165,6 +166,7 @@ describe('SeaAuth + SeaBackend — OAuth M2M auth flow', () => { expect(calls[0].args[0]).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthM2m', oauthClientId: 'client-uuid', oauthClientSecret: 'dose-fake-secret', diff --git a/tests/unit/sea/auth-pat.test.ts b/tests/unit/sea/auth-pat.test.ts index f59b445c..bd82eb87 100644 --- a/tests/unit/sea/auth-pat.test.ts +++ b/tests/unit/sea/auth-pat.test.ts @@ -31,6 +31,7 @@ describe('SeaAuth — PAT auth options builder', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'Pat', token: 'dapi-fake-pat', }); diff --git a/tests/unit/sea/auth-u2m.test.ts b/tests/unit/sea/auth-u2m.test.ts index c8f63fef..828ca961 100644 --- a/tests/unit/sea/auth-u2m.test.ts +++ b/tests/unit/sea/auth-u2m.test.ts @@ -33,6 +33,7 @@ describe('SeaAuth + SeaBackend — OAuth U2M auth flow', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthU2m', oauthRedirectPort: 8030, }); @@ -132,6 +133,7 @@ describe('SeaAuth + SeaBackend — OAuth U2M auth flow', () => { expect(calls[0].args[0]).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthU2m', oauthRedirectPort: 8030, }); diff --git a/tests/unit/sea/connectionOptions.test.ts b/tests/unit/sea/connectionOptions.test.ts new file mode 100644 index 00000000..4869bd16 --- /dev/null +++ b/tests/unit/sea/connectionOptions.test.ts @@ -0,0 +1,121 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { expect } from 'chai'; +import { buildSeaConnectionOptions, buildSeaTlsOptions } from '../../../lib/sea/SeaAuth'; +import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import HiveDriverError from '../../../lib/errors/HiveDriverError'; + +const PAT = { host: 'h.databricks.com', path: '/sql/1.0/warehouses/abc', token: 'dapi-x' }; + +// Cast helper: the SEA connection-tuning/TLS options live on the internal +// surface, so tests build untyped option literals. +const opts = (extra: Record) => ({ ...PAT, ...extra } as unknown as ConnectionOptions); + +describe('SeaAuth connection options — intervalsAsString default', () => { + it('always sets intervalsAsString:true (thrift-compatible interval rendering)', () => { + const native = buildSeaConnectionOptions(opts({})) as { intervalsAsString?: boolean }; + expect(native.intervalsAsString).to.equal(true); + }); + + it('does NOT force complexTypesAsJson (native Arrow nested types match Thrift)', () => { + const native = buildSeaConnectionOptions(opts({})) as { complexTypesAsJson?: boolean }; + expect(native.complexTypesAsJson).to.equal(undefined); + }); +}); + +describe('SeaAuth connection options — maxConnections', () => { + it('forwards a valid positive integer', () => { + const native = buildSeaConnectionOptions(opts({ maxConnections: 10 })) as { maxConnections?: number }; + expect(native.maxConnections).to.equal(10); + }); + + it('omits maxConnections when unset', () => { + const native = buildSeaConnectionOptions(opts({})) as { maxConnections?: number }; + expect(native.maxConnections).to.equal(undefined); + }); + + for (const bad of [0, -1, 1.5]) { + it(`rejects non-positive-integer maxConnections (${bad})`, () => { + expect(() => buildSeaConnectionOptions(opts({ maxConnections: bad }))).to.throw( + HiveDriverError, + /positive integer/, + ); + }); + } + + it('rejects maxConnections beyond the u32 limit', () => { + expect(() => buildSeaConnectionOptions(opts({ maxConnections: 0x1_0000_0000 }))).to.throw( + HiveDriverError, + /u32 limit/, + ); + }); +}); + +describe('SeaAuth TLS options (buildSeaTlsOptions)', () => { + it('is empty by default (secure-by-default — kernel default verify-on)', () => { + expect(buildSeaTlsOptions(opts({}))).to.deep.equal({}); + }); + + it('passes checkServerCertificate through verbatim (including false)', () => { + expect(buildSeaTlsOptions(opts({ checkServerCertificate: false }))).to.deep.equal({ + checkServerCertificate: false, + }); + expect(buildSeaTlsOptions(opts({ checkServerCertificate: true }))).to.deep.equal({ + checkServerCertificate: true, + }); + }); + + it('normalises a PEM string to a Buffer', () => { + const pem = '-----BEGIN CERTIFICATE-----\nMIIB...\n-----END CERTIFICATE-----\n'; + const tls = buildSeaTlsOptions(opts({ customCaCert: pem })); + expect(Buffer.isBuffer(tls.customCaCert)).to.equal(true); + expect(tls.customCaCert?.toString('utf8')).to.equal(pem); + }); + + it('passes a Buffer customCaCert through unchanged', () => { + const buf = Buffer.from('-----BEGIN CERTIFICATE-----\nx\n-----END CERTIFICATE-----'); + expect(buildSeaTlsOptions(opts({ customCaCert: buf })).customCaCert).to.equal(buf); + }); + + it('rejects a non-PEM string', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: 'not-a-pem' }))).to.throw(HiveDriverError, /PEM certificate/); + }); + + it('rejects out-of-order / partial PEM markers (ordered match, not two substrings)', () => { + // END-before-BEGIN, BEGIN-only, and END-only must all fail — a blob that + // merely *contains* both literals (e.g. a proxy-intercept page) is not a cert. + const reversed = '-----END CERTIFICATE-----\nMIIB...\n-----BEGIN CERTIFICATE-----'; + const beginOnly = '-----BEGIN CERTIFICATE-----\nMIIB...\n'; + const endOnly = 'MIIB...\n-----END CERTIFICATE-----'; + for (const bad of [reversed, beginOnly, endOnly]) { + expect(() => buildSeaTlsOptions(opts({ customCaCert: bad })), bad).to.throw(HiveDriverError, /PEM certificate/); + } + }); + + it('rejects an empty Buffer', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: Buffer.alloc(0) }))).to.throw(HiveDriverError, /empty/); + }); + + it('rejects a non-string, non-Buffer customCaCert', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: 123 }))).to.throw(HiveDriverError, /PEM string or a Buffer/); + }); + + it('folds TLS options into the full connection options', () => { + const native = buildSeaConnectionOptions(opts({ checkServerCertificate: false })) as { + checkServerCertificate?: boolean; + }; + expect(native.checkServerCertificate).to.equal(false); + }); +}); diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index fefc79f7..81cdfadd 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -14,6 +14,7 @@ import { expect } from 'chai'; import sinon from 'sinon'; +import Int64 from 'node-int64'; import SeaBackend from '../../../lib/sea/SeaBackend'; import SeaSessionBackend from '../../../lib/sea/SeaSessionBackend'; import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; @@ -22,7 +23,9 @@ import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientCont import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; import HiveDriverError from '../../../lib/errors/HiveDriverError'; import ParameterError from '../../../lib/errors/ParameterError'; +import OperationStateError, { OperationStateErrorCode } from '../../../lib/errors/OperationStateError'; import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import { OperationState } from '../../../lib/contracts/OperationStatus'; // ----------------------------------------------------------------------------- // Fakes — minimal stand-ins for the napi-rs generated surface and the @@ -73,6 +76,110 @@ class FakeNativeStatement implements SeaStatement { } } +/** + * Fake `AsyncStatement` (the `submitStatement` return). `status()` reports a + * configurable state (default Succeeded); `awaitResult()` yields a fetch handle + * (reuses `FakeNativeStatement`'s fetchNextBatch/schema surface). + */ +class FakeAsyncStatement { + public cancelled = false; + + public closed = false; + + public statusCalls = 0; + + public awaitResultError: Error | null = null; + + // Successive status() returns drain this queue; the last value sticks. + private readonly states: string[]; + + public readonly statementId = '01ef-fake-async-id'; + + constructor( + statusValue: string | string[] = 'Succeeded', + public readonly resultHandle: FakeNativeStatement = new FakeNativeStatement(), + ) { + this.states = Array.isArray(statusValue) ? [...statusValue] : [statusValue]; + } + + public async status(): Promise { + this.statusCalls += 1; + return this.states.length > 1 ? (this.states.shift() as string) : this.states[0]; + } + + public async awaitResult(): Promise { + if (this.awaitResultError) { + throw this.awaitResultError; + } + return this.resultHandle; + } + + public async cancel(): Promise { + this.cancelled = true; + } + + public async close(): Promise { + this.closed = true; + } +} + +/** + * Fake `CancellableExecution` (the `executeStatementCancellable` return — the + * sync `runAsync: false` query path). `result()` drives the (already-terminal, + * in the fake) execution and yields the terminal statement fetch handle; + * `cancel()` flips a flag and, if armed, makes a pending `result()` reject with + * a Cancelled-shaped kernel error to model mid-compute interruption. + */ +class FakeCancellableExecution { + public cancelled = false; + + public resultCalls = 0; + + public resultError: Error | null = null; + + // Mirrors the real `CancellableExecution.statementId`: `null` until the + // initial execute round-trip publishes the server id mid-`result()`. The + // resolved `Statement` (resultHandle) carries the id (`FakeNativeStatement`). + public readonly statementId: string | null = null; + + // When set, the result() promise stays pending until cancel() rejects it, + // modelling a still-running blocking execute that a concurrent cancel aborts. + private pendingResolve?: (stmt: FakeNativeStatement) => void; + + private pendingReject?: (err: Error) => void; + + constructor(public readonly resultHandle: FakeNativeStatement = new FakeNativeStatement()) {} + + // When true, result() does not resolve until cancel()/an error fires. + public block = false; + + public async result(): Promise { + this.resultCalls += 1; + if (this.resultError) { + throw this.resultError; + } + if (this.block) { + return new Promise((resolve, reject) => { + this.pendingResolve = resolve; + this.pendingReject = reject; + }); + } + return this.resultHandle; + } + + public async cancel(): Promise { + this.cancelled = true; + // Model the server flipping the statement terminal: a parked result() + // rejects with the kernel's Cancelled error envelope. + if (this.pendingReject) { + const err = new Error('statement cancelled'); + this.pendingReject(err); + this.pendingReject = undefined; + this.pendingResolve = undefined; + } + } +} + class FakeNativeConnection implements SeaConnection { public closed = false; @@ -93,8 +200,19 @@ class FakeNativeConnection implements SeaConnection { // Mirrors the kernel `Connection.sessionId` getter. public readonly sessionId = '01ef-fake-session-id'; - // The merged kernel binding takes an optional per-statement `ExecuteOptions` - // (positional/named params, statementConf, …). Record it for assertions. + // Last AsyncStatement handed out by submitStatement (the async query path). + public lastAsyncStatement?: FakeAsyncStatement; + + // The async submit state(s) the next FakeAsyncStatement should report. + public submitStatusValue: string | string[] = 'Succeeded'; + + // Last CancellableExecution handed out by executeStatementCancellable (the + // sync `runAsync: false` query path — the DEFAULT). + public lastCancellableExecution?: FakeCancellableExecution; + + // The bare blocking executeStatement path: the SEA backend's sync default + // routes through executeStatementCancellable (below), but the binding still + // exposes this for completeness. public async executeStatement(sql: string, options?: unknown): Promise { if (this.throwOnExecute) { throw this.throwOnExecute; @@ -104,10 +222,28 @@ class FakeNativeConnection implements SeaConnection { return this.statementToReturn; } - // Async-submit path (PR 2 territory); present only so the fake satisfies - // the full `Connection` surface. Not exercised by these tests. - public submitStatement(): Promise { - throw new Error('submitStatement not used in this test'); + // Sync (`runAsync: false`, the DEFAULT) query path: records sql + options and + // returns a pending CancellableExecution whose result() drives the execute. + public async executeStatementCancellable(sql: string, options?: unknown): Promise { + if (this.throwOnExecute) { + throw this.throwOnExecute; + } + this.lastSql = sql; + this.lastOptions = options; + this.lastCancellableExecution = new FakeCancellableExecution(); + return this.lastCancellableExecution; + } + + // Async-submit path: records sql + per-statement options (for forwarding + // assertions) and returns a pending AsyncStatement. + public async submitStatement(sql: string, options?: unknown): Promise { + if (this.throwOnExecute) { + throw this.throwOnExecute; + } + this.lastSql = sql; + this.lastOptions = options; + this.lastAsyncStatement = new FakeAsyncStatement(this.submitStatusValue); + return this.lastAsyncStatement; } private recordMetadata(method: string, args: unknown[]): Promise { @@ -190,8 +326,8 @@ function makeBinding(connection: SeaConnection): SeaNativeBinding & { return Object.assign(binding, { openSessionStub }); } -function makeContext(): IClientContext { - const logger: IDBSQLLogger = { +function makeContext(logger?: IDBSQLLogger): IClientContext { + const log: IDBSQLLogger = logger ?? { log(_level: LogLevel, _message: string): void { // no-op }, @@ -199,7 +335,7 @@ function makeContext(): IClientContext { const config = {} as ClientConfig; return { getConfig: () => config, - getLogger: () => logger, + getLogger: () => log, getConnectionProvider: async () => { throw new Error('not used by SEA backend'); }, @@ -312,11 +448,14 @@ describe('SeaBackend', () => { const args = binding.openSessionStub.firstCall.args[0]; // sea-auth-u2m introduced the discriminated SeaNativeConnectionOptions // shape with a leading `authMode` tag — `'Pat'` for the PAT branch. + // `intervalsAsString: true` is always set so the SEA result shape is a + // byte-compatible drop-in for the Thrift backend (interval-as-string). expect(args).to.deep.equal({ hostName: 'workspace.example', httpPath: '/sql/1.0/warehouses/xyz', authMode: 'Pat', token: 'dapi-token', + intervalsAsString: true, }); }); @@ -452,24 +591,110 @@ describe('SeaSessionBackend', () => { expect((thrown as Error).message).to.equal('Driver does not support both ordinal and named parameters.'); }); - it('executeStatement rejects queryTimeout (M1)', async () => { + it('executeStatement (sync default) DOES forward queryTimeout to the napi options', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { queryTimeout: 30 }); + // Sync path: the kernel `execute()` honours queryTimeoutSecs (server + // statement timeout), so the backend forwards it onto the napi options. + expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(30); + }); + + it('executeStatement (runAsync: true) does NOT forward queryTimeout to submit (kernel ignores it; enforced client-side)', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); + await session.executeStatement('SELECT 1', { queryTimeout: 30, runAsync: true }); + // Async submit path: the kernel ignores queryTimeoutSecs under + // `wait_timeout=0s`, so it's enforced client-side by the poll deadline + // instead — never forwarded to the napi options. + expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined); + }); + + it('coerces an Int64 queryTimeout into the client-side deadline on the async path (not NaN)', async function int64Timeout() { + // Regression: `Number(new Int64(...))` yields NaN (node-int64 has no valueOf), + // which would silently disable the deadline. The backend must coerce via + // numberToInt64(...).toNumber() so an Int64 queryTimeout still bounds the poll. + // Exercised on the async path, where the client-side poll deadline applies. + // eslint-disable-next-line no-invalid-this + this.timeout(5000); + const connection = new FakeNativeConnection(); + connection.submitStatusValue = 'Running'; // never reaches a terminal state + const session = makeSession(connection); + const op = await session.executeStatement('SELECT 1', { queryTimeout: new Int64(1), runAsync: true }); let thrown: unknown; try { - await session.executeStatement('SELECT 1', { queryTimeout: 30 }); + await op.waitUntilReady(); } catch (err) { thrown = err; } - expect(thrown).to.be.instanceOf(HiveDriverError); - expect((thrown as Error).message).to.match(/queryTimeout/); + expect(thrown, 'Int64(1) timeout must fire — NaN would poll forever').to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout); + }); + + it('executeStatement forwards rowLimit', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { rowLimit: 100 }); + expect((connection.lastOptions as { rowLimit?: number }).rowLimit).to.equal(100); + }); + + it('executeStatement serialises queryTags into statementConf.query_tags', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { queryTags: { team: 'x', env: 'prod' } }); + const conf = (connection.lastOptions as { statementConf?: Record }).statementConf; + expect(conf).to.have.property('query_tags'); + expect(conf?.query_tags).to.contain('team:x').and.to.contain('env:prod'); }); - // These Thrift-path options are not honored on SEA M0. Rejecting them - // (rather than silently ignoring) is the contract a caller/agent needs: - // a silent no-op gives zero signal to debug. + it('executeStatement merges explicit statementConf with serialised queryTags', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { + statementConf: { 'spark.sql.ansi.enabled': 'true' }, + queryTags: { team: 'x' }, + }); + const conf = (connection.lastOptions as { statementConf?: Record }).statementConf; + expect(conf?.['spark.sql.ansi.enabled']).to.equal('true'); + expect(conf?.query_tags).to.contain('team:x'); + }); + + it('queryTags wins over a query_tags key in statementConf (precedence on collision)', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { + statementConf: { query_tags: 'manual-raw-value' }, + queryTags: { team: 'x' }, + }); + const conf = (connection.lastOptions as { statementConf?: Record }).statementConf; + // The structured `queryTags` option overwrites a raw `query_tags` conf key — + // a single, predictable wire value rather than two competing ones. + expect(conf?.query_tags).to.contain('team:x').and.to.not.equal('manual-raw-value'); + }); + + it('maps a submit-time kernel error via logAndMapError on both paths', async () => { + const envelope = `__databricks_error__:${JSON.stringify({ code: 'SqlError', message: 'SUBMIT_BOOM' })}`; + for (const opts of [{}, { runAsync: true }]) { + const connection = new FakeNativeConnection(); + connection.throwOnExecute = new Error(envelope); // fails executeStatementCancellable / submitStatement + const session = makeSession(connection); + let thrown: unknown; + try { + // eslint-disable-next-line no-await-in-loop + await session.executeStatement('SELECT 1', opts); + } catch (err) { + thrown = err; + } + expect(thrown, `path ${JSON.stringify(opts)}`).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/SUBMIT_BOOM/); + } + }); + + // Genuinely unsupported on SEA — rejected (rather than silently ignored) so + // a caller/agent gets signal instead of a no-op. queryTags / queryTimeout / + // rowLimit are NOT here — they are forwarded (asserted above). for (const { name, options, re } of [ - { name: 'queryTags', options: { queryTags: { team: 'x' } }, re: /queryTags/ }, + { name: 'useCloudFetch', options: { useCloudFetch: true }, re: /useCloudFetch/ }, { name: 'useLZ4Compression', options: { useLZ4Compression: true }, re: /useLZ4Compression/ }, { name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' }, re: /stagingAllowedLocalPath/ }, ] as const) { @@ -647,3 +872,279 @@ describe('SeaOperationBackend', () => { // tests/unit/sea/SeaOperationBackend.test.ts and the parity-gate e2e // at tests/e2e/sea/results-e2e.test.ts. }); + +describe('SeaOperationBackend — async (submitStatement) path', () => { + const makeAsyncOp = (asyncStatement: FakeAsyncStatement, queryTimeoutSecs?: number) => + // eslint-disable-next-line @typescript-eslint/no-explicit-any + new SeaOperationBackend({ asyncStatement: asyncStatement as any, context: makeContext(), queryTimeoutSecs }); + + it('rejects when neither asyncStatement nor statement is provided', () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect(() => new SeaOperationBackend({ context: makeContext() } as any)).to.throw(HiveDriverError, /exactly one/); + }); + + it('rejects when BOTH asyncStatement and statement are provided', () => { + expect( + () => + new SeaOperationBackend({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + asyncStatement: new FakeAsyncStatement() as any, + statement: new FakeNativeStatement(), + context: makeContext(), + }), + ).to.throw(HiveDriverError, /exactly one/); + }); + + it('id defaults to the async statement id', () => { + const op = makeAsyncOp(new FakeAsyncStatement()); + expect(op.id).to.equal('01ef-fake-async-id'); + }); + + it('status() reports the real kernel state', async () => { + const running = makeAsyncOp(new FakeAsyncStatement('Running')); + expect((await running.status(false)).state).to.equal(OperationState.Running); + const ok = makeAsyncOp(new FakeAsyncStatement('Succeeded')); + expect((await ok.status(false)).state).to.equal(OperationState.Succeeded); + }); + + it('waitUntilReady() polls status() until terminal, firing the progress callback each tick', async () => { + const stmt = new FakeAsyncStatement(['Pending', 'Running', 'Succeeded']); + const op = makeAsyncOp(stmt); + const states: OperationState[] = []; + await op.waitUntilReady({ callback: (s) => states.push(s.state) }); + expect(stmt.statusCalls).to.equal(3); + expect(states).to.deep.equal([OperationState.Pending, OperationState.Running, OperationState.Succeeded]); + }); + + it('waitUntilReady() surfaces the kernel error envelope on a Failed statement', async () => { + const stmt = new FakeAsyncStatement('Failed'); + // The kernel rejects awaitResult() with a sentinel-framed structured error; + // decodeNapiKernelError turns it into a typed HiveDriverError. + stmt.awaitResultError = new Error( + `__databricks_error__:${JSON.stringify({ code: 'SqlError', message: 'TABLE_OR_VIEW_NOT_FOUND' })}`, + ); + const op = makeAsyncOp(stmt); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/TABLE_OR_VIEW_NOT_FOUND/); + }); + + // A server-driven terminal state MUST throw OperationStateError (not a plain + // HiveDriverError) so the DBSQLOperation facade — which only mirrors its + // cancelled/closed flags when `err instanceof OperationStateError` — stays in + // sync. Asserting the subclass + errorCode is what catches a regression to + // the bare HiveDriverError (which would pass an `instanceOf HiveDriverError` + // check since OperationStateError extends it). + it('waitUntilReady() throws OperationStateError(Canceled) on a server-side Cancelled statement', async () => { + const op = makeAsyncOp(new FakeAsyncStatement('Cancelled')); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Canceled); + }); + + it('best-effort close()s the kernel statement on a server-driven terminal error (no leak)', async () => { + // P1.5: the poll loop must release the statement handle on terminal errors, + // not just throw (otherwise the kernel-side statement leaks until session close). + for (const state of ['Cancelled', 'Closed', 'Unknown']) { + const stmt = new FakeAsyncStatement(state); + const op = makeAsyncOp(stmt); + // eslint-disable-next-line no-await-in-loop + await op.waitUntilReady().catch(() => undefined); + expect(stmt.closed, `closed after ${state}`).to.equal(true); + } + }); + + it('waitUntilReady() throws OperationStateError(Closed) on a server-side Closed statement', async () => { + const op = makeAsyncOp(new FakeAsyncStatement('Closed')); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Closed); + }); + + it('waitUntilReady() enforces queryTimeout client-side: throws Timeout and cancels a stuck Running statement', async function timeoutTest() { + // eslint-disable-next-line no-invalid-this + this.timeout(5000); + const stmt = new FakeAsyncStatement('Running'); // never reaches a terminal state + const op = makeAsyncOp(stmt, 0.05); // 50ms client-side deadline + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout); + // Best-effort server-side cancel fired so the statement doesn't keep running. + expect(stmt.cancelled).to.equal(true); + }); + + it('cancel() forwards to the async statement and short-circuits a subsequent poll', async () => { + const stmt = new FakeAsyncStatement(['Running', 'Running', 'Succeeded']); + const op = makeAsyncOp(stmt); + await op.cancel(); + expect(stmt.cancelled).to.equal(true); + // A JS-side cancel makes waitUntilReady fail fast without further polling. + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.an('error'); + }); + + it('close() forwards to the async statement', async () => { + const stmt = new FakeAsyncStatement(); + const op = makeAsyncOp(stmt); + await op.close(); + expect(stmt.closed).to.equal(true); + }); +}); + +describe('SeaOperationBackend — sync (executeStatementCancellable) path', () => { + const makeSyncOp = (cancellableExecution: FakeCancellableExecution, queryTimeoutSecs?: number) => + new SeaOperationBackend({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + cancellableExecution: cancellableExecution as any, + context: makeContext(), + queryTimeoutSecs, + }); + + it('rejects when more than one handle kind is provided', () => { + expect( + () => + new SeaOperationBackend({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + cancellableExecution: new FakeCancellableExecution() as any, + statement: new FakeNativeStatement(), + context: makeContext(), + }), + ).to.throw(HiveDriverError, /exactly one/); + }); + + it('keeps op.id stable across the sync execute and logs the resolved server statement id', async () => { + // op.id MUST stay stable (the facade keys telemetry start/complete on it — + // a mid-flight flip to the server id would split the records and drop the + // summary). The server statement_id is surfaced via a debug log instead. + const logs: Array<{ level: LogLevel; message: string }> = []; + const logger: IDBSQLLogger = { log: (level, message) => logs.push({ level, message }) }; + const op = new SeaOperationBackend({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + cancellableExecution: new FakeCancellableExecution() as any, + context: makeContext(logger), + }); + const idBefore = op.id; + expect(idBefore).to.be.a('string').and.have.length.greaterThan(0); + await op.waitUntilReady(); + // Stable: driving result() to terminal does NOT mutate the id. + expect(op.id).to.equal(idBefore); + // But the now-known server statement_id is logged for correlation. + expect(logs.some((l) => l.message.includes('01ef-fake-statement-id'))).to.equal(true); + }); + + it('surfaces the kernel SQL-error envelope when a sync result() rejects (Failed)', async () => { + const exec = new FakeCancellableExecution(); + // The kernel rejects result() with a sentinel-framed structured error; + // decodeNapiKernelError turns it into a typed HiveDriverError (sync path). + exec.resultError = new Error( + `__databricks_error__:${JSON.stringify({ code: 'SqlError', message: 'TABLE_OR_VIEW_NOT_FOUND' })}`, + ); + const op = makeSyncOp(exec); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/TABLE_OR_VIEW_NOT_FOUND/); + }); + + it('status() reports Running before result() and Succeeded after', async () => { + const exec = new FakeCancellableExecution(); + const op = makeSyncOp(exec); + // Before waitUntilReady drives result(), the blocking execute is still in + // flight from the JS side's perspective. + expect((await op.status(false)).state).to.equal(OperationState.Running); + await op.waitUntilReady(); + expect((await op.status(false)).state).to.equal(OperationState.Succeeded); + }); + + it('waitUntilReady() drives result() to the terminal statement and fires the callback once', async () => { + const exec = new FakeCancellableExecution(); + const op = makeSyncOp(exec); + const states: OperationState[] = []; + await op.waitUntilReady({ callback: (s) => states.push(s.state) }); + expect(exec.resultCalls).to.equal(1); + expect(states).to.deep.equal([OperationState.Succeeded]); + }); + + it('cancel() forwards to the cancellable execution (mid-compute)', async () => { + const exec = new FakeCancellableExecution(); + const op = makeSyncOp(exec); + await op.cancel(); + expect(exec.cancelled).to.equal(true); + }); + + it('close() on a still-running sync op cancels the server execution (no compute leak)', async () => { + const exec = new FakeCancellableExecution(); + const op = makeSyncOp(exec); + // close() before result() resolved: with no terminal statement to close, + // it must proactively cancel the running execution rather than no-op + // (otherwise server compute runs on until the kernel drop-guard fires at GC). + await op.close(); + expect(exec.cancelled).to.equal(true); + }); + + it('cancel() interrupts an in-flight result(), surfacing OperationStateError(Canceled)', async () => { + const exec = new FakeCancellableExecution(); + exec.block = true; // result() stays pending until cancel() rejects it + const op = makeSyncOp(exec); + // Start the wait (drives result(), which parks), then cancel mid-compute. + const waitPromise = op.waitUntilReady(); + // Let the microtask queue run so result() is dispatched and parked. + await Promise.resolve(); + await op.cancel(); + let thrown: unknown; + try { + await waitPromise; + } catch (err) { + thrown = err; + } + expect(exec.cancelled).to.equal(true); + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Canceled); + }); + + it('close() routes to the resolved statement once result() has produced it', async () => { + const exec = new FakeCancellableExecution(); + const op = makeSyncOp(exec); + await op.waitUntilReady(); // resolves the terminal statement + await op.close(); + expect(exec.resultHandle.closed).to.equal(true); + }); + + it('close() before result() resolves is a no-op (nothing server-side to close yet)', async () => { + const exec = new FakeCancellableExecution(); + const op = makeSyncOp(exec); + // Should not throw even though result() never ran. + const status = await op.close(); + expect(status.isSuccess).to.equal(true); + expect(exec.resultHandle.closed).to.equal(false); + }); +}); diff --git a/tests/unit/sea/loader.test.ts b/tests/unit/sea/loader.test.ts index 13d88632..3eb642ac 100644 --- a/tests/unit/sea/loader.test.ts +++ b/tests/unit/sea/loader.test.ts @@ -31,6 +31,9 @@ function stubBinding(overrides: Partial> openSession: async () => ({}), Connection: function Connection() {}, Statement: function Statement() {}, + AsyncStatement: function AsyncStatement() {}, + AsyncResultHandle: function AsyncResultHandle() {}, + CancellableExecution: function CancellableExecution() {}, ...overrides, } as unknown as SeaNativeBinding; } @@ -121,6 +124,17 @@ describe('SeaNativeLoader', () => { expect(msg).to.match(/missing expected export/); expect(msg).to.match(/openSession/); }); + + it('rejects a stale binding missing the async / cancellable execution classes', () => { + // An older cached .node would load but crash mid-query at e.g. + // `submitStatement` / `executeStatementCancellable`; fail fast at load. + for (const cls of ['AsyncStatement', 'AsyncResultHandle', 'CancellableExecution'] as const) { + const loader = new SeaNativeLoader(() => stubBinding({ [cls]: undefined }), SUPPORTED_NODE_MAJOR); + const msg = thrownMessage(() => loader.get()); + expect(msg, cls).to.match(/missing expected export/); + expect(msg, cls).to.contain(cls); + } + }); }); describe('Node-version gate', () => { diff --git a/tests/unit/sea/operation-lifecycle.test.ts b/tests/unit/sea/operation-lifecycle.test.ts index 06260542..1148cd47 100644 --- a/tests/unit/sea/operation-lifecycle.test.ts +++ b/tests/unit/sea/operation-lifecycle.test.ts @@ -166,6 +166,10 @@ describe('SeaOperationLifecycle (helpers)', () => { } expect(thrown).to.be.instanceOf(HiveDriverError); expect((thrown as Error).message).to.contain('statement already closed'); + // The caller asked to cancel: a failed cancel RPC must NOT roll the intent + // back (doing so would silently resurrect a cancelled op while the + // server-side statement may still be running). + expect(state.isCancelled).to.equal(true); }); it('logs a debug message tagged with the operation id', async () => { diff --git a/tests/unit/sea/positionalParams.test.ts b/tests/unit/sea/positionalParams.test.ts index ab0f065c..d9902303 100644 --- a/tests/unit/sea/positionalParams.test.ts +++ b/tests/unit/sea/positionalParams.test.ts @@ -90,6 +90,18 @@ describe('SeaPositionalParams.buildSeaPositionalParams', () => { { sqlType: 'TIMESTAMP', value: '2024-01-15 10:30:00' }, ]); }); + + it('binds TIMESTAMP_NTZ natively and TIMESTAMP_LTZ as TIMESTAMP (Spark has no distinct LTZ type)', () => { + expect( + buildSeaPositionalParams([ + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_NTZ, value: '2024-01-15 10:30:00' }), + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_LTZ, value: '2024-01-15 10:30:00' }), + ]), + ).to.deep.equal([ + { sqlType: 'TIMESTAMP_NTZ', value: '2024-01-15 10:30:00' }, + { sqlType: 'TIMESTAMP', value: '2024-01-15 10:30:00' }, + ]); + }); }); describe('SeaPositionalParams.buildSeaNamedParams', () => {