Skip to content

Commit 2538c19

Browse files
committed
feat(sea): type the napi async-execute surface + e2e test
Mirrors the new napi `submitStatement` / `AsyncStatement` / `AsyncResultHandle` shapes from kernel branch `msrathore/krn-async-execute` (commit `1957878`) on the JS side and adds an end-to-end test against pecotesting. Plumbing: - `SeaNativeLoader.ts` — new typed interfaces `SeaNativeStatementStatus` (string union mirroring the kernel `StatementStatus` variant names), `SeaNativeAsyncStatement`, `SeaNativeAsyncResultHandle`. `SeaNativeConnection` gains the `submitStatement(sql, options?)` method. - `native/sea/index.d.ts` — adds `AsyncStatement` / `AsyncResultHandle` class declarations and the `Connection.submitStatement(...)` method. Mirrors the napi-rs codegen exactly. - `tests/e2e/sea/async-execute-e2e.test.ts` — three cases: 1. submit → awaitResult → drain 100 rows; assert byte-decoded Arrow row count matches the range(0, 100) query. 2. status() returns a string variant from the kernel enum (Pending/Running/Succeeded/Closed depending on warehouse responsiveness). 3. cancel() against a still-pending statement (range(0, 100_000_000)) completes within a 2s wire-latency budget. Deferred to a follow-on PR: a full `SeaAsyncOperationBackend` implementing the facade's `IOperationBackend` interface so `DBSQLOperation` async-mode (matching Thrift's `IDBSQLOperation` polling-mode surface) flows transparently. The kernel + napi surface is ready; the JS adapter is the next increment. Cross-PR dependency: requires kernel branch `msrathore/krn-async-execute` (commit `1957878`). Stacks on `msrathore/sea-statement-options` (F3 + F4). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 1056dfa commit 2538c19

3 files changed

Lines changed: 318 additions & 1 deletion

File tree

lib/sea/SeaNativeLoader.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,47 @@ export interface SeaNativeExecuteOptions {
8888
queryTags?: Record<string, string>;
8989
}
9090

91+
/**
92+
* Server-side execution status returned by
93+
* `AsyncStatement.status()`. Mirrors the kernel `StatementStatus`
94+
* enum collapsed to its variant name. `'Unknown'` is the
95+
* forward-compat arm for kernel variants the binding doesn't
96+
* recognise.
97+
*/
98+
export type SeaNativeStatementStatus =
99+
| 'Pending'
100+
| 'Running'
101+
| 'Succeeded'
102+
| 'Failed'
103+
| 'Cancelled'
104+
| 'Closed'
105+
| 'Unknown';
106+
107+
/**
108+
* Typed surface for the opaque napi `AsyncResultHandle`. Returned
109+
* by `AsyncStatement.awaitResult()`; same fetch-side surface as
110+
* `SeaNativeStatement` but without `cancel()` / `close()` (the
111+
* parent `AsyncStatement` owns server-side lifecycle).
112+
*/
113+
export interface SeaNativeAsyncResultHandle {
114+
readonly statementId: string;
115+
fetchNextBatch(): Promise<SeaArrowBatch | null>;
116+
schema(): Promise<SeaArrowSchema>;
117+
}
118+
119+
/**
120+
* Typed surface for the opaque napi `AsyncStatement`. Returned by
121+
* `Connection.submitStatement(...)`. JS drives polling via
122+
* `status()` or blocks via `awaitResult()`.
123+
*/
124+
export interface SeaNativeAsyncStatement {
125+
readonly statementId: string;
126+
status(): Promise<SeaNativeStatementStatus>;
127+
awaitResult(): Promise<SeaNativeAsyncResultHandle>;
128+
cancel(): Promise<void>;
129+
close(): Promise<void>;
130+
}
131+
91132
/**
92133
* Typed surface for the opaque napi `Connection` handle.
93134
*/
@@ -101,6 +142,16 @@ export interface SeaNativeConnection {
101142
* (per-statement Spark conf overlay) and `queryTags`.
102143
*/
103144
executeStatement(sql: string, options?: SeaNativeExecuteOptions): Promise<SeaNativeStatement>;
145+
/**
146+
* Submit a SQL statement asynchronously and return an
147+
* `AsyncStatement` handle. The kernel sends `wait_timeout=0s` so
148+
* the server returns immediately with a `statement_id` while the
149+
* query is still Pending/Running. Drive polling via the returned
150+
* handle's `status()` / `awaitResult()` methods. Drop-cancel
151+
* during `awaitResult()` is handled by the kernel's
152+
* `AwaitResultCancelGuard`.
153+
*/
154+
submitStatement(sql: string, options?: SeaNativeExecuteOptions): Promise<SeaNativeAsyncStatement>;
104155
close(): Promise<void>;
105156
}
106157

