JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 244
  • Score
    100M100P100Q91893F
  • License LicenseRef-LICENSE

Application-level event sourcing toolkit for delta-base

Package Exports

  • @delta-base/toolkit

Readme

@delta-base/toolkit

Application-level event sourcing toolkit for delta-base. This package provides high-level patterns and utilities for building event-sourced applications with delta-base.

Features

  • 🏗️ Core Types - Strongly-typed events, commands, and projections
  • 💾 In-Memory Event Store - Complete event store implementation for testing
  • Command Handlers - Simple and decider pattern-based command handling with retry logic
  • 📊 Projection System - Interface-based projections with pluggable read model stores
  • 🔧 Error Handling - Comprehensive error types with type guards
  • 🛠️ Utility Functions - Event and command creation helpers

Installation

npm install @delta-base/toolkit
# or
pnpm add @delta-base/toolkit

Quick Start

Basic Event Store Usage

import { InMemoryEventStore, createEvent } from '@delta-base/toolkit';

const eventStore = new InMemoryEventStore();

// Create and append events
const event = createEvent('UserRegistered', {
  userId: 'user-123',
  email: 'user@example.com',
  name: 'John Doe'
});

await eventStore.appendToStream('user-123', [event]);

// Read events back
const events = await eventStore.readFromStream('user-123');
console.log(events.events); // [{ type: 'UserRegistered', ... }]

Command Handling with Decider Pattern

import { 
  handleCommandWithDecider,
  createCommand,
  InMemoryEventStore,
  type Decider,
  type ReadEvent
} from '@delta-base/toolkit';

// Define your aggregate state
interface UserState {
  id?: string;
  email?: string;
  isActive: boolean;
}

// Define your events
type UserEvent = 
  | { type: 'UserRegistered'; data: { userId: string; email: string; name: string } }
  | { type: 'EmailUpdated'; data: { userId: string; newEmail: string } };

// Define your commands  
type UserCommand = 
  | { type: 'RegisterUser'; data: { userId: string; email: string; name: string } }
  | { type: 'UpdateEmail'; data: { userId: string; newEmail: string } };

// Create your decider
const userDecider: Decider<UserState, UserCommand, UserEvent> = {
  initialState: () => ({ isActive: false }),
  
  decide: (state: UserState, command: UserCommand) => {
    switch (command.type) {
      case 'RegisterUser':
        if (state.id) throw new Error('User already exists');
        return [{
          type: 'UserRegistered',
          data: command.data
        }];
      
      case 'UpdateEmail':
        if (!state.id) throw new Error('User does not exist');
        return [{
          type: 'EmailUpdated', 
          data: { userId: command.data.userId, newEmail: command.data.newEmail }
        }];
      
      default:
        return [];
    }
  },
  
  evolve: (state: UserState, event: ReadEvent<UserEvent>) => {
    switch (event.type) {
      case 'UserRegistered':
        return {
          ...state,
          id: event.data.userId,
          email: event.data.email,
          isActive: true
        };
      
      case 'EmailUpdated':
        return {
          ...state,
          email: event.data.newEmail
        };
      
      default:
        return state;
    }
  }
};

// Use the command handler
const eventStore = new InMemoryEventStore();

const command = createCommand('RegisterUser', {
  userId: '123',
  email: 'user@example.com', 
  name: 'John Doe'
});

const result = await handleCommandWithDecider(
  eventStore,
  'user-123',
  command,
  userDecider
);

console.log(result.newState); // { id: '123', email: 'user@example.com', isActive: true }
console.log(result.newEvents); // [UserRegistered event]

Building Projections with the New Interface System

The new projection system provides a powerful interface-based approach with pluggable read model stores:

import { 
  BaseProjection,
  InMemoryReadModelStore,
  KVReadModelStore,
  type ReadEvent,
  type IReadModelStore
} from '@delta-base/toolkit';

// Define your read model structure
interface UserReadModel {
  id: string;
  email: string;
  name: string;
  status: 'active' | 'inactive';
  registeredAt: string;
  lastUpdated: string;
  revision: number;
}

// Create a projection class extending BaseProjection
class UserProjection extends BaseProjection {
  readonly supportedEventTypes = [
    'UserRegisteredEvent',
    'UserUpdatedEvent', 
    'UserStatusChangedEvent'
  ];

