JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 24
  • Score
    100M100P100Q63548F
  • License MIT

Bidirectional RPC over WebSocket with Zod schema validation, TypeScript inference, and Cloudflare Durable Object support

Package Exports

  • @igoforth/ws-rpc
  • @igoforth/ws-rpc/adapters
  • @igoforth/ws-rpc/adapters/client
  • @igoforth/ws-rpc/adapters/cloudflare-do
  • @igoforth/ws-rpc/adapters/server
  • @igoforth/ws-rpc/codecs
  • @igoforth/ws-rpc/codecs/cbor
  • @igoforth/ws-rpc/codecs/json
  • @igoforth/ws-rpc/codecs/msgpack
  • @igoforth/ws-rpc/errors
  • @igoforth/ws-rpc/peer
  • @igoforth/ws-rpc/protocol
  • @igoforth/ws-rpc/schema
  • @igoforth/ws-rpc/storage
  • @igoforth/ws-rpc/types
  • @igoforth/ws-rpc/utils

Readme

@igoforth/ws-rpc

CI TypeScript Node License: MIT

Bidirectional RPC over WebSocket with Zod schema validation and full TypeScript inference.

Features

  • Bidirectional RPC - Both client and server can call methods on each other
  • Schema-first - Define your API with Zod schemas, get full TypeScript inference
  • Multiple codecs - JSON (built-in), MessagePack, and CBOR support
  • Cloudflare Durable Objects - First-class support with hibernation-safe persistence
  • Auto-reconnect - Client automatically reconnects with exponential backoff
  • Fire-and-forget events - Decoupled from request/response pattern

Installation

npm install @igoforth/ws-rpc zod
# or
pnpm add @igoforth/ws-rpc zod

Optional codecs

# MessagePack (faster, smaller)
pnpm add @msgpack/msgpack

# CBOR (binary, compact)
pnpm add cbor-x

Cloudflare Durable Objects

pnpm add @cloudflare/actors

Quick Start

1. Define your schema

import { z } from "zod";
import { method, event, type RpcSchema } from "@igoforth/ws-rpc/schema";

// Server schema - methods the server implements
export const ServerSchema = {
  methods: {
    getUser: method({
      input: z.object({ id: z.string() }),
      output: z.object({ name: z.string(), email: z.string() }),
    }),
    createOrder: method({
      input: z.object({ product: z.string(), quantity: z.number() }),
      output: z.object({ orderId: z.string() }),
    }),
  },
  events: {
    orderUpdated: event({
      data: z.object({ orderId: z.string(), status: z.string() }),
    }),
  },
} satisfies RpcSchema;

// Client schema - methods the client implements (for bidirectional RPC)
export const ClientSchema = {
  methods: {
    ping: method({
      input: z.object({}),
      output: z.object({ pong: z.boolean() }),
    }),
  },
  events: {},
} satisfies RpcSchema;

2. Create a client

import { RpcClient } from "@igoforth/ws-rpc/adapters/client";
import { ServerSchema, ClientSchema } from "./schemas";

const client = new RpcClient({
  url: "wss://your-server.com/ws",
  localSchema: ClientSchema,
  remoteSchema: ServerSchema,
  provider: {
    // Implement methods the server can call on us
    ping: async () => ({ pong: true }),
  },
  reconnect: {
    initialDelay: 1000,
    maxDelay: 30000,
    maxAttempts: 10,
  },
  autoConnect: true, // Connect immediately (or call client.connect() manually)
  onConnect: () => console.log("Connected"),
  onDisconnect: (code, reason) => console.log("Disconnected:", code, reason),
});

// Call server methods with full type safety
const user = await client.driver.getUser({ id: "123" });
console.log(user.name, user.email);

const order = await client.driver.createOrder({ product: "widget", quantity: 5 });
console.log(order.orderId);

// Emit events to the server
client.emit("someEvent", { data: "value" });

// Disconnect when done
client.disconnect();

3. Create a server (Node.js)

import { WebSocketServer } from "ws";
import { RpcServer } from "@igoforth/ws-rpc/adapters/server";
import { ServerSchema, ClientSchema } from "./schemas";

