JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 24
  • Score
    100M100P100Q63533F
  • License MIT

Bidirectional RPC over WebSocket with Zod schema validation, TypeScript inference, and Cloudflare Durable Object support

Package Exports

  • @igoforth/ws-rpc
  • @igoforth/ws-rpc/adapters
  • @igoforth/ws-rpc/adapters/client
  • @igoforth/ws-rpc/adapters/cloudflare-do
  • @igoforth/ws-rpc/adapters/server
  • @igoforth/ws-rpc/codecs
  • @igoforth/ws-rpc/codecs/cbor
  • @igoforth/ws-rpc/codecs/json
  • @igoforth/ws-rpc/codecs/msgpack
  • @igoforth/ws-rpc/errors
  • @igoforth/ws-rpc/peer
  • @igoforth/ws-rpc/protocol
  • @igoforth/ws-rpc/schema
  • @igoforth/ws-rpc/storage
  • @igoforth/ws-rpc/types
  • @igoforth/ws-rpc/utils

Readme

@igoforth/ws-rpc

CI TypeScript Node License: MIT

Bidirectional RPC over WebSocket with Zod schema validation and full TypeScript inference.

Features

  • Bidirectional RPC - Both client and server can call methods on each other
  • Schema-first - Define your API with Zod schemas, get full TypeScript inference
  • Multiple codecs - JSON (built-in), MessagePack, and CBOR support
  • Cloudflare Durable Objects - First-class support with hibernation-safe persistence
  • Auto-reconnect - Client automatically reconnects with exponential backoff
  • Fire-and-forget events - Decoupled from request/response pattern

Installation

npm install @igoforth/ws-rpc zod
# or
pnpm add @igoforth/ws-rpc zod

Optional codecs

# MessagePack (faster, smaller)
pnpm add @msgpack/msgpack

# CBOR (binary, compact)
pnpm add cbor-x

Cloudflare Durable Objects

pnpm add @cloudflare/actors

Quick Start

1. Define your schema

import { z } from "zod";
import { method, event, type RpcSchema } from "@igoforth/ws-rpc/schema";

// Server schema - methods the server implements
export const ServerSchema = {
  methods: {
    getUser: method({
      input: z.object({ id: z.string() }),
      output: z.object({ name: z.string(), email: z.string() }),
    }),
    createOrder: method({
      input: z.object({ product: z.string(), quantity: z.number() }),
      output: z.object({ orderId: z.string() }),
    }),
  },
  events: {
    orderUpdated: event({
      data: z.object({ orderId: z.string(), status: z.string() }),
    }),
  },
} satisfies RpcSchema;

// Client schema - methods the client implements (for bidirectional RPC)
export const ClientSchema = {
  methods: {
    ping: method({
      input: z.object({}),
      output: z.object({ pong: z.boolean() }),
    }),
  },
  events: {},
} satisfies RpcSchema;

2. Create a client

import { RpcClient } from "@igoforth/ws-rpc/adapters/client";
import { ServerSchema, ClientSchema } from "./schemas";

const client = new RpcClient({
  url: "wss://your-server.com/ws",
  localSchema: ClientSchema,
  remoteSchema: ServerSchema,
  provider: {
    // Implement methods the server can call on us
    ping: async () => ({ pong: true }),
  },
  reconnect: {
    initialDelay: 1000,
    maxDelay: 30000,
    maxAttempts: 10,
  },
  autoConnect: true, // Connect immediately (or call client.connect() manually)
  onConnect: () => console.log("Connected"),
  onDisconnect: (code, reason) => console.log("Disconnected:", code, reason),
});

// Call server methods with full type safety
const user = await client.driver.getUser({ id: "123" });
console.log(user.name, user.email);

const order = await client.driver.createOrder({ product: "widget", quantity: 5 });
console.log(order.orderId);

// Emit events to the server
client.emit("someEvent", { data: "value" });

// Disconnect when done
client.disconnect();

3. Create a server (Node.js)

import { WebSocketServer } from "ws";
import { RpcServer } from "@igoforth/ws-rpc/adapters/server";
import { ServerSchema, ClientSchema } from "./schemas";

