Package Exports
- @igoforth/ws-rpc
- @igoforth/ws-rpc/adapters
- @igoforth/ws-rpc/adapters/client
- @igoforth/ws-rpc/adapters/cloudflare-do
- @igoforth/ws-rpc/adapters/server
- @igoforth/ws-rpc/codecs
- @igoforth/ws-rpc/codecs/cbor
- @igoforth/ws-rpc/codecs/json
- @igoforth/ws-rpc/codecs/msgpack
- @igoforth/ws-rpc/errors
- @igoforth/ws-rpc/peer
- @igoforth/ws-rpc/protocol
- @igoforth/ws-rpc/schema
- @igoforth/ws-rpc/storage
- @igoforth/ws-rpc/types
- @igoforth/ws-rpc/utils
Readme
@igoforth/ws-rpc
Bidirectional RPC over WebSocket with Zod schema validation and full TypeScript inference.
Features
- Bidirectional RPC - Both client and server can call methods on each other
- Schema-first - Define your API with Zod schemas, get full TypeScript inference
- Multiple codecs - JSON (built-in), MessagePack, and CBOR support
- Cloudflare Durable Objects - First-class support with hibernation-safe persistence
- Auto-reconnect - Client automatically reconnects with exponential backoff
- Fire-and-forget events - Decoupled from request/response pattern
Installation
npm install @igoforth/ws-rpc zod
# or
pnpm add @igoforth/ws-rpc zodOptional codecs
# MessagePack (faster, smaller)
pnpm add @msgpack/msgpack
# CBOR (binary, compact)
pnpm add cbor-xCloudflare Durable Objects
pnpm add @cloudflare/actorsQuick Start
1. Define your schema
import { z } from "zod";
import { method, event, type RpcSchema } from "@igoforth/ws-rpc/schema";
// Server schema - methods the server implements
export const ServerSchema = {
methods: {
getUser: method({
input: z.object({ id: z.string() }),
output: z.object({ name: z.string(), email: z.string() }),
}),
createOrder: method({
input: z.object({ product: z.string(), quantity: z.number() }),
output: z.object({ orderId: z.string() }),
}),
},
events: {
orderUpdated: event({
data: z.object({ orderId: z.string(), status: z.string() }),
}),
},
} satisfies RpcSchema;
// Client schema - methods the client implements (for bidirectional RPC)
export const ClientSchema = {
methods: {
ping: method({
input: z.object({}),
output: z.object({ pong: z.boolean() }),
}),
},
events: {},
} satisfies RpcSchema;2. Create a client
import { RpcClient } from "@igoforth/ws-rpc/adapters/client";
import { ServerSchema, ClientSchema } from "./schemas";
const client = new RpcClient({
url: "wss://your-server.com/ws",
localSchema: ClientSchema,
remoteSchema: ServerSchema,
provider: {
// Implement methods the server can call on us
ping: async () => ({ pong: true }),
},
reconnect: {
initialDelay: 1000,
maxDelay: 30000,
maxAttempts: 10,
},
autoConnect: true, // Connect immediately (or call client.connect() manually)
onConnect: () => console.log("Connected"),
onDisconnect: (code, reason) => console.log("Disconnected:", code, reason),
});
// Call server methods with full type safety
const user = await client.driver.getUser({ id: "123" });
console.log(user.name, user.email);
const order = await client.driver.createOrder({ product: "widget", quantity: 5 });
console.log(order.orderId);
// Emit events to the server
client.emit("someEvent", { data: "value" });
// Disconnect when done
client.disconnect();3. Create a server (Node.js)
import { WebSocketServer } from "ws";
import { RpcServer } from "@igoforth/ws-rpc/adapters/server";
import { ServerSchema, ClientSchema } from "./schemas";
const server = new RpcServer({
wss: { port: 8080 },
WebSocketServer,
localSchema: ServerSchema,
remoteSchema: ClientSchema,
provider: {
getUser: async ({ id }) => {
return { name: "John Doe", email: "john@example.com" };
},
createOrder: async ({ product, quantity }) => {
return { orderId: crypto.randomUUID() };
},
},
hooks: {
onConnect: (peer) => {
console.log(`Client ${peer.id} connected`);
// Call methods on this specific client
peer.driver.ping({}).then((result) => {
console.log("Client responded:", result.pong);
});
},
onDisconnect: (peer) => {
console.log(`Client ${peer.id} disconnected`);
},
onError: (peer, error) => {
console.error(`Error from ${peer?.id}:`, error);
},
},
});
// Emit to all connected clients
server.emit("orderUpdated", { orderId: "123", status: "shipped" });
// Emit to specific clients by ID
server.emit("orderUpdated", { orderId: "456", status: "delivered" }, ["peer-id-1", "peer-id-2"]);
// Call methods on all clients
const results = await server.driver.ping({});
for (const { id, result } of results) {
if (result.success) {
console.log(`Peer ${id}:`, result.value);
}
}
// Get connection info
console.log("Connected clients:", server.getConnectionCount());
console.log("Client IDs:", server.getConnectionIds());
// Close a specific client
server.closePeer("peer-id", 1000, "Goodbye");
// Graceful shutdown
process.on("SIGTERM", () => server.close());4. Cloudflare Durable Object
import { Actor } from "@cloudflare/actors";
import { withRpc } from "@igoforth/ws-rpc/adapters/cloudflare-do";
import { ServerSchema, ClientSchema } from "./schemas";
// The mixin adds RPC capabilities to your Actor
// Your class must implement the methods defined in localSchema
export class GameRoom extends withRpc(Actor, {
localSchema: ServerSchema,
remoteSchema: ClientSchema,
}) {
private gameState = { players: [] as string[] };
// Implement methods from ServerSchema
async getUser({ id }: { id: string }) {
return { name: `Player ${id}`, email: `${id}@game.com` };
}
async createOrder({ product, quantity }: { product: string; quantity: number }) {
return { orderId: crypto.randomUUID() };
}
// Use this.driver to call methods on connected clients
async notifyAllPlayers() {
// Call ping on all connected clients
const results = await this.driver.ping({});
console.log("Ping results:", results);
}
// Use this.emit to send events to clients
broadcastUpdate() {
this.emit("orderUpdated", { orderId: "123", status: "updated" });
}
// Check connection status
getPlayerCount() {
return this.getConnectionCount();
}
}API Reference
Schema Definition
import { method, event } from "@igoforth/ws-rpc/schema";
import { z } from "zod";
// Define a method with input/output validation
const myMethod = method({
input: z.object({ /* ... */ }),
output: z.object({ /* ... */ }),
});
// Define an event (fire-and-forget)
const myEvent = event({
data: z.object({ /* ... */ }),
});
// Combine into a schema
const MySchema = {
methods: { myMethod },
events: { myEvent },
} satisfies RpcSchema;Type Inference
import type { Provider, Driver, InferInput, InferOutput } from "@igoforth/ws-rpc/schema";
// Get the provider type (what you implement)
type MyProvider = Provider<typeof MySchema>;
// Get the driver type (what you call)
type MyDriver = Driver<typeof MySchema>;
// Get input/output types for a specific method
type MyMethodInput = InferInput<typeof MySchema["methods"]["myMethod"]>;
type MyMethodOutput = InferOutput<typeof MySchema["methods"]["myMethod"]>;Codecs
import { createMsgpackCodec } from "@igoforth/ws-rpc/codecs/msgpack";
import { createCborCodec } from "@igoforth/ws-rpc/codecs/cbor";
import { createJsonCodec } from "@igoforth/ws-rpc/codecs/json";
// JSON (default)
const jsonCodec = createJsonCodec(z.unknown());
// MessagePack (requires @msgpack/msgpack)
const msgpackCodec = createMsgpackCodec(z.unknown());
// CBOR (requires cbor-x)
const cborCodec = createCborCodec(z.unknown());Error Handling
import {
RpcError,
RpcTimeoutError,
RpcRemoteError,
RpcConnectionClosed,
RpcValidationError,
RpcMethodNotFoundError,
} from "@igoforth/ws-rpc/errors";
try {
await client.driver.someMethod({ /* ... */ });
} catch (error) {
if (error instanceof RpcTimeoutError) {
console.log(`Request '${error.method}' timed out after ${error.timeoutMs}ms`);
} else if (error instanceof RpcValidationError) {
console.log("Invalid input/output:", error.message);
} else if (error instanceof RpcRemoteError) {
console.log("Server error:", error.message, error.code);
} else if (error instanceof RpcMethodNotFoundError) {
console.log(`Method '${error.method}' not found`);
} else if (error instanceof RpcConnectionClosed) {
console.log("Connection closed");
}
}Client Options
const client = new RpcClient({
url: "wss://...",
localSchema: MyLocalSchema,
remoteSchema: MyRemoteSchema,
provider: { /* method implementations */ },
// Reconnection options (set to false to disable)
reconnect: {
initialDelay: 1000, // First retry delay (ms)
maxDelay: 30000, // Maximum retry delay (ms)
backoffMultiplier: 2, // Exponential backoff multiplier
maxAttempts: 0, // Max attempts (0 = unlimited)
jitter: 0.1, // Random jitter factor (0-1)
},
// Request timeout (ms)
timeout: 30000,
// Auto-connect on creation (default: false)
autoConnect: true,
// WebSocket options
protocols: ["v1"], // Subprotocols
headers: { Authorization: "Bearer ..." }, // Headers (Node.js/Bun only)
// Event handlers
onConnect: () => console.log("Connected"),
onDisconnect: (code, reason) => console.log("Disconnected"),
onReconnect: (attempt, delay) => console.log(`Reconnecting in ${delay}ms`),
onReconnectFailed: () => console.log("Reconnection failed"),
onEvent: (event, data) => console.log("Event:", event, data),
});
// Connection state
client.state; // "disconnected" | "connecting" | "connected" | "reconnecting"
client.isConnected; // booleanHibernation-Safe Durable Objects
For Cloudflare Durable Objects that need hibernation-safe outgoing calls, use DurableRpcPeer with continuation-passing style:
import { DurableRpcPeer } from "@igoforth/ws-rpc/peer";
import { SqlPendingCallStorage } from "@igoforth/ws-rpc/storage";
class MyDO extends Actor<Env> {
private peer!: DurableRpcPeer<LocalSchema, RemoteSchema, this>;
onWebSocketConnect(ws: WebSocket) {
this.peer = new DurableRpcPeer({
ws,
localSchema: LocalSchema,
remoteSchema: RemoteSchema,
provider: this,
storage: new SqlPendingCallStorage(this.ctx.storage.sql),
actor: this,
});
}
onWebSocketMessage(ws: WebSocket, message: string | ArrayBuffer) {
this.peer.handleMessage(message);
}
async doSomething() {
// Promise-based (NOT hibernation-safe - pending if DO hibernates)
const result = await this.peer.driver.someMethod({ data: "value" });
// Continuation-based (hibernation-safe)
this.peer.callWithCallback("someMethod", { data: "value" }, "onResult");
}
// Callback receives result even after hibernation
onResult(result: SomeResult, context: CallContext) {
console.log("Result:", result);
console.log("Latency:", context.latencyMs, "ms");
}
}Performance
Real WebSocket RPC round-trip benchmarks (localhost, Node.js):
Wire sizes:
| Payload | JSON | MessagePack | CBOR |
|---|---|---|---|
| Small | 93 B | 71 B | 112 B |
| Medium | 3.5 KB | 2.1 KB | 1.4 KB |
| Large | 24.5 KB | 19.6 KB | 14.1 KB |
Throughput (ops/sec):
| Payload | JSON | MessagePack | CBOR |
|---|---|---|---|
| Small | 1,371 | 2,208 | 2,423 |
| Medium | 2,218 | 2,221 | 2,249 |
| Large | 1,334 | 1,245 | 1,562 |
CBOR provides the best balance of speed and wire size for most payloads. MessagePack excels with smaller payloads. JSON is the most portable but largest.
Run benchmarks yourself: pnpm bench
Multi-Peer Driver Results
When calling methods via server.driver or this.driver in a Durable Object, results are returned as an array:
// Call all connected peers
const results = await server.driver.getData({});
// Each result contains the peer ID and success/error
for (const { id, result } of results) {
if (result.success) {
console.log(`Peer ${id} returned:`, result.value);
} else {
console.error(`Peer ${id} failed:`, result.error.message);
}
}
// Call specific peers
const singleResult = await server.driver.getData({}, { ids: "peer-123" });
const multiResult = await server.driver.getData({}, {
ids: ["peer-1", "peer-2"],
timeout: 5000,
});License
MIT