const server = new RpcServer({
  wss: { port: 8080 },
  WebSocketServer,
  localSchema: ServerSchema,
  remoteSchema: ClientSchema,
  provider: {
    getUser: async ({ id }) => {
      return { name: "John Doe", email: "john@example.com" };
    },
    createOrder: async ({ product, quantity }) => {
      return { orderId: crypto.randomUUID() };
    },
  },
  hooks: {
    onConnect: (peer) => {
      console.log(`Client ${peer.id} connected`);

      // Call methods on this specific client
      peer.driver.ping({}).then((result) => {
        console.log("Client responded:", result.pong);
      });
    },
    onDisconnect: (peer) => {
      console.log(`Client ${peer.id} disconnected`);
    },
    onError: (peer, error) => {
      console.error(`Error from ${peer?.id}:`, error);
    },
  },
});

// Emit to all connected clients
server.emit("orderUpdated", { orderId: "123", status: "shipped" });

// Emit to specific clients by ID
server.emit("orderUpdated", { orderId: "456", status: "delivered" }, ["peer-id-1", "peer-id-2"]);

// Call methods on all clients
const results = await server.driver.ping({});
for (const { id, result } of results) {
  if (result.success) {
    console.log(`Peer ${id}:`, result.value);
  }
}

// Get connection info
console.log("Connected clients:", server.getConnectionCount());
console.log("Client IDs:", server.getConnectionIds());

// Close a specific client
server.closePeer("peer-id", 1000, "Goodbye");

// Graceful shutdown
process.on("SIGTERM", () => server.close());

4. Cloudflare Durable Object

import { Actor } from "@cloudflare/actors";
import { withRpc } from "@igoforth/ws-rpc/adapters/cloudflare-do";
import { ServerSchema, ClientSchema } from "./schemas";

// First, create an Actor with the RPC method implementations
// Methods from localSchema MUST be defined here for type checking
class GameRoomActor extends Actor<Env> {
  protected gameState = { players: [] as string[] };

  // Implement methods from ServerSchema
  async getUser({ id }: { id: string }) {
    return { name: `Player ${id}`, email: `${id}@game.com` };
  }

  async createOrder({ product, quantity }: { product: string; quantity: number }) {
    return { orderId: crypto.randomUUID() };
  }
}

// Then apply the RPC mixin to get driver, emit, etc.
export class GameRoom extends withRpc(GameRoomActor, {
  localSchema: ServerSchema,
  remoteSchema: ClientSchema,
}) {
  // Use this.driver to call methods on connected clients
  async notifyAllPlayers() {
    const results = await this.driver.ping({});
    console.log("Ping results:", results);
  }

  // Use this.emit to send events to clients
  broadcastUpdate() {
    this.emit("orderUpdated", { orderId: "123", status: "updated" });
  }

  // Check connection status
  getPlayerCount() {
    return this.getConnectionCount();
  }
}

API Reference

Schema Definition

import { method, event } from "@igoforth/ws-rpc/schema";
import { z } from "zod";

// Define a method with input/output validation
const myMethod = method({
  input: z.object({ /* ... */ }),
  output: z.object({ /* ... */ }),
});

// Define an event (fire-and-forget)
const myEvent = event({
  data: z.object({ /* ... */ }),
});

// Combine into a schema
const MySchema = {
  methods: { myMethod },
  events: { myEvent },
} satisfies RpcSchema;

Type Inference

import type { Provider, Driver, InferInput, InferOutput } from "@igoforth/ws-rpc/schema";

// Get the provider type (what you implement)
type MyProvider = Provider<typeof MySchema>;

// Get the driver type (what you call)
type MyDriver = Driver<typeof MySchema>;

// Get input/output types for a specific method
type MyMethodInput = InferInput<typeof MySchema["methods"]["myMethod"]>;
type MyMethodOutput = InferOutput<typeof MySchema["methods"]["myMethod"]>;

Codecs

import { createMsgpackCodec } from "@igoforth/ws-rpc/codecs/msgpack";
import { createCborCodec } from "@igoforth/ws-rpc/codecs/cbor";
import { createJsonCodec } from "@igoforth/ws-rpc/codecs/json";

// JSON (default)
const jsonCodec = createJsonCodec(z.unknown());

// MessagePack (requires @msgpack/msgpack)
const msgpackCodec = createMsgpackCodec(z.unknown());

// CBOR (requires cbor-x)
const cborCodec = createCborCodec(z.unknown());

Error Handling

import {
  RpcError,
  RpcTimeoutError,
  RpcRemoteError,
  RpcConnectionClosed,
  RpcValidationError,
  RpcMethodNotFoundError,
} from "@igoforth/ws-rpc/errors";

