Package Exports
- @delta-base/toolkit
Readme
@delta-base/toolkit
Application-level event sourcing toolkit for delta-base. This package provides high-level patterns and utilities for building event-sourced applications with delta-base.
Features
- 🏗️ Core Types - Strongly-typed events, commands, and projections
- 💾 In-Memory Event Store - Complete event store implementation for testing
- ⚡ Command Handlers - Simple and decider pattern-based command handling with retry logic
- 📊 Projection Processors - Build read models from event streams with pluggable storage
- 🔧 Error Handling - Comprehensive error types with type guards
- 🛠️ Utility Functions - Event and command creation helpers
Installation
npm install @delta-base/toolkit
# or
pnpm add @delta-base/toolkitQuick Start
Basic Event Store Usage
import { InMemoryEventStore, createEvent } from '@delta-base/toolkit';
const eventStore = new InMemoryEventStore();
// Create and append events
const event = createEvent('UserRegistered', {
userId: 'user-123',
email: 'user@example.com',
name: 'John Doe'
});
await eventStore.appendToStream('user-123', [event]);
// Read events back
const events = await eventStore.readFromStream('user-123');
console.log(events.events); // [{ type: 'UserRegistered', ... }]Command Handling with Decider Pattern
import {
handleCommandWithDecider,
createCommand,
InMemoryEventStore,
type Decider,
type ReadEvent
} from '@delta-base/toolkit';
// Define your aggregate state
interface UserState {
id?: string;
email?: string;
isActive: boolean;
}
// Define your events
type UserEvent =
| { type: 'UserRegistered'; data: { userId: string; email: string; name: string } }
| { type: 'EmailUpdated'; data: { userId: string; newEmail: string } };
// Define your commands
type UserCommand =
| { type: 'RegisterUser'; data: { userId: string; email: string; name: string } }
| { type: 'UpdateEmail'; data: { userId: string; newEmail: string } };
// Create your decider
const userDecider: Decider<UserState, UserCommand, UserEvent> = {
initialState: () => ({ isActive: false }),
decide: (state: UserState, command: UserCommand) => {
switch (command.type) {
case 'RegisterUser':
if (state.id) throw new Error('User already exists');
return [{
type: 'UserRegistered',
data: command.data
}];
case 'UpdateEmail':
if (!state.id) throw new Error('User does not exist');
return [{
type: 'EmailUpdated',
data: { userId: command.data.userId, newEmail: command.data.newEmail }
}];
default:
return [];
}
},
evolve: (state: UserState, event: ReadEvent<UserEvent>) => {
switch (event.type) {
case 'UserRegistered':
return {
...state,
id: event.data.userId,
email: event.data.email,
isActive: true
};
case 'EmailUpdated':
return {
...state,
email: event.data.newEmail
};
default:
return state;
}
}
};
// Use the command handler
const eventStore = new InMemoryEventStore();
const command = createCommand('RegisterUser', {
userId: '123',
email: 'user@example.com',
name: 'John Doe'
});
const result = await handleCommandWithDecider(
eventStore,
'user-123',
command,
userDecider
);
console.log(result.newState); // { id: '123', email: 'user@example.com', isActive: true }
console.log(result.newEvents); // [UserRegistered event]Building Projections
import {
createInMemoryProjectionHandler,
type ReadEvent
} from '@delta-base/toolkit';
// Define projection state
interface UserListProjection {
users: Array<{ id: string; email: string; registeredAt: string }>;
totalCount: number;
}
// Create projection handler
const userListProjection = createInMemoryProjectionHandler<ReadEvent, UserListProjection>({
projectionId: 'user-list',
applyFn: async (event, currentProjection = { users: [], totalCount: 0 }) => {
switch (event.type) {
case 'UserRegistered':
return {
users: [
...currentProjection.users,
{
id: event.data.userId,
email: event.data.email,
registeredAt: event.createdAt
}
],
totalCount: currentProjection.totalCount + 1
};
default:
return currentProjection;
}
}
});
// Process events to build projection
await userListProjection.processEvent(userRegisteredEvent);
const projection = await userListProjection.getProjection();
console.log(projection); // { users: [...], totalCount: 1 }Webhook Projections
import {
createWebhookProjectionHandler,
HttpProjectionStore
} from '@delta-base/toolkit';
// Create projection with external storage
const webhookHandler = createWebhookProjectionHandler({
projectionId: 'user-analytics',
applyFn: async (event, current = { totalUsers: 0, dailySignups: {} }) => {
if (event.type === 'UserRegistered') {
const day = new Date(event.createdAt).toISOString().split('T')[0];
return {
totalUsers: current.totalUsers + 1,
dailySignups: {
...current.dailySignups,
[day]: (current.dailySignups[day] || 0) + 1
}
};
}
return current;
},
store: new HttpProjectionStore('https://api.example.com/projections', {
'Authorization': 'Bearer token'
})
});
// Use in Cloudflare Worker, Vercel Edge Function, etc.
export default {
async fetch(request: Request): Promise<Response> {
return await webhookHandler(request);
}
};Custom Projection Stores
The toolkit uses a simple ProjectionStore interface, making it easy to implement custom storage backends:
import { ProjectionStore } from '@delta-base/toolkit';
// Example: Redis store
class RedisProjectionStore<T> implements ProjectionStore<T> {
constructor(private redis: Redis, private prefix = 'proj:') {}
async get(projectionId: string): Promise<T | null> {
const data = await this.redis.get(`${this.prefix}${projectionId}`);
return data ? JSON.parse(data) : null;
}
async set(projectionId: string, projection: T): Promise<void> {
await this.redis.set(`${this.prefix}${projectionId}`, JSON.stringify(projection));
}
async delete(projectionId: string): Promise<void> {
await this.redis.del(`${this.prefix}${projectionId}`);
}
}
// Example: Cloudflare KV store
class KVProjectionStore<T> implements ProjectionStore<T> {
constructor(private kv: KVNamespace, private prefix = 'proj:') {}
async get(projectionId: string): Promise<T | null> {
return await this.kv.get(`${this.prefix}${projectionId}`, 'json');
}
async set(projectionId: string, projection: T): Promise<void> {
await this.kv.put(`${this.prefix}${projectionId}`, JSON.stringify(projection));
}
async delete(projectionId: string): Promise<void> {
await this.kv.delete(`${this.prefix}${projectionId}`);
}
}Core Concepts
Events
Events represent something that happened in the past. They are immutable and contain:
eventId- Unique identifier (added by event store)type- Event type (e.g., "UserRegistered")streamId- Stream where the event belongs (added by event store)streamPosition- Position within the stream (added by event store)globalPosition- Global position across all streams (added by event store)data- Event payloadmetadata- Additional metadata (optional)schemaVersion- Version of the event schema (added by event store)transactionId- Transaction identifier (added by event store)createdAt- When the event occurred (added by event store)
Commands
Commands represent an intention to do something. They contain:
commandId- Unique identifier (added automatically)type- Command type (e.g., "RegisterUser")data- Command payloadmetadata- Additional metadata (optional)createdAt- When the command was created (added automatically)
Decider Pattern
The decider pattern separates:
- Decide - Given current state and a command, what events should be produced?
- Evolve - Given current state and an event, what is the new state?
- Initial State - What is the starting state?
This pattern makes business logic pure and testable.
Projections
Projections transform events into read models optimized for queries. They:
- Process events sequentially using async apply functions
- Build denormalized views optimized for specific use cases
- Can be rebuilt from events at any time
- Support eventual consistency
- Work with any storage backend via the
ProjectionStoreinterface
Error Handling
The toolkit provides comprehensive error types with type guards:
import {
isDeltaBaseError,
isVersionConflictError,
StreamVersionConflictError
} from '@delta-base/toolkit';
try {
await handleCommand(eventStore, streamId, command, decider);
} catch (error) {
if (isVersionConflictError(error)) {
// Handle concurrency conflict
console.log('Version conflict, retrying...');
} else if (isDeltaBaseError(error)) {
// Handle other toolkit errors
console.log('Toolkit error:', error.message);
} else {
// Handle unexpected errors
console.log('Unexpected error:', error);
}
}API Reference
Core Types
Event<TType, TData, TMetadata?>- Event interfaceReadEvent<TEvent>- Event as read from event store with system fieldsCommand<TType, TData, TMetadata?>- Command interfaceStreamId- Stream identifier typeEventStore- Event persistence interface
Command Handling
handleCommand()- Handle command without decider patternhandleCommandWithDecider()- Handle command with decider patternhandleCommandWithRetry()- Handle command with automatic retryhandleCommandWithDeciderAndRetry()- Handle command with decider and retryDecider<State, Command, Event>- Decider pattern interface
Projections
ProjectionProcessor<TEvent, TProjection>- Main projection processor classProjectionStore<TProjection>- Storage interface for projectionsInMemoryProjectionStore- In-memory storage implementationHttpProjectionStore- HTTP API storage implementationcreateInMemoryProjectionHandler()- Create in-memory projectioncreateWebhookProjectionHandler()- Create webhook handler
Database
InMemoryEventStore- Full event store implementation for testing- Implements all
EventStoreinterface methods - Includes utility methods like
getAllStreamIds(),clear(), etc.
- Implements all
Error Types
All errors inherit from DeltaBaseError and include specific types for:
VersionConflictError/StreamVersionConflictError- Concurrency conflictsValidationError- Request validation failuresAuthenticationError/AuthorizationError- Auth failuresNotFoundError/StreamNotFoundError- Resource not foundTimeoutError/RateLimitError- Operational failures
Utilities
createEvent(type, data, metadata?)- Create events with type inferencecreateCommand(type, data, metadata?)- Create commands with type inferencecreateReadEvent()- Create read events for testing
Testing
The toolkit is designed for easy testing:
import {
InMemoryEventStore,
createEvent,
createCommand,
handleCommandWithDecider
} from '@delta-base/toolkit';
describe('User Registration', () => {
let eventStore: InMemoryEventStore;
beforeEach(() => {
eventStore = new InMemoryEventStore();
});
it('should register a new user', async () => {
// Given - existing events (if any)
// When - command is handled
const command = createCommand('RegisterUser', {
userId: '123',
email: 'user@example.com',
name: 'John Doe'
});
const result = await handleCommandWithDecider(
eventStore,
'user-123',
command,
userDecider
);
// Then - verify results
expect(result.newState.id).toBe('123');
expect(result.newEvents).toHaveLength(1);
expect(result.newEvents[0].type).toBe('UserRegistered');
// Verify events were persisted
const events = await eventStore.readFromStream('user-123');
expect(events.events).toHaveLength(1);
});
});Examples
See the /examples directory for complete working examples:
- Basic User Management - Complete user lifecycle with commands and projections
Best Practices
- Keep Business Logic Pure: Use the decider pattern to separate decisions from effects
- Design for Idempotency: Commands should be safe to retry
- Version Your Events: Include schema version in event metadata
- Test with Events: Write tests that verify event streams, not just final state
- Handle Errors Gracefully: Use provided error types and type guards