Package Exports
- kafka.do
- kafka.do/client
- kafka.do/errors
Readme
kafka.do
Kafka-compatible streaming platform on Cloudflare Workers with Durable Object SQLite.
kafka.do brings the familiar Kafka programming model to the edge, running entirely on Cloudflare's global network. Each topic partition is backed by a Durable Object with SQLite storage, providing strong consistency and durability without managing any infrastructure.
Features
- Kafka-compatible API - Familiar producer, consumer, and admin interfaces
- Edge-native - Runs on Cloudflare Workers with global distribution
- Durable storage - Messages stored in Durable Object SQLite
- Consumer groups - Coordinated consumption with automatic partition assignment
- Offset management - Automatic and manual offset commits
- HTTP Client SDK - Access kafka.do from any JavaScript runtime
- Partitioning - Key-based partitioning for message ordering
- Batch operations - Efficient batch produce and consume
Installation
npm install kafka.doQuick Start
Producer API
Send messages to topics using the Producer API within a Cloudflare Worker:
import { createProducer } from 'kafka.do'
export default {
async fetch(request: Request, env: Env) {
const producer = createProducer(env)
// Send a single message
const metadata = await producer.send({
topic: 'orders',
key: 'order-123',
value: { orderId: '123', amount: 99.99 },
headers: { source: 'web' }
})
console.log(`Message sent to partition ${metadata.partition} at offset ${metadata.offset}`)
// Send multiple messages in a batch
const results = await producer.sendBatch([
{ topic: 'orders', key: 'order-124', value: { orderId: '124', amount: 49.99 } },
{ topic: 'orders', key: 'order-125', value: { orderId: '125', amount: 149.99 } }
])
await producer.close()
return new Response('Messages sent')
}
}Consumer API
Consume messages from topics using the Consumer API:
import { createConsumer } from 'kafka.do'
export default {
async fetch(request: Request, env: Env) {
const consumer = createConsumer(env, {
groupId: 'order-processor',
clientId: 'worker-1',
autoCommit: true,
fromBeginning: false
})
// Subscribe to topics
await consumer.subscribe(['orders'])
// Poll for messages
const records = await consumer.poll(1000)
for (const record of records) {
console.log(`Received: ${record.key} = ${JSON.stringify(record.value)}`)
console.log(`Topic: ${record.topic}, Partition: ${record.partition}, Offset: ${record.offset}`)
}
// Manual commit (if autoCommit is false)
await consumer.commit()
await consumer.close()
return new Response(`Processed ${records.length} messages`)
}
}Using Async Iterator
const consumer = createConsumer(env, { groupId: 'my-group' })
await consumer.subscribe(['orders'])
for await (const record of consumer) {
console.log(`Processing: ${record.key}`)
// Process each record as it arrives
}Admin API
Manage topics and consumer groups:
import { createAdmin } from 'kafka.do'
export default {
async fetch(request: Request, env: Env) {
const admin = createAdmin(env)
// Create a topic with 3 partitions
await admin.createTopic({
topic: 'orders',
partitions: 3,
config: {
'retention.ms': '604800000' // 7 days
}
})
// List all topics
const topics = await admin.listTopics()
console.log('Topics:', topics)
// Describe a topic
const metadata = await admin.describeTopic('orders')
console.log('Partitions:', metadata.partitions.length)
// Add more partitions
await admin.createPartitions('orders', 6)
// List consumer groups
const groups = await admin.listGroups()
// Describe a consumer group
const groupInfo = await admin.describeGroup('order-processor')
console.log('Group state:', groupInfo.state)
console.log('Members:', groupInfo.members.length)
// Get partition offsets
const offsets = await admin.listOffsets('orders')
for (const [partition, info] of offsets) {
console.log(`Partition ${partition}: earliest=${info.earliest}, latest=${info.latest}`)
}
// Delete a topic
await admin.deleteTopic('old-topic')
// Delete a consumer group
await admin.deleteGroup('old-group')
await admin.close()
return new Response('Admin operations complete')
}
}HTTP Client SDK
Access kafka.do from any JavaScript environment using the HTTP Client SDK:
import { KafkaClient } from 'kafka.do/client'
// Create client pointing to your kafka.do deployment
const client = new KafkaClient({
baseUrl: 'https://kafka.your-domain.workers.dev',
clientId: 'my-app',
timeout: 30000,
headers: {
'Authorization': 'Bearer your-token'
}
})
// Check service health
const health = await client.health()
console.log('Status:', health.status)
// Producer operations
const producer = client.producer({ defaultTopic: 'events' })
await producer.send({
key: 'user-123',
value: { type: 'page_view', page: '/home' }
})
await producer.sendBatch([
{ key: 'user-123', value: { type: 'click', button: 'signup' } },
{ key: 'user-456', value: { type: 'page_view', page: '/about' } }
])
// Consumer operations
const consumer = client.consumer({
groupId: 'analytics-processor',
topics: ['events'],
autoCommit: true
})
// Connect and join consumer group
const joinResult = await consumer.connect()
console.log('Member ID:', joinResult.memberId)
// Fetch messages from a partition
const { messages } = await consumer.fetch('events', 0, { offset: 0, limit: 100 })
for (const msg of messages) {
console.log(`${msg.key}: ${JSON.stringify(msg.value)}`)
}
// Commit offsets
await consumer.commit()
// Get partition offsets
const offsets = await consumer.getOffsets('events', 0)
console.log(`Earliest: ${offsets.earliest}, Latest: ${offsets.latest}`)
// Disconnect from consumer group
await consumer.disconnect()
// Admin operations
const admin = client.admin()
await admin.createTopic({ topic: 'logs', partitions: 5 })
const topics = await admin.listTopics()
const topicInfo = await admin.describeTopic('logs')
const groups = await admin.listGroups()
const groupInfo = await admin.describeGroup('analytics-processor')
await admin.addPartitions('logs', 10)
await admin.deleteTopic('old-logs')
await admin.deleteGroup('old-group')API Reference
Producer
createProducer(env, config?)
Creates a new producer instance.
| Config Option | Type | Default | Description |
|---|---|---|---|
clientId |
string |
undefined |
Client identifier for tracking |
batchSize |
number |
undefined |
Number of messages to batch |
lingerMs |
number |
undefined |
Time to wait for batch to fill |
acks |
0 | 1 | 'all' |
undefined |
Acknowledgment mode |
retries |
number |
undefined |
Number of retry attempts |
Producer Methods
send(record)- Send a single message, returnsRecordMetadatasendBatch(records)- Send multiple messages, returnsRecordMetadata[]flush()- Flush buffered messagesclose()- Close the producer
Consumer
createConsumer(env, config, rebalanceListener?)
Creates a new consumer instance.
| Config Option | Type | Default | Description |
|---|---|---|---|
groupId |
string |
required | Consumer group ID |
clientId |
string |
'do-consumer' |
Client identifier |
sessionTimeoutMs |
number |
30000 |
Session timeout |
heartbeatIntervalMs |
number |
3000 |
Heartbeat interval |
maxPollRecords |
number |
500 |
Max records per poll |
autoCommit |
boolean |
true |
Enable auto-commit |
autoCommitIntervalMs |
number |
5000 |
Auto-commit interval |
fromBeginning |
boolean |
false |
Start from beginning |
rebalanceTimeoutMs |
number |
60000 |
Rebalance timeout |
Consumer Methods
subscribe(topics)- Subscribe to topicsunsubscribe()- Unsubscribe from all topicspoll(timeout?)- Poll for recordscommit()- Commit current offsetscommitSync(offsets?)- Commit specific offsetsseek(partition, offset)- Seek to offsetpause(partitions)- Pause consumptionresume(partitions)- Resume consumptionclose()- Close consumer and leave group
Rebalance Listener
const consumer = createConsumer(env, config, {
async onPartitionsAssigned(partitions) {
console.log('Assigned:', partitions)
},
async onPartitionsRevoked(partitions) {
console.log('Revoked:', partitions)
}
})Admin
createAdmin(env, config?)
Creates a new admin client.
| Config Option | Type | Default | Description |
|---|---|---|---|
clientId |
string |
undefined |
Client identifier |
requestTimeoutMs |
number |
undefined |
Request timeout |
Admin Methods
createTopic(config)- Create a new topicdeleteTopic(topic)- Delete a topiclistTopics()- List all topicsdescribeTopic(topic)- Get topic metadatacreatePartitions(topic, count)- Add partitionslistGroups()- List consumer groupsdescribeGroup(groupId)- Get group detailsdeleteGroup(groupId)- Delete a consumer grouplistOffsets(topic)- Get partition offsetsclose()- Close admin client
HTTP Client
KafkaClient
| Config Option | Type | Default | Description |
|---|---|---|---|
baseUrl |
string |
required | kafka.do service URL |
clientId |
string |
auto-generated | Client identifier |
timeout |
number |
30000 |
Request timeout (ms) |
headers |
object |
{} |
Default headers |
fetch |
function |
globalThis.fetch |
Custom fetch implementation |
Integrations
kafka.do includes pre-built integrations for common data sources.
import {
KafkaPipeline,
createKafkaPipeline,
R2EventBridge,
createR2EventBridge
} from 'kafka.do/integrations'MongoDB CDC
Stream MongoDB change events to kafka.do topics using the KafkaPipeline adapter. This integrates with MongoDB change streams to capture insert, update, and delete operations in real-time.
import { createKafkaPipeline, type CDCEvent } from 'kafka.do/integrations'
// Create a pipeline that routes events to topics based on database/collection
const pipeline = createKafkaPipeline({
env,
topicPattern: 'cdc.{db}.{coll}' // e.g., cdc.mydb.users
})
// Send a CDC event (typically from a MongoDB change stream)
await pipeline.send({
eventId: 'evt-123',
operationType: 'insert',
ns: { db: 'mydb', coll: 'users' },
documentKey: { _id: 'user-456' },
fullDocument: { _id: 'user-456', name: 'Alice', email: 'alice@example.com' },
timestamp: new Date().toISOString()
})
// Send multiple events in a batch
await pipeline.sendBatch(cdcEvents)Factory functions:
createKafkaPipeline(config)- Full configuration with custom topic patternscreateFixedTopicPipeline(env, topic)- All events go to a single topiccreateDatabaseTopicPipeline(env)- Topics per database (cdc.{db})createCollectionTopicPipeline(env)- Topics per collection (cdc.{db}.{coll})
Consumer helpers:
import { processCDCMessage, isInsertEvent } from 'kafka.do/integrations'
// Process CDC messages with typed handlers
await processCDCMessage(message, {
database: 'mydb',
collection: 'users',
groupId: 'cdc-processor',
onInsert: async (event) => console.log('New document:', event.fullDocument),
onUpdate: async (event) => console.log('Updated:', event.updateDescription),
onDelete: async (event) => console.log('Deleted:', event.documentKey)
})R2 Event Bridge
Stream R2 object events (creates, deletes) to kafka.do topics. Use this as a Queue consumer to capture R2 event notifications.
import { createR2EventBridge, R2EventBridge } from 'kafka.do/integrations'
// Create an event bridge
const bridge = createR2EventBridge({
env,
topicPattern: 'r2.{bucket}', // e.g., r2.my-bucket
bucketFilter: 'my-bucket', // Optional: filter by bucket
keyPrefixFilter: 'uploads/' // Optional: filter by key prefix
})
// Process R2 events (typically in a Queue consumer)
export default {
async queue(batch: MessageBatch, env: Env) {
const bridge = createR2EventBridge({ env })
for (const message of batch.messages) {
const event = R2EventBridge.parseQueueMessage(message)
if (event) {
await bridge.processEvent(event)
}
}
}
}Consumer helpers:
import { processR2Event, isR2ObjectCreated } from 'kafka.do/integrations'
// Process R2 events with typed handlers
await processR2Event(message, {
bucketFilter: 'my-bucket',
onObjectCreated: async (event) => {
console.log('Created:', event.object.key, event.object.size)
},
onObjectDeleted: async (event) => {
console.log('Deleted:', event.key)
}
})Configuration
Wrangler Configuration
Add the following to your wrangler.toml:
name = "my-kafka-app"
main = "src/index.ts"
compatibility_date = "2024-01-01"
compatibility_flags = ["nodejs_compat"]
[durable_objects]
bindings = [
{ name = "TOPIC_PARTITION", class_name = "TopicPartitionDO" },
{ name = "CONSUMER_GROUP", class_name = "ConsumerGroupDO" },
{ name = "CLUSTER_METADATA", class_name = "ClusterMetadataDO" }
]
[[migrations]]
tag = "v1"
new_sqlite_classes = ["TopicPartitionDO", "ConsumerGroupDO", "ClusterMetadataDO"]Environment Type
interface Env {
TOPIC_PARTITION: DurableObjectNamespace
CONSUMER_GROUP: DurableObjectNamespace
CLUSTER_METADATA: DurableObjectNamespace
}HTTP API Endpoints
kafka.do exposes a REST API for external access:
Producer Endpoints
POST /topics/:topic/produce- Produce a single messagePOST /topics/:topic/produce-batch- Produce multiple messages
Consumer Endpoints
GET /topics/:topic/partitions/:partition/messages- Read messagesGET /topics/:topic/partitions/:partition/offsets- Get partition offsets
Consumer Group Endpoints
POST /consumer-groups/:groupId/join- Join a consumer groupPOST /consumer-groups/:groupId/heartbeat- Send heartbeatPOST /consumer-groups/:groupId/commit- Commit offsetsPOST /consumer-groups/:groupId/leave- Leave consumer groupGET /consumer-groups/:groupId- Describe consumer group
Admin Endpoints
GET /admin/topics- List topicsPOST /admin/topics- Create topicGET /admin/topics/:topic- Describe topicDELETE /admin/topics/:topic- Delete topicPOST /admin/topics/:topic/partitions- Add partitionsGET /admin/topics/:topic/offsets- Get topic offsetsGET /admin/groups- List consumer groupsGET /admin/groups/:groupId- Describe consumer groupDELETE /admin/groups/:groupId- Delete consumer group
Health Endpoints
GET /- Service infoGET /health- Health check
Requirements
- Cloudflare Workers environment
- Durable Objects with SQLite storage enabled
- Node.js 18+ (for local development)
Development
# Install dependencies
npm install
# Run locally
npm run dev
# Run tests
npm test
# Type check
npm run typecheck
# Deploy to Cloudflare
npm run deployLicense
MIT