try {
  await client.driver.someMethod({ /* ... */ });
} catch (error) {
  if (error instanceof RpcTimeoutError) {
    console.log(`Request '${error.method}' timed out after ${error.timeoutMs}ms`);
  } else if (error instanceof RpcValidationError) {
    console.log("Invalid input/output:", error.message);
  } else if (error instanceof RpcRemoteError) {
    console.log("Server error:", error.message, error.code);
  } else if (error instanceof RpcMethodNotFoundError) {
    console.log(`Method '${error.method}' not found`);
  } else if (error instanceof RpcConnectionClosed) {
    console.log("Connection closed");
  }
}

Client Options

const client = new RpcClient({
  url: "wss://...",
  localSchema: MyLocalSchema,
  remoteSchema: MyRemoteSchema,
  provider: { /* method implementations */ },

  // Reconnection options (set to false to disable)
  reconnect: {
    initialDelay: 1000,    // First retry delay (ms)
    maxDelay: 30000,       // Maximum retry delay (ms)
    backoffMultiplier: 2,  // Exponential backoff multiplier
    maxAttempts: 0,        // Max attempts (0 = unlimited)
    jitter: 0.1,           // Random jitter factor (0-1)
  },

  // Request timeout (ms)
  timeout: 30000,

  // Auto-connect on creation (default: false)
  autoConnect: true,

  // WebSocket options
  protocols: ["v1"],                    // Subprotocols
  headers: { Authorization: "Bearer ..." }, // Headers (Node.js/Bun only)

  // Event handlers
  onConnect: () => console.log("Connected"),
  onDisconnect: (code, reason) => console.log("Disconnected"),
  onReconnect: (attempt, delay) => console.log(`Reconnecting in ${delay}ms`),
  onReconnectFailed: () => console.log("Reconnection failed"),
  onEvent: (event, data) => console.log("Event:", event, data),
});

// Connection state
client.state;       // "disconnected" | "connecting" | "connected" | "reconnecting"
client.isConnected; // boolean

Hibernation-Safe Durable Objects

For Cloudflare Durable Objects that need hibernation-safe outgoing calls, use DurableRpcPeer with continuation-passing style:

import { DurableRpcPeer } from "@igoforth/ws-rpc/peer";
import { SqlPendingCallStorage } from "@igoforth/ws-rpc/storage";

class MyDO extends Actor<Env> {
  private peer!: DurableRpcPeer<LocalSchema, RemoteSchema, this>;

  onWebSocketConnect(ws: WebSocket) {
    this.peer = new DurableRpcPeer({
      ws,
      localSchema: LocalSchema,
      remoteSchema: RemoteSchema,
      provider: this,
      storage: new SqlPendingCallStorage(this.ctx.storage.sql),
      actor: this,
    });
  }

  onWebSocketMessage(ws: WebSocket, message: string | ArrayBuffer) {
    this.peer.handleMessage(message);
  }

  async doSomething() {
    // Promise-based (NOT hibernation-safe - pending if DO hibernates)
    const result = await this.peer.driver.someMethod({ data: "value" });

    // Continuation-based (hibernation-safe)
    this.peer.callWithCallback("someMethod", { data: "value" }, "onResult");
  }

  // Callback receives result even after hibernation
  onResult(result: SomeResult, context: CallContext) {
    console.log("Result:", result);
    console.log("Latency:", context.latencyMs, "ms");
  }
}

Performance

Real WebSocket RPC round-trip benchmarks (GitHub Actions runner, Node.js 22):

Wire sizes:

Payload JSON MessagePack CBOR
Small 93 B 71 B 112 B
Medium 3.4 KB 2.1 KB 1.3 KB
Large 24.4 KB 19.5 KB 14.1 KB

Throughput (ops/sec):

Payload JSON MessagePack CBOR Fastest
Small 0 0 0 JSON
Medium 0 0 0 JSON
Large 0 0 0 JSON

Benchmarks run automatically via GitHub Actions. Results may vary based on runner load. Run locally with pnpm bench for your environment.

Multi-Peer Driver Results

When calling methods via server.driver or this.driver in a Durable Object, results are returned as an array:

// Call all connected peers
const results = await server.driver.getData({});

// Each result contains the peer ID and success/error
for (const { id, result } of results) {
  if (result.success) {
    console.log(`Peer ${id} returned:`, result.value);
  } else {
    console.error(`Peer ${id} failed:`, result.error.message);
  }
}

// Call specific peers
const singleResult = await server.driver.getData({}, { ids: "peer-123" });
const multiResult = await server.driver.getData({}, {
  ids: ["peer-1", "peer-2"],
  timeout: 5000,
});

License

MIT