Package Exports
- @solncebro/websocket-engine
- @solncebro/websocket-engine/dist/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@solncebro/websocket-engine) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
@solncebro/websocket-engine
Reliable WebSocket client for Node.js with automatic reconnection, ping/pong heartbeat, typed messages, optional authentication phase, and request/response pattern.
Installation
yarn add @solncebro/websocket-enginenpm install @solncebro/websocket-engineFeatures
- Automatic reconnection — exponential backoff with jitter; fast reconnect for specific close codes (1001, 1006, 1011–1014)
- Heartbeat — TCP ping/pong by default; or application-level JSON ping (e.g. Bybit
{ op: 'ping' }) - Connection timeout — handshake timeout with retry
- Auth phase — optional
onOpenasync callback for authentication before the connection is considered ready - Send —
controller.sendToConnectedSocket(data)for outbound messages - Request/response —
controller.waitForMessage(predicate, timeout)to await a specific incoming message by any criteria (e.g.reqId) - Typed messages — optional
parseMessagefor type-safe payloads - Notifications —
onNotifycallback for alerts; process exits with code 1 after max retries (suitable for PM2 restart)
Requirements
- Node.js 16+
- TypeScript 5.x (optional, for types)
Usage
Basic (no auth)
import { createReliableWebSocket } from "@solncebro/websocket-engine";
interface StreamMessage {
type: string;
data: unknown;
}
const controller = createReliableWebSocket<StreamMessage>({
url: "wss://stream.example.com",
label: "market-stream",
logger: pinoLogger,
parseMessage: (rawData) => JSON.parse(rawData.toString()) as StreamMessage,
onMessage: (message) => {
console.log(message.type, message.data);
},
onNotify: async (message) => {
await sendTelegramAlert(message);
},
});
controller.close();With authentication (e.g. Bybit trading WebSocket)
import crypto from "crypto";
import {
createReliableWebSocket,
WebSocketOpenContext,
} from "@solncebro/websocket-engine";
interface BybitMessage {
op?: string;
retCode?: number;
retMsg?: string;
reqId?: string;
data?: unknown;
}
const controller = createReliableWebSocket<BybitMessage>({
url: "wss://stream.bybit.com/v5/trade",
label: "bybit-trade",
logger: pinoLogger,
parseMessage: (rawData) => JSON.parse(rawData.toString()) as BybitMessage,
onOpen: async ({ send, waitForMessage }: WebSocketOpenContext<BybitMessage>) => {
const expires = Date.now() + 10000;
const signature = crypto
.createHmac("sha256", SECRET)
.update(`GET/realtime${expires}`)
.digest("hex");
send({ op: "auth", args: [API_KEY, expires, signature] });
const response = await waitForMessage((message) => message.op === "auth", 10000);
if (response.retMsg !== "OK") {
throw new Error(`Auth failed: ${response.retMsg}`);
}
},
heartbeat: {
buildPayload: () => ({ op: "ping" }),
isResponse: (msg) => msg.op === "pong",
},
onMessage: (message) => {
if (message.op === "order.create") {
// handle order response
}
},
onReconnectSuccess: () => {
console.log("Reconnected and re-authenticated");
},
onNotify: async (message) => {
await sendTelegramAlert(message);
},
});
// Send an order and await its specific response by reqId
const sendOrder = async (orderParams: Record<string, unknown>) => {
const reqId = `req_${Date.now()}`;
controller.sendToConnectedSocket({
reqId,
op: "order.create",
args: [orderParams],
});
return controller.waitForMessage((message) => message.reqId === reqId, 30000);
};API
createReliableWebSocket<TMessage>(args)
Returns a ReliableWebSocketController<TMessage>.
Arguments
| Property | Type | Required | Description |
|---|---|---|---|
url |
string |
Yes | WebSocket URL |
label |
string |
Yes | Identifier for logs and notifications |
logger |
WebSocketLogger |
Yes | Logger with debug, info, warn, error, fatal |
onMessage |
(message: TMessage) => void |
Yes | Called for each incoming message (not intercepted by waitForMessage or heartbeat) |
parseMessage |
(rawData: RawData) => TMessage |
No | Parse raw data to TMessage; default: pass-through |
onOpen |
(context: WebSocketOpenContext<TMessage>) => Promise<void> |
No | Async setup phase after connect (e.g. auth). Connection is not considered ready until this resolves. |
onReconnectSuccess |
() => void |
No | Called after a successful reconnection (not on first connect) |
onNotify |
(message: string) => void | Promise<void> |
No | Called on connection issues and before process exit |
heartbeat |
WebSocketHeartbeatOptions<TMessage> |
No | Application-level heartbeat (JSON ping/pong). When provided, TCP ping is disabled. |
configuration |
Partial<WebSocketConfiguration> |
No | Override default timeouts and retry behaviour |
Controller
| Method | Description |
|---|---|
close() |
Stops reconnection, clears timers, rejects pending waiters, closes the socket |
getStatus() |
Returns current WebSocketStatus |
sendToConnectedSocket(data) |
Send data; string is sent as-is, anything else is JSON.stringify-ed. Throws if not connected. |
waitForMessage(predicate, timeoutMilliseconds) |
Returns a Promise<TMessage> that resolves with the first incoming message matching predicate. The message is not passed to onMessage. Rejects on timeout or connection close. |
WebSocketStatus
CONNECTING— initial connection attemptCONNECTED— connected (and auth passed ifonOpenwas provided)DISCONNECTED— disrupted, reconnect scheduledRECONNECTING— reconnect in progress (includesonOpenphase)FAILED— closed by user or max retries exceeded
WebSocketOpenContext
Passed to onOpen:
| Property | Description |
|---|---|
send |
Send to the open socket (for use during onOpen) |
waitForMessage |
Same as controller.waitForMessage |
WebSocketHeartbeatOptions
| Property | Description |
|---|---|
buildPayload |
Returns the JSON object to send as a ping |
isResponse |
Returns true if the message is a pong. Matching messages are not passed to onMessage. |
Configuration (configuration)
| Option | Default | Description |
|---|---|---|
maxRetryAttempts |
15 |
Max reconnection attempts before process exit |
initialRetryDelay |
1000 |
Initial delay (ms) for exponential backoff |
maxRetryDelay |
30000 |
Cap (ms) for backoff delay |
retryDelayMultiplier |
1.8 |
Backoff multiplier |
connectionTimeout |
30000 |
Handshake timeout (ms) |
pingInterval |
15000 |
Ping interval (ms) |
pongTimeout |
10000 |
Pong wait timeout (ms) |
heartbeatGracePeriod |
3000 |
Delay before first ping (ms) |
fastReconnectCodes |
[1001, 1006, 1011, 1012, 1013, 1014] |
Close codes that use short reconnect delay |
missedPongThreshold |
3 |
Missed pongs before terminating connection |
Behaviour
- On close or error, reconnection is scheduled with exponential backoff.
- If
onOpenthrows, the connection is terminated and reconnect is triggered (with the same backoff logic). Retry counters are only reset afteronOpenresolves successfully. - After maxRetryAttempts failed attempts,
onNotifyis awaited with a critical message, thenprocess.exit(1)runs (PM2 or similar will restart the app). waitForMessagepending promises are rejected when the connection is disrupted orclose()is called.- Heartbeat response messages and
waitForMessage-intercepted messages are never passed toonMessage.
License
ISC