JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 5
  • Score
    100M100P100Q107206F
  • License MIT

A minimal, type-safe, hierarchical pub-sub message bus for TypeScript and JavaScript

Package Exports

  • @lppedd/message-bus

Readme

message-bus

A minimal, type-safe, hierarchical pub-sub message bus for TypeScript and JavaScript

test npm npm gzipped size license

Hierarchical Bus

Table of Contents

Installation

npm i @lppedd/message-bus
pnpm add @lppedd/message-bus
yarn add @lppedd/message-bus

API reference

You can find the complete API reference at lppedd.github.io/message-bus.

Requirements

The JavaScript environment must support or polyfill Map, Set, WeakMap and WeakRef.

Quickstart

// Create a message bus
const bus = createMessageBus();

// Create a topic to publish messages to
const CommandTopic = createTopic<string>("Command");

// Subscribe to the topic using a message handler/callback
bus.subscribe(CommandTopic, (command) => {
  if (command === "shutdown") {
    /* ... */
  }
});

// Publish a new message to the topic
bus.publish(CommandTopic, "shutdown");

Creating a message bus

Use the createMessageBus factory function to create a new message bus:

const bus = createMessageBus();

You can customize the message bus behavior by also passing options:

const bus = createMessageBus({
  safePublishing: true,   // Prevents publishing from failing if a message handler throws
  errorHandler: () => {}, // Handles errors thrown by message handlers (requires safePublishing: true).
});                       // By default, caught unhandled errors are printed to console.error.

Child buses

A MessageBus can create child buses.
By default, subscriptions registered on a child bus will also receive messages published by its parent bus.

To create a child bus, call:

const childBus = bus.createChildBus();

Publishing messages

To publish a message via the message bus, the first step is to define a topic.
A topic is a typed object that uniquely identifies a message channel.

// Messages sent to the CommandTopic must be strings
const CommandTopic = createTopic<string>("Command");

Once you have a topic, you can publish a message:

bus.publish(CommandTopic, "shutdown");

Note that if the topic uses a void type, the value parameter can be omitted:

const PingTopic = createTopic<void>("Ping");

// No value needed
bus.publish(PingTopic);

Message ordering 🚥

The message bus guarantees that messages are always dispatched to handlers in the order they are published.

If a message is published from within a message handler, it is not dispatched immediately. Instead, it is enqueued and will only be processed after all previously published (but not yet dispatched) messages. This ensures consistent, FIFO-style message delivery, even across nested publish calls.

Subscribing to topics

There are multiple ways to subscribe to a topic, but the most straightforward is to provide a message handler (a callback):

const subscription = bus.subscribe(CommandTopic, (command) => {
  /* ... */
});

The handler is invoked each time a message is published to the topic, whether it is published on the current bus or any of its parent buses.

You can unsubscribe from the topic at any time by calling subscription.dispose().

Single-message subscription

If you're only interested in the single next message, use:

bus.subscribeOnce(CommandTopic, (command) => {
  /* ... */
});

This subscribes to the topic and automatically disposes the created Subscription after receiving a single message.

Asynchronous consumption ⚡

An alternative way to subscribe to a topic is using async iterations:

const subscription = bus.subscribe(CommandTopic); // AsyncIterableIterator<string>

This creates a lazy subscription: no actual subscription is made until you start consuming messages.
You can do that using a for await ... of loop:

for await (const command of subscription) {
  /* ... */
}

Or by awaiting the next message directly with subscription.single():

const command = await subscription.single(); // Promise<string>

Note that calling single() does not automatically dispose the subscription. In contrast, a for await ... of loop disposes it automatically when the iteration ends, whether normally or due to a break, a return, or an error.

If you use single() and no longer need the subscription afterward, remember to dispose it manually with subscription.dispose().

Asynchronous single-message subscription

The asynchronous alternative to bus.subscribeOnce(topic, handler) is:

const command = await bus.subscribeOnce(CommandTopic); // Promise<string>

[!TIP] If you are only interested in a single message, prefer using subscribeOnce(Topic) over subscribe(Topic) + subscription.single(). This avoids the need to manually dispose the subscription.

Decorator-based subscription

The library also supports a declarative way to subscribe to topics, by using TypeScript's experimental decorators.

When applied to a method parameter, a topic created with createTopic can act as a parameter decorator. This allows wiring up subscriptions directly inside class methods.

To enable this behavior, decorate the class with @AutoSubscribe and pass the target message bus, where subscriptions will be created:

@AutoSubscribe(messageBus) // or () => messageBus, if it needs to be lazily resolved
export class CommandProcessor {
  onCommand(@CommandTopic() command: string): void {
    if (command === "shutdown") {
      /* ... */
    }
  }
}

This automatically subscribes the onCommand method to CommandTopic, and unsubscribes it when the instance is garbage-collected.

[!NOTE] The class must be instantiated, either manually or via a third-party mechanism, for the subscription to be activated. Decorating the class alone does not trigger any subscriptions.

Unsubscribing programmatically

If you do not want to rely on garbage collection to clean up the subscriptions, you can unsubscribe manually. To do that, declare a Subscription parameter immediately after the decorated topic parameter. The runtime will automatically inject the corresponding subscription object:

@AutoSubscribe(messageBus)
export class CommandProcessor {
  onCommand(@CommandTopic() command: string, subscription: Subscription): void {
    if (command === "shutdown") {
      /* ... */
      subscription.dispose();
    }
  }
}

[!NOTE] Only one Subscription parameter is allowed per method, and it must follow the topic parameter.

License

MIT license

2025-present Edoardo Luppi