  constructor(store: IReadModelStore) {
    super(store);
  }

  protected async processEvent(event: ReadEvent): Promise<void> {
    switch (event.type) {
      case 'UserRegisteredEvent':
        await this.handleUserRegistered(event);
        break;
      case 'UserUpdatedEvent':
        await this.handleUserUpdated(event);
        break;
      case 'UserStatusChangedEvent':
        await this.handleUserStatusChanged(event);
        break;
    }
  }

  private async handleUserRegistered(event: ReadEvent): Promise<void> {
    const eventData = event.data as any;
    const userId = `user:${eventData.userId}`;
    
    // Check if user already exists (idempotency)
    const existingUser = await this.store.get<UserReadModel>(userId);
    if (existingUser && !(await this.shouldProcessEvent(event, existingUser.revision))) {
      return;
    }

    const userReadModel: UserReadModel = {
      id: eventData.userId,
      email: eventData.email,
      name: eventData.name,
      status: 'active',
      registeredAt: event.createdAt,
      lastUpdated: event.createdAt,
      revision: event.streamPosition
    };

    await this.store.put(userId, userReadModel);
  }

  private async handleUserUpdated(event: ReadEvent): Promise<void> {
    const eventData = event.data as any;
    const userId = `user:${eventData.userId}`;
    
    const existingUser = await this.store.get<UserReadModel>(userId);
    if (!existingUser) {
      console.warn(`User ${eventData.userId} not found for update`);
      return;
    }

    if (!this.validateRevision(event, existingUser.revision)) {
      return;
    }

    const updatedUser: UserReadModel = {
      ...existingUser,
      email: eventData.email || existingUser.email,
      name: eventData.name || existingUser.name,
      lastUpdated: event.createdAt,
      revision: event.streamPosition
    };

    await this.store.put(userId, updatedUser);
  }

  private async handleUserStatusChanged(event: ReadEvent): Promise<void> {
    const eventData = event.data as any;
    const userId = `user:${eventData.userId}`;
    
    const existingUser = await this.store.get<UserReadModel>(userId);
    if (!existingUser || !this.validateRevision(event, existingUser.revision)) {
      return;
    }

    const updatedUser: UserReadModel = {
      ...existingUser,
      status: eventData.status,
      lastUpdated: event.createdAt,
      revision: event.streamPosition
    };

    await this.store.put(userId, updatedUser);
  }
}

// Usage with different store implementations
const inMemoryStore = new InMemoryReadModelStore();
const userProjection = new UserProjection(inMemoryStore);

// Process events
await userProjection.processEvents([userRegisteredEvent, userUpdatedEvent]);

// Query the read model
const user = await inMemoryStore.get<UserReadModel>('user:123');
console.log(user); // { id: '123', email: 'user@example.com', ... }

Read Model Store Implementations

The toolkit provides multiple store implementations:

In-Memory Store (for testing)

import { InMemoryReadModelStore } from '@delta-base/toolkit';

const store = new InMemoryReadModelStore();

// Supports all IReadModelStore operations
await store.put('key', { data: 'value' });
const value = await store.get('key');
const allItems = await store.getAll({ prefix: 'user:' });

// Advanced features
await store.batchPut([
  { key: 'user:1', value: { name: 'John' } },
  { key: 'user:2', value: { name: 'Jane' } }
]);

const users = await store.query({
  filter: { status: 'active' }
});

Cloudflare KV Store

import { KVReadModelStore } from '@delta-base/toolkit';

// In a Cloudflare Worker
const store = new KVReadModelStore(env.MY_KV_NAMESPACE);

// Supports native batch operations for performance
const users = await store.batchGet(['user:1', 'user:2', 'user:3']);

// TTL and metadata support
await store.put('session:123', sessionData, {
  expirationTtl: 3600, // 1 hour
  metadata: { userId: '123' }
});

HTTP Store (for external APIs)

import { HttpReadModelStore } from '@delta-base/toolkit';

const store = new HttpReadModelStore('https://api.example.com/readmodels', {
  'Authorization': 'Bearer token'
});

// Works with any HTTP API that follows REST conventions
await store.put('user:123', userData);
const user = await store.get('user:123');

Webhook Projections

import { 
  createWebhookProjectionHandler,
  KVReadModelStore
} from '@delta-base/toolkit';

