Package Exports
- @aimf/event-mesh
Readme
@aimf/event-mesh
Real-time Event Mesh for Pub/Sub Messaging in the AI-MCP Framework.
Overview
The Event Mesh package provides a high-performance, topic-based publish/subscribe messaging system with advanced features like pattern matching, typed channels, request/reply, and event aggregation.
Features
- Topic-based Pub/Sub: Flexible topic patterns with wildcards (
*and#) - Typed Channels: Type-safe event channels with TypeScript generics
- Request/Reply: RPC-style communication over the event bus
- Event Aggregation: Batch and window events by count or time
- Dead Letter Queue: Capture and handle failed events
- Priority Handling: Execute handlers in priority order
- Event Filtering: Filter events before they reach handlers
Installation
pnpm add @aimf/event-meshQuick Start
import { createEventBus, createChannel } from "@aimf/event-mesh";
// Create an event bus
const bus = createEventBus();
// Simple pub/sub
bus.subscribe("users.created", (data) => {
console.log("User created:", data);
});
await bus.publish("users.created", { id: 1, name: "Alice" });
// Pattern matching with wildcards
bus.subscribe("users.*", (data) => {
console.log("User event:", data);
});
bus.subscribe("users.#", (data) => {
console.log("Any user event:", data);
});
// Clean up
await bus.destroy();Topic Patterns
The event mesh supports two types of wildcards:
*- Matches exactly one segment#- Matches zero or more segments
// Match any direct child topic
bus.subscribe("users.*", handler);
// Matches: users.created, users.updated, users.deleted
// Does NOT match: users.profile.updated
// Match any descendant topic
bus.subscribe("users.#", handler);
// Matches: users, users.created, users.profile.updated
// Complex patterns
bus.subscribe("*.events.#", handler);
// Matches: users.events.login, orders.events.created.successTyped Channels
Create type-safe channels for specific event types:
import { createChannel, createChannelRegistry } from "@aimf/event-mesh";
interface UserCreatedEvent {
id: number;
name: string;
email: string;
}
// Create a typed channel
const userChannel = createChannel<UserCreatedEvent>(bus, "users.created");
// Subscribe with type safety
userChannel.subscribe((user) => {
console.log(user.name); // TypeScript knows this is a string
});
// Publish with type checking
await userChannel.publish({
id: 1,
name: "Alice",
email: "alice@example.com"
});
// Channel Registry for managing multiple channels
const registry = createChannelRegistry(bus);
registry.register<UserCreatedEvent>("users.created");
registry.register<{ orderId: string }>("orders.created");
const channel = registry.get("users.created");Request/Reply Pattern
Implement RPC-style communication:
import { createRequestClient, createRequestResponder } from "@aimf/event-mesh";
// Server side - create responder
const responder = createRequestResponder(bus, "math.add");
responder.handle<{ a: number; b: number }, number>(({ a, b }) => a + b);
// Client side - send requests
const client = createRequestClient(bus);
const result = await client.request<{ a: number; b: number }, number>(
"math.add",
{ a: 5, b: 3 }
);
console.log(result); // 8
// Clean up
responder.stop();Event Aggregation
Batch events by count or time window:
import { createEventAggregator, aggregators } from "@aimf/event-mesh";
const aggregator = createEventAggregator(bus);
// Count-based aggregation
aggregator.addRule({
id: "batch-orders",
pattern: "orders.#",
windowType: "count",
windowSize: 10, // Emit after 10 events
outputTopic: "orders.batch",
aggregator: aggregators.collect,
});
// Time-based aggregation
aggregator.addRule({
id: "metrics-window",
pattern: "metrics.#",
windowType: "time",
windowSize: 5000, // Emit every 5 seconds
outputTopic: "metrics.batch",
aggregator: aggregators.sum,
});
// Subscribe to batched events
bus.subscribe("orders.batch", (batch) => {
console.log(`Processing ${batch.length} orders`);
});
// Built-in aggregators
// - aggregators.collect - Collect into array
// - aggregators.sum - Sum numeric values
// - aggregators.average - Average numeric values
// - aggregators.count - Count events
// - aggregators.first - Get first value
// - aggregators.last - Get last value
// - aggregators.min - Get minimum value
// - aggregators.max - Get maximum value
// - aggregators.groupBy(key) - Group by object key
// Clean up
aggregator.destroy();Event Filtering
Filter events before they reach handlers:
bus.subscribe(
"users.#",
(user) => {
console.log("Admin user:", user);
},
{
filter: (data) => data.role === "admin",
}
);Priority Handling
Control handler execution order:
// Higher priority handlers execute first
bus.subscribe("events.important", handler1, { priority: 1 });
bus.subscribe("events.important", handler2, { priority: 10 }); // Runs first
bus.subscribe("events.important", handler3, { priority: 5 });Dead Letter Queue
Capture and handle failed events:
// Listen for dead letters
bus.onDeadLetter((deadLetter) => {
console.error(`Event failed: ${deadLetter.topic}`, deadLetter.error);
// Retry or log the failure
});
// Get dead letter queue
const deadLetters = bus.getDeadLetterQueue();
// Clear the queue
bus.clearDeadLetterQueue();Waiting for Events
Wait for specific events with timeout:
// Wait for an event (with timeout)
const envelope = await bus.waitFor("users.verified", 5000);
console.log("User verified:", envelope.data);
// One-time subscription
bus.once("startup.complete", () => {
console.log("Application started");
});Event Envelope
Access full event metadata:
bus.subscribe("events.test", (data, envelope) => {
console.log("Event ID:", envelope.metadata.id);
console.log("Timestamp:", envelope.metadata.timestamp);
console.log("Correlation ID:", envelope.metadata.correlationId);
console.log("Source:", envelope.metadata.source);
});
// Publish with metadata
await bus.publish("events.test", data, {
correlationId: "request-123",
source: "user-service",
});Configuration
const bus = createEventBus({
maxDeadLetterSize: 1000, // Maximum dead letters to keep
generateId: () => crypto.randomUUID(), // Custom ID generator
});Statistics
Monitor event bus activity:
const stats = bus.getStatistics();
console.log("Published:", stats.publishedCount);
console.log("Subscriptions:", stats.subscriptionCount);
console.log("Dead Letters:", stats.deadLetterCount);
// Aggregator statistics
const aggStats = aggregator.getStatistics("rule-id");
console.log("Events Processed:", aggStats.eventsProcessed);
console.log("Batches Emitted:", aggStats.batchesEmitted);Namespaced Channels
Create isolated channel namespaces:
import { createNamespacedChannels } from "@aimf/event-mesh";
const appChannels = createNamespacedChannels(bus, "myapp");
const usersChannel = appChannels.channel<UserEvent>("users");
// Topic: myapp.users
const ordersChannel = appChannels.channel<OrderEvent>("orders");
// Topic: myapp.ordersAPI Reference
EventBus
| Method | Description |
|---|---|
subscribe(topic, handler, options?) |
Subscribe to events |
unsubscribe(subscriptionId) |
Unsubscribe by ID |
publish(topic, data, metadata?) |
Publish an event |
once(topic, handler) |
Subscribe for one event |
waitFor(topic, timeout?) |
Wait for an event |
getSubscriptions() |
Get all subscriptions |
getTopics() |
Get unique topics |
hasSubscribers(topic) |
Check for subscribers |
getDeadLetterQueue() |
Get failed events |
clearDeadLetterQueue() |
Clear dead letters |
onDeadLetter(handler) |
Handle dead letters |
getStatistics() |
Get bus statistics |
destroy() |
Clean up resources |
Topic Matching
| Function | Description |
|---|---|
parseTopicPattern(pattern) |
Parse a topic pattern |
matchTopic(pattern, topic) |
Match topic against pattern |
validateTopic(topic, allowPatterns?) |
Validate topic format |
isPattern(topic) |
Check if topic contains wildcards |
Channel Functions
| Function | Description |
|---|---|
createChannel(bus, topic) |
Create a typed channel |
createChannelRegistry(bus) |
Create a channel registry |
createTypedChannel(bus, topic, options) |
Create with options |
createNamespacedChannels(bus, namespace) |
Create namespaced factory |
Request/Reply
| Function | Description |
|---|---|
createRequestClient(bus, options?) |
Create request client |
createRequestResponder(bus, topic) |
Create responder |
createRequestReply(bus, topic) |
Create paired client/responder |
Aggregator
| Function | Description |
|---|---|
createEventAggregator(bus) |
Create aggregator |
aggregators.collect |
Collect events into array |
aggregators.sum |
Sum numeric events |
aggregators.average |
Average numeric events |
aggregators.count |
Count events |
aggregators.first |
Get first event |
aggregators.last |
Get last event |
aggregators.min |
Get minimum |
aggregators.max |
Get maximum |
aggregators.groupBy(key) |
Group by object key |
License
MIT