native/sea/index.d.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,58 @@ export interface ArrowBatch {
9696
}
9797
/**
9898
* An Arrow IPC stream payload encoding just the result schema (no
99-
* record-batch messages). Returned by `Statement.schema()`.
99+
* record-batch messages). Returned by `Statement.schema()` or
100+
* `AsyncResultHandle.schema()`.
100101
*/
101102
export interface ArrowSchema {
102103
ipcBytes: Buffer
103104
}
105+
/**
106+
* Opaque async-statement handle returned by
107+
* `Connection.submitStatement(...)`. The kernel sent
108+
* `wait_timeout=0s`, so the server is still Pending/Running when
109+
* this handle is constructed; JS drives polling via `status()` or
110+
* blocks via `awaitResult()`. Drop-cancel during `awaitResult()`
111+
* is handled by the kernel's `AwaitResultCancelGuard`.
112+
*/
113+
export declare class AsyncStatement {
114+
/** Server-issued statement id. Cached at construction. */
115+
get statementId(): string
116+
/**
117+
* One-shot status check. Returns
118+
* `'Pending' | 'Running' | 'Succeeded' | 'Failed' | 'Cancelled' |
119+
* 'Closed' | 'Unknown'`. Returns `KernelError(InvalidStatementHandle)`
120+
* if the statement has been explicitly `close()`d.
121+
*/
122+
status(): Promise<string>
123+
/**
124+
* Block until the server reaches a terminal state, then return an
125+
* `AsyncResultHandle` that wraps the materialised result stream.
126+
*/
127+
awaitResult(): Promise<AsyncResultHandle>
128+
/** Server-side cancel. */
129+
cancel(): Promise<void>
130+
/** Explicit close. Idempotent. */
131+
close(): Promise<void>
132+
}
133+
/**
134+
* Opaque result-fetch handle returned by
135+
* `AsyncStatement.awaitResult()`. Wraps a kernel `ResultStream`.
136+
* No `cancel()` / `close()` — the parent `AsyncStatement` owns
137+
* server-side lifecycle.
138+
*/
139+
export declare class AsyncResultHandle {
140+
/** Server-issued statement id. Matches the parent `AsyncStatement`. */
141+
get statementId(): string
142+
/**
143+
* Pull the next batch of results. Returns `null` when the stream
144+
* is exhausted. Byte-identical IPC payload to the sync
145+
* `Statement.fetchNextBatch()` for the same query.
146+
*/
147+
fetchNextBatch(): Promise<ArrowBatch | null>
148+
/** Result schema as a schema-only Arrow IPC payload. */
149+
schema(): Promise<ArrowSchema>
150+
}
104151
/**
105152
* Returns the native binding's crate version (`CARGO_PKG_VERSION`).
106153
*
@@ -131,6 +178,14 @@ export declare class Connection {
131178
* `serializeQueryTags` wire shape).
132179
*/
133180
executeStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise<Statement>
181+
/**
182+
* Submit a SQL statement asynchronously and return an
183+
* `AsyncStatement` handle. The kernel `Statement::submit()` sends
184+
* `wait_timeout=0s`, so the server returns immediately with a
185+
* statement_id while the query is still Pending/Running. Drive
186+
* polling via the returned handle.
187+
*/
188+
submitStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise<AsyncStatement>
134189
/**
135190
* Explicit close. Marks the connection wrapper as closed so
136191
* subsequent calls on this `Connection` return `InvalidArg`, then
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Copyright (c) 2026 Databricks, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* End-to-end tests for the SEA submit/await_result async-consumption
17+
* path through the napi binding.
18+
*
19+
* Path under test:
20+
* 1. `binding.openSession(...)` — kernel `Session::open()`
21+
* 2. `connection.submitStatement(sql)` — kernel `Statement::submit()`
22+
* (wait_timeout=0s, server returns Pending/Running with a
23+
* statement_id)
24+
* 3. `asyncStmt.status()` / `.awaitResult()` — kernel `status()` /
25+
* `await_result()`
26+
* 4. `asyncResult.fetchNextBatch()` — kernel `ResultStream::next_batch()`
27+
*
28+
* The kernel's `AwaitResultCancelGuard` covers drop-cancel safety;
29+
* the `cancel-while-pending` test exercises explicit cancel mid-poll
30+
* by racing `asyncStmt.cancel()` against `asyncStmt.awaitResult()`.
31+
*
32+
* Calls the napi binding directly (same pattern as
33+
* `operation-lifecycle-e2e.test.ts`) — the higher-level
34+
* `DBSQLOperation` async-mode integration (matching Thrift's
35+
* `IDBSQLOperation` polling-mode surface) is a follow-on. This test
36+
* proves the kernel → napi → JS shape works end-to-end.
37+
*
38+
* Skipped when `DATABRICKS_PECOTESTING_*` env vars are absent.
39+
*/
40+
41+
import { expect } from 'chai';
42+
import { tableFromIPC } from 'apache-arrow';
43+
44+
interface NativeBinding {
45+
openSession(opts: {
46+
hostName: string;
47+
httpPath: string;
48+
token: string;
49+
}): Promise<NativeConnection>;
50+
}
51+
52+
interface NativeConnection {
53+
submitStatement(sql: string): Promise<NativeAsyncStatement>;
54+
close(): Promise<void>;
55+
}
56+
57+
interface NativeAsyncStatement {
58+
readonly statementId: string;
59+
status(): Promise<string>;
60+
awaitResult(): Promise<NativeAsyncResultHandle>;
61+
cancel(): Promise<void>;
62+
close(): Promise<void>;
63+
}
64+
65+
interface NativeAsyncResultHandle {
66+
readonly statementId: string;
67+
fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null>;
68+
schema(): Promise<{ ipcBytes: Buffer }>;
69+
}
70+
71+
describe('SEA async execute — submit / status / awaitResult / cancel', function suite() {
72+
this.timeout(180_000);
73+
74+
const hostName =
75+
process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME || process.env.E2E_HOST;
76+
const httpPath =
77+
process.env.DATABRICKS_PECOTESTING_HTTP_PATH || process.env.E2E_PATH;
78+
const token =
79+
process.env.DATABRICKS_PECOTESTING_TOKEN || process.env.E2E_ACCESS_TOKEN;
80+
81+
before(function gate() {
82+
if (!hostName || !httpPath || !token) {
83+
// eslint-disable-next-line no-invalid-this
84+
this.skip();
85+
}
86+
});
87+
88+
/**
89+
* Lazy-load the native binding so the test file is requirable in
90+
* environments where the `.node` artifact isn't built yet — the
91+
* `before()` gate will skip the suite before we touch the binding.
92+
* Loading at `require`-time would crash test discovery.
93+
*/
94+
function loadBinding(): NativeBinding {
95+
// eslint-disable-next-line global-require, @typescript-eslint/no-var-requires
96+
return require('../../../native/sea/index.js') as NativeBinding;
97+
}
98+
99+
it('submit returns immediately with a statement_id; awaitResult drains', async () => {
100+
const binding = loadBinding();
101+
const connection = await binding.openSession({
102+
hostName: hostName as string,
103+
httpPath: httpPath as string,
104+
token: token as string,
105+
});
106+
107+
let asyncStmt: NativeAsyncStatement | null = null;
108+
try {
109+
asyncStmt = await connection.submitStatement('SELECT * FROM range(0, 100)');
110+
expect(asyncStmt).to.be.an('object');
111+
expect(asyncStmt.statementId).to.be.a('string').and.to.have.length.greaterThan(0);
112+
113+
// Block on the server-side terminal state. The kernel's
114+
// internal polling handles backoff and the drop-cancel guard.
115+
const result = await asyncStmt.awaitResult();
116+
expect(result.statementId).to.equal(asyncStmt.statementId);
117+
118+
// Drain the full result and assert row count.
119+
let totalRows = 0;
120+
// eslint-disable-next-line no-constant-condition
121+
while (true) {
122+
// eslint-disable-next-line no-await-in-loop
123+
const envelope = await result.fetchNextBatch();
124+
if (envelope === null) {
125+
break;
126+
}
127+
const table = tableFromIPC(envelope.ipcBytes);
128+
totalRows += table.numRows;
129+
}
130+
expect(totalRows).to.equal(100);
131+
} finally {
132+
if (asyncStmt !== null) {
133+
try {
134+
await asyncStmt.close();
135+
} catch (_) {
136+
// best-effort cleanup
137+
}
138+
}
139+
await connection.close();
140+
}
141+
});
142+
143+
it('status() returns a string variant from the kernel StatementStatus enum', async () => {
144+
const binding = loadBinding();
145+
const connection = await binding.openSession({
146+
hostName: hostName as string,
147+
httpPath: httpPath as string,
148+
token: token as string,
149+
});
150+
151+
let asyncStmt: NativeAsyncStatement | null = null;
152+
try {
153+
asyncStmt = await connection.submitStatement('SELECT * FROM range(0, 100)');
154+
const status = await asyncStmt.status();
155+
expect(status).to.be.a('string');
156+
expect(['Pending', 'Running', 'Succeeded', 'Closed']).to.include(
157+
status,
158+
`unexpected status: ${status}`,
159+
);
160+
161+
// Drain via awaitResult to release server-side resources.
162+
await asyncStmt.awaitResult();
163+
} finally {
164+
if (asyncStmt !== null) {
165+
try {
166+
await asyncStmt.close();
167+
} catch (_) {
168+
// best-effort cleanup
169+
}
170+
}
171+
await connection.close();
172+
}
173+
});
174+
175+
it('cancel() against a still-pending async statement completes quickly', async () => {
176+
const binding = loadBinding();
177+
const connection = await binding.openSession({
178+
hostName: hostName as string,
179+
httpPath: httpPath as string,
180+
token: token as string,
181+
});
182+
183+
let asyncStmt: NativeAsyncStatement | null = null;
184+
try {
185+
// Large enough query that the server will not have finished by
186+
// the time we issue cancel. `range(0, 100_000_000)` was used in
187+
// the existing sync cancel test for the same reason.
188+
asyncStmt = await connection.submitStatement(
189+
'SELECT * FROM range(0, 100000000)',
190+
);
191+
expect(asyncStmt.statementId).to.have.length.greaterThan(0);
192+
193+
const t0 = Date.now();
194+
await asyncStmt.cancel();
195+
const elapsed = Date.now() - t0;
196+
// cancel should not block on completion of the underlying query;
197+
// it just sends a CancelStatement and returns. Allow a generous
198+
// budget for wire latency.
199+
expect(elapsed).to.be.lessThan(2000, `cancel latency ${elapsed}ms`);
200+
} finally {
201+
if (asyncStmt !== null) {
202+
try {
203+
await asyncStmt.close();
204+
} catch (_) {
205+
// best-effort cleanup; cancelled statements may surface a close error
206+
}
207+
}
208+
await connection.close();
209+
}
210+
});
211+
});

0 commit comments

Comments
 (0)