Package Exports
- @dagengine/core
- @dagengine/core/package.json
Readme
@dagengine/core
🚀 Type-Safe DAG Engine for AI Workflows
Define task dependencies. Get automatic parallelization, cost tracking, and 10x speedup.
🚀 Quick Start • 📖 Documentation • 💬 Discussions • 🐛 Issues • 📦 Examples
🎯 What is dagengine?
dagengine is a TypeScript DAG engine that turns sequential AI workflows into parallel ones automatically.
The Problem
// ❌ What most developers do (sequential, slow, expensive)
for (const item of items) {
const sentiment = await ai.analyze(item); // Wait...
const topics = await ai.extract(item); // Wait...
const summary = await ai.summarize(item); // Wait...
}
// Result: 100 items × 15 seconds = 25 minutes, $15The Solution
// ✅ With dagengine (parallel, fast, cheap)
const engine = new DagEngine({ plugin: new MyPlugin() });
const result = await engine.process(items);
// Result: 100 items in 2.5 minutes, $510x faster. 67% cheaper. Zero orchestration code.
Define dependencies → get automatic parallelization.
🚀 5-Minute Quick Start
Install
npm install @dagengine/coreRequirements: Node.js ≥ 18.0.0, TypeScript ≥ 5.0 (recommended)
Example: Analyze Customer Reviews
import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';
// Define result types (optional but helps with TypeScript)
interface SentimentResult {
sentiment: "positive" | "negative" | "neutral";
score: number;
}
interface TopicsResult {
topics: string[];
}
// 1. Define your workflow
class ReviewAnalyzer extends Plugin {
constructor() {
super('analyzer', 'Review Analyzer', 'Analyzes reviews');
this.dimensions = ['sentiment', 'topics', 'summary'];
}
defineDependencies(): Record<string, string[]> {
return {
summary: ['sentiment', 'topics']
};
}
createPrompt(context: PromptContext): string {
const content = context.sections[0]?.content || '';
if (context.dimension === 'sentiment') {
return `Analyze sentiment: "${content}"
Return JSON: {"sentiment": "positive|negative|neutral", "score": 0-1}`;
}
if (context.dimension === 'topics') {
return `Extract topics: "${content}"
Return JSON: {"topics": ["topic1", "topic2"]}`;
}
if (context.dimension === 'summary') {
const sentiment = context.dependencies.sentiment?.data as SentimentResult;
const topics = context.dependencies.topics?.data as TopicsResult;
return `Create a ${sentiment.sentiment} summary covering ${topics.topics.join(', ')}:
"${content}"
Return JSON: {"summary": "summary text"}`;
}
throw new Error(`Unknown dimension: ${context.dimension}`);
}
selectProvider(): ProviderSelection {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
}
// 2. Process your data
async function main(): Promise<void> {
// Validate API key
if (!process.env.ANTHROPIC_API_KEY) {
console.error('❌ Missing ANTHROPIC_API_KEY environment variable');
console.error('Set it with: export ANTHROPIC_API_KEY="your-key"');
process.exit(1);
}
// Create engine
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
}
});
// Prepare reviews
const reviews = [
{ content: 'Great product!', metadata: { id: 1 } },
{ content: 'Not good.', metadata: { id: 2 } }
];
// Process
const result = await engine.process(reviews);
// Display results
console.log(JSON.stringify(result.sections[0]?.results, null, 4));
}
// 3. Run with error handling
main().catch((error: Error) => {
console.error('❌ Processing failed:', error.message);
process.exit(1);
});What just happened?
- ✅
sentimentandtopicsran in parallel (both have no dependencies) - ✅
summarywaited for both to complete - ✅ All sections processed in parallel
- ✅ 2 reviews × 3 dimensions = 6 AI calls, all optimized automatically
Next: Full Documentation • Examples • Production Guide
📊 Why Choose dagengine?
| Feature | DIY Code | LangChain | dagengine |
|---|---|---|---|
| Setup | Manual loops | Learn LCEL | 2 methods |
| Parallelization | Manual | Manual | Automatic |
| Cost Tracking | Manual calc | Manual calc | Built-in |
| TypeScript | ✅ Full | ⚠️ Partial | ✅ Full |
| Code (100 items) | 150 lines | 80 lines | 25 lines |
| Best For | Small scripts | RAG/Agents | Orchestration |
Use dagengine when:
- ✅ Processing 100+ items with multiple AI analyses
- ✅ Want automatic parallelization without complexity
- ✅ Need built-in cost tracking
- ✅ TypeScript projects
Skip dagengine when:
- ❌ Single AI calls (overkill)
- ❌ Need RAG/agents (use LangChain)
- ❌ Python projects (we're TypeScript-only)
⚡ Key Features
🎯 Zero InfrastructureDefine task dependencies once. Engine handles execution order, parallelization, and coordination automatically. No queues, workers, or complex orchestration code. 💰 Cost OptimizedSkip low-value processing with conditional execution. Route tasks to optimal models. Track costs per dimension in real-time with automatic token counting. 🔄 Production ReadyAutomatic retry with exponential backoff. Provider fallback chains. Graceful error recovery with partial results. Battle-tested reliability. |
🌍 Multi-Provider SupportUse Anthropic Claude, OpenAI GPT, Google Gemini with a unified interface. Switch providers per dimension. Mix models in one workflow. 🪝 18 Lifecycle HooksFull async/await support. Integrate databases, caches, APIs at every processing stage. Transform data mid-pipeline. Complete control when you need it. 📊 Real-Time TrackingBuilt-in cost and token tracking per dimension and provider. Progress callbacks with throughput metrics. Detailed breakdowns in results. |
💡 Core Concepts
1️⃣ Sections (Your Data)
const sections = [
{
content: 'Customer review text here',
metadata: { id: 1, userId: 123, productId: 'SKU-789' }
}
];Sections are the pieces of data you analyze (reviews, emails, documents, etc.).
2️⃣ Dimensions (Your Tasks)
this.dimensions = ['sentiment', 'topics', 'summary'];Dimensions are the analyses you run. Each dimension processes all sections.
3️⃣ Dependencies (Execution Order)
defineDependencies() {
return {
sentiment: [], // No dependencies (runs first)
topics: [], // No dependencies (runs first)
summary: ['sentiment', 'topics'] // Waits for both
};
}Dependencies control execution order. Engine automatically parallelizes independent tasks.
Execution Plan:
sentiment ──┐
├─→ Both run in parallel → summary
topics ─────┘4️⃣ Two Dimension Types
Section Dimensions (default) - Analyze each item independently:
this.dimensions = ['sentiment']; // Runs once per sectionGlobal Dimensions - Analyze all items together:
this.dimensions = [
{ name: 'categorize', scope: 'global' } // Runs once for all sections
];🎨 Advanced Features
Cost Optimization with Skip Logic
class SmartAnalyzer extends Plugin {
dimensions = ['quality_check', 'deep_analysis'];
defineDependencies() {
return { deep_analysis: ['quality_check'] };
}
shouldSkipSectionDimension(context) {
if (context.dimension === 'deep_analysis') {
const quality = context.dependencies.quality_check.data;
return quality.score < 0.7; // Skip low-quality items
}
return false;
}
selectProvider(dimension) {
if (dimension === 'quality_check') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' } // Cheap model
};
}
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' } // Expensive model
};
}
}Result: 100 items → 40 high-quality → 60% fewer expensive API calls
Provider Fallback Chains
selectProvider() {
return {
provider: 'anthropic',
options: { model: 'claude-sonnet-4-5-20250929' },
fallbacks: [
{ provider: 'openai', options: { model: 'gpt-4o' } },
{ provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
]
};
}Automatic failover: If Anthropic fails, automatically tries OpenAI, then Gemini.
Data Transformations
class CategoryAnalyzer extends Plugin {
dimensions = [
'classify',
{ name: 'group_by_category', scope: 'global' },
'analyze_category'
];
transformSections(context) {
if (context.dimension === 'group_by_category') {
const categories = context.result.data.categories;
// Transform: 100 sections → 5 category groups
return categories.map(cat => ({
content: cat.items.join('\n\n'),
metadata: { category: cat.name, count: cat.items.length }
}));
}
}
}Result: Analyze 5 category groups instead of 100 individual items (95% fewer API calls)
Async Integration Hooks
class DatabaseIntegratedPlugin extends Plugin {
async beforeProcessStart(context) {
// Initialize connections
await this.db.connect();
}
async shouldSkipSectionDimension(context) {
// Check cache before processing
const cached = await this.redis.get(`${context.section.id}:${context.dimension}`);
if (cached) return true;
return false;
}
async afterDimensionExecute(context) {
// Save results to database
await this.db.results.insert({
section: context.section.id,
dimension: context.dimension,
data: context.result.data
});
}
async afterProcessComplete(context) {
// Cleanup
await this.db.disconnect();
}
}All 18 hooks support async/await for seamless external service integration.
📚 Documentation
🎓 Learn
- Quick Start - Get started in 5 minutes
- Core Concepts - Understand sections, dimensions, dependencies
- Examples - Complete working examples
📖 Fundamentals (Step-by-Step Guides)
- Hello World - Your first plugin
- Dependencies - Control execution order
- Section vs Global - Two dimension types
- Transformations - Reshape data mid-pipeline
- Skip Logic - Optimize costs
- Multi-Provider - Route to different models
- Async Hooks - Database integration
- Error Handling - Graceful recovery
🚀 Advanced
- Portkey Gateway - Rate limit protection & caching
- Production Quickstart - Complete production workflow
🔧 API Reference
- Configuration - Engine config options
- Lifecycle Hooks - All 18 hooks explained
- Type Definitions - TypeScript interfaces
🌐 Supported Providers
| Provider | Description | Best For | Docs |
|---|---|---|---|
| Anthropic | Claude models for reasoning and analysis | Complex tasks, deep reasoning | Docs |
| OpenAI | GPT models for general-purpose tasks | Fast responses, versatile workflows | Docs |
| Google Gemini | Gemini models for high-speed processing | High throughput, multimodal inputs | Docs |
Mix and match: Route different dimensions to different providers in the same workflow.
selectProvider(dimension) {
if (dimension === 'quality_check') {
return { provider: 'gemini', options: { model: 'gemini-1.5-flash' } };
}
if (dimension === 'deep_analysis') {
return { provider: 'anthropic', options: { model: 'claude-sonnet-4-5-20250929' } };
}
}🔄 Gateway Support
dagengine supports Portkey as a unified AI gateway for advanced features:
| Feature | Direct Mode | With Portkey Gateway |
|---|---|---|
| Automatic Retries | ✅ Engine-level | ✅ Gateway-level with smart backoff |
| Rate Limit Handling | ⚠️ Manual | ✅ Automatic with queuing |
| Semantic Caching | ❌ | ✅ Reduce costs and latency |
| Load Balancing | ❌ | ✅ Multi-provider routing |
| Observability | ✅ Basic | ✅ Full dashboard & analytics |
Enable Portkey:
providers: {
anthropic: {
apiKey: process.env.ANTHROPIC_API_KEY,
gateway: 'portkey',
gatewayApiKey: process.env.PORTKEY_API_KEY,
gatewayConfig: 'pc-my-config-id' // Optional: retry/cache config
}
}Learn more: Portkey Integration Guide • Portkey Docs
📦 Configuration
const engine = new DagEngine({
plugin: new MyPlugin(),
// Provider credentials
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
openai: { apiKey: process.env.OPENAI_API_KEY }
},
// Execution settings
execution: {
concurrency: 10, // Max parallel operations
maxRetries: 3, // Retry attempts
retryDelay: 1000, // Base delay (ms)
timeout: 60000, // Default timeout
continueOnError: true // Process partial results
},
// Cost tracking
pricing: {
models: {
'claude-sonnet-4-5-20250929': {
inputPer1M: 3.00,
outputPer1M: 15.00
}
}
},
// Progress display
progressDisplay: {
display: 'bar', // 'simple' | 'bar' | 'multi' | 'none'
showDimensions: true
}
});🛠️ Development
# Clone repository
git clone https://github.com/dagengine/dagengine.git
cd dagengine
# Install dependencies
npm install
# Run tests
npm test
# Run tests with coverage
npm run test:coverage
# Type check
npm run type-check
# Build
npm run build
# Run all checks
npm run validate🤝 Contributing
We welcome contributions! Here's how to get started:
Quick Start
- Fork the repository on GitHub
- Clone your fork:
git clone https://github.com/YOUR_USERNAME/dagengine.git - Create a branch:
git checkout -b feature/your-feature-name - Make your changes and add tests
- Run validation:
npm run validate - Commit:
git commit -m "feat: add your feature" - Push:
git push origin feature/your-feature-name - Open a Pull Request on GitHub
Development Guidelines
- Code Style: We use Prettier and ESLint (run
npm run format && npm run lint:fix) - Tests: Add tests for new features (run
npm test) - Types: Maintain full TypeScript coverage (run
npm run type-check) - Commits: Use Conventional Commits format
Need Help?
See CONTRIBUTING.md for detailed guidelines.
💬 Community & Support
🙋 Need Help?
- 📖 Check the docs - Documentation
- 🔍 Search existing Q&A - GitHub Discussions
- 💬 Ask a question - Start a discussion
- 🐛 Found a bug? - Open an issue
🚀 Stay Updated
- 📦 npm Package - Install and updates
- ⭐ GitHub - Star for updates
- 💬 Discussions - Feature requests, Q&A
🔒 Security
We take security seriously. See SECURITY.md for our security policy.
Reporting Vulnerabilities
Never report security issues through public GitHub issues.
Use GitHub's private vulnerability reporting or email the maintainers directly.
📜 License
Apache License 2.0 © dagengine contributors
Licensed under the Apache License, Version 2.0. See LICENSE for the full license text.
Patent Protection
This license includes an explicit patent grant (Section 3), protecting users from patent litigation. See LICENSE for details.
🙏 Acknowledgments
Built with:
- TypeScript - Type safety
- @dagrejs/graphlib - DAG algorithms
- p-queue - Concurrency control
📊 Project Stats
🔗 Links
- 🌐 Homepage: dagengine.ai
- 📖 Documentation: https://www.dagengine.ai/guide/quick-start.html
- 📦 npm Package: @dagengine/core
- 💬 Community: GitHub Discussions
- 🐛 Issue Tracker: GitHub Issues
- 🔐 Security: Security Policy
⭐ Star us on GitHub — it helps the project grow!
Made with ❤️ by the dagengine community