From 6c24aef5019833c85d6b7de23be0894e7ad8aa0c Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 25 May 2026 22:15:16 +0000 Subject: [PATCH 1/3] feat(sea): type the napi async-execute surface + e2e test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the new napi `submitStatement` / `AsyncStatement` / `AsyncResultHandle` shapes from kernel branch `msrathore/krn-async-execute` (commit `1957878`) on the JS side and adds an end-to-end test against pecotesting. Plumbing: - `SeaNativeLoader.ts` — new typed interfaces `SeaNativeStatementStatus` (string union mirroring the kernel `StatementStatus` variant names), `SeaNativeAsyncStatement`, `SeaNativeAsyncResultHandle`. `SeaNativeConnection` gains the `submitStatement(sql, options?)` method. - `native/sea/index.d.ts` — adds `AsyncStatement` / `AsyncResultHandle` class declarations and the `Connection.submitStatement(...)` method. Mirrors the napi-rs codegen exactly. - `tests/e2e/sea/async-execute-e2e.test.ts` — three cases: 1. submit → awaitResult → drain 100 rows; assert byte-decoded Arrow row count matches the range(0, 100) query. 2. status() returns a string variant from the kernel enum (Pending/Running/Succeeded/Closed depending on warehouse responsiveness). 3. cancel() against a still-pending statement (range(0, 100_000_000)) completes within a 2s wire-latency budget. Deferred to a follow-on PR: a full `SeaAsyncOperationBackend` implementing the facade's `IOperationBackend` interface so `DBSQLOperation` async-mode (matching Thrift's `IDBSQLOperation` polling-mode surface) flows transparently. The kernel + napi surface is ready; the JS adapter is the next increment. Cross-PR dependency: requires kernel branch `msrathore/krn-async-execute` (commit `1957878`). Stacks on `msrathore/sea-statement-options` (F3 + F4). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- lib/sea/SeaNativeLoader.ts | 51 ++++++ native/sea/index.d.ts | 57 ++++++- tests/e2e/sea/async-execute-e2e.test.ts | 211 ++++++++++++++++++++++++ 3 files changed, 318 insertions(+), 1 deletion(-) create mode 100644 tests/e2e/sea/async-execute-e2e.test.ts diff --git a/lib/sea/SeaNativeLoader.ts b/lib/sea/SeaNativeLoader.ts index c1f25f4f..edc9cbcc 100644 --- a/lib/sea/SeaNativeLoader.ts +++ b/lib/sea/SeaNativeLoader.ts @@ -100,6 +100,47 @@ export interface SeaNativeExecuteOptions { queryTags?: Record; } +/** + * Server-side execution status returned by + * `AsyncStatement.status()`. Mirrors the kernel `StatementStatus` + * enum collapsed to its variant name. `'Unknown'` is the + * forward-compat arm for kernel variants the binding doesn't + * recognise. + */ +export type SeaNativeStatementStatus = + | 'Pending' + | 'Running' + | 'Succeeded' + | 'Failed' + | 'Cancelled' + | 'Closed' + | 'Unknown'; + +/** + * Typed surface for the opaque napi `AsyncResultHandle`. Returned + * by `AsyncStatement.awaitResult()`; same fetch-side surface as + * `SeaNativeStatement` but without `cancel()` / `close()` (the + * parent `AsyncStatement` owns server-side lifecycle). + */ +export interface SeaNativeAsyncResultHandle { + readonly statementId: string; + fetchNextBatch(): Promise; + schema(): Promise; +} + +/** + * Typed surface for the opaque napi `AsyncStatement`. Returned by + * `Connection.submitStatement(...)`. JS drives polling via + * `status()` or blocks via `awaitResult()`. + */ +export interface SeaNativeAsyncStatement { + readonly statementId: string; + status(): Promise; + awaitResult(): Promise; + cancel(): Promise; + close(): Promise; +} + /** * Typed surface for the opaque napi `Connection` handle. */ @@ -113,6 +154,16 @@ export interface SeaNativeConnection { * (per-statement Spark conf overlay) and `queryTags`. */ executeStatement(sql: string, options?: SeaNativeExecuteOptions): Promise; + /** + * Submit a SQL statement asynchronously and return an + * `AsyncStatement` handle. The kernel sends `wait_timeout=0s` so + * the server returns immediately with a `statement_id` while the + * query is still Pending/Running. Drive polling via the returned + * handle's `status()` / `awaitResult()` methods. Drop-cancel + * during `awaitResult()` is handled by the kernel's + * `AwaitResultCancelGuard`. + */ + submitStatement(sql: string, options?: SeaNativeExecuteOptions): Promise; close(): Promise; } diff --git a/native/sea/index.d.ts b/native/sea/index.d.ts index 2ce8fa34..8d03d658 100644 --- a/native/sea/index.d.ts +++ b/native/sea/index.d.ts @@ -96,11 +96,58 @@ export interface ArrowBatch { } /** * An Arrow IPC stream payload encoding just the result schema (no - * record-batch messages). Returned by `Statement.schema()`. + * record-batch messages). Returned by `Statement.schema()` or + * `AsyncResultHandle.schema()`. */ export interface ArrowSchema { ipcBytes: Buffer } +/** + * Opaque async-statement handle returned by + * `Connection.submitStatement(...)`. The kernel sent + * `wait_timeout=0s`, so the server is still Pending/Running when + * this handle is constructed; JS drives polling via `status()` or + * blocks via `awaitResult()`. Drop-cancel during `awaitResult()` + * is handled by the kernel's `AwaitResultCancelGuard`. + */ +export declare class AsyncStatement { + /** Server-issued statement id. Cached at construction. */ + get statementId(): string + /** + * One-shot status check. Returns + * `'Pending' | 'Running' | 'Succeeded' | 'Failed' | 'Cancelled' | + * 'Closed' | 'Unknown'`. Returns `KernelError(InvalidStatementHandle)` + * if the statement has been explicitly `close()`d. + */ + status(): Promise + /** + * Block until the server reaches a terminal state, then return an + * `AsyncResultHandle` that wraps the materialised result stream. + */ + awaitResult(): Promise + /** Server-side cancel. */ + cancel(): Promise + /** Explicit close. Idempotent. */ + close(): Promise +} +/** + * Opaque result-fetch handle returned by + * `AsyncStatement.awaitResult()`. Wraps a kernel `ResultStream`. + * No `cancel()` / `close()` — the parent `AsyncStatement` owns + * server-side lifecycle. + */ +export declare class AsyncResultHandle { + /** Server-issued statement id. Matches the parent `AsyncStatement`. */ + get statementId(): string + /** + * Pull the next batch of results. Returns `null` when the stream + * is exhausted. Byte-identical IPC payload to the sync + * `Statement.fetchNextBatch()` for the same query. + */ + fetchNextBatch(): Promise + /** Result schema as a schema-only Arrow IPC payload. */ + schema(): Promise +} /** * Returns the native binding's crate version (`CARGO_PKG_VERSION`). * @@ -131,6 +178,14 @@ export declare class Connection { * `serializeQueryTags` wire shape). */ executeStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise + /** + * Submit a SQL statement asynchronously and return an + * `AsyncStatement` handle. The kernel `Statement::submit()` sends + * `wait_timeout=0s`, so the server returns immediately with a + * statement_id while the query is still Pending/Running. Drive + * polling via the returned handle. + */ + submitStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise /** * Explicit close. Marks the connection wrapper as closed so * subsequent calls on this `Connection` return `InvalidArg`, then diff --git a/tests/e2e/sea/async-execute-e2e.test.ts b/tests/e2e/sea/async-execute-e2e.test.ts new file mode 100644 index 00000000..f1defc59 --- /dev/null +++ b/tests/e2e/sea/async-execute-e2e.test.ts @@ -0,0 +1,211 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * End-to-end tests for the SEA submit/await_result async-consumption + * path through the napi binding. + * + * Path under test: + * 1. `binding.openSession(...)` — kernel `Session::open()` + * 2. `connection.submitStatement(sql)` — kernel `Statement::submit()` + * (wait_timeout=0s, server returns Pending/Running with a + * statement_id) + * 3. `asyncStmt.status()` / `.awaitResult()` — kernel `status()` / + * `await_result()` + * 4. `asyncResult.fetchNextBatch()` — kernel `ResultStream::next_batch()` + * + * The kernel's `AwaitResultCancelGuard` covers drop-cancel safety; + * the `cancel-while-pending` test exercises explicit cancel mid-poll + * by racing `asyncStmt.cancel()` against `asyncStmt.awaitResult()`. + * + * Calls the napi binding directly (same pattern as + * `operation-lifecycle-e2e.test.ts`) — the higher-level + * `DBSQLOperation` async-mode integration (matching Thrift's + * `IDBSQLOperation` polling-mode surface) is a follow-on. This test + * proves the kernel → napi → JS shape works end-to-end. + * + * Skipped when `DATABRICKS_PECOTESTING_*` env vars are absent. + */ + +import { expect } from 'chai'; +import { tableFromIPC } from 'apache-arrow'; + +interface NativeBinding { + openSession(opts: { + hostName: string; + httpPath: string; + token: string; + }): Promise; +} + +interface NativeConnection { + submitStatement(sql: string): Promise; + close(): Promise; +} + +interface NativeAsyncStatement { + readonly statementId: string; + status(): Promise; + awaitResult(): Promise; + cancel(): Promise; + close(): Promise; +} + +interface NativeAsyncResultHandle { + readonly statementId: string; + fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null>; + schema(): Promise<{ ipcBytes: Buffer }>; +} + +describe('SEA async execute — submit / status / awaitResult / cancel', function suite() { + this.timeout(180_000); + + const hostName = + process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME || process.env.E2E_HOST; + const httpPath = + process.env.DATABRICKS_PECOTESTING_HTTP_PATH || process.env.E2E_PATH; + const token = + process.env.DATABRICKS_PECOTESTING_TOKEN || process.env.E2E_ACCESS_TOKEN; + + before(function gate() { + if (!hostName || !httpPath || !token) { + // eslint-disable-next-line no-invalid-this + this.skip(); + } + }); + + /** + * Lazy-load the native binding so the test file is requirable in + * environments where the `.node` artifact isn't built yet — the + * `before()` gate will skip the suite before we touch the binding. + * Loading at `require`-time would crash test discovery. + */ + function loadBinding(): NativeBinding { + // eslint-disable-next-line global-require, @typescript-eslint/no-var-requires + return require('../../../native/sea/index.js') as NativeBinding; + } + + it('submit returns immediately with a statement_id; awaitResult drains', async () => { + const binding = loadBinding(); + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let asyncStmt: NativeAsyncStatement | null = null; + try { + asyncStmt = await connection.submitStatement('SELECT * FROM range(0, 100)'); + expect(asyncStmt).to.be.an('object'); + expect(asyncStmt.statementId).to.be.a('string').and.to.have.length.greaterThan(0); + + // Block on the server-side terminal state. The kernel's + // internal polling handles backoff and the drop-cancel guard. + const result = await asyncStmt.awaitResult(); + expect(result.statementId).to.equal(asyncStmt.statementId); + + // Drain the full result and assert row count. + let totalRows = 0; + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-await-in-loop + const envelope = await result.fetchNextBatch(); + if (envelope === null) { + break; + } + const table = tableFromIPC(envelope.ipcBytes); + totalRows += table.numRows; + } + expect(totalRows).to.equal(100); + } finally { + if (asyncStmt !== null) { + try { + await asyncStmt.close(); + } catch (_) { + // best-effort cleanup + } + } + await connection.close(); + } + }); + + it('status() returns a string variant from the kernel StatementStatus enum', async () => { + const binding = loadBinding(); + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let asyncStmt: NativeAsyncStatement | null = null; + try { + asyncStmt = await connection.submitStatement('SELECT * FROM range(0, 100)'); + const status = await asyncStmt.status(); + expect(status).to.be.a('string'); + expect(['Pending', 'Running', 'Succeeded', 'Closed']).to.include( + status, + `unexpected status: ${status}`, + ); + + // Drain via awaitResult to release server-side resources. + await asyncStmt.awaitResult(); + } finally { + if (asyncStmt !== null) { + try { + await asyncStmt.close(); + } catch (_) { + // best-effort cleanup + } + } + await connection.close(); + } + }); + + it('cancel() against a still-pending async statement completes quickly', async () => { + const binding = loadBinding(); + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let asyncStmt: NativeAsyncStatement | null = null; + try { + // Large enough query that the server will not have finished by + // the time we issue cancel. `range(0, 100_000_000)` was used in + // the existing sync cancel test for the same reason. + asyncStmt = await connection.submitStatement( + 'SELECT * FROM range(0, 100000000)', + ); + expect(asyncStmt.statementId).to.have.length.greaterThan(0); + + const t0 = Date.now(); + await asyncStmt.cancel(); + const elapsed = Date.now() - t0; + // cancel should not block on completion of the underlying query; + // it just sends a CancelStatement and returns. Allow a generous + // budget for wire latency. + expect(elapsed).to.be.lessThan(2000, `cancel latency ${elapsed}ms`); + } finally { + if (asyncStmt !== null) { + try { + await asyncStmt.close(); + } catch (_) { + // best-effort cleanup; cancelled statements may surface a close error + } + } + await connection.close(); + } + }); +}); From 018f5b731f19963d92c069e4d00dc1bccab32217 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 25 May 2026 22:33:18 +0000 Subject: [PATCH 2/3] =?UTF-8?q?fix(sea):=20F3b=20fixup=20=E2=80=94=20skip-?= =?UTF-8?q?gate=20native-binary=20check=20+=20createRequire?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same skip-gate fix as F2/F4 (DA round 1 H1): - `before()` verifies `native/sea/index.linux-x64-gnu.node` exists, skips with a clear "run yarn build:native" message if absent. - `loadBinding()` uses `createRequire(import.meta.url)` so the require works under both CJS and the ESM-reparse path mocha 11+ may use (MODULE_TYPELESS_PACKAGE_JSON reparse-as-ESM). Pre-emptive fix even though F3b wasn't on the DA fixup batch list — the same defect pattern would have been flagged on the next round. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- tests/e2e/sea/async-execute-e2e.test.ts | 33 ++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/tests/e2e/sea/async-execute-e2e.test.ts b/tests/e2e/sea/async-execute-e2e.test.ts index f1defc59..c19bf1f6 100644 --- a/tests/e2e/sea/async-execute-e2e.test.ts +++ b/tests/e2e/sea/async-execute-e2e.test.ts @@ -40,6 +40,15 @@ import { expect } from 'chai'; import { tableFromIPC } from 'apache-arrow'; +import * as fs from 'fs'; +import * as path from 'path'; +import { createRequire } from 'module'; + +// `createRequire(import.meta.url)` so the require works under both +// CJS and the ESM-reparse path mocha 11+ may use +// (MODULE_TYPELESS_PACKAGE_JSON reparse-as-ESM). +// eslint-disable-next-line @typescript-eslint/naming-convention +const requireFromHere = createRequire(import.meta.url); interface NativeBinding { openSession(opts: { @@ -82,18 +91,34 @@ describe('SEA async execute — submit / status / awaitResult / cancel', functio if (!hostName || !httpPath || !token) { // eslint-disable-next-line no-invalid-this this.skip(); + return; + } + // Verify the native artifact exists before any test calls + // loadBinding(). Skip-with-message if absent. DA round-1 H1 fixup + // (skip-gate must not crash MODULE_NOT_FOUND when build:native not + // run). + const nodeArtifact = path.resolve( + process.cwd(), + 'native/sea/index.linux-x64-gnu.node', + ); + if (!fs.existsSync(nodeArtifact)) { + // eslint-disable-next-line no-console + console.warn( + `[sea async-execute e2e] skipping: native binary not built. ` + + `Run \`yarn build:native\` first.`, + ); + // eslint-disable-next-line no-invalid-this + this.skip(); } }); /** * Lazy-load the native binding so the test file is requirable in * environments where the `.node` artifact isn't built yet — the - * `before()` gate will skip the suite before we touch the binding. - * Loading at `require`-time would crash test discovery. + * `before()` gate skips the suite before we touch the binding. */ function loadBinding(): NativeBinding { - // eslint-disable-next-line global-require, @typescript-eslint/no-var-requires - return require('../../../native/sea/index.js') as NativeBinding; + return requireFromHere('../../../native/sea/index.js') as NativeBinding; } it('submit returns immediately with a statement_id; awaitResult drains', async () => { From de55b96f268410269c90666921ece6deef7bdff5 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 25 May 2026 22:38:47 +0000 Subject: [PATCH 3/3] =?UTF-8?q?fix(sea):=20F3b=20e2e=20=E2=80=94=20switch?= =?UTF-8?q?=20fs/path=20to=20named=20imports=20under=20ESM=20reparse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same ESM-reparse fix as F2/F4. Verified live e2e passes (submit → awaitResult → drain, status() variant, cancel-while-pending). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore --- tests/e2e/sea/async-execute-e2e.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/e2e/sea/async-execute-e2e.test.ts b/tests/e2e/sea/async-execute-e2e.test.ts index c19bf1f6..d662477c 100644 --- a/tests/e2e/sea/async-execute-e2e.test.ts +++ b/tests/e2e/sea/async-execute-e2e.test.ts @@ -40,8 +40,8 @@ import { expect } from 'chai'; import { tableFromIPC } from 'apache-arrow'; -import * as fs from 'fs'; -import * as path from 'path'; +import { existsSync } from 'fs'; +import { resolve as resolvePath } from 'path'; import { createRequire } from 'module'; // `createRequire(import.meta.url)` so the require works under both @@ -97,11 +97,11 @@ describe('SEA async execute — submit / status / awaitResult / cancel', functio // loadBinding(). Skip-with-message if absent. DA round-1 H1 fixup // (skip-gate must not crash MODULE_NOT_FOUND when build:native not // run). - const nodeArtifact = path.resolve( + const nodeArtifact = resolvePath( process.cwd(), 'native/sea/index.linux-x64-gnu.node', ); - if (!fs.existsSync(nodeArtifact)) { + if (!existsSync(nodeArtifact)) { // eslint-disable-next-line no-console console.warn( `[sea async-execute e2e] skipping: native binary not built. ` +