JSPM

@fluxgraph/core

0.1.2
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 4
  • Score
    100M100P100Q40187F
  • License MIT

Real-time graph-based stream processing for Cloudflare Workers and Durable Objects

Package Exports

  • @fluxgraph/core
  • @fluxgraph/core/dist/index.js

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@fluxgraph/core) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

FluxGraph

🌊 Real-time graph-based stream processing and AI orchestration for Cloudflare Workers

FluxGraph is a lightweight, high-performance stream processing library with built-in AI workflow capabilities, designed specifically for edge computing environments. Build complex data pipelines and AI agents that run directly on Cloudflare's global network - combining the power of LangGraph-style orchestration with real-time stream processing in a package that's 10x smaller than alternatives.

Features

  • 🚀 Real-time Processing - Process data streams with millisecond latency
  • 🔀 Graph-based Architecture - Create complex topologies with parallel and conditional paths
  • 🤖 AI-Native - Built-in LLM, tool calling, and memory nodes for AI workflows
  • 📊 Built-in Aggregations - Time, count, and session-based windowing
  • 🔄 Backpressure Handling - Automatic buffering and flow control
  • 🛡️ Error Resilience - Retry policies and error recovery strategies
  • 🎯 Type-safe - Full TypeScript support with comprehensive types
  • ☁️ Edge-native - Optimized for Cloudflare Workers and Durable Objects
  • 🔁 Agent Loops - Support for cyclic graphs enabling ReAct and autonomous agents
  • 💾 State Management - Built-in memory and checkpointing for long-running workflows

Installation

npm install @fluxgraph/core

Quick Start

import { Graph, nodes } from '@fluxgraph/core';

// Define your graph
const graph = new Graph({
  name: 'Transaction Processor',
  nodes: [
    nodes.source('webhook', {
      type: 'websocket',
      url: 'wss://api.example.com/transactions'
    }),
    
    nodes.transform('normalize', {
      function: (data) => ({
        ...data,
        amount: data.amount / 100
      })
    }),
    
    nodes.filter('large-only', {
      function: (data) => data.amount > 100
    }),
    
    nodes.aggregate('hourly-summary', {
      window: 'time',
      duration: 3600,
      function: (packets) => ({
        total: packets.reduce((sum, p) => sum + p.data.amount, 0),
        count: packets.length
      })
    }),
    
    nodes.sink('alerts', {
      type: 'http',
      url: 'https://alerts.example.com/webhook'
    })
  ],
  
  edges: [
    ['webhook', 'normalize'],
    ['normalize', 'large-only'],
    ['large-only', 'hourly-summary'],
    ['hourly-summary', 'alerts']
  ]
});

// Start processing
await graph.start();

// Inject data manually
await graph.inject('webhook', { amount: 15000, currency: 'USD' });

// Subscribe to outputs
graph.subscribe('alerts', (packet) => {
  console.log('Alert triggered:', packet.data);
});

AI Workflows Comparison

FluxGraph now includes powerful AI workflow capabilities, making it a lightweight alternative to popular AI orchestration frameworks:

Feature FluxGraph LangGraph Pydantic AI LlamaIndex CrewAI
Stream Processing ✅ Excellent (RxJS-based) ✅ Good ⚠️ Limited ⚠️ Limited ❌ No
Graph Architecture ✅ Yes ✅ Yes ⚠️ Chain-based ⚠️ Chain-based ✅ Yes
AI-specific Nodes ✅ LLM, Tool, Memory ✅ Yes ✅ Yes ✅ Yes ✅ Agents
State Management ✅ Built-in + Durable Objects ✅ Built-in ✅ Built-in ✅ Built-in ✅ Yes
Cycles/Agent Loops ✅ Yes (ReAct, etc.) ✅ Yes ✅ Yes ⚠️ Limited ✅ Yes
Tool Calling ✅ Parallel + Sequential ✅ Yes ✅ Yes ✅ Yes ✅ Yes
Memory Types ✅ Conversation, Semantic, Hybrid ✅ Yes ⚠️ Basic ✅ Yes ✅ Yes
Checkpointing ✅ Yes ✅ Yes ✅ Yes ⚠️ Limited ⚠️ Limited
Edge Runtime ✅ Cloudflare Workers ❌ No ❌ No ❌ No ❌ No
Bundle Size ✅ ~179KB ❌ ~1.7MB ❌ Python only ❌ Python only ❌ Python only
Streaming LLM ✅ Native ✅ Yes ⚠️ Limited ✅ Yes ⚠️ Limited
TypeScript ✅ First-class ✅ Yes ❌ Python ❌ Python ❌ Python
Real-time Data ✅ Excellent ⚠️ Limited ❌ No ❌ No ❌ No
Production Ready ✅ Yes ✅ Yes ✅ Yes ✅ Yes ⚠️ Beta

Why Choose FluxGraph for AI Workflows?

  • 🚀 Edge-Native: Only framework that runs on Cloudflare Workers
  • ⚡ Lightweight: ~179KB vs 1.7MB for LangGraph (10x smaller)
  • 🌊 Streaming-First: Built on RxJS for excellent real-time performance
  • 🔧 Flexible: Combine AI with real-time data processing
  • 💾 Durable: Native integration with Durable Objects for persistence

