Package Exports
- @qualithm/arrow-flight-sql-js
- @qualithm/arrow-flight-sql-js/runtime
- @qualithm/arrow-flight-sql-js/transport
- @qualithm/arrow-flight-sql-js/transport/grpc-js
- @qualithm/arrow-flight-sql-js/transport/grpc-web
Readme
Arrow Flight SQL JS
Arrow Flight SQL client for JavaScript and TypeScript runtimes.
📦 Package:
@qualithm/arrow-flight-sql-js
Overview
This library provides a native JavaScript implementation for communicating with Arrow Flight SQL servers. It handles the complete protocol stack:
- gRPC/HTTP2 Transport – Standards-based communication layer
- Protocol Buffers – Full Flight SQL message serialization
- Connection Pooling – Efficient connection reuse for high throughput
- Arrow IPC Streaming – Native Arrow record batch handling
- Authentication – Bearer tokens, basic auth, and custom handlers
Design Goals
Arrow Flight SQL JS is modeled on the canonical implementations:
| Reference | What We Adopt |
|---|---|
| Java (reference impl) | Comprehensive API surface, error handling patterns |
| C++ | Streaming-first patterns, performance considerations |
| Go | Connection pooling, context/cancellation model |
We aim for API parity with the official clients where JavaScript idioms allow.
Installation
# npm
npm install @qualithm/arrow-flight-sql-js
# bun
bun add @qualithm/arrow-flight-sql-js
# pnpm
pnpm add @qualithm/arrow-flight-sql-jsQuick Start
import { FlightSqlClient } from "@qualithm/arrow-flight-sql-js"
// Create a client
const client = new FlightSqlClient({
host: "localhost",
port: 31337,
tls: true,
auth: { type: "bearer", token: "your-bearer-token" }
})
// Connect to the server
await client.connect()
// Execute a query
const result = await client.query("SELECT * FROM my_table LIMIT 100")
// Process Arrow record batches
for await (const batch of result.stream()) {
console.log(`Received ${batch.numRows} rows`)
// batch is an Arrow RecordBatch
}
// Or collect all results into a table
const table = await result.collect()
console.log(`Total rows: ${table.numRows}`)
// Clean up
client.close()Connection Pooling
import { FlightSqlPool } from "@qualithm/arrow-flight-sql-js"
// Create a pool with client and pool configuration
const pool = new FlightSqlPool(
// Client options
{
host: "localhost",
port: 31337,
tls: false,
auth: { type: "basic", username: "admin", password: "secret" }
},
// Pool options
{
minConnections: 2,
maxConnections: 10,
idleTimeoutMs: 30_000
}
)
// Initialize the pool (creates minConnections)
await pool.initialize()
// Acquire a client from the pool
const client = await pool.acquire()
try {
const result = await client.query("SELECT 1")
const table = await result.collect()
// ... process results
} finally {
// Return to pool
pool.release(client)
}
// Or use the convenience method (handles acquire/release automatically)
await pool.withConnection(async (client) => {
const result = await client.query("SELECT * FROM users")
return result.collect()
})
// Graceful shutdown
await pool.close()Real-Time Subscriptions
Subscribe to live data updates using the DoExchange bidirectional streaming protocol:
import { FlightSqlClient, SubscriptionMode } from "@qualithm/arrow-flight-sql-js"
const client = new FlightSqlClient({
host: "localhost",
port: 31337,
tls: true,
auth: { type: "bearer", token: "your-bearer-token" }
})
await client.connect()
// Subscribe to real-time updates
const subscription = client.subscribe("SELECT * FROM events WHERE status = 'pending'", {
mode: SubscriptionMode.ChangesOnly, // Full | ChangesOnly | Tail
heartbeatMs: 30_000 // Server heartbeat interval
})
// Consume batches as they arrive
for await (const batch of subscription) {
console.log(`Received ${batch.numRows} rows`)
}
// Or with cancellation
const controller = new AbortController()
const cancelableSubscription = client.subscribe(query, {
signal: controller.signal,
autoReconnect: true,
maxReconnectAttempts: 10
})
// Later: cancel the subscription
controller.abort()
// Or manually unsubscribe
await cancelableSubscription.unsubscribe()Subscription Options
| Option | Default | Description |
|---|---|---|
mode |
ChangesOnly |
Subscription mode (Full, ChangesOnly, Tail) |
heartbeatMs |
30000 |
Server heartbeat interval in milliseconds |
signal |
- | AbortSignal for cancellation |
autoReconnect |
true |
Auto-reconnect on connection loss |
maxReconnectAttempts |
10 |
Maximum reconnection attempts |
reconnectDelayMs |
1000 |
Initial reconnect delay |
maxReconnectDelayMs |
30000 |
Maximum reconnect delay (with exponential backoff) |
Low-Level DoExchange
For custom bidirectional protocols:
const exchange = client.doExchange({
type: DescriptorType.CMD,
cmd: new TextEncoder().encode("CUSTOM_COMMAND")
})
// Send data to server
await exchange.send({
dataHeader: new Uint8Array(),
dataBody: new Uint8Array(),
appMetadata: new TextEncoder().encode(JSON.stringify({ action: "subscribe" }))
})
// Receive data from server
for await (const data of exchange) {
console.log("Received:", data)
}
// Half-close (signal end of client stream)
await exchange.end()Observability & Metrics
Integrate with your observability stack using the metrics handler interface:
import {
FlightSqlClient,
ConsoleMetricsHandler,
InMemoryMetricsHandler,
MetricNames,
type MetricsHandler
} from "@qualithm/arrow-flight-sql-js"
// Console handler for development
const client = new FlightSqlClient({
host: "localhost",
port: 31337,
tls: false,
auth: { type: "none" },
metrics: new ConsoleMetricsHandler()
})
// Output: [FlightSQL Metrics] ✓ query success (42ms)
// In-memory handler for testing
const metricsHandler = new InMemoryMetricsHandler()
const testClient = new FlightSqlClient({
host: "localhost",
port: 31337,
tls: false,
auth: { type: "none" },
metrics: metricsHandler
})
// Query metrics after operations
await testClient.connect()
const result = await testClient.query("SELECT 1")
await result.collect()
console.log(metricsHandler.getAverageDuration("query"))
console.log(metricsHandler.getErrorRate("query"))
console.log(metricsHandler.getSummary())
// Custom handler for OpenTelemetry, Prometheus, etc.
class OpenTelemetryHandler implements MetricsHandler {
recordOperation(event) {
// Record to your tracing/metrics backend
tracer.startSpan(event.operation).end()
histogram.record(event.durationMs, { operation: event.operation })
}
recordGauge(event) {
/* ... */
}
recordCounter(event) {
/* ... */
}
}Standard Metric Names
Use MetricNames for consistent metric naming:
MetricNames.poolTotalConnections // "flight_sql.pool.total_connections"
MetricNames.poolActiveConnections // "flight_sql.pool.active_connections"
MetricNames.queriesExecuted // "flight_sql.queries.executed"
MetricNames.bytesReceived // "flight_sql.bytes.received"
MetricNames.retriesAttempted // "flight_sql.retries.attempted"Error Handling
The library provides a comprehensive error hierarchy:
import {
FlightSqlError, // Base error class
ConnectionError, // Network/connection issues
AuthenticationError, // Auth failures (401, 403)
QueryError, // SQL syntax or execution errors
TimeoutError, // Operation timeouts
ProtocolError, // Protocol/encoding issues
NotFoundError, // Resource not found
CancelledError // Operation cancelled
} from "@qualithm/arrow-flight-sql-js"
try {
const result = await client.query("SELECT * FROM missing_table")
await result.collect()
} catch (error) {
if (error instanceof QueryError) {
console.error("SQL Error:", error.message)
console.error("SQL State:", error.sqlState)
} else if (error instanceof ConnectionError) {
console.error("Connection lost, will retry...")
} else if (error instanceof TimeoutError) {
console.error(`Operation timed out after ${error.timeoutMs}ms`)
}
}Retry Configuration
Configure automatic retries for transient failures:
import { FlightSqlClient, RetryPolicy, retryPolicies } from "@qualithm/arrow-flight-sql-js"
// Use pre-configured policies
const client = new FlightSqlClient({
host: "localhost",
port: 31337,
tls: false,
auth: { type: "bearer", token: "my-token" },
retry: retryPolicies.default // 3 retries, exponential backoff
})
// Available policies
retryPolicies.none // No retries
retryPolicies.fast // 3 retries, 50ms initial, 500ms max
retryPolicies.default // 3 retries, 100ms initial, 10s max
retryPolicies.aggressive // 5 retries, 200ms initial, 30s max
retryPolicies.reconnection // 10 retries, 1s initial, 60s max
// Custom retry configuration
const customPolicy = new RetryPolicy({
maxRetries: 5,
initialDelayMs: 100,
maxDelayMs: 5000,
backoffMultiplier: 2,
jitter: true, // Adds ±25% variance to prevent thundering herd
isRetryable: (error) => {
// Custom logic for which errors to retry
return error.code === 14 || error.message.includes("timeout")
}
})
const clientWithCustomRetry = new FlightSqlClient({
host: "localhost",
port: 31337,
tls: false,
auth: { type: "bearer", token: "my-token" },
retry: customPolicy
})Catalog Introspection
Explore database metadata with the catalog API:
// List all catalogs
const catalogs = await client.getCatalogs()
console.log("Catalogs:", catalogs)
// List schemas in a catalog
const schemas = await client.getSchemas("my_catalog", "public%")
// List tables with filtering
const tables = await client.getTables({
catalog: "my_catalog",
dbSchemaFilterPattern: "public",
tableNameFilterPattern: "user%",
tableTypes: ["TABLE", "VIEW"],
includeSchema: true // Include Arrow schema for each table
})
// Get table types supported by the server
const tableTypes = await client.getTableTypes()
// ["TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE", ...]
// Get primary key information
const primaryKeys = await client.getPrimaryKeys("users", "my_catalog", "public")
for (const pk of primaryKeys) {
console.log(`Column ${pk.columnName} at position ${pk.keySequence}`)
}
// Get foreign key relationships
const exportedKeys = await client.getExportedKeys("users") // Keys referencing this table
const importedKeys = await client.getImportedKeys("orders") // Keys this table referencesPrepared Statements
Use prepared statements for parameterized queries:
// Create a prepared statement
const stmt = await client.prepare("SELECT * FROM users WHERE id = ? AND status = ?")
try {
// Execute the query (returns a QueryResult)
const result = await stmt.executeQuery()
for await (const batch of result.stream()) {
console.log(`Received ${batch.numRows} rows`)
}
// Or collect all results
const table = await result.collect()
} finally {
// Always close prepared statements
await stmt.close()
}API Reference
FlightSqlClient
The main client for interacting with Flight SQL servers.
Constructor Options
| Option | Type | Default | Description |
|---|---|---|---|
host |
string |
— | Server hostname |
port |
number |
— | Server port |
tls |
boolean |
true |
Enable TLS |
auth |
AuthConfig |
— | Authentication configuration |
credentials |
ChannelCredentials |
— | Custom gRPC channel credentials |
metadata |
Record<string, string> |
— | Custom metadata headers |
connectTimeoutMs |
number |
30000 |
Connection timeout in ms |
requestTimeoutMs |
number |
60000 |
Request timeout in ms |
AuthConfig
type AuthConfig =
| { type: "bearer"; token: string }
| { type: "basic"; username: string; password: string }
| { type: "none" }Methods
Query Execution
query(query: string, options?): Promise<QueryResult>– Execute SQL, returns result withstream()andcollect()methodsexecute(query: string, options?): Promise<FlightInfo>– (deprecated) Execute SQL, return flight infoexecuteUpdate(query: string): Promise<bigint>– Execute DML, return affected rowsprepare(query: string): Promise<PreparedStatement>– Create prepared statement
Catalog Introspection
getCatalogs(): Promise<string[]>– List available catalogsgetSchemas(catalog?, schemaPattern?): Promise<Schema[]>– List schemasgetTables(options?): Promise<Table[]>– List tables with filtersgetTableTypes(): Promise<string[]>– List table type namesgetPrimaryKeys(table, catalog?, schema?): Promise<PrimaryKey[]>– Get primary keysgetExportedKeys(table, catalog?, schema?): Promise<ForeignKey[]>– Get exported foreign keysgetImportedKeys(table, catalog?, schema?): Promise<ForeignKey[]>– Get imported foreign keys
Low-Level Flight Operations
getFlightInfo(descriptor): Promise<FlightInfo>– Get flight metadatadoGet(ticket): AsyncIterable<RecordBatch>– Fetch data by ticketdoPut(descriptor, stream): Promise<void>– Upload Arrow datadoAction(type, body?): AsyncIterable<Result>– Execute custom action
Connection Management
connect(): Promise<void>– Establish connection and authenticateclose(): void– Close connection and release resourcesisConnected(): boolean– Check connection status
Architecture
┌─────────────────────────────────────────────────────────┐
│ FlightSqlClient │
├─────────────────────────────────────────────────────────┤
│ Query Builder │ Prepared Statements │ Catalog API │
├─────────────────────────────────────────────────────────┤
│ Flight SQL Protocol │
│ (GetFlightInfo, DoGet, DoPut, DoAction) │
├─────────────────────────────────────────────────────────┤
│ Protocol Buffers Layer │
│ (FlightDescriptor, FlightInfo, etc.) │
├─────────────────────────────────────────────────────────┤
│ gRPC Transport │
│ (HTTP/2 + TLS + Auth Headers) │
├─────────────────────────────────────────────────────────┤
│ Connection Pool │
│ (Health checks, reconnection, backoff) │
└─────────────────────────────────────────────────────────┘Compatibility
Runtime Support
| Runtime | Status | Transport | Notes |
|---|---|---|---|
| Node.js 20+ | ✅ Supported | gRPC-JS | Full feature support |
| Bun | ✅ Supported | gRPC-JS | Development runtime |
| Deno | ✅ Supported | gRPC-Web | Requires gRPC-web proxy |
| Browser | ✅ Supported | gRPC-Web | Requires gRPC-web proxy |
| Cloudflare Workers | ✅ Supported | gRPC-Web | Requires gRPC-web proxy |
Browser & Deno Usage
Browser and Deno environments use the gRPC-Web transport, which requires a gRPC-Web proxy (like Envoy) in front of your Flight SQL server.
// Browser or Deno
import { FlightSqlClient, createGrpcWebTransport } from "@qualithm/arrow-flight-sql-js"
// Create a gRPC-Web transport explicitly
const transport = createGrpcWebTransport({
host: "your-grpc-web-proxy.example.com",
port: 8080,
tls: true
})
// Create client with custom transport
const client = new FlightSqlClient({
host: "your-grpc-web-proxy.example.com",
port: 8080,
tls: true,
transport
})
await client.connect()
const result = await client.query("SELECT * FROM my_table")gRPC-Web Limitations:
- Client streaming (
DoPut) is not supported - Bidirectional streaming (
DoExchange,Handshake) is not supported - Use bearer token auth via
setAuthToken()instead ofHandshake
Envoy gRPC-Web Proxy Example:
# envoy.yaml
static_resources:
listeners:
- address:
socket_address:
address: 0.0.0.0
port_value: 8080
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: AUTO
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: backend
domains: ["*"]
routes:
- match: { prefix: "/" }
route:
cluster: flight_sql_backend
cors:
allow_origin_string_match:
- prefix: "*"
allow_methods: "GET, PUT, DELETE, POST, OPTIONS"
allow_headers: "content-type,x-grpc-web,x-user-agent"
expose_headers: "grpc-status,grpc-message"
http_filters:
- name: envoy.filters.http.grpc_web
- name: envoy.filters.http.cors
- name: envoy.filters.http.router
clusters:
- name: flight_sql_backend
connect_timeout: 0.25s
type: LOGICAL_DNS
http2_protocol_options: {}
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: flight_sql_backend
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: your-flight-sql-server
port_value: 50051Flight SQL Servers
Tested against:
- Apache Arrow Flight SQL reference server
- DuckDB Flight SQL extension
- DataFusion Ballista
- Custom lakehouse implementations
Development
# Install dependencies
bun install
# Run tests
bun test
# Run unit tests only
bun test:unit
# Run integration tests (requires Flight SQL server)
bun test:integration
# Lint and format
bun run lint:fix
bun run format:fix
# Generate API documentation
bun run docsFlight SQL Protocol Reference
This client implements the Arrow Flight SQL specification:
- Flight SQL 13.0 – Current target version
- Full protobuf message support
- All standard actions (CreatePreparedStatement, ClosePreparedStatement, etc.)
- Catalog introspection commands
- Transaction support (where server supports it)
License
MIT
Related Projects
- Apache Arrow – The Arrow columnar format
- Arrow Flight – High-performance data transport
- Arrow Flight SQL – SQL over Flight