const server = new RpcServer({
  wss: { port: 8080 },
  WebSocketServer,
  localSchema: ServerSchema,
  remoteSchema: ClientSchema,
  provider: {
    getUser: async ({ id }) => {
      return { name: "John Doe", email: "john@example.com" };
    },
    createOrder: async ({ product, quantity }) => {
      return { orderId: crypto.randomUUID() };
    },
  },
  hooks: {
    onConnect: (peer) => {
      console.log(`Client ${peer.id} connected`);

      // Call methods on this specific client
      peer.driver.ping({}).then((result) => {
        console.log("Client responded:", result.pong);
      });
    },
    onDisconnect: (peer) => {
      console.log(`Client ${peer.id} disconnected`);
    },
    onError: (peer, error) => {
      console.error(`Error from ${peer?.id}:`, error);
    },
  },
});

// Emit to all connected clients
server.emit("orderUpdated", { orderId: "123", status: "shipped" });

// Emit to specific clients by ID
server.emit("orderUpdated", { orderId: "456", status: "delivered" }, ["peer-id-1", "peer-id-2"]);

// Call methods on all clients
const results = await server.driver.ping({});
for (const { id, result } of results) {
  if (result.success) {
    console.log(`Peer ${id}:`, result.value);
  }
}

// Get connection info
console.log("Connected clients:", server.getConnectionCount());
console.log("Client IDs:", server.getConnectionIds());

// Close a specific client
server.closePeer("peer-id", 1000, "Goodbye");

// Graceful shutdown
process.on("SIGTERM", () => server.close());

4. Cloudflare Durable Object

import { Actor } from "@cloudflare/actors";
import { withRpc } from "@igoforth/ws-rpc/adapters/cloudflare-do";
import { ServerSchema, ClientSchema } from "./schemas";

// The mixin adds RPC capabilities to your Actor
// Your class must implement the methods defined in localSchema
export class GameRoom extends withRpc(Actor, {
  localSchema: ServerSchema,
  remoteSchema: ClientSchema,
}) {
  private gameState = { players: [] as string[] };

  // Implement methods from ServerSchema
  async getUser({ id }: { id: string }) {
    return { name: `Player ${id}`, email: `${id}@game.com` };
  }

  async createOrder({ product, quantity }: { product: string; quantity: number }) {
    return { orderId: crypto.randomUUID() };
  }

  // Use this.driver to call methods on connected clients
  async notifyAllPlayers() {
    // Call ping on all connected clients
    const results = await this.driver.ping({});
    console.log("Ping results:", results);
  }

  // Use this.emit to send events to clients
  broadcastUpdate() {
    this.emit("orderUpdated", { orderId: "123", status: "updated" });
  }

  // Check connection status
  getPlayerCount() {
    return this.getConnectionCount();
  }
}

API Reference

Schema Definition

import { method, event } from "@igoforth/ws-rpc/schema";
import { z } from "zod";

// Define a method with input/output validation
const myMethod = method({
  input: z.object({ /* ... */ }),
  output: z.object({ /* ... */ }),
});

// Define an event (fire-and-forget)
const myEvent = event({
  data: z.object({ /* ... */ }),
});

// Combine into a schema
const MySchema = {
  methods: { myMethod },
  events: { myEvent },
} satisfies RpcSchema;

Type Inference

import type { Provider, Driver, InferInput, InferOutput } from "@igoforth/ws-rpc/schema";

// Get the provider type (what you implement)
type MyProvider = Provider<typeof MySchema>;

// Get the driver type (what you call)
type MyDriver = Driver<typeof MySchema>;

// Get input/output types for a specific method
type MyMethodInput = InferInput<typeof MySchema["methods"]["myMethod"]>;
type MyMethodOutput = InferOutput<typeof MySchema["methods"]["myMethod"]>;

Codecs

import { createMsgpackCodec } from "@igoforth/ws-rpc/codecs/msgpack";
import { createCborCodec } from "@igoforth/ws-rpc/codecs/cbor";
import { createJsonCodec } from "@igoforth/ws-rpc/codecs/json";

// JSON (default)
const jsonCodec = createJsonCodec(z.unknown());

// MessagePack (requires @msgpack/msgpack)
const msgpackCodec = createMsgpackCodec(z.unknown());

// CBOR (requires cbor-x)
const cborCodec = createCborCodec(z.unknown());

Error Handling

import {
  RpcError,
  RpcTimeoutError,
  RpcRemoteError,
  RpcConnectionClosed,
  RpcValidationError,
  RpcMethodNotFoundError,
} from "@igoforth/ws-rpc/errors";

