JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 244
  • Score
    100M100P100Q91905F
  • License LicenseRef-LICENSE

Application-level event sourcing toolkit for delta-base

Package Exports

  • @delta-base/toolkit

Readme

@delta-base/toolkit

Application-level event sourcing toolkit for delta-base. This package provides high-level patterns and utilities for building event-sourced applications with delta-base.

Features

  • 🏗️ Core Types - Strongly-typed events, commands, and projections
  • 💾 In-Memory Event Store - Complete event store implementation for testing
  • Command Handlers - Simple and decider pattern-based command handling with retry logic
  • 📊 Projection Processors - Build read models from event streams with pluggable storage
  • 🔧 Error Handling - Comprehensive error types with type guards
  • 🛠️ Utility Functions - Event and command creation helpers

Installation

npm install @delta-base/toolkit
# or
pnpm add @delta-base/toolkit

Quick Start

Basic Event Store Usage

import { InMemoryEventStore, createEvent } from '@delta-base/toolkit';

const eventStore = new InMemoryEventStore();

// Create and append events
const event = createEvent('UserRegistered', {
  userId: 'user-123',
  email: 'user@example.com',
  name: 'John Doe'
});

await eventStore.appendToStream('user-123', [event]);

// Read events back
const events = await eventStore.readFromStream('user-123');
console.log(events.events); // [{ type: 'UserRegistered', ... }]

Command Handling with Decider Pattern

import { 
  handleCommandWithDecider,
  createCommand,
  InMemoryEventStore,
  type Decider,
  type ReadEvent
} from '@delta-base/toolkit';

// Define your aggregate state
interface UserState {
  id?: string;
  email?: string;
  isActive: boolean;
}

// Define your events
type UserEvent = 
  | { type: 'UserRegistered'; data: { userId: string; email: string; name: string } }
  | { type: 'EmailUpdated'; data: { userId: string; newEmail: string } };

// Define your commands  
type UserCommand = 
  | { type: 'RegisterUser'; data: { userId: string; email: string; name: string } }
  | { type: 'UpdateEmail'; data: { userId: string; newEmail: string } };

// Create your decider
const userDecider: Decider<UserState, UserCommand, UserEvent> = {
  initialState: () => ({ isActive: false }),
  
  decide: (state: UserState, command: UserCommand) => {
    switch (command.type) {
      case 'RegisterUser':
        if (state.id) throw new Error('User already exists');
        return [{
          type: 'UserRegistered',
          data: command.data
        }];
      
      case 'UpdateEmail':
        if (!state.id) throw new Error('User does not exist');
        return [{
          type: 'EmailUpdated', 
          data: { userId: command.data.userId, newEmail: command.data.newEmail }
        }];
      
      default:
        return [];
    }
  },
  
  evolve: (state: UserState, event: ReadEvent<UserEvent>) => {
    switch (event.type) {
      case 'UserRegistered':
        return {
          ...state,
          id: event.data.userId,
          email: event.data.email,
          isActive: true
        };
      
      case 'EmailUpdated':
        return {
          ...state,
          email: event.data.newEmail
        };
      
      default:
        return state;
    }
  }
};

// Use the command handler
const eventStore = new InMemoryEventStore();

const command = createCommand('RegisterUser', {
  userId: '123',
  email: 'user@example.com', 
  name: 'John Doe'
});

const result = await handleCommandWithDecider(
  eventStore,
  'user-123',
  command,
  userDecider
);

console.log(result.newState); // { id: '123', email: 'user@example.com', isActive: true }
console.log(result.newEvents); // [UserRegistered event]

Building Projections

import { 
  createInMemoryProjectionHandler,
  type ReadEvent 
} from '@delta-base/toolkit';

// Define projection state
interface UserListProjection {
  users: Array<{ id: string; email: string; registeredAt: string }>;
  totalCount: number;
}

