diff --git a/lib/DBSQLParameter.ts b/lib/DBSQLParameter.ts index 5e7f0abc..786db676 100644 --- a/lib/DBSQLParameter.ts +++ b/lib/DBSQLParameter.ts @@ -8,6 +8,14 @@ export enum DBSQLParameterType { STRING = 'STRING', DATE = 'DATE', TIMESTAMP = 'TIMESTAMP', + // Timezone-explicit timestamp variants. A bare `Date` value defaults to + // `TIMESTAMP`; set one of these explicitly to bind a TIMESTAMP_NTZ (no + // timezone, wall-clock) or TIMESTAMP_LTZ (local timezone) parameter. These + // are SEA-path types the kernel param codec accepts; the Thrift wire only + // has `TIMESTAMP`, so on the Thrift backend they degrade to a plain + // TIMESTAMP bind. + TIMESTAMP_NTZ = 'TIMESTAMP_NTZ', + TIMESTAMP_LTZ = 'TIMESTAMP_LTZ', FLOAT = 'FLOAT', DECIMAL = 'DECIMAL', DOUBLE = 'DOUBLE', diff --git a/lib/contracts/IDBSQLSession.ts b/lib/contracts/IDBSQLSession.ts index 392f3108..fd0dda16 100644 --- a/lib/contracts/IDBSQLSession.ts +++ b/lib/contracts/IDBSQLSession.ts @@ -27,6 +27,18 @@ export type ExecuteStatementOptions = { * These tags apply only to this statement and do not persist across queries. */ queryTags?: Record; + /** + * SEA-only: server-side row cap for this statement (kernel `row_limit`). The + * Thrift backend has no execute-time server cap, so this is a no-op there; + * use `maxRows` for the cross-backend client-side fetch limit. + */ + rowLimit?: number; + /** + * SEA-only: per-statement Spark conf overlay (kernel `statement_conf`). + * Merged with the serialized `queryTags` (which land under the reserved + * `query_tags` key). Ignored by the Thrift backend. + */ + statementConf?: Record; }; export type TypeInfoRequest = { diff --git a/lib/contracts/InternalConnectionOptions.ts b/lib/contracts/InternalConnectionOptions.ts index a115aa47..24575984 100644 --- a/lib/contracts/InternalConnectionOptions.ts +++ b/lib/contracts/InternalConnectionOptions.ts @@ -18,4 +18,27 @@ export interface InternalConnectionOptions { * @internal Not stable; M0 stub only. */ useSEA?: boolean; + + /** + * SEA-only: kernel connection-pool size (`ConnectionOptions.max_connections`). + * Validated as a positive integer within the napi `u32` range. + * @internal SEA path only. + */ + maxConnections?: number; + + /** + * SEA-only: verify the server's TLS certificate. Secure-by-default — omit + * to keep full chain + hostname verification; set `false` only to opt into + * the insecure accept-anything mode. + * @internal SEA path only. + */ + checkServerCertificate?: boolean; + + /** + * SEA-only: PEM-encoded CA certificate (string or `Buffer`) added to the + * trust store on top of the system roots — for TLS-inspecting proxies or + * on-prem internal CAs. Honoured regardless of `checkServerCertificate`. + * @internal SEA path only. + */ + customCaCert?: Buffer | string; } diff --git a/lib/sea/SeaAuth.ts b/lib/sea/SeaAuth.ts index bdfabf3d..3dd62cef 100644 --- a/lib/sea/SeaAuth.ts +++ b/lib/sea/SeaAuth.ts @@ -13,6 +13,7 @@ // limitations under the License. import { ConnectionOptions } from '../contracts/IDBSQLClient'; +import { InternalConnectionOptions } from '../contracts/InternalConnectionOptions'; import AuthenticationError from '../errors/AuthenticationError'; import HiveDriverError from '../errors/HiveDriverError'; @@ -66,9 +67,58 @@ export interface SeaSessionDefaults { catalog?: string; schema?: string; sessionConf?: Record; + /** + * Render `INTERVAL` / `DURATION` result columns as strings + * (kernel `ResultConfig.intervals_as_string`). The kernel default is + * native Arrow `month_interval` / `duration[us]`, but the NodeJS + * Thrift driver surfaces intervals as strings — so the SEA path sets + * this `true` so its result shape is a byte-compatible drop-in for the + * Thrift backend. Omitting it falls back to the kernel's native types. + */ + intervalsAsString?: boolean; + /** + * Render complex (`ARRAY` / `MAP` / `STRUCT` / `VARIANT`) result + * columns as JSON strings (kernel `ResultConfig.complex_types_as_json`). + * Left unset on the SEA path: native Arrow nested types already decode + * identically to the Thrift backend through the shared Arrow converter, + * so forcing JSON here would *introduce* a divergence rather than + * remove one. + */ + complexTypesAsJson?: boolean; + /** + * Per-session kernel connection-pool size + * (kernel `ConnectionOptions.max_connections`). Validated as a positive + * integer within the napi `u32` range by `buildSeaConnectionOptions`. + */ + maxConnections?: number; +} + +/** + * TLS options shared across all auth-mode variants. Mirror the napi + * binding's `ConnectionOptions.checkServerCertificate` / `.customCaCert` + * (kernel `Session::builder().tls(TlsConfig)`). + * + * The napi shape takes `customCaCert` as a `Buffer` only; the public + * `ConnectionOptions` additionally accepts a PEM string, which + * `buildSeaConnectionOptions` normalises to a `Buffer` before crossing + * the FFI boundary. + */ +export interface SeaTlsOptions { + /** + * Verify the server's TLS certificate. The SEA backend is + * **secure-by-default**: omitting this leaves the kernel default of + * `true` (full chain + hostname verification). Set `false` only to opt + * into the insecure, accept-anything mode (analogous to Thrift's + * `rejectUnauthorized: false`); prefer pairing strict checking with + * `customCaCert` over disabling verification entirely. + */ + checkServerCertificate?: boolean; + /** PEM-encoded CA bytes to add to the trust store. */ + customCaCert?: Buffer; } export type SeaNativeConnectionOptions = SeaSessionDefaults & + SeaTlsOptions & ( | { hostName: string; @@ -114,6 +164,64 @@ export function isBlankOrReserved(s: string): boolean { return normalized.length === 0 || normalized === 'undefined' || normalized === 'null'; } +/** napi-rs marshals `maxConnections` as a `u32`; reject values it can't hold. */ +const MAX_U32 = 0xffffffff; + +/** + * Normalise the public TLS options (`checkServerCertificate` / + * `customCaCert`) into the napi shape. + * + * - `checkServerCertificate` passes through verbatim (only when set; an + * absent value leaves the kernel default, which is secure — verify on). + * - `customCaCert` accepts a PEM string or `Buffer` on the public + * surface; we convert a string to a `Buffer` here and do a light PEM + * sanity check. The bytes are NOT parsed in JS — the kernel returns a + * meaningful error if the PEM is malformed. + * + * Throws `HiveDriverError` when `customCaCert` is supplied but empty or + * (for strings) lacks a PEM certificate header. + */ +export function buildSeaTlsOptions(options: ConnectionOptions): SeaTlsOptions { + // Read the SEA-only fields through the purpose-built internal options type + // rather than an ad-hoc inline cast, so the shape can't silently drift from + // its declaration and a typo'd key fails to compile. + const { checkServerCertificate, customCaCert } = options as ConnectionOptions & InternalConnectionOptions; + + const tls: SeaTlsOptions = {}; + + if (checkServerCertificate !== undefined) { + tls.checkServerCertificate = checkServerCertificate; + } + + if (customCaCert !== undefined) { + if (typeof customCaCert === 'string') { + // Light PEM sanity check — require both the BEGIN and END markers so a + // truncated/headerless cert is rejected here rather than surfacing as an + // opaque kernel TLS error. Full parsing is deferred to the kernel. + if ( + !customCaCert.includes('-----BEGIN CERTIFICATE-----') || + !customCaCert.includes('-----END CERTIFICATE-----') + ) { + throw new HiveDriverError( + 'SEA backend: `customCaCert` string does not look like a PEM certificate ' + + "(missing the '-----BEGIN CERTIFICATE-----' / '-----END CERTIFICATE-----' markers). " + + 'Pass PEM text or a Buffer of PEM bytes.', + ); + } + tls.customCaCert = Buffer.from(customCaCert, 'utf8'); + } else if (Buffer.isBuffer(customCaCert)) { + if (customCaCert.length === 0) { + throw new HiveDriverError('SEA backend: `customCaCert` Buffer is empty.'); + } + tls.customCaCert = customCaCert; + } else { + throw new HiveDriverError('SEA backend: `customCaCert` must be a PEM string or a Buffer.'); + } + } + + return tls; +} + /** * Validate the user-supplied `ConnectionOptions` and build the * napi-binding's connection-options shape. @@ -170,11 +278,43 @@ export function isBlankOrReserved(s: string): boolean { export function buildSeaConnectionOptions(options: ConnectionOptions): SeaNativeConnectionOptions { const { authType } = options as { authType?: string }; - const base = { + const base: { + hostName: string; + httpPath: string; + intervalsAsString: boolean; + maxConnections?: number; + } & SeaTlsOptions = { hostName: options.host, httpPath: prependSlash(options.path), + // Match the NodeJS Thrift driver, which surfaces INTERVAL columns as + // strings. The kernel defaults to native Arrow interval/duration types; + // forcing the string rendering here keeps the SEA path a byte-compatible + // drop-in. Complex types are intentionally left at the kernel default + // (native Arrow) — they already decode identically to Thrift via the + // shared Arrow converter, so `complexTypesAsJson` is not forced on. + intervalsAsString: true, + // TLS knobs (server-cert verification toggle + custom CA). Validated and + // normalised (string PEM → Buffer) here so the napi shape only sees a Buffer. + ...buildSeaTlsOptions(options), }; + // SEA-only pool sizing; read via cast to match how this function reads the + // other SEA-specific options (TLS) — they live on the internal options + // surface, not the published public `ConnectionOptions` `.d.ts`. + const { maxConnections } = options as ConnectionOptions & InternalConnectionOptions; + if (maxConnections !== undefined) { + if (!Number.isInteger(maxConnections) || maxConnections < 1) { + throw new HiveDriverError(`SEA backend: \`maxConnections\` must be a positive integer; got ${maxConnections}.`); + } + if (maxConnections > MAX_U32) { + throw new HiveDriverError( + `SEA backend: \`maxConnections\` exceeds the napi u32 limit (${MAX_U32}); got ${maxConnections}. ` + + 'Typical pool sizes are 10-500.', + ); + } + base.maxConnections = maxConnections; + } + const oauth = options as { oauthClientId?: string; oauthClientSecret?: string; diff --git a/lib/sea/SeaNativeLoader.ts b/lib/sea/SeaNativeLoader.ts index 8eb36f6a..75004e51 100644 --- a/lib/sea/SeaNativeLoader.ts +++ b/lib/sea/SeaNativeLoader.ts @@ -36,6 +36,8 @@ import type { ExecuteOptions as NativeExecuteOptions, TypedValueInput as NativeTypedValueInput, NamedTypedValueInput as NativeNamedTypedValueInput, + AsyncStatement as NativeAsyncStatement, + AsyncResultHandle as NativeAsyncResultHandle, } from '../../native/sea'; // SEA-prefixed re-exports. The kernel-generated `.d.ts` keeps the @@ -59,6 +61,14 @@ export type SeaNativeExecuteOptions = NativeExecuteOptions; export type SeaNativeTypedValueInput = NativeTypedValueInput; export type SeaNativeNamedTypedValueInput = NativeNamedTypedValueInput; +// Async-submit surface: `Connection.submitStatement` returns an +// `AsyncStatement` (status / awaitResult / cancel / close); `awaitResult` +// yields an `AsyncResultHandle` whose `fetchNextBatch` / `schema` match the +// blocking `Statement`'s fetch surface, so the results pipeline consumes +// either interchangeably. +export type SeaNativeAsyncStatement = NativeAsyncStatement; +export type SeaNativeAsyncResultHandle = NativeAsyncResultHandle; + /** * The full native binding surface, derived from the generated module * so it can never drift from the `.d.ts` contract: when the kernel diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts index eb002870..0831d9ea 100644 --- a/lib/sea/SeaOperationBackend.ts +++ b/lib/sea/SeaOperationBackend.ts @@ -45,12 +45,13 @@ import IClientContext from '../contracts/IClientContext'; import { LogLevel } from '../contracts/IDBSQLLogger'; import Status from '../dto/Status'; import HiveDriverError from '../errors/HiveDriverError'; +import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; import ArrowResultConverter from '../result/ArrowResultConverter'; import ResultSlicer from '../result/ResultSlicer'; import SeaResultsProvider from './SeaResultsProvider'; import { arrowSchemaToThriftSchema, decodeIpcSchema, patchIpcBytes } from './SeaArrowIpc'; import { decodeNapiKernelError } from './SeaErrorMapping'; -import { SeaStatement } from './SeaNativeLoader'; +import { SeaStatement, SeaNativeAsyncStatement, SeaNativeAsyncResultHandle } from './SeaNativeLoader'; import { SeaStatementHandle, SeaOperationLifecycleState, @@ -71,23 +72,78 @@ import { export type SeaOperationStatement = SeaStatementHandle & Partial; /** - * Constructor options for `SeaOperationBackend`. + * The fetch surface shared by the blocking metadata `Statement` and the async + * query path's `AsyncResultHandle` (from `awaitResult()`): both expose + * `fetchNextBatch()` + a synchronous `schema()`, so the results pipeline + * (`SeaResultsProvider` → `ArrowResultConverter` → `ResultSlicer`) consumes + * either interchangeably. + */ +type SeaFetchHandle = Pick; + +/** Poll cadence for the async `status()` loop — matches the Thrift backend's 100ms. */ +const STATUS_POLL_INTERVAL_MS = 100; + +function delay(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +/** + * Map a kernel `AsyncStatement.status()` string to the backend-neutral + * `OperationState`. The kernel variant names (`Pending` / `Running` / + * `Succeeded` / `Failed` / `Cancelled` / `Closed` / `Unknown`) line up 1:1 + * with the enum; `Canceled` (one-L spelling) is mapped defensively, and any + * unrecognised value collapses to `Unknown`. + */ +function statusStringToOperationState(state: string): OperationState { + if (state === 'Canceled') { + return OperationState.Cancelled; + } + if ((Object.values(OperationState) as string[]).includes(state)) { + return state as OperationState; + } + return OperationState.Unknown; +} + +/** + * Constructor options for `SeaOperationBackend`. Exactly one of + * `asyncStatement` (query path — `Connection.submitStatement`) or `statement` + * (metadata path — `Connection.list*` / `get*`, already terminal) must be set. */ export interface SeaOperationBackendOptions { - /** The opaque napi `Statement` handle returned by `Connection.executeStatement(...)`. */ - statement: SeaOperationStatement; + /** The pending napi `AsyncStatement` from `Connection.submitStatement(...)`. */ + asyncStatement?: SeaNativeAsyncStatement; + /** The terminal napi `Statement` from a metadata call. */ + statement?: SeaOperationStatement; context: IClientContext; /** - * Optional override for `id`. When not provided a fresh UUIDv4 is - * generated upstream (in `SeaSessionBackend.executeStatement`); the - * kernel does not yet surface its internal statement-id at the napi - * boundary. Once it does, the JS layer can thread it through here. + * Optional override for `id`. Defaults to the napi statement-id when the + * handle exposes one, else a fresh UUIDv4. */ id?: string; + /** + * Client-side query timeout in whole seconds (the public `queryTimeout`). + * The kernel ignores `queryTimeoutSecs` on the async submit path + * (`submitStatement` always sends `wait_timeout=0s`), so the JS poll loop + * enforces it as a deadline — on expiry it best-effort cancels the statement + * and throws `OperationStateError(Timeout)`, matching the Thrift path's + * server-side TIMEDOUT outcome. Omitted ⇒ no client-side deadline. + */ + queryTimeoutSecs?: number; } export default class SeaOperationBackend implements IOperationBackend { - private readonly statement: SeaOperationStatement; + // Query path: pending async statement we poll to terminal. Undefined on the + // metadata path. + private readonly asyncStatement?: SeaNativeAsyncStatement; + + // Metadata path: terminal statement. Undefined on the query path. + private readonly blockingStatement?: SeaOperationStatement; + + // The cancel/close surface — whichever handle backs this operation. Both + // `AsyncStatement` and `Statement` expose `cancel()` / `close()`. + private readonly lifecycleHandle: SeaStatementHandle; private readonly context: IClientContext; @@ -103,10 +159,25 @@ export default class SeaOperationBackend implements IOperationBackend { private metadataPromise?: Promise; - constructor({ statement, context, id }: SeaOperationBackendOptions) { - this.statement = statement; + // Memoised fetch handle: on the async path it is `awaitResult()`'s result + // (resolved once the statement is terminal); on the metadata path it is the + // already-terminal statement. Drives both fetch and result-metadata. + private fetchHandlePromise?: Promise; + + // Client-side query-timeout deadline in ms (the public `queryTimeout`), + // undefined when unset. Enforced in the async poll loop. + private readonly queryTimeoutMs?: number; + + constructor({ asyncStatement, statement, context, id, queryTimeoutSecs }: SeaOperationBackendOptions) { + if ((asyncStatement === undefined) === (statement === undefined)) { + throw new HiveDriverError('SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided'); + } + this.asyncStatement = asyncStatement; + this.blockingStatement = statement; + this.lifecycleHandle = (asyncStatement ?? statement) as SeaStatementHandle; this.context = context; - this._id = id ?? uuidv4(); + this._id = id ?? asyncStatement?.statementId ?? statement?.statementId ?? uuidv4(); + this.queryTimeoutMs = queryTimeoutSecs !== undefined && queryTimeoutSecs > 0 ? queryTimeoutSecs * 1000 : undefined; } public get id(): string { @@ -162,7 +233,7 @@ export default class SeaOperationBackend implements IOperationBackend { // wedged, so nothing downstream forces another close). We still don't // mask the original fetch error, but log the close failure at warn so // the leak is diagnosable rather than completely invisible. - await seaClose(this.lifecycle, this.statement, this.context, this._id).catch((closeErr) => { + await seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id).catch((closeErr) => { const cause = closeErr instanceof Error ? closeErr.message : String(closeErr); this.context .getLogger() @@ -191,12 +262,16 @@ export default class SeaOperationBackend implements IOperationBackend { return this.metadataPromise; } this.metadataPromise = (async () => { - if (!this.statement.schema) { - throw new HiveDriverError('SeaOperationBackend: statement.schema() is not available on this handle'); + // The schema lives on the fetch handle: the metadata `Statement` + // directly, or the async path's `AsyncResultHandle` (materialised by + // `getFetchHandle()` once the statement is terminal). + const handle = await this.getFetchHandle(); + if (!handle.schema) { + throw new HiveDriverError('SeaOperationBackend: schema() is not available on this handle'); } // `schema()` is a synchronous napi getter (returns `ArrowSchema`, not a // Promise) — no `await` needed. - const arrowSchemaIpc = this.statement.schema(); + const arrowSchemaIpc = handle.schema(); const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes); // `ResultMetadata.schema` keeps the Thrift `TTableSchema` shape for // back-compat with the public `IOperation.getSchema()` surface. @@ -229,60 +304,166 @@ export default class SeaOperationBackend implements IOperationBackend { // --------------------------------------------------------------------------- public async status(_progress: boolean): Promise { - // Synthesised — the kernel resolves `Statement::execute().await` before - // it hands back a Statement handle, so by the time a SeaOperationBackend - // exists the statement is terminal. Note there is intentionally no - // `Failed` arm: a failed execution rejects inside `executeStatement` - // (the kernel surfaces the error at submit), so a `Failed` statement - // never becomes a SeaOperationBackend — `status()` only ever observes - // Succeeded, or Cancelled/Closed from a client-side lifecycle call. - // Report Cancelled/Closed if the lifecycle flag is set, else Succeeded. - // Returns the backend-neutral OperationStatus the IOperationBackend - // contract expects, so the DBSQLOperation facade switches on `state` - // identically across backends. + // A client-side cancel/close wins over any server state. if (this.lifecycle.isCancelled) { return { state: OperationState.Cancelled, hasResultSet: true }; } if (this.lifecycle.isClosed) { return { state: OperationState.Closed, hasResultSet: true }; } + if (this.asyncStatement) { + // Query path: report the real kernel state (single GetStatementStatus + // RPC — no polling here; `waitUntilReady` owns the poll loop). + const state = statusStringToOperationState(await this.asyncStatement.status()); + return { state, hasResultSet: true }; + } + // Metadata path: the kernel statement is already terminal. return { state: OperationState.Succeeded, hasResultSet: true }; } public async waitUntilReady(options?: IOperationBackendWaitOptions): Promise { - // Kernel's `Statement::execute().await` has already resolved by the - // time we hold a Statement handle — there is no pending/running - // state to poll for M0. seaFinished fires the progress callback - // once with a synthesised FINISHED response so progress-UI callers - // see the same one-shot completion tick the Thrift path emits at - // the end of its polling loop. + if (this.asyncStatement) { + return this.waitUntilReadyAsync(options); + } + // Metadata path: the kernel statement has already resolved, so there is + // nothing to poll. seaFinished fires the progress callback once with a + // synthesised completion tick, matching the Thrift path's final tick. return seaFinished(this.lifecycle, options); } public async cancel(): Promise { - return seaCancel(this.lifecycle, this.statement, this.context, this._id); + return seaCancel(this.lifecycle, this.lifecycleHandle, this.context, this._id); } public async close(): Promise { - return seaClose(this.lifecycle, this.statement, this.context, this._id); + return seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id); } // --------------------------------------------------------------------------- // Internals. // --------------------------------------------------------------------------- + /** + * Poll the kernel `AsyncStatement` to a terminal state on a fixed 100ms + * cadence, mirroring the Thrift backend's `waitUntilReady` loop. We poll + * `status()` (a cheap GetStatementStatus RPC) rather than awaiting + * `awaitResult()` directly so that `status()` reports the real + * Pending/Running/Succeeded state to a progress callback each tick, and so a + * JS-initiated `cancel()`/`close()` is observed between ticks via + * `failIfNotActive`. On success it materialises the result handle (so the + * first fetch is free); on a server-driven terminal state it throws the typed + * error the `IOperationBackend` contract requires. + * + * Terminal errors are thrown as `OperationStateError` (NOT plain + * `HiveDriverError`) for Cancelled/Closed/Unknown, because the DBSQLOperation + * facade only mirrors its `cancelled`/`closed` flags when + * `err instanceof OperationStateError` — exactly as the Thrift backend does. + * The Failed branch surfaces the kernel's typed SQL-error envelope via + * `awaitResult()`. + */ + private async waitUntilReadyAsync(options?: IOperationBackendWaitOptions): Promise { + // Already materialised → terminal-and-ready, nothing to wait for. + if (this.fetchHandlePromise) { + return; + } + // Client-side timeout deadline: the kernel ignores queryTimeoutSecs on the + // async submit path, so we enforce the public `queryTimeout` here. + const deadline = this.queryTimeoutMs !== undefined ? Date.now() + this.queryTimeoutMs : undefined; + for (;;) { + // A JS-initiated cancel/close short-circuits before the next poll. + failIfNotActive(this.lifecycle); + + // eslint-disable-next-line no-await-in-loop + const state = statusStringToOperationState(await this.asyncStatement!.status()); + + if (options?.callback) { + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(options.callback({ state, hasResultSet: true })); + } + + switch (state) { + case OperationState.Pending: + case OperationState.Running: + break; + case OperationState.Succeeded: + // Materialise the result stream now so the first fetch/metadata call + // doesn't pay an extra await_result round-trip. + // eslint-disable-next-line no-await-in-loop + await this.getFetchHandle(); + return; + case OperationState.Failed: + // `status()` collapses Failed to the variant name only; the real + // SQL-error envelope (sql_state / error_code / query_id) rides on + // `awaitResult()`'s rejection — drive it to surface the typed error. + // eslint-disable-next-line no-await-in-loop + await this.throwAsyncError(); + break; + case OperationState.Cancelled: + throw new OperationStateError(OperationStateErrorCode.Canceled); + case OperationState.Closed: + throw new OperationStateError(OperationStateErrorCode.Closed); + default: + throw new OperationStateError(OperationStateErrorCode.Unknown); + } + + // Still Pending/Running — enforce the client-side timeout before sleeping. + if (deadline !== undefined && Date.now() >= deadline) { + // Best-effort server-side cancel so the statement doesn't keep running + // after we stop waiting; never mask the timeout with a cancel failure. + // eslint-disable-next-line no-await-in-loop + await this.cancel().catch(() => undefined); + throw new OperationStateError(OperationStateErrorCode.Timeout); + } + + // eslint-disable-next-line no-await-in-loop + await delay(STATUS_POLL_INTERVAL_MS); + } + } + + /** + * Drive `awaitResult()` on a Failed statement to surface the kernel's typed + * SQL-error envelope. Falls back to a generic error if `awaitResult()` + * unexpectedly resolves instead of rejecting. + */ + private async throwAsyncError(): Promise { + try { + await this.asyncStatement!.awaitResult(); + } catch (err) { + throw decodeNapiKernelError(err); + } + throw new HiveDriverError(`SEA operation ${this._id} reported Failed but produced a result.`); + } + + /** + * Resolve (and memoise) the fetch handle: `awaitResult()`'s `AsyncResultHandle` + * on the query path, or the already-terminal `Statement` on the metadata path. + */ + private getFetchHandle(): Promise { + if (!this.fetchHandlePromise) { + if (this.asyncStatement) { + this.fetchHandlePromise = this.asyncStatement.awaitResult().catch((err) => { + throw decodeNapiKernelError(err); + }) as Promise; + } else { + const stmt = this.blockingStatement!; + if (!stmt.fetchNextBatch) { + throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle'); + } + this.fetchHandlePromise = Promise.resolve(stmt as unknown as SeaFetchHandle); + } + } + return this.fetchHandlePromise; + } + private async getResultSlicer(): Promise> { if (this.resultSlicer) { return this.resultSlicer; } - if (!this.statement.fetchNextBatch) { - throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle'); - } const metadata = await this.getResultMetadata(); - // The lifecycle subset has cancel/close only; fetch methods exist on - // the full napi Statement. Cast is safe here because we've just - // verified `fetchNextBatch` is callable. - this.resultsProvider = new SeaResultsProvider(this.statement as SeaStatement); + const handle = await this.getFetchHandle(); + // SeaResultsProvider consumes only `fetchNextBatch`; both the async result + // handle and the blocking statement satisfy that surface. + this.resultsProvider = new SeaResultsProvider(handle as unknown as SeaStatement); const converter = new ArrowResultConverter(this.context, this.resultsProvider, metadata); this.resultSlicer = new ResultSlicer(this.context, converter); return this.resultSlicer; diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index e12e1d60..ad95ec11 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -38,6 +38,7 @@ import { decodeNapiKernelError } from './SeaErrorMapping'; import SeaOperationBackend from './SeaOperationBackend'; import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams'; import { seaServerInfoValue } from './SeaServerInfo'; +import { serializeQueryTags } from '../utils'; export interface SeaSessionBackendOptions { /** The opaque napi `Connection` handle returned by `openSession`. */ @@ -116,51 +117,30 @@ export default class SeaSessionBackend implements ISessionBackend { /** * Execute a SQL statement through the napi binding. * - * Catalog / schema / sessionConf were applied at session open, so - * there are no per-statement options to thread through. + * Catalog / schema / sessionConf are session-level (applied at open). + * Per-statement options forwarded to the kernel `ExecuteOptions`: + * - `ordinalParameters` / `namedParameters` → bound params (mutually + * exclusive — the kernel binds one placeholder style per statement); + * - `queryTimeout` → `queryTimeoutSecs` (SEA server wait timeout); + * - `rowLimit` → `rowLimit` (SEA-only server-side row cap); + * - `queryTags` → serialised into the conf overlay's reserved + * `query_tags` key (the same wire shape Thrift's `serializeQueryTags` + * produces), merged with any explicit `statementConf`. * - * M0 intentionally rejects `queryTimeout`, `namedParameters`, and - * `ordinalParameters` with explicit deferred-to-M1 errors. `useCloudFetch` - * is a no-op on the SEA path — the kernel hardcodes the SEA - * `disposition` to `INLINE_OR_EXTERNAL_LINKS`, and per-statement - * conf overrides have no reader on the kernel; cloud-fetch behaviour - * is governed entirely by the kernel's `ResultConfig` (M1 binding - * surface). - * - * The Thrift backend remains the path for consumers that need any - * of those today. + * Still rejected (genuinely unsupported on SEA, rather than silently + * dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a + * per-statement knob), `useLZ4Compression` (kernel owns result compression), + * and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by + * the facade at fetch time, so it is intentionally not handled here. */ public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise { this.failIfClosed(); - // Positional (`?`) and named (`:name`) parameters are mutually exclusive — - // the kernel param codec binds exactly one placeholder style per statement. - // Use the SAME error type and message as the Thrift backend - // (`ThriftSessionBackend.getQueryParameters`) so a caller catching - // `ParameterError` for this case behaves identically across backends. - const positionalParams = buildSeaPositionalParams(options.ordinalParameters); - const namedParams = buildSeaNamedParams(options.namedParameters); - if (positionalParams !== undefined && namedParams !== undefined) { - throw new ParameterError('Driver does not support both ordinal and named parameters.'); - } - - if (options.queryTimeout !== undefined) { - throw new HiveDriverError('SEA executeStatement: queryTimeout is not supported in M0 (deferred to M1)'); - } if (options.useCloudFetch !== undefined) { throw new HiveDriverError( 'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA', ); } - // Reject — rather than silently ignore — the remaining Thrift-path - // options the SEA M0 backend does not honor. Silently dropping them - // is the worst failure mode for an agent/caller: passing e.g. - // `queryTags` or `useLZ4Compression` would no-op with zero signal. - // (`maxRows` is intentionally NOT here — the facade applies it at - // fetch time.) - if (options.queryTags !== undefined) { - throw new HiveDriverError('SEA executeStatement: queryTags is not supported in M0 (deferred to M1)'); - } if (options.useLZ4Compression !== undefined) { throw new HiveDriverError( 'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)', @@ -168,30 +148,88 @@ export default class SeaSessionBackend implements ISessionBackend { } if (options.stagingAllowedLocalPath !== undefined) { throw new HiveDriverError( - 'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported in M0 (deferred to M1)', + 'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA', ); } - // Only build the napi options object when there is something to send — - // the no-params path keeps the minimal call shape (`executeStatement(sql)`). - let execOptions: SeaNativeExecuteOptions | undefined; - if (positionalParams !== undefined || namedParams !== undefined) { - execOptions = { positionalParams, namedParams }; - } + const execOptions = this.buildExecuteOptions(options); - let nativeStatement: SeaStatement; + // Submit asynchronously (kernel `wait_timeout=0s`): the server returns a + // pending `AsyncStatement` immediately while the query runs, matching the + // Thrift backend's always-async (`runAsync: true`) path. The operation + // backend polls `status()` to terminal in `waitUntilReady()` and + // materialises results via `awaitResult()`, so a long-running query stays + // cancellable mid-flight and `status()` reports real Pending/Running states. + let asyncStatement; try { - nativeStatement = + asyncStatement = execOptions === undefined - ? await this.connection.executeStatement(statement) - : await this.connection.executeStatement(statement, execOptions); + ? await this.connection.submitStatement(statement) + : await this.connection.submitStatement(statement, execOptions); } catch (err) { throw this.logAndMapError('executeStatement', err); } - return this.wrapStatement(nativeStatement); + // `queryTimeout` is enforced client-side by the operation backend's poll + // loop: the kernel ignores `queryTimeoutSecs` on the async submit path + // (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward + // it to the napi options — passing it there would be a silent no-op. + return new SeaOperationBackend({ + asyncStatement: asyncStatement!, + context: this.context, + queryTimeoutSecs: options.queryTimeout !== undefined ? Number(options.queryTimeout) : undefined, + }); + } + + /** + * Translate the public `ExecuteStatementOptions` into the kernel napi + * `ExecuteOptions`, returning `undefined` when nothing is set so the + * no-options call shape (`executeStatement(sql)`) is preserved. + */ + private buildExecuteOptions(options: ExecuteStatementOptions): SeaNativeExecuteOptions | undefined { + // Positional (`?`) and named (`:name`) parameters are mutually exclusive — + // the kernel binds one placeholder style per statement. Use the SAME error + // type and message as the Thrift backend (`ThriftSessionBackend`) so a + // caller catching `ParameterError` behaves identically across backends. + const positionalParams = buildSeaPositionalParams(options.ordinalParameters); + const namedParams = buildSeaNamedParams(options.namedParameters); + if (positionalParams !== undefined && namedParams !== undefined) { + throw new ParameterError('Driver does not support both ordinal and named parameters.'); + } + + const execOptions: SeaNativeExecuteOptions = {}; + if (positionalParams !== undefined) { + execOptions.positionalParams = positionalParams; + } + if (namedParams !== undefined) { + execOptions.namedParams = namedParams; + } + // `queryTimeout` is intentionally NOT forwarded here — the kernel ignores + // `queryTimeoutSecs` on `submitStatement`, so it is enforced client-side by + // the operation backend's poll-loop deadline instead (see executeStatement). + if (options.rowLimit !== undefined) { + execOptions.rowLimit = Number(options.rowLimit); + } + // Per-statement conf overlay plus query tags. Tags are serialised JS-side + // into the reserved `query_tags` key (the same wire shape the Thrift + // backend produces via `serializeQueryTags` → `confOverlay`), rather than + // via the napi `queryTags` field: napi's `HashMap` can't + // represent a null-valued tag, and the kernel rejects setting both the + // `queryTags` field and a `query_tags` conf key. + const serializedQueryTags = serializeQueryTags(options.queryTags); + if (options.statementConf !== undefined || serializedQueryTags !== undefined) { + const statementConf: Record = { ...(options.statementConf ?? {}) }; + if (serializedQueryTags !== undefined) { + statementConf.query_tags = serializedQueryTags; + } + if (Object.keys(statementConf).length > 0) { + execOptions.statementConf = statementConf; + } + } + + return Object.keys(execOptions).length > 0 ? execOptions : undefined; } - /** Wrap a napi `Statement` (from execute or a metadata call) as an operation backend. */ + /** Wrap a napi metadata `Statement` (already terminal) as an operation backend. */ private wrapStatement(nativeStatement: SeaStatement): IOperationBackend { return new SeaOperationBackend({ statement: nativeStatement, diff --git a/tests/unit/sea/auth-m2m.test.ts b/tests/unit/sea/auth-m2m.test.ts index a4f90ed5..159afe1d 100644 --- a/tests/unit/sea/auth-m2m.test.ts +++ b/tests/unit/sea/auth-m2m.test.ts @@ -35,6 +35,7 @@ describe('SeaAuth + SeaBackend — OAuth M2M auth flow', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthM2m', oauthClientId: 'client-uuid', oauthClientSecret: 'dose-fake-secret', @@ -165,6 +166,7 @@ describe('SeaAuth + SeaBackend — OAuth M2M auth flow', () => { expect(calls[0].args[0]).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthM2m', oauthClientId: 'client-uuid', oauthClientSecret: 'dose-fake-secret', diff --git a/tests/unit/sea/auth-pat.test.ts b/tests/unit/sea/auth-pat.test.ts index f59b445c..bd82eb87 100644 --- a/tests/unit/sea/auth-pat.test.ts +++ b/tests/unit/sea/auth-pat.test.ts @@ -31,6 +31,7 @@ describe('SeaAuth — PAT auth options builder', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'Pat', token: 'dapi-fake-pat', }); diff --git a/tests/unit/sea/auth-u2m.test.ts b/tests/unit/sea/auth-u2m.test.ts index c8f63fef..828ca961 100644 --- a/tests/unit/sea/auth-u2m.test.ts +++ b/tests/unit/sea/auth-u2m.test.ts @@ -33,6 +33,7 @@ describe('SeaAuth + SeaBackend — OAuth U2M auth flow', () => { expect(native).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthU2m', oauthRedirectPort: 8030, }); @@ -132,6 +133,7 @@ describe('SeaAuth + SeaBackend — OAuth U2M auth flow', () => { expect(calls[0].args[0]).to.deep.equal({ hostName: 'example.cloud.databricks.com', httpPath: '/sql/1.0/warehouses/abc', + intervalsAsString: true, authMode: 'OAuthU2m', oauthRedirectPort: 8030, }); diff --git a/tests/unit/sea/connectionOptions.test.ts b/tests/unit/sea/connectionOptions.test.ts new file mode 100644 index 00000000..96e3f87a --- /dev/null +++ b/tests/unit/sea/connectionOptions.test.ts @@ -0,0 +1,110 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { expect } from 'chai'; +import { buildSeaConnectionOptions, buildSeaTlsOptions } from '../../../lib/sea/SeaAuth'; +import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import HiveDriverError from '../../../lib/errors/HiveDriverError'; + +const PAT = { host: 'h.databricks.com', path: '/sql/1.0/warehouses/abc', token: 'dapi-x' }; + +// Cast helper: the SEA connection-tuning/TLS options live on the internal +// surface, so tests build untyped option literals. +const opts = (extra: Record) => ({ ...PAT, ...extra } as unknown as ConnectionOptions); + +describe('SeaAuth connection options — intervalsAsString default', () => { + it('always sets intervalsAsString:true (thrift-compatible interval rendering)', () => { + const native = buildSeaConnectionOptions(opts({})) as { intervalsAsString?: boolean }; + expect(native.intervalsAsString).to.equal(true); + }); + + it('does NOT force complexTypesAsJson (native Arrow nested types match Thrift)', () => { + const native = buildSeaConnectionOptions(opts({})) as { complexTypesAsJson?: boolean }; + expect(native.complexTypesAsJson).to.equal(undefined); + }); +}); + +describe('SeaAuth connection options — maxConnections', () => { + it('forwards a valid positive integer', () => { + const native = buildSeaConnectionOptions(opts({ maxConnections: 10 })) as { maxConnections?: number }; + expect(native.maxConnections).to.equal(10); + }); + + it('omits maxConnections when unset', () => { + const native = buildSeaConnectionOptions(opts({})) as { maxConnections?: number }; + expect(native.maxConnections).to.equal(undefined); + }); + + for (const bad of [0, -1, 1.5]) { + it(`rejects non-positive-integer maxConnections (${bad})`, () => { + expect(() => buildSeaConnectionOptions(opts({ maxConnections: bad }))).to.throw( + HiveDriverError, + /positive integer/, + ); + }); + } + + it('rejects maxConnections beyond the u32 limit', () => { + expect(() => buildSeaConnectionOptions(opts({ maxConnections: 0x1_0000_0000 }))).to.throw( + HiveDriverError, + /u32 limit/, + ); + }); +}); + +describe('SeaAuth TLS options (buildSeaTlsOptions)', () => { + it('is empty by default (secure-by-default — kernel default verify-on)', () => { + expect(buildSeaTlsOptions(opts({}))).to.deep.equal({}); + }); + + it('passes checkServerCertificate through verbatim (including false)', () => { + expect(buildSeaTlsOptions(opts({ checkServerCertificate: false }))).to.deep.equal({ + checkServerCertificate: false, + }); + expect(buildSeaTlsOptions(opts({ checkServerCertificate: true }))).to.deep.equal({ + checkServerCertificate: true, + }); + }); + + it('normalises a PEM string to a Buffer', () => { + const pem = '-----BEGIN CERTIFICATE-----\nMIIB...\n-----END CERTIFICATE-----\n'; + const tls = buildSeaTlsOptions(opts({ customCaCert: pem })); + expect(Buffer.isBuffer(tls.customCaCert)).to.equal(true); + expect(tls.customCaCert?.toString('utf8')).to.equal(pem); + }); + + it('passes a Buffer customCaCert through unchanged', () => { + const buf = Buffer.from('-----BEGIN CERTIFICATE-----\nx\n-----END CERTIFICATE-----'); + expect(buildSeaTlsOptions(opts({ customCaCert: buf })).customCaCert).to.equal(buf); + }); + + it('rejects a non-PEM string', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: 'not-a-pem' }))).to.throw(HiveDriverError, /PEM certificate/); + }); + + it('rejects an empty Buffer', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: Buffer.alloc(0) }))).to.throw(HiveDriverError, /empty/); + }); + + it('rejects a non-string, non-Buffer customCaCert', () => { + expect(() => buildSeaTlsOptions(opts({ customCaCert: 123 }))).to.throw(HiveDriverError, /PEM string or a Buffer/); + }); + + it('folds TLS options into the full connection options', () => { + const native = buildSeaConnectionOptions(opts({ checkServerCertificate: false })) as { + checkServerCertificate?: boolean; + }; + expect(native.checkServerCertificate).to.equal(false); + }); +}); diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index fefc79f7..5b34e419 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -22,7 +22,9 @@ import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientCont import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; import HiveDriverError from '../../../lib/errors/HiveDriverError'; import ParameterError from '../../../lib/errors/ParameterError'; +import OperationStateError, { OperationStateErrorCode } from '../../../lib/errors/OperationStateError'; import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient'; +import { OperationState } from '../../../lib/contracts/OperationStatus'; // ----------------------------------------------------------------------------- // Fakes — minimal stand-ins for the napi-rs generated surface and the @@ -73,6 +75,53 @@ class FakeNativeStatement implements SeaStatement { } } +/** + * Fake `AsyncStatement` (the `submitStatement` return). `status()` reports a + * configurable state (default Succeeded); `awaitResult()` yields a fetch handle + * (reuses `FakeNativeStatement`'s fetchNextBatch/schema surface). + */ +class FakeAsyncStatement { + public cancelled = false; + + public closed = false; + + public statusCalls = 0; + + public awaitResultError: Error | null = null; + + // Successive status() returns drain this queue; the last value sticks. + private readonly states: string[]; + + public readonly statementId = '01ef-fake-async-id'; + + constructor( + statusValue: string | string[] = 'Succeeded', + public readonly resultHandle: FakeNativeStatement = new FakeNativeStatement(), + ) { + this.states = Array.isArray(statusValue) ? [...statusValue] : [statusValue]; + } + + public async status(): Promise { + this.statusCalls += 1; + return this.states.length > 1 ? (this.states.shift() as string) : this.states[0]; + } + + public async awaitResult(): Promise { + if (this.awaitResultError) { + throw this.awaitResultError; + } + return this.resultHandle; + } + + public async cancel(): Promise { + this.cancelled = true; + } + + public async close(): Promise { + this.closed = true; + } +} + class FakeNativeConnection implements SeaConnection { public closed = false; @@ -93,8 +142,14 @@ class FakeNativeConnection implements SeaConnection { // Mirrors the kernel `Connection.sessionId` getter. public readonly sessionId = '01ef-fake-session-id'; - // The merged kernel binding takes an optional per-statement `ExecuteOptions` - // (positional/named params, statementConf, …). Record it for assertions. + // Last AsyncStatement handed out by submitStatement (the query path). + public lastAsyncStatement?: FakeAsyncStatement; + + // The async submit state(s) the next FakeAsyncStatement should report. + public submitStatusValue: string | string[] = 'Succeeded'; + + // The blocking executeStatement path is no longer used by the SEA backend + // (queries go through submitStatement), but the binding still exposes it. public async executeStatement(sql: string, options?: unknown): Promise { if (this.throwOnExecute) { throw this.throwOnExecute; @@ -104,10 +159,16 @@ class FakeNativeConnection implements SeaConnection { return this.statementToReturn; } - // Async-submit path (PR 2 territory); present only so the fake satisfies - // the full `Connection` surface. Not exercised by these tests. - public submitStatement(): Promise { - throw new Error('submitStatement not used in this test'); + // Async-submit path: records sql + per-statement options (for forwarding + // assertions) and returns a pending AsyncStatement. + public async submitStatement(sql: string, options?: unknown): Promise { + if (this.throwOnExecute) { + throw this.throwOnExecute; + } + this.lastSql = sql; + this.lastOptions = options; + this.lastAsyncStatement = new FakeAsyncStatement(this.submitStatusValue); + return this.lastAsyncStatement; } private recordMetadata(method: string, args: unknown[]): Promise { @@ -312,11 +373,14 @@ describe('SeaBackend', () => { const args = binding.openSessionStub.firstCall.args[0]; // sea-auth-u2m introduced the discriminated SeaNativeConnectionOptions // shape with a leading `authMode` tag — `'Pat'` for the PAT branch. + // `intervalsAsString: true` is always set so the SEA result shape is a + // byte-compatible drop-in for the Thrift backend (interval-as-string). expect(args).to.deep.equal({ hostName: 'workspace.example', httpPath: '/sql/1.0/warehouses/xyz', authMode: 'Pat', token: 'dapi-token', + intervalsAsString: true, }); }); @@ -452,24 +516,49 @@ describe('SeaSessionBackend', () => { expect((thrown as Error).message).to.equal('Driver does not support both ordinal and named parameters.'); }); - it('executeStatement rejects queryTimeout (M1)', async () => { + it('executeStatement does NOT forward queryTimeout to submit (kernel ignores it; enforced client-side)', async () => { const connection = new FakeNativeConnection(); const session = makeSession(connection); - let thrown: unknown; - try { - await session.executeStatement('SELECT 1', { queryTimeout: 30 }); - } catch (err) { - thrown = err; - } - expect(thrown).to.be.instanceOf(HiveDriverError); - expect((thrown as Error).message).to.match(/queryTimeout/); + await session.executeStatement('SELECT 1', { queryTimeout: 30 }); + // queryTimeout alone must not produce napi submit options — the kernel + // ignores queryTimeoutSecs on submitStatement, so it's enforced client-side + // by the operation backend's poll deadline instead (covered below). + expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined); + }); + + it('executeStatement forwards rowLimit', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { rowLimit: 100 }); + expect((connection.lastOptions as { rowLimit?: number }).rowLimit).to.equal(100); }); - // These Thrift-path options are not honored on SEA M0. Rejecting them - // (rather than silently ignoring) is the contract a caller/agent needs: - // a silent no-op gives zero signal to debug. + it('executeStatement serialises queryTags into statementConf.query_tags', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { queryTags: { team: 'x', env: 'prod' } }); + const conf = (connection.lastOptions as { statementConf?: Record }).statementConf; + expect(conf).to.have.property('query_tags'); + expect(conf?.query_tags).to.contain('team:x').and.to.contain('env:prod'); + }); + + it('executeStatement merges explicit statementConf with serialised queryTags', async () => { + const connection = new FakeNativeConnection(); + const session = makeSession(connection); + await session.executeStatement('SELECT 1', { + statementConf: { 'spark.sql.ansi.enabled': 'true' }, + queryTags: { team: 'x' }, + }); + const conf = (connection.lastOptions as { statementConf?: Record }).statementConf; + expect(conf?.['spark.sql.ansi.enabled']).to.equal('true'); + expect(conf?.query_tags).to.contain('team:x'); + }); + + // Genuinely unsupported on SEA — rejected (rather than silently ignored) so + // a caller/agent gets signal instead of a no-op. queryTags / queryTimeout / + // rowLimit are NOT here — they are forwarded (asserted above). for (const { name, options, re } of [ - { name: 'queryTags', options: { queryTags: { team: 'x' } }, re: /queryTags/ }, + { name: 'useCloudFetch', options: { useCloudFetch: true }, re: /useCloudFetch/ }, { name: 'useLZ4Compression', options: { useLZ4Compression: true }, re: /useLZ4Compression/ }, { name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' }, re: /stagingAllowedLocalPath/ }, ] as const) { @@ -647,3 +736,134 @@ describe('SeaOperationBackend', () => { // tests/unit/sea/SeaOperationBackend.test.ts and the parity-gate e2e // at tests/e2e/sea/results-e2e.test.ts. }); + +describe('SeaOperationBackend — async (submitStatement) path', () => { + const makeAsyncOp = (asyncStatement: FakeAsyncStatement, queryTimeoutSecs?: number) => + // eslint-disable-next-line @typescript-eslint/no-explicit-any + new SeaOperationBackend({ asyncStatement: asyncStatement as any, context: makeContext(), queryTimeoutSecs }); + + it('rejects when neither asyncStatement nor statement is provided', () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect(() => new SeaOperationBackend({ context: makeContext() } as any)).to.throw(HiveDriverError, /exactly one/); + }); + + it('rejects when BOTH asyncStatement and statement are provided', () => { + expect( + () => + new SeaOperationBackend({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + asyncStatement: new FakeAsyncStatement() as any, + statement: new FakeNativeStatement(), + context: makeContext(), + }), + ).to.throw(HiveDriverError, /exactly one/); + }); + + it('id defaults to the async statement id', () => { + const op = makeAsyncOp(new FakeAsyncStatement()); + expect(op.id).to.equal('01ef-fake-async-id'); + }); + + it('status() reports the real kernel state', async () => { + const running = makeAsyncOp(new FakeAsyncStatement('Running')); + expect((await running.status(false)).state).to.equal(OperationState.Running); + const ok = makeAsyncOp(new FakeAsyncStatement('Succeeded')); + expect((await ok.status(false)).state).to.equal(OperationState.Succeeded); + }); + + it('waitUntilReady() polls status() until terminal, firing the progress callback each tick', async () => { + const stmt = new FakeAsyncStatement(['Pending', 'Running', 'Succeeded']); + const op = makeAsyncOp(stmt); + const states: OperationState[] = []; + await op.waitUntilReady({ callback: (s) => states.push(s.state) }); + expect(stmt.statusCalls).to.equal(3); + expect(states).to.deep.equal([OperationState.Pending, OperationState.Running, OperationState.Succeeded]); + }); + + it('waitUntilReady() surfaces the kernel error envelope on a Failed statement', async () => { + const stmt = new FakeAsyncStatement('Failed'); + // The kernel rejects awaitResult() with a sentinel-framed structured error; + // decodeNapiKernelError turns it into a typed HiveDriverError. + stmt.awaitResultError = new Error( + `__databricks_error__:${JSON.stringify({ code: 'SqlError', message: 'TABLE_OR_VIEW_NOT_FOUND' })}`, + ); + const op = makeAsyncOp(stmt); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(HiveDriverError); + expect((thrown as Error).message).to.match(/TABLE_OR_VIEW_NOT_FOUND/); + }); + + // A server-driven terminal state MUST throw OperationStateError (not a plain + // HiveDriverError) so the DBSQLOperation facade — which only mirrors its + // cancelled/closed flags when `err instanceof OperationStateError` — stays in + // sync. Asserting the subclass + errorCode is what catches a regression to + // the bare HiveDriverError (which would pass an `instanceOf HiveDriverError` + // check since OperationStateError extends it). + it('waitUntilReady() throws OperationStateError(Canceled) on a server-side Cancelled statement', async () => { + const op = makeAsyncOp(new FakeAsyncStatement('Cancelled')); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Canceled); + }); + + it('waitUntilReady() throws OperationStateError(Closed) on a server-side Closed statement', async () => { + const op = makeAsyncOp(new FakeAsyncStatement('Closed')); + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Closed); + }); + + it('waitUntilReady() enforces queryTimeout client-side: throws Timeout and cancels a stuck Running statement', async function timeoutTest() { + // eslint-disable-next-line no-invalid-this + this.timeout(5000); + const stmt = new FakeAsyncStatement('Running'); // never reaches a terminal state + const op = makeAsyncOp(stmt, 0.05); // 50ms client-side deadline + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(OperationStateError); + expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout); + // Best-effort server-side cancel fired so the statement doesn't keep running. + expect(stmt.cancelled).to.equal(true); + }); + + it('cancel() forwards to the async statement and short-circuits a subsequent poll', async () => { + const stmt = new FakeAsyncStatement(['Running', 'Running', 'Succeeded']); + const op = makeAsyncOp(stmt); + await op.cancel(); + expect(stmt.cancelled).to.equal(true); + // A JS-side cancel makes waitUntilReady fail fast without further polling. + let thrown: unknown; + try { + await op.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.an('error'); + }); + + it('close() forwards to the async statement', async () => { + const stmt = new FakeAsyncStatement(); + const op = makeAsyncOp(stmt); + await op.close(); + expect(stmt.closed).to.equal(true); + }); +}); diff --git a/tests/unit/sea/positionalParams.test.ts b/tests/unit/sea/positionalParams.test.ts index ab0f065c..a3389fcc 100644 --- a/tests/unit/sea/positionalParams.test.ts +++ b/tests/unit/sea/positionalParams.test.ts @@ -90,6 +90,18 @@ describe('SeaPositionalParams.buildSeaPositionalParams', () => { { sqlType: 'TIMESTAMP', value: '2024-01-15 10:30:00' }, ]); }); + + it('honours explicit TIMESTAMP_NTZ / TIMESTAMP_LTZ types (kernel param codec)', () => { + expect( + buildSeaPositionalParams([ + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_NTZ, value: '2024-01-15 10:30:00' }), + new DBSQLParameter({ type: DBSQLParameterType.TIMESTAMP_LTZ, value: '2024-01-15 10:30:00' }), + ]), + ).to.deep.equal([ + { sqlType: 'TIMESTAMP_NTZ', value: '2024-01-15 10:30:00' }, + { sqlType: 'TIMESTAMP_LTZ', value: '2024-01-15 10:30:00' }, + ]); + }); }); describe('SeaPositionalParams.buildSeaNamedParams', () => {