try {
  await client.driver.someMethod({ /* ... */ });
} catch (error) {
  if (error instanceof RpcTimeoutError) {
    console.log(`Request '${error.method}' timed out after ${error.timeoutMs}ms`);
  } else if (error instanceof RpcValidationError) {
    console.log("Invalid input/output:", error.message);
  } else if (error instanceof RpcRemoteError) {
    console.log("Server error:", error.message, error.code);
  } else if (error instanceof RpcMethodNotFoundError) {
    console.log(`Method '${error.method}' not found`);
  } else if (error instanceof RpcConnectionClosed) {
    console.log("Connection closed");
  }
}

Client Options

const client = new RpcClient({
  url: "wss://...",
  localSchema: MyLocalSchema,
  remoteSchema: MyRemoteSchema,
  provider: { /* method implementations */ },

  // Reconnection options (set to false to disable)
  reconnect: {
    initialDelay: 1000,    // First retry delay (ms)
    maxDelay: 30000,       // Maximum retry delay (ms)
    backoffMultiplier: 2,  // Exponential backoff multiplier
    maxAttempts: 0,        // Max attempts (0 = unlimited)
    jitter: 0.1,           // Random jitter factor (0-1)
  },

  // Request timeout (ms)
  timeout: 30000,

  // Auto-connect on creation (default: false)
  autoConnect: true,

  // WebSocket options
  protocols: ["v1"],                    // Subprotocols
  headers: { Authorization: "Bearer ..." }, // Headers (Node.js/Bun only)

  // Event handlers
  onConnect: () => console.log("Connected"),
  onDisconnect: (code, reason) => console.log("Disconnected"),
  onReconnect: (attempt, delay) => console.log(`Reconnecting in ${delay}ms`),
  onReconnectFailed: () => console.log("Reconnection failed"),
  onEvent: (event, data) => console.log("Event:", event, data),
});

// Connection state
client.state;       // "disconnected" | "connecting" | "connected" | "reconnecting"
client.isConnected; // boolean

Hibernation-Safe Durable Objects

For Cloudflare Durable Objects that need hibernation-safe outgoing calls, use DurableRpcPeer with continuation-passing style:

import { DurableRpcPeer } from "@igoforth/ws-rpc/peer";
import { SqlPendingCallStorage } from "@igoforth/ws-rpc/storage";

class MyDO extends Actor<Env> {
  private peer!: DurableRpcPeer<LocalSchema, RemoteSchema, this>;

  onWebSocketConnect(ws: WebSocket) {
    this.peer = new DurableRpcPeer({
      ws,
      localSchema: LocalSchema,
      remoteSchema: RemoteSchema,
      provider: this,
      storage: new SqlPendingCallStorage(this.ctx.storage.sql),
      actor: this,
    });
  }

  onWebSocketMessage(ws: WebSocket, message: string | ArrayBuffer) {
    this.peer.handleMessage(message);
  }

  async doSomething() {
    // Promise-based (NOT hibernation-safe - pending if DO hibernates)
    const result = await this.peer.driver.someMethod({ data: "value" });

    // Continuation-based (hibernation-safe)
    this.peer.callWithCallback("someMethod", { data: "value" }, "onResult");
  }

  // Callback receives result even after hibernation
  onResult(result: SomeResult, context: CallContext) {
    console.log("Result:", result);
    console.log("Latency:", context.latencyMs, "ms");
  }
}

Performance

Real WebSocket RPC round-trip benchmarks (localhost, Node.js):

Wire sizes:

Payload JSON MessagePack CBOR
Small 93 B 71 B 112 B
Medium 3.5 KB 2.1 KB 1.4 KB
Large 24.5 KB 19.6 KB 14.1 KB

Throughput (ops/sec):

Payload JSON MessagePack CBOR
Small 1,371 2,208 2,423
Medium 2,218 2,221 2,249
Large 1,334 1,245 1,562

CBOR provides the best balance of speed and wire size for most payloads. MessagePack excels with smaller payloads. JSON is the most portable but largest.

Run benchmarks yourself: pnpm bench

Multi-Peer Driver Results

When calling methods via server.driver or this.driver in a Durable Object, results are returned as an array:

// Call all connected peers
const results = await server.driver.getData({});

// Each result contains the peer ID and success/error
for (const { id, result } of results) {
  if (result.success) {
    console.log(`Peer ${id} returned:`, result.value);
  } else {
    console.error(`Peer ${id} failed:`, result.error.message);
  }
}

// Call specific peers
const singleResult = await server.driver.getData({}, { ids: "peer-123" });
const multiResult = await server.driver.getData({}, {
  ids: ["peer-1", "peer-2"],
  timeout: 5000,
});

License

MIT