JSPM

@aimf/event-mesh

0.1.1
    • ESM via JSPM
    • ES Module Entrypoint
    • Export Map
    • Keywords
    • License
    • Repository URL
    • TypeScript Types
    • README
    • Created
    • Published
    • Downloads 5
    • Score
      100M100P100Q40025F
    • License MIT

    Real-time Event Mesh for Pub/Sub Messaging in AIMF

    Package Exports

    • @aimf/event-mesh

    Readme

    @aimf/event-mesh

    Real-time Event Mesh for Pub/Sub Messaging in the AI-MCP Framework.

    Overview

    The Event Mesh package provides a high-performance, topic-based publish/subscribe messaging system with advanced features like pattern matching, typed channels, request/reply, and event aggregation.

    Features

    • Topic-based Pub/Sub: Flexible topic patterns with wildcards (* and #)
    • Typed Channels: Type-safe event channels with TypeScript generics
    • Request/Reply: RPC-style communication over the event bus
    • Event Aggregation: Batch and window events by count or time
    • Dead Letter Queue: Capture and handle failed events
    • Priority Handling: Execute handlers in priority order
    • Event Filtering: Filter events before they reach handlers

    Installation

    pnpm add @aimf/event-mesh

    Quick Start

    import { createEventBus, createChannel } from "@aimf/event-mesh";
    
    // Create an event bus
    const bus = createEventBus();
    
    // Simple pub/sub
    bus.subscribe("users.created", (data) => {
      console.log("User created:", data);
    });
    
    await bus.publish("users.created", { id: 1, name: "Alice" });
    
    // Pattern matching with wildcards
    bus.subscribe("users.*", (data) => {
      console.log("User event:", data);
    });
    
    bus.subscribe("users.#", (data) => {
      console.log("Any user event:", data);
    });
    
    // Clean up
    await bus.destroy();

    Topic Patterns

    The event mesh supports two types of wildcards:

    • * - Matches exactly one segment
    • # - Matches zero or more segments
    // Match any direct child topic
    bus.subscribe("users.*", handler);
    // Matches: users.created, users.updated, users.deleted
    // Does NOT match: users.profile.updated
    
    // Match any descendant topic
    bus.subscribe("users.#", handler);
    // Matches: users, users.created, users.profile.updated
    
    // Complex patterns
    bus.subscribe("*.events.#", handler);
    // Matches: users.events.login, orders.events.created.success

    Typed Channels

    Create type-safe channels for specific event types:

    import { createChannel, createChannelRegistry } from "@aimf/event-mesh";
    
    interface UserCreatedEvent {
      id: number;
      name: string;
      email: string;
    }
    
    // Create a typed channel
    const userChannel = createChannel<UserCreatedEvent>(bus, "users.created");
    
    // Subscribe with type safety
    userChannel.subscribe((user) => {
      console.log(user.name); // TypeScript knows this is a string
    });
    
    // Publish with type checking
    await userChannel.publish({
      id: 1,
      name: "Alice",
      email: "alice@example.com"
    });
    
    // Channel Registry for managing multiple channels
    const registry = createChannelRegistry(bus);
    
    registry.register<UserCreatedEvent>("users.created");
    registry.register<{ orderId: string }>("orders.created");
    
    const channel = registry.get("users.created");

    Request/Reply Pattern

    Implement RPC-style communication:

    import { createRequestClient, createRequestResponder } from "@aimf/event-mesh";
    
    // Server side - create responder
    const responder = createRequestResponder(bus, "math.add");
    responder.handle<{ a: number; b: number }, number>(({ a, b }) => a + b);
    
    // Client side - send requests
    const client = createRequestClient(bus);
    
    const result = await client.request<{ a: number; b: number }, number>(
      "math.add",
      { a: 5, b: 3 }
    );
    console.log(result); // 8
    
    // Clean up
    responder.stop();

    Event Aggregation

    Batch events by count or time window:

    import { createEventAggregator, aggregators } from "@aimf/event-mesh";
    
    const aggregator = createEventAggregator(bus);
    
    // Count-based aggregation
    aggregator.addRule({
      id: "batch-orders",
      pattern: "orders.#",
      windowType: "count",
      windowSize: 10, // Emit after 10 events
      outputTopic: "orders.batch",
      aggregator: aggregators.collect,
    });
    
    // Time-based aggregation
    aggregator.addRule({
      id: "metrics-window",
      pattern: "metrics.#",
      windowType: "time",
      windowSize: 5000, // Emit every 5 seconds
      outputTopic: "metrics.batch",
      aggregator: aggregators.sum,
    });
    
    // Subscribe to batched events
    bus.subscribe("orders.batch", (batch) => {
      console.log(`Processing ${batch.length} orders`);
    });
    
    // Built-in aggregators
    // - aggregators.collect - Collect into array
    // - aggregators.sum - Sum numeric values
    // - aggregators.average - Average numeric values
    // - aggregators.count - Count events
    // - aggregators.first - Get first value
    // - aggregators.last - Get last value
    // - aggregators.min - Get minimum value
    // - aggregators.max - Get maximum value
    // - aggregators.groupBy(key) - Group by object key
    
    // Clean up
    aggregator.destroy();

    Event Filtering

    Filter events before they reach handlers:

    bus.subscribe(
      "users.#",
      (user) => {
        console.log("Admin user:", user);
      },
      {
        filter: (data) => data.role === "admin",
      }
    );

    Priority Handling

    Control handler execution order:

    // Higher priority handlers execute first
    bus.subscribe("events.important", handler1, { priority: 1 });
    bus.subscribe("events.important", handler2, { priority: 10 }); // Runs first
    bus.subscribe("events.important", handler3, { priority: 5 });

    Dead Letter Queue

    Capture and handle failed events:

    // Listen for dead letters
    bus.onDeadLetter((deadLetter) => {
      console.error(`Event failed: ${deadLetter.topic}`, deadLetter.error);
      // Retry or log the failure
    });
    
    // Get dead letter queue
    const deadLetters = bus.getDeadLetterQueue();
    
    // Clear the queue
    bus.clearDeadLetterQueue();

    Waiting for Events

    Wait for specific events with timeout:

    // Wait for an event (with timeout)
    const envelope = await bus.waitFor("users.verified", 5000);
    console.log("User verified:", envelope.data);
    
    // One-time subscription
    bus.once("startup.complete", () => {
      console.log("Application started");
    });

    Event Envelope

    Access full event metadata:

    bus.subscribe("events.test", (data, envelope) => {
      console.log("Event ID:", envelope.metadata.id);
      console.log("Timestamp:", envelope.metadata.timestamp);
      console.log("Correlation ID:", envelope.metadata.correlationId);
      console.log("Source:", envelope.metadata.source);
    });
    
    // Publish with metadata
    await bus.publish("events.test", data, {
      correlationId: "request-123",
      source: "user-service",
    });

    Configuration

    const bus = createEventBus({
      maxDeadLetterSize: 1000, // Maximum dead letters to keep
      generateId: () => crypto.randomUUID(), // Custom ID generator
    });

    Statistics

    Monitor event bus activity:

    const stats = bus.getStatistics();
    console.log("Published:", stats.publishedCount);
    console.log("Subscriptions:", stats.subscriptionCount);
    console.log("Dead Letters:", stats.deadLetterCount);
    
    // Aggregator statistics
    const aggStats = aggregator.getStatistics("rule-id");
    console.log("Events Processed:", aggStats.eventsProcessed);
    console.log("Batches Emitted:", aggStats.batchesEmitted);

    Namespaced Channels

    Create isolated channel namespaces:

    import { createNamespacedChannels } from "@aimf/event-mesh";
    
    const appChannels = createNamespacedChannels(bus, "myapp");
    
    const usersChannel = appChannels.channel<UserEvent>("users");
    // Topic: myapp.users
    
    const ordersChannel = appChannels.channel<OrderEvent>("orders");
    // Topic: myapp.orders

    API Reference

    EventBus

    Method Description
    subscribe(topic, handler, options?) Subscribe to events
    unsubscribe(subscriptionId) Unsubscribe by ID
    publish(topic, data, metadata?) Publish an event
    once(topic, handler) Subscribe for one event
    waitFor(topic, timeout?) Wait for an event
    getSubscriptions() Get all subscriptions
    getTopics() Get unique topics
    hasSubscribers(topic) Check for subscribers
    getDeadLetterQueue() Get failed events
    clearDeadLetterQueue() Clear dead letters
    onDeadLetter(handler) Handle dead letters
    getStatistics() Get bus statistics
    destroy() Clean up resources

    Topic Matching

    Function Description
    parseTopicPattern(pattern) Parse a topic pattern
    matchTopic(pattern, topic) Match topic against pattern
    validateTopic(topic, allowPatterns?) Validate topic format
    isPattern(topic) Check if topic contains wildcards

    Channel Functions

    Function Description
    createChannel(bus, topic) Create a typed channel
    createChannelRegistry(bus) Create a channel registry
    createTypedChannel(bus, topic, options) Create with options
    createNamespacedChannels(bus, namespace) Create namespaced factory

    Request/Reply

    Function Description
    createRequestClient(bus, options?) Create request client
    createRequestResponder(bus, topic) Create responder
    createRequestReply(bus, topic) Create paired client/responder

    Aggregator

    Function Description
    createEventAggregator(bus) Create aggregator
    aggregators.collect Collect events into array
    aggregators.sum Sum numeric events
    aggregators.average Average numeric events
    aggregators.count Count events
    aggregators.first Get first event
    aggregators.last Get last event
    aggregators.min Get minimum
    aggregators.max Get maximum
    aggregators.groupBy(key) Group by object key

    License

    MIT