// Create projection for webhook deployment
class UserStatsProjection extends BaseProjection {
  readonly supportedEventTypes = ['UserRegisteredEvent', 'UserStatusChangedEvent'];

  protected async processEvent(event: ReadEvent): Promise<void> {
    const stats = await this.store.get<{ totalUsers: number; activeUsers: number }>('stats') || 
                  { totalUsers: 0, activeUsers: 0 };

    switch (event.type) {
      case 'UserRegisteredEvent':
        stats.totalUsers++;
        stats.activeUsers++;
        break;
      case 'UserStatusChangedEvent':
        const eventData = event.data as any;
        if (eventData.status === 'inactive') stats.activeUsers--;
        else if (eventData.status === 'active') stats.activeUsers++;
        break;
    }

    await this.store.put('stats', stats);
  }
}

// In Cloudflare Worker
const projection = new UserStatsProjection(new KVReadModelStore(env.KV_NAMESPACE));
const webhookHandler = createWebhookProjectionHandler(projection);

export default {
  async fetch(request: Request): Promise<Response> {
    return await webhookHandler(request);
  }
};

Store Capabilities and Runtime Detection

// Check store capabilities at runtime
const capabilities = store.getCapabilities();

if (capabilities.features.ttl) {
  // Store supports TTL
  await store.put('temp-data', data, { expirationTtl: 300 });
}

if (capabilities.features.advancedQueries) {
  // Store supports complex queries
  const results = await store.query({
    filter: { 
      status: 'active',
      lastLogin: { $gte: new Date('2024-01-01') }
    }
  });
}

console.log(`Store type: ${capabilities.storeType}`);
console.log(`Max batch size: ${capabilities.limits.maxBatchSize}`);

Core Concepts

Events

Events represent something that happened in the past. They are immutable and contain:

  • eventId - Unique identifier (added by event store)
  • type - Event type (e.g., "UserRegistered")
  • streamId - Stream where the event belongs (added by event store)
  • streamPosition - Position within the stream (added by event store)
  • globalPosition - Global position across all streams (added by event store)
  • data - Event payload
  • metadata - Additional metadata (optional)
  • schemaVersion - Version of the event schema (added by event store)
  • transactionId - Transaction identifier (added by event store)
  • createdAt - When the event occurred (added by event store)

Commands

Commands represent an intention to do something. They contain:

  • commandId - Unique identifier (added automatically)
  • type - Command type (e.g., "RegisterUser")
  • data - Command payload
  • metadata - Additional metadata (optional)
  • createdAt - When the command was created (added automatically)

Decider Pattern

The decider pattern separates:

  1. Decide - Given current state and a command, what events should be produced?
  2. Evolve - Given current state and an event, what is the new state?
  3. Initial State - What is the starting state?

This pattern makes business logic pure and testable.

Read Model Projections

The new projection system provides a clean, interface-based approach:

IReadModelStore Interface

  • Unified API: Same interface works with in-memory, KV, HTTP, and other stores
  • Flexible Operations: get, put, delete, getAll, batchGet, batchPut, query
  • Store Capabilities: Runtime feature detection for optimal performance
  • Multi-table Support: Optional table/namespace isolation

Projection Interface

  • Event Filtering: Declare supported event types for automatic filtering
  • Batch Processing: Process multiple events efficiently in order
  • Type Safety: Strongly typed event handling

BaseProjection Class

  • Common Patterns: Built-in revision tracking and idempotency checking
  • Event Ordering: Sequential processing maintains consistency
  • Error Handling: Graceful handling of out-of-order events

Benefits

  • Developer Experience: Clean APIs with comprehensive TypeScript support
  • Performance: Native batch operations and store-optimized queries
  • Scalability: Pluggable stores from in-memory to distributed systems
  • Testing: Easy to test with in-memory stores
  • Production: Deploy to any platform with appropriate store implementation

Error Handling

The toolkit provides comprehensive error types with type guards:

import { 
  isDeltaBaseError, 
  isVersionConflictError,
  StreamVersionConflictError 
} from '@delta-base/toolkit';

try {
  await handleCommand(eventStore, streamId, command, decider);
} catch (error) {
  if (isVersionConflictError(error)) {
    // Handle concurrency conflict
    console.log('Version conflict, retrying...');
  } else if (isDeltaBaseError(error)) {
    // Handle other toolkit errors
    console.log('Toolkit error:', error.message);
  } else {
    // Handle unexpected errors
    console.log('Unexpected error:', error);
  }
}

