JSPM

kafka.do

0.0.4
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 4
  • Score
    100M100P100Q31379F
  • License MIT

Kafka-compatible streaming platform on Cloudflare Workers with Durable Object SQLite

Package Exports

  • kafka.do
  • kafka.do/client
  • kafka.do/errors

Readme

kafka.do

Kafka-compatible streaming platform on Cloudflare Workers with Durable Object SQLite.

kafka.do brings the familiar Kafka programming model to the edge, running entirely on Cloudflare's global network. Each topic partition is backed by a Durable Object with SQLite storage, providing strong consistency and durability without managing any infrastructure.

Features

  • Kafka-compatible API - Familiar producer, consumer, and admin interfaces
  • Edge-native - Runs on Cloudflare Workers with global distribution
  • Durable storage - Messages stored in Durable Object SQLite
  • Consumer groups - Coordinated consumption with automatic partition assignment
  • Offset management - Automatic and manual offset commits
  • HTTP Client SDK - Access kafka.do from any JavaScript runtime
  • Partitioning - Key-based partitioning for message ordering
  • Batch operations - Efficient batch produce and consume

Installation

npm install kafka.do

Quick Start

Producer API

Send messages to topics using the Producer API within a Cloudflare Worker:

import { createProducer } from 'kafka.do'

export default {
  async fetch(request: Request, env: Env) {
    const producer = createProducer(env)

    // Send a single message
    const metadata = await producer.send({
      topic: 'orders',
      key: 'order-123',
      value: { orderId: '123', amount: 99.99 },
      headers: { source: 'web' }
    })

    console.log(`Message sent to partition ${metadata.partition} at offset ${metadata.offset}`)

    // Send multiple messages in a batch
    const results = await producer.sendBatch([
      { topic: 'orders', key: 'order-124', value: { orderId: '124', amount: 49.99 } },
      { topic: 'orders', key: 'order-125', value: { orderId: '125', amount: 149.99 } }
    ])

    await producer.close()

    return new Response('Messages sent')
  }
}

Consumer API

Consume messages from topics using the Consumer API:

import { createConsumer } from 'kafka.do'

export default {
  async fetch(request: Request, env: Env) {
    const consumer = createConsumer(env, {
      groupId: 'order-processor',
      clientId: 'worker-1',
      autoCommit: true,
      fromBeginning: false
    })

    // Subscribe to topics
    await consumer.subscribe(['orders'])

    // Poll for messages
    const records = await consumer.poll(1000)

    for (const record of records) {
      console.log(`Received: ${record.key} = ${JSON.stringify(record.value)}`)
      console.log(`Topic: ${record.topic}, Partition: ${record.partition}, Offset: ${record.offset}`)
    }

    // Manual commit (if autoCommit is false)
    await consumer.commit()

    await consumer.close()

    return new Response(`Processed ${records.length} messages`)
  }
}

Using Async Iterator

const consumer = createConsumer(env, { groupId: 'my-group' })
await consumer.subscribe(['orders'])

for await (const record of consumer) {
  console.log(`Processing: ${record.key}`)
  // Process each record as it arrives
}

Admin API

Manage topics and consumer groups:

import { createAdmin } from 'kafka.do'

export default {
  async fetch(request: Request, env: Env) {
    const admin = createAdmin(env)

    // Create a topic with 3 partitions
    await admin.createTopic({
      topic: 'orders',
      partitions: 3,
      config: {
        'retention.ms': '604800000' // 7 days
      }
    })

    // List all topics
    const topics = await admin.listTopics()
    console.log('Topics:', topics)

    // Describe a topic
    const metadata = await admin.describeTopic('orders')
    console.log('Partitions:', metadata.partitions.length)

    // Add more partitions
    await admin.createPartitions('orders', 6)

    // List consumer groups
    const groups = await admin.listGroups()

    // Describe a consumer group
    const groupInfo = await admin.describeGroup('order-processor')
    console.log('Group state:', groupInfo.state)
    console.log('Members:', groupInfo.members.length)

    // Get partition offsets
    const offsets = await admin.listOffsets('orders')
    for (const [partition, info] of offsets) {
      console.log(`Partition ${partition}: earliest=${info.earliest}, latest=${info.latest}`)
    }

    // Delete a topic
    await admin.deleteTopic('old-topic')

    // Delete a consumer group
    await admin.deleteGroup('old-group')

    await admin.close()

    return new Response('Admin operations complete')
  }
}

HTTP Client SDK

Access kafka.do from any JavaScript environment using the HTTP Client SDK:

import { KafkaClient } from 'kafka.do/client'

// Create client pointing to your kafka.do deployment
const client = new KafkaClient({
  baseUrl: 'https://kafka.your-domain.workers.dev',
  clientId: 'my-app',
  timeout: 30000,
  headers: {
    'Authorization': 'Bearer your-token'
  }
})

// Check service health
const health = await client.health()
console.log('Status:', health.status)

// Producer operations
const producer = client.producer({ defaultTopic: 'events' })

await producer.send({
  key: 'user-123',
  value: { type: 'page_view', page: '/home' }
})

await producer.sendBatch([
  { key: 'user-123', value: { type: 'click', button: 'signup' } },
  { key: 'user-456', value: { type: 'page_view', page: '/about' } }
])

// Consumer operations
const consumer = client.consumer({
  groupId: 'analytics-processor',
  topics: ['events'],
  autoCommit: true
})

// Connect and join consumer group
const joinResult = await consumer.connect()
console.log('Member ID:', joinResult.memberId)

// Fetch messages from a partition
const { messages } = await consumer.fetch('events', 0, { offset: 0, limit: 100 })

for (const msg of messages) {
  console.log(`${msg.key}: ${JSON.stringify(msg.value)}`)
}

// Commit offsets
await consumer.commit()

// Get partition offsets
const offsets = await consumer.getOffsets('events', 0)
console.log(`Earliest: ${offsets.earliest}, Latest: ${offsets.latest}`)

// Disconnect from consumer group
await consumer.disconnect()

// Admin operations
const admin = client.admin()

await admin.createTopic({ topic: 'logs', partitions: 5 })

const topics = await admin.listTopics()
const topicInfo = await admin.describeTopic('logs')
const groups = await admin.listGroups()
const groupInfo = await admin.describeGroup('analytics-processor')

await admin.addPartitions('logs', 10)
await admin.deleteTopic('old-logs')
await admin.deleteGroup('old-group')

API Reference

Producer

createProducer(env, config?)

Creates a new producer instance.

Config Option Type Default Description
clientId string undefined Client identifier for tracking
batchSize number undefined Number of messages to batch
lingerMs number undefined Time to wait for batch to fill
acks 0 | 1 | 'all' undefined Acknowledgment mode
retries number undefined Number of retry attempts

Producer Methods

  • send(record) - Send a single message, returns RecordMetadata
  • sendBatch(records) - Send multiple messages, returns RecordMetadata[]
  • flush() - Flush buffered messages
  • close() - Close the producer

Consumer

createConsumer(env, config, rebalanceListener?)

Creates a new consumer instance.

Config Option Type Default Description
groupId string required Consumer group ID
clientId string 'do-consumer' Client identifier
sessionTimeoutMs number 30000 Session timeout
heartbeatIntervalMs number 3000 Heartbeat interval
maxPollRecords number 500 Max records per poll
autoCommit boolean true Enable auto-commit
autoCommitIntervalMs number 5000 Auto-commit interval
fromBeginning boolean false Start from beginning
rebalanceTimeoutMs number 60000 Rebalance timeout

Consumer Methods

  • subscribe(topics) - Subscribe to topics
  • unsubscribe() - Unsubscribe from all topics
  • poll(timeout?) - Poll for records
  • commit() - Commit current offsets
  • commitSync(offsets?) - Commit specific offsets
  • seek(partition, offset) - Seek to offset
  • pause(partitions) - Pause consumption
  • resume(partitions) - Resume consumption
  • close() - Close consumer and leave group

Rebalance Listener

const consumer = createConsumer(env, config, {
  async onPartitionsAssigned(partitions) {
    console.log('Assigned:', partitions)
  },
  async onPartitionsRevoked(partitions) {
    console.log('Revoked:', partitions)
  }
})

Admin

createAdmin(env, config?)

Creates a new admin client.

Config Option Type Default Description
clientId string undefined Client identifier
requestTimeoutMs number undefined Request timeout

Admin Methods

  • createTopic(config) - Create a new topic
  • deleteTopic(topic) - Delete a topic
  • listTopics() - List all topics
  • describeTopic(topic) - Get topic metadata
  • createPartitions(topic, count) - Add partitions
  • listGroups() - List consumer groups
  • describeGroup(groupId) - Get group details
  • deleteGroup(groupId) - Delete a consumer group
  • listOffsets(topic) - Get partition offsets
  • close() - Close admin client

HTTP Client

KafkaClient

Config Option Type Default Description
baseUrl string required kafka.do service URL
clientId string auto-generated Client identifier
timeout number 30000 Request timeout (ms)
headers object {} Default headers
fetch function globalThis.fetch Custom fetch implementation

Integrations

kafka.do includes pre-built integrations for common data sources.

import {
  KafkaPipeline,
  createKafkaPipeline,
  R2EventBridge,
  createR2EventBridge
} from 'kafka.do/integrations'

MongoDB CDC

Stream MongoDB change events to kafka.do topics using the KafkaPipeline adapter. This integrates with MongoDB change streams to capture insert, update, and delete operations in real-time.

import { createKafkaPipeline, type CDCEvent } from 'kafka.do/integrations'

// Create a pipeline that routes events to topics based on database/collection
const pipeline = createKafkaPipeline({
  env,
  topicPattern: 'cdc.{db}.{coll}' // e.g., cdc.mydb.users
})

// Send a CDC event (typically from a MongoDB change stream)
await pipeline.send({
  eventId: 'evt-123',
  operationType: 'insert',
  ns: { db: 'mydb', coll: 'users' },
  documentKey: { _id: 'user-456' },
  fullDocument: { _id: 'user-456', name: 'Alice', email: 'alice@example.com' },
  timestamp: new Date().toISOString()
})

// Send multiple events in a batch
await pipeline.sendBatch(cdcEvents)

Factory functions:

  • createKafkaPipeline(config) - Full configuration with custom topic patterns
  • createFixedTopicPipeline(env, topic) - All events go to a single topic
  • createDatabaseTopicPipeline(env) - Topics per database (cdc.{db})
  • createCollectionTopicPipeline(env) - Topics per collection (cdc.{db}.{coll})

Consumer helpers:

import { processCDCMessage, isInsertEvent } from 'kafka.do/integrations'

// Process CDC messages with typed handlers
await processCDCMessage(message, {
  database: 'mydb',
  collection: 'users',
  groupId: 'cdc-processor',
  onInsert: async (event) => console.log('New document:', event.fullDocument),
  onUpdate: async (event) => console.log('Updated:', event.updateDescription),
  onDelete: async (event) => console.log('Deleted:', event.documentKey)
})

R2 Event Bridge

Stream R2 object events (creates, deletes) to kafka.do topics. Use this as a Queue consumer to capture R2 event notifications.

import { createR2EventBridge, R2EventBridge } from 'kafka.do/integrations'

// Create an event bridge
const bridge = createR2EventBridge({
  env,
  topicPattern: 'r2.{bucket}', // e.g., r2.my-bucket
  bucketFilter: 'my-bucket',   // Optional: filter by bucket
  keyPrefixFilter: 'uploads/'  // Optional: filter by key prefix
})

// Process R2 events (typically in a Queue consumer)
export default {
  async queue(batch: MessageBatch, env: Env) {
    const bridge = createR2EventBridge({ env })

    for (const message of batch.messages) {
      const event = R2EventBridge.parseQueueMessage(message)
      if (event) {
        await bridge.processEvent(event)
      }
    }
  }
}

Consumer helpers:

import { processR2Event, isR2ObjectCreated } from 'kafka.do/integrations'

// Process R2 events with typed handlers
await processR2Event(message, {
  bucketFilter: 'my-bucket',
  onObjectCreated: async (event) => {
    console.log('Created:', event.object.key, event.object.size)
  },
  onObjectDeleted: async (event) => {
    console.log('Deleted:', event.key)
  }
})

Configuration

Wrangler Configuration

Add the following to your wrangler.toml:

name = "my-kafka-app"
main = "src/index.ts"
compatibility_date = "2024-01-01"
compatibility_flags = ["nodejs_compat"]

[durable_objects]
bindings = [
  { name = "TOPIC_PARTITION", class_name = "TopicPartitionDO" },
  { name = "CONSUMER_GROUP", class_name = "ConsumerGroupDO" },
  { name = "CLUSTER_METADATA", class_name = "ClusterMetadataDO" }
]

[[migrations]]
tag = "v1"
new_sqlite_classes = ["TopicPartitionDO", "ConsumerGroupDO", "ClusterMetadataDO"]

Environment Type

interface Env {
  TOPIC_PARTITION: DurableObjectNamespace
  CONSUMER_GROUP: DurableObjectNamespace
  CLUSTER_METADATA: DurableObjectNamespace
}

HTTP API Endpoints

kafka.do exposes a REST API for external access:

Producer Endpoints

  • POST /topics/:topic/produce - Produce a single message
  • POST /topics/:topic/produce-batch - Produce multiple messages

Consumer Endpoints

  • GET /topics/:topic/partitions/:partition/messages - Read messages
  • GET /topics/:topic/partitions/:partition/offsets - Get partition offsets

Consumer Group Endpoints

  • POST /consumer-groups/:groupId/join - Join a consumer group
  • POST /consumer-groups/:groupId/heartbeat - Send heartbeat
  • POST /consumer-groups/:groupId/commit - Commit offsets
  • POST /consumer-groups/:groupId/leave - Leave consumer group
  • GET /consumer-groups/:groupId - Describe consumer group

Admin Endpoints

  • GET /admin/topics - List topics
  • POST /admin/topics - Create topic
  • GET /admin/topics/:topic - Describe topic
  • DELETE /admin/topics/:topic - Delete topic
  • POST /admin/topics/:topic/partitions - Add partitions
  • GET /admin/topics/:topic/offsets - Get topic offsets
  • GET /admin/groups - List consumer groups
  • GET /admin/groups/:groupId - Describe consumer group
  • DELETE /admin/groups/:groupId - Delete consumer group

Health Endpoints

  • GET / - Service info
  • GET /health - Health check

Requirements

  • Cloudflare Workers environment
  • Durable Objects with SQLite storage enabled
  • Node.js 18+ (for local development)

Development

# Install dependencies
npm install

# Run locally
npm run dev

# Run tests
npm test

# Type check
npm run typecheck

# Deploy to Cloudflare
npm run deploy

License

MIT