Package Exports
- @fluxgraph/core
- @fluxgraph/core/dist/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@fluxgraph/core) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
FluxGraph
🌊 Real-time graph-based stream processing and AI orchestration for Cloudflare Workers
FluxGraph is a lightweight, high-performance stream processing library with built-in AI workflow capabilities, designed specifically for edge computing environments. Build complex data pipelines and AI agents that run directly on Cloudflare's global network - combining the power of LangGraph-style orchestration with real-time stream processing in a package that's 10x smaller than alternatives.
Features
- 🚀 Real-time Processing - Process data streams with millisecond latency
- 🔀 Graph-based Architecture - Create complex topologies with parallel and conditional paths
- 🤖 AI-Native - Built-in LLM, tool calling, and memory nodes for AI workflows
- 📊 Built-in Aggregations - Time, count, and session-based windowing
- 🔄 Backpressure Handling - Automatic buffering and flow control
- 🛡️ Error Resilience - Retry policies and error recovery strategies
- 🎯 Type-safe - Full TypeScript support with comprehensive types
- ☁️ Edge-native - Optimized for Cloudflare Workers and Durable Objects
- 🔁 Agent Loops - Support for cyclic graphs enabling ReAct and autonomous agents
- 💾 State Management - Built-in memory and checkpointing for long-running workflows
Installation
npm install @fluxgraph/coreQuick Start
import { Graph, nodes } from '@fluxgraph/core';
// Define your graph
const graph = new Graph({
name: 'Transaction Processor',
nodes: [
nodes.source('webhook', {
type: 'websocket',
url: 'wss://api.example.com/transactions'
}),
nodes.transform('normalize', {
function: (data) => ({
...data,
amount: data.amount / 100
})
}),
nodes.filter('large-only', {
function: (data) => data.amount > 100
}),
nodes.aggregate('hourly-summary', {
window: 'time',
duration: 3600,
function: (packets) => ({
total: packets.reduce((sum, p) => sum + p.data.amount, 0),
count: packets.length
})
}),
nodes.sink('alerts', {
type: 'http',
url: 'https://alerts.example.com/webhook'
})
],
edges: [
['webhook', 'normalize'],
['normalize', 'large-only'],
['large-only', 'hourly-summary'],
['hourly-summary', 'alerts']
]
});
// Start processing
await graph.start();
// Inject data manually
await graph.inject('webhook', { amount: 15000, currency: 'USD' });
// Subscribe to outputs
graph.subscribe('alerts', (packet) => {
console.log('Alert triggered:', packet.data);
});AI Workflows Comparison
FluxGraph now includes powerful AI workflow capabilities, making it a lightweight alternative to popular AI orchestration frameworks:
| Feature | FluxGraph | LangGraph | Pydantic AI | LlamaIndex | CrewAI |
|---|---|---|---|---|---|
| Stream Processing | ✅ Excellent (RxJS-based) | ✅ Good | ⚠️ Limited | ⚠️ Limited | ❌ No |
| Graph Architecture | ✅ Yes | ✅ Yes | ⚠️ Chain-based | ⚠️ Chain-based | ✅ Yes |
| AI-specific Nodes | ✅ LLM, Tool, Memory | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Agents |
| State Management | ✅ Built-in + Durable Objects | ✅ Built-in | ✅ Built-in | ✅ Built-in | ✅ Yes |
| Cycles/Agent Loops | ✅ Yes (ReAct, etc.) | ✅ Yes | ✅ Yes | ⚠️ Limited | ✅ Yes |
| Tool Calling | ✅ Parallel + Sequential | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes |
| Memory Types | ✅ Conversation, Semantic, Hybrid | ✅ Yes | ⚠️ Basic | ✅ Yes | ✅ Yes |
| Checkpointing | ✅ Yes | ✅ Yes | ✅ Yes | ⚠️ Limited | ⚠️ Limited |
| Edge Runtime | ✅ Cloudflare Workers | ❌ No | ❌ No | ❌ No | ❌ No |
| Bundle Size | ✅ ~179KB | ❌ ~1.7MB | ❌ Python only | ❌ Python only | ❌ Python only |
| Streaming LLM | ✅ Native | ✅ Yes | ⚠️ Limited | ✅ Yes | ⚠️ Limited |
| TypeScript | ✅ First-class | ✅ Yes | ❌ Python | ❌ Python | ❌ Python |
| Real-time Data | ✅ Excellent | ⚠️ Limited | ❌ No | ❌ No | ❌ No |
| Production Ready | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes | ⚠️ Beta |
Why Choose FluxGraph for AI Workflows?
- 🚀 Edge-Native: Only framework that runs on Cloudflare Workers
- ⚡ Lightweight: ~179KB vs 1.7MB for LangGraph (10x smaller)
- 🌊 Streaming-First: Built on RxJS for excellent real-time performance
- 🔧 Flexible: Combine AI with real-time data processing
- 💾 Durable: Native integration with Durable Objects for persistence
AI Quick Start
ReAct Agent Example
import { GraphRunner } from '@fluxgraph/core';
import { reactAgentTemplate } from '@fluxgraph/core/templates';
// Create an autonomous agent
const agent = new GraphRunner(reactAgentTemplate);
await agent.initialize();
await agent.start();
// Give it a task
agent.inject('input', {
task: 'Research and summarize the latest AI trends'
});RAG Pipeline Example
import { GraphBuilder, LLMNode, MemoryNode } from '@fluxgraph/core';
const ragPipeline = GraphBuilder.create('RAG Pipeline')
.nodes(
{
id: 'vectorDB',
type: 'memory',
name: 'Vector Store',
memoryType: 'semantic',
embeddingDimension: 1536
},
{
id: 'llm',
type: 'llm',
name: 'GPT-4',
model: 'gpt-4',
systemPrompt: 'Answer based on the provided context.',
streaming: true
}
)
.flow('vectorDB', 'llm')
.build();Multi-Agent Collaboration
const multiAgent = GraphBuilder.create('Multi-Agent System')
.allowCycles() // Enable agent communication loops
.nodes(
{
id: 'coordinator',
type: 'llm',
name: 'Coordinator',
model: 'gpt-4',
systemPrompt: 'You coordinate multiple specialist agents.'
},
{
id: 'researcher',
type: 'llm',
name: 'Research Agent',
model: 'gpt-3.5-turbo',
systemPrompt: 'You are a research specialist.'
},
{
id: 'analyst',
type: 'llm',
name: 'Analysis Agent',
model: 'gpt-3.5-turbo',
systemPrompt: 'You analyze data and provide insights.'
}
)
.build();Use Cases
Financial Transaction Processing
const financialGraph = templates.financial.createAnomalyDetector({
thresholds: {
amount: 1000,
frequency: 10 // transactions per minute
},
alertUrl: 'https://your-webhook.com'
});IoT Data Aggregation
const iotGraph = templates.iot.createSensorAggregator({
sensors: ['temperature', 'humidity', 'pressure'],
aggregateWindow: 60, // seconds
outputFormat: 'prometheus'
});Real-time Analytics
const analyticsGraph = templates.analytics.createEventProcessor({
events: ['click', 'view', 'purchase'],
sessionTimeout: 1800, // 30 minutes
enrichment: {
geoip: true,
userAgent: true
}
});Durable Object Integration
export class StreamProcessor extends DurableObject {
private graph: Graph;
async fetch(request: Request) {
if (!this.graph) {
this.graph = new Graph(graphConfig);
await this.graph.start();
}
const url = new URL(request.url);
if (url.pathname === '/inject') {
const data = await request.json();
await this.graph.inject('input', data);
return new Response('OK');
}
if (url.pathname === '/metrics') {
return Response.json(this.graph.getMetrics());
}
return new Response('Not found', { status: 404 });
}
}Node Types
Source Nodes
- WebSocket - Real-time data streams
- HTTP - Polling or webhook endpoints
- Timer - Scheduled data generation
- Manual - Programmatic injection
Transform Nodes
- Data mapping and enrichment
- Format conversion
- Calculations and derived fields
Filter Nodes
- Conditional routing
- Data validation
- Sampling and rate limiting
Aggregate Nodes
- Time-based windows
- Count-based windows
- Session windows
- Custom aggregation functions
Sink Nodes
- WebSocket output
- HTTP webhooks
- Database writes
- Custom outputs
Advanced Features
Error Handling
const graph = new Graph({
// ...
errorStrategy: 'continue', // or 'stop', 'retry'
retryPolicy: {
maxRetries: 3,
backoffMultiplier: 2,
initialDelay: 1000
}
});Metrics and Monitoring
const metrics = graph.getMetrics();
console.log({
processed: metrics.packetsProcessed,
dropped: metrics.packetsDropped,
latency: metrics.averageLatency
});
graph.on('error', (event) => {
console.error('Graph error:', event);
});State Persistence
// Save graph state to Durable Object storage
const state = graph.getState();
await this.storage.put('graph-state', state);
// Restore on restart
const savedState = await this.storage.get('graph-state');
if (savedState) {
graph.restore(savedState);
}Performance
Streamflow is designed for high-throughput, low-latency processing:
- Process 10,000+ events/second per Durable Object
- Sub-millisecond processing latency
- Automatic backpressure handling
- Memory-efficient buffering
Contributing
We welcome contributions! Please see CONTRIBUTING.md for details.
License
MIT License - see LICENSE for details.
Support
Built with ❤️ for the edge computing community