// Create projection handler
const userListProjection = createInMemoryProjectionHandler<ReadEvent, UserListProjection>({
  projectionId: 'user-list',
  applyFn: async (event, currentProjection = { users: [], totalCount: 0 }) => {
    switch (event.type) {
      case 'UserRegistered':
        return {
          users: [
            ...currentProjection.users,
            {
              id: event.data.userId,
              email: event.data.email,
              registeredAt: event.createdAt
            }
          ],
          totalCount: currentProjection.totalCount + 1
        };
      
      default:
        return currentProjection;
    }
  }
});

// Process events to build projection
await userListProjection.processEvent(userRegisteredEvent);
const projection = await userListProjection.getProjection();
console.log(projection); // { users: [...], totalCount: 1 }

Webhook Projections

import { 
  createWebhookProjectionHandler,
  HttpProjectionStore
} from '@delta-base/toolkit';

// Create projection with external storage
const webhookHandler = createWebhookProjectionHandler({
  projectionId: 'user-analytics',
  applyFn: async (event, current = { totalUsers: 0, dailySignups: {} }) => {
    if (event.type === 'UserRegistered') {
      const day = new Date(event.createdAt).toISOString().split('T')[0];
      return {
        totalUsers: current.totalUsers + 1,
        dailySignups: {
          ...current.dailySignups,
          [day]: (current.dailySignups[day] || 0) + 1
        }
      };
    }
    return current;
  },
  store: new HttpProjectionStore('https://api.example.com/projections', {
    'Authorization': 'Bearer token'
  })
});

// Use in Cloudflare Worker, Vercel Edge Function, etc.
export default {
  async fetch(request: Request): Promise<Response> {
    return await webhookHandler(request);
  }
};

Custom Projection Stores

The toolkit uses a simple ProjectionStore interface, making it easy to implement custom storage backends:

import { ProjectionStore } from '@delta-base/toolkit';

// Example: Redis store
class RedisProjectionStore<T> implements ProjectionStore<T> {
  constructor(private redis: Redis, private prefix = 'proj:') {}
  
  async get(projectionId: string): Promise<T | null> {
    const data = await this.redis.get(`${this.prefix}${projectionId}`);
    return data ? JSON.parse(data) : null;
  }
  
  async set(projectionId: string, projection: T): Promise<void> {
    await this.redis.set(`${this.prefix}${projectionId}`, JSON.stringify(projection));
  }
  
  async delete(projectionId: string): Promise<void> {
    await this.redis.del(`${this.prefix}${projectionId}`);
  }
}

// Example: Cloudflare KV store  
class KVProjectionStore<T> implements ProjectionStore<T> {
  constructor(private kv: KVNamespace, private prefix = 'proj:') {}
  
  async get(projectionId: string): Promise<T | null> {
    return await this.kv.get(`${this.prefix}${projectionId}`, 'json');
  }
  
  async set(projectionId: string, projection: T): Promise<void> {
    await this.kv.put(`${this.prefix}${projectionId}`, JSON.stringify(projection));
  }
  
  async delete(projectionId: string): Promise<void> {
    await this.kv.delete(`${this.prefix}${projectionId}`);
  }
}

Core Concepts

Events

Events represent something that happened in the past. They are immutable and contain:

  • eventId - Unique identifier (added by event store)
  • type - Event type (e.g., "UserRegistered")
  • streamId - Stream where the event belongs (added by event store)
  • streamPosition - Position within the stream (added by event store)
  • globalPosition - Global position across all streams (added by event store)
  • data - Event payload
  • metadata - Additional metadata (optional)
  • schemaVersion - Version of the event schema (added by event store)
  • transactionId - Transaction identifier (added by event store)
  • createdAt - When the event occurred (added by event store)

Commands

Commands represent an intention to do something. They contain:

  • commandId - Unique identifier (added automatically)
  • type - Command type (e.g., "RegisterUser")
  • data - Command payload
  • metadata - Additional metadata (optional)
  • createdAt - When the command was created (added automatically)

Decider Pattern

The decider pattern separates:

  1. Decide - Given current state and a command, what events should be produced?
  2. Evolve - Given current state and an event, what is the new state?
  3. Initial State - What is the starting state?

This pattern makes business logic pure and testable.

Projections