API Reference

Core Types

  • Event<TType, TData, TMetadata?> - Event interface
  • ReadEvent<TEvent> - Event as read from event store with system fields
  • Command<TType, TData, TMetadata?> - Command interface
  • StreamId - Stream identifier type
  • EventStore - Event persistence interface

Command Handling

  • handleCommand() - Handle command without decider pattern
  • handleCommandWithDecider() - Handle command with decider pattern
  • handleCommandWithRetry() - Handle command with automatic retry
  • handleCommandWithDeciderAndRetry() - Handle command with decider and retry
  • Decider<State, Command, Event> - Decider pattern interface

Read Model Projections

  • IReadModelStore - Unified interface for read model storage
  • InMemoryReadModelStore - In-memory implementation for testing
  • KVReadModelStore - Cloudflare KV implementation with native batch operations
  • HttpReadModelStore - HTTP API implementation for external services
  • Projection - Interface for event-driven projections
  • BaseProjection - Abstract base class with common projection patterns
  • createWebhookProjectionHandler() - Create HTTP webhook handlers

Database

  • InMemoryEventStore - Full event store implementation for testing
    • Implements all EventStore interface methods
    • Includes utility methods like getAllStreamIds(), clear(), etc.

Error Types

All errors inherit from DeltaBaseError and include specific types for:

  • VersionConflictError / StreamVersionConflictError - Concurrency conflicts
  • ValidationError - Request validation failures
  • AuthenticationError / AuthorizationError - Auth failures
  • NotFoundError / StreamNotFoundError - Resource not found
  • TimeoutError / RateLimitError - Operational failures

Utilities

  • createEvent(type, data, metadata?) - Create events with type inference
  • createCommand(type, data, metadata?) - Create commands with type inference
  • createReadEvent() - Create read events for testing

Testing

The toolkit is designed for easy testing:

import { 
  InMemoryEventStore,
  InMemoryReadModelStore,
  createEvent, 
  createCommand,
  handleCommandWithDecider,
  BaseProjection,
  type ReadEvent
} from '@delta-base/toolkit';

describe('User Registration with Projections', () => {
  let eventStore: InMemoryEventStore;
  let readModelStore: InMemoryReadModelStore;
  let userProjection: UserProjection;
  
  beforeEach(() => {
    eventStore = new InMemoryEventStore();
    readModelStore = new InMemoryReadModelStore();
    userProjection = new UserProjection(readModelStore);
  });
  
  it('should register user and update read model', async () => {
    // Given - command
    const command = createCommand('RegisterUser', {
      userId: '123',
      email: 'user@example.com',
      name: 'John Doe'
    });
    
    // When - command is handled
    const result = await handleCommandWithDecider(
      eventStore,
      'user-123',
      command,
      userDecider
    );
    
    // And - projection processes the events
    await userProjection.processEvents(result.newEvents);
    
    // Then - verify command result
    expect(result.newState.id).toBe('123');
    expect(result.newEvents).toHaveLength(1);
    expect(result.newEvents[0].type).toBe('UserRegistered');
    
    // And - verify read model
    const userReadModel = await readModelStore.get('user:123');
    expect(userReadModel).toEqual({
      id: '123',
      email: 'user@example.com',
      name: 'John Doe',
      status: 'active',
      registeredAt: expect.any(String),
      lastUpdated: expect.any(String),
      revision: 1
    });
  });
});

Examples

See the /examples directory for complete working examples:

  • Basic User Management - Complete user lifecycle with commands and projections

Best Practices

  1. Keep Business Logic Pure: Use the decider pattern to separate decisions from effects
  2. Design for Idempotency: Commands and projections should be safe to retry
  3. Version Your Events: Include schema version in event metadata
  4. Test with Events: Write tests that verify event streams and read models
  5. Handle Errors Gracefully: Use provided error types and type guards
  6. Choose the Right Store: Use InMemory for testing, KV for serverless, HTTP for existing APIs
  7. Leverage Batch Operations: Use batchGet/batchPut for better performance
  8. Monitor Store Capabilities: Check capabilities at runtime for optimal behavior