JSPM

@fluxgraph/core

0.1.0
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 4
  • Score
    100M100P100Q40260F
  • License MIT

Real-time graph-based stream processing for Cloudflare Workers and Durable Objects

Package Exports

  • @fluxgraph/core
  • @fluxgraph/core/dist/index.js

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@fluxgraph/core) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

FluxGraph

🌊 Real-time graph-based stream processing for Cloudflare Workers and Durable Objects

FluxGraph is a lightweight, high-performance stream processing library designed specifically for edge computing environments. Build complex data processing pipelines that run directly on Cloudflare's global network.

Features

  • 🚀 Real-time Processing - Process data streams with millisecond latency
  • 🔀 Graph-based Architecture - Create complex topologies with parallel and conditional paths
  • 📊 Built-in Aggregations - Time, count, and session-based windowing
  • 🔄 Backpressure Handling - Automatic buffering and flow control
  • 🛡️ Error Resilience - Retry policies and error recovery strategies
  • 🎯 Type-safe - Full TypeScript support with comprehensive types
  • ☁️ Edge-native - Optimized for Cloudflare Workers and Durable Objects

Installation

npm install @fluxgraph/core

Quick Start

import { Graph, nodes } from '@fluxgraph/core';

// Define your graph
const graph = new Graph({
  name: 'Transaction Processor',
  nodes: [
    nodes.source('webhook', {
      type: 'websocket',
      url: 'wss://api.example.com/transactions'
    }),
    
    nodes.transform('normalize', {
      function: (data) => ({
        ...data,
        amount: data.amount / 100
      })
    }),
    
    nodes.filter('large-only', {
      function: (data) => data.amount > 100
    }),
    
    nodes.aggregate('hourly-summary', {
      window: 'time',
      duration: 3600,
      function: (packets) => ({
        total: packets.reduce((sum, p) => sum + p.data.amount, 0),
        count: packets.length
      })
    }),
    
    nodes.sink('alerts', {
      type: 'http',
      url: 'https://alerts.example.com/webhook'
    })
  ],
  
  edges: [
    ['webhook', 'normalize'],
    ['normalize', 'large-only'],
    ['large-only', 'hourly-summary'],
    ['hourly-summary', 'alerts']
  ]
});

// Start processing
await graph.start();

// Inject data manually
await graph.inject('webhook', { amount: 15000, currency: 'USD' });

// Subscribe to outputs
graph.subscribe('alerts', (packet) => {
  console.log('Alert triggered:', packet.data);
});

Use Cases

Financial Transaction Processing

const financialGraph = templates.financial.createAnomalyDetector({
  thresholds: {
    amount: 1000,
    frequency: 10 // transactions per minute
  },
  alertUrl: 'https://your-webhook.com'
});

IoT Data Aggregation

const iotGraph = templates.iot.createSensorAggregator({
  sensors: ['temperature', 'humidity', 'pressure'],
  aggregateWindow: 60, // seconds
  outputFormat: 'prometheus'
});

Real-time Analytics

const analyticsGraph = templates.analytics.createEventProcessor({
  events: ['click', 'view', 'purchase'],
  sessionTimeout: 1800, // 30 minutes
  enrichment: {
    geoip: true,
    userAgent: true
  }
});

Durable Object Integration

export class StreamProcessor extends DurableObject {
  private graph: Graph;

  async fetch(request: Request) {
    if (!this.graph) {
      this.graph = new Graph(graphConfig);
      await this.graph.start();
    }

    const url = new URL(request.url);
    
    if (url.pathname === '/inject') {
      const data = await request.json();
      await this.graph.inject('input', data);
      return new Response('OK');
    }

    if (url.pathname === '/metrics') {
      return Response.json(this.graph.getMetrics());
    }

    return new Response('Not found', { status: 404 });
  }
}

Node Types

Source Nodes

  • WebSocket - Real-time data streams
  • HTTP - Polling or webhook endpoints
  • Timer - Scheduled data generation
  • Manual - Programmatic injection

Transform Nodes

  • Data mapping and enrichment
  • Format conversion
  • Calculations and derived fields

Filter Nodes

  • Conditional routing
  • Data validation
  • Sampling and rate limiting

Aggregate Nodes

  • Time-based windows
  • Count-based windows
  • Session windows
  • Custom aggregation functions

Sink Nodes

  • WebSocket output
  • HTTP webhooks
  • Database writes
  • Custom outputs

Advanced Features

Error Handling

const graph = new Graph({
  // ...
  errorStrategy: 'continue', // or 'stop', 'retry'
  retryPolicy: {
    maxRetries: 3,
    backoffMultiplier: 2,
    initialDelay: 1000
  }
});

Metrics and Monitoring

const metrics = graph.getMetrics();
console.log({
  processed: metrics.packetsProcessed,
  dropped: metrics.packetsDropped,
  latency: metrics.averageLatency
});

graph.on('error', (event) => {
  console.error('Graph error:', event);
});

State Persistence

// Save graph state to Durable Object storage
const state = graph.getState();
await this.storage.put('graph-state', state);

// Restore on restart
const savedState = await this.storage.get('graph-state');
if (savedState) {
  graph.restore(savedState);
}

Performance

Streamflow is designed for high-throughput, low-latency processing:

  • Process 10,000+ events/second per Durable Object
  • Sub-millisecond processing latency
  • Automatic backpressure handling
  • Memory-efficient buffering

Contributing

We welcome contributions! Please see CONTRIBUTING.md for details.

License

MIT License - see LICENSE for details.

Support


Built with ❤️ for the edge computing community