Projections transform events into read models optimized for queries. They:

  • Process events sequentially using async apply functions
  • Build denormalized views optimized for specific use cases
  • Can be rebuilt from events at any time
  • Support eventual consistency
  • Work with any storage backend via the ProjectionStore interface

Error Handling

The toolkit provides comprehensive error types with type guards:

import { 
  isDeltaBaseError, 
  isVersionConflictError,
  StreamVersionConflictError 
} from '@delta-base/toolkit';

try {
  await handleCommand(eventStore, streamId, command, decider);
} catch (error) {
  if (isVersionConflictError(error)) {
    // Handle concurrency conflict
    console.log('Version conflict, retrying...');
  } else if (isDeltaBaseError(error)) {
    // Handle other toolkit errors
    console.log('Toolkit error:', error.message);
  } else {
    // Handle unexpected errors
    console.log('Unexpected error:', error);
  }
}

API Reference

Core Types

  • Event<TType, TData, TMetadata?> - Event interface
  • ReadEvent<TEvent> - Event as read from event store with system fields
  • Command<TType, TData, TMetadata?> - Command interface
  • StreamId - Stream identifier type
  • EventStore - Event persistence interface

Command Handling

  • handleCommand() - Handle command without decider pattern
  • handleCommandWithDecider() - Handle command with decider pattern
  • handleCommandWithRetry() - Handle command with automatic retry
  • handleCommandWithDeciderAndRetry() - Handle command with decider and retry
  • Decider<State, Command, Event> - Decider pattern interface

Projections

  • ProjectionProcessor<TEvent, TProjection> - Main projection processor class
  • ProjectionStore<TProjection> - Storage interface for projections
  • InMemoryProjectionStore - In-memory storage implementation
  • HttpProjectionStore - HTTP API storage implementation
  • createInMemoryProjectionHandler() - Create in-memory projection
  • createWebhookProjectionHandler() - Create webhook handler

Database

  • InMemoryEventStore - Full event store implementation for testing
    • Implements all EventStore interface methods
    • Includes utility methods like getAllStreamIds(), clear(), etc.

Error Types

All errors inherit from DeltaBaseError and include specific types for:

  • VersionConflictError / StreamVersionConflictError - Concurrency conflicts
  • ValidationError - Request validation failures
  • AuthenticationError / AuthorizationError - Auth failures
  • NotFoundError / StreamNotFoundError - Resource not found
  • TimeoutError / RateLimitError - Operational failures

Utilities

  • createEvent(type, data, metadata?) - Create events with type inference
  • createCommand(type, data, metadata?) - Create commands with type inference
  • createReadEvent() - Create read events for testing

Testing

The toolkit is designed for easy testing:

import { 
  InMemoryEventStore, 
  createEvent, 
  createCommand,
  handleCommandWithDecider
} from '@delta-base/toolkit';

describe('User Registration', () => {
  let eventStore: InMemoryEventStore;
  
  beforeEach(() => {
    eventStore = new InMemoryEventStore();
  });
  
  it('should register a new user', async () => {
    // Given - existing events (if any)
    // When - command is handled
    const command = createCommand('RegisterUser', {
      userId: '123',
      email: 'user@example.com',
      name: 'John Doe'
    });
    
    const result = await handleCommandWithDecider(
      eventStore,
      'user-123',
      command,
      userDecider
    );
    
    // Then - verify results
    expect(result.newState.id).toBe('123');
    expect(result.newEvents).toHaveLength(1);
    expect(result.newEvents[0].type).toBe('UserRegistered');
    
    // Verify events were persisted
    const events = await eventStore.readFromStream('user-123');
    expect(events.events).toHaveLength(1);
  });
});

Examples

See the /examples directory for complete working examples:

  • Basic User Management - Complete user lifecycle with commands and projections

Best Practices

  1. Keep Business Logic Pure: Use the decider pattern to separate decisions from effects
  2. Design for Idempotency: Commands should be safe to retry
  3. Version Your Events: Include schema version in event metadata
  4. Test with Events: Write tests that verify event streams, not just final state
  5. Handle Errors Gracefully: Use provided error types and type guards