AI Quick Start

ReAct Agent Example

import { GraphRunner } from '@fluxgraph/core';
import { reactAgentTemplate } from '@fluxgraph/core/templates';

// Create an autonomous agent
const agent = new GraphRunner(reactAgentTemplate);
await agent.initialize();
await agent.start();

// Give it a task
agent.inject('input', { 
  task: 'Research and summarize the latest AI trends' 
});

RAG Pipeline Example

import { GraphBuilder, LLMNode, MemoryNode } from '@fluxgraph/core';

const ragPipeline = GraphBuilder.create('RAG Pipeline')
  .nodes(
    {
      id: 'vectorDB',
      type: 'memory',
      name: 'Vector Store',
      memoryType: 'semantic',
      embeddingDimension: 1536
    },
    {
      id: 'llm',
      type: 'llm',
      name: 'GPT-4',
      model: 'gpt-4',
      systemPrompt: 'Answer based on the provided context.',
      streaming: true
    }
  )
  .flow('vectorDB', 'llm')
  .build();

Multi-Agent Collaboration

const multiAgent = GraphBuilder.create('Multi-Agent System')
  .allowCycles() // Enable agent communication loops
  .nodes(
    {
      id: 'coordinator',
      type: 'llm',
      name: 'Coordinator',
      model: 'gpt-4',
      systemPrompt: 'You coordinate multiple specialist agents.'
    },
    {
      id: 'researcher',
      type: 'llm',
      name: 'Research Agent',
      model: 'gpt-3.5-turbo',
      systemPrompt: 'You are a research specialist.'
    },
    {
      id: 'analyst',
      type: 'llm',
      name: 'Analysis Agent',
      model: 'gpt-3.5-turbo',
      systemPrompt: 'You analyze data and provide insights.'
    }
  )
  .build();

Use Cases

Financial Transaction Processing

const financialGraph = templates.financial.createAnomalyDetector({
  thresholds: {
    amount: 1000,
    frequency: 10 // transactions per minute
  },
  alertUrl: 'https://your-webhook.com'
});

IoT Data Aggregation

const iotGraph = templates.iot.createSensorAggregator({
  sensors: ['temperature', 'humidity', 'pressure'],
  aggregateWindow: 60, // seconds
  outputFormat: 'prometheus'
});

Real-time Analytics

const analyticsGraph = templates.analytics.createEventProcessor({
  events: ['click', 'view', 'purchase'],
  sessionTimeout: 1800, // 30 minutes
  enrichment: {
    geoip: true,
    userAgent: true
  }
});

Durable Object Integration

export class StreamProcessor extends DurableObject {
  private graph: Graph;

  async fetch(request: Request) {
    if (!this.graph) {
      this.graph = new Graph(graphConfig);
      await this.graph.start();
    }

    const url = new URL(request.url);
    
    if (url.pathname === '/inject') {
      const data = await request.json();
      await this.graph.inject('input', data);
      return new Response('OK');
    }

    if (url.pathname === '/metrics') {
      return Response.json(this.graph.getMetrics());
    }

    return new Response('Not found', { status: 404 });
  }
}

Node Types

Source Nodes

  • WebSocket - Real-time data streams
  • HTTP - Polling or webhook endpoints
  • Timer - Scheduled data generation
  • Manual - Programmatic injection

Transform Nodes

  • Data mapping and enrichment
  • Format conversion
  • Calculations and derived fields

Filter Nodes

  • Conditional routing
  • Data validation
  • Sampling and rate limiting

Aggregate Nodes

  • Time-based windows
  • Count-based windows
  • Session windows
  • Custom aggregation functions

Sink Nodes

  • WebSocket output
  • HTTP webhooks
  • Database writes
  • Custom outputs

Advanced Features

Error Handling

const graph = new Graph({
  // ...
  errorStrategy: 'continue', // or 'stop', 'retry'
  retryPolicy: {
    maxRetries: 3,
    backoffMultiplier: 2,
    initialDelay: 1000
  }
});

Metrics and Monitoring

const metrics = graph.getMetrics();
console.log({
  processed: metrics.packetsProcessed,
  dropped: metrics.packetsDropped,
  latency: metrics.averageLatency
});

graph.on('error', (event) => {
  console.error('Graph error:', event);
});

State Persistence

// Save graph state to Durable Object storage
const state = graph.getState();
await this.storage.put('graph-state', state);

// Restore on restart
const savedState = await this.storage.get('graph-state');
if (savedState) {
  graph.restore(savedState);
}

Performance

Streamflow is designed for high-throughput, low-latency processing:

  • Process 10,000+ events/second per Durable Object
  • Sub-millisecond processing latency
  • Automatic backpressure handling
  • Memory-efficient buffering

Contributing

We welcome contributions! Please see CONTRIBUTING.md for details.

License

MIT License - see LICENSE for details.

Support


Built with ❤️ for the edge computing community