Package Exports
- @fastify/sse
- @fastify/sse/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@fastify/sse) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
@fastify/sse
Server-Sent Events plugin for Fastify. Provides first-class SSE support with clean API integration, session management, and streaming capabilities.
Features
- 🚀 Clean API: Route-level SSE support with
{ sse: true }
- 📡 Streaming: Native support for Node.js streams and async iterators
- 🔄 Reconnection: Built-in message replay with
Last-Event-ID
- 💓 Heartbeat: Configurable keep-alive mechanism
- 🎯 Lifecycle: Full integration with Fastify hooks and error handling
- 📝 TypeScript: Complete type definitions included
- ⚡ Performance: Efficient backpressure handling
Install
npm i @fastify/sse
Quick Start
const fastify = require('fastify')({ logger: true })
// Register the plugin
await fastify.register(require('@fastify/sse'))
// Create an SSE endpoint
fastify.get('/events', { sse: true }, async (request, reply) => {
// Send a message
await reply.sse.send({ data: 'Hello SSE!' })
// Send with full options
await reply.sse.send({
id: '123',
event: 'update',
data: { message: 'Hello World' },
retry: 1000
})
})
await fastify.listen({ port: 3000 })
API
Plugin Registration
await fastify.register(require('@fastify/sse'), {
// Optional: heartbeat interval in milliseconds (default: 30000)
heartbeatInterval: 30000,
// Optional: default serializer (default: JSON.stringify)
serializer: (data) => JSON.stringify(data)
})
Route Configuration
// Enable SSE for a route
fastify.get('/events', { sse: true }, handler)
// With options
fastify.get('/events', {
sse: {
heartbeat: false, // Disable heartbeat for this route
serializer: customSerializer // Custom serializer for this route
}
}, handler)
reply.sse.send(source)
Send SSE messages. Accepts various source types:
Single Message
// Simple data
await reply.sse.send({ data: 'hello' })
// Full SSE message
await reply.sse.send({
id: '123',
event: 'update',
data: { message: 'Hello' },
retry: 1000
})
// Plain string
await reply.sse.send('plain text message')
Streaming Sources
// Async generator
async function* generateEvents() {
for (let i = 0; i < 10; i++) {
yield { id: i, data: `Event ${i}` }
await sleep(1000)
}
}
await reply.sse.send(generateEvents())
// Node.js Readable stream
const stream = fs.createReadStream('data.jsonl')
await reply.sse.send(stream)
// Transform existing stream
const transformStream = new Transform({
transform(chunk, encoding, callback) {
callback(null, { data: chunk.toString() })
}
})
someSource.pipe(transformStream)
await reply.sse.send(transformStream)
reply.sse.stream()
Create a transform stream for use in pipeline operations:
// Use with pipeline for efficient streaming
const { pipeline } = require('stream/promises')
const fs = require('fs')
fastify.get('/file-stream', { sse: true }, async (request, reply) => {
const fileStream = fs.createReadStream('data.jsonl')
// Parse each line as JSON and convert to SSE format
const parseTransform = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n').filter(Boolean)
for (const line of lines) {
try {
const data = JSON.parse(line)
this.push({ id: data.id, data })
} catch (err) {
// Skip invalid JSON lines
}
}
callback()
}
})
// Stream file data through SSE
await pipeline(
fileStream,
parseTransform,
reply.sse.stream(),
reply.raw,
{ end: false }
)
})
Connection Management
fastify.get('/live', { sse: true }, async (request, reply) => {
// Keep connection alive (prevents automatic close)
reply.sse.keepAlive()
// Send initial message
await reply.sse.send({ data: 'Connected' })
// Set up periodic updates
const interval = setInterval(async () => {
if (reply.sse.isConnected) {
await reply.sse.send({ data: 'ping' })
} else {
clearInterval(interval)
}
}, 1000)
// Clean up when connection closes
reply.sse.onClose(() => {
clearInterval(interval)
console.log('Connection closed')
})
})
Message Replay
Handle client reconnections with Last-Event-ID
:
const messageHistory = []
fastify.get('/events', { sse: true }, async (request, reply) => {
// Handle replay on reconnection
await reply.sse.replay(async (lastEventId) => {
// Find messages after the last received ID
const startIndex = messageHistory.findIndex(msg => msg.id === lastEventId)
const messagesToReplay = startIndex !== -1
? messageHistory.slice(startIndex + 1)
: messageHistory
// Send missed messages
for (const message of messagesToReplay) {
await reply.sse.send(message)
}
})
// Send new message
const newMessage = { id: Date.now(), data: 'New event' }
messageHistory.push(newMessage)
await reply.sse.send(newMessage)
})
Properties and Methods
reply.sse.lastEventId
: Client's last received event IDreply.sse.isConnected
: Connection status (boolean)reply.sse.keepAlive()
: Prevent connection from auto-closingreply.sse.close()
: Manually close the connectionreply.sse.replay(callback)
: Handle message replayreply.sse.onClose(callback)
: Register close callback
Advanced Usage
Fallback to Regular Responses
Routes with { sse: true }
automatically fall back to regular handlers when the client doesn't request SSE:
fastify.get('/data', { sse: true }, async (request, reply) => {
const data = await getData()
// Check if this is an SSE request
if (request.headers.accept?.includes('text/event-stream')) {
// SSE client - stream the data
await reply.sse.send({ data })
} else {
// Regular client - return JSON
return { data }
}
})
Error Handling
fastify.get('/stream', { sse: true }, async (request, reply) => {
try {
async function* riskyGenerator() {
yield { data: 'before error' }
throw new Error('Something went wrong')
}
await reply.sse.send(riskyGenerator())
} catch (error) {
// Handle errors gracefully
await reply.sse.send({
event: 'error',
data: { message: 'Stream error occurred' }
})
}
})
Custom Serialization
// Plugin-level serializer
await fastify.register(require('@fastify/sse'), {
serializer: (data) => {
// Custom serialization logic
return typeof data === 'string' ? data : JSON.stringify(data)
}
})
// Route-level serializer
fastify.get('/custom', {
sse: {
serializer: (data) => `CUSTOM:${JSON.stringify(data)}`
}
}, async (request, reply) => {
await reply.sse.send({ data: 'test' }) // Outputs: "CUSTOM:\"test\""
})
Testing
Testing SSE endpoints is simplified with standard Fastify injection:
const response = await fastify.inject({
method: 'GET',
url: '/events',
headers: {
accept: 'text/event-stream'
}
})
assert.strictEqual(response.statusCode, 200)
assert.strictEqual(response.headers['content-type'], 'text/event-stream')
assert.ok(response.body.includes('data: "Hello SSE!"'))
Client-Side Usage
<!DOCTYPE html>
<html>
<head>
<title>SSE Client</title>
</head>
<body>
<div id="messages"></div>
<script>
const eventSource = new EventSource('/events')
const messagesDiv = document.getElementById('messages')
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data)
messagesDiv.innerHTML += '<div>' + JSON.stringify(data) + '</div>'
}
eventSource.addEventListener('update', function(event) {
console.log('Update event:', JSON.parse(event.data))
})
eventSource.onerror = function(event) {
console.error('SSE error:', event)
}
</script>
</body>
</html>
TypeScript
Full TypeScript support included:
import fastify from 'fastify'
import fastifySSE, { SSEMessage } from '@fastify/sse'
const app = fastify()
await app.register(fastifySSE)
app.get('/events', { sse: true }, async (request, reply) => {
const message: SSEMessage = {
id: '123',
event: 'test',
data: { hello: 'world' }
}
await reply.sse.send(message)
// TypeScript knows about SSE properties
console.log(reply.sse.isConnected) // boolean
console.log(reply.sse.lastEventId) // string | null
})
Examples
See the examples directory for complete working examples:
- Basic Usage - Simple SSE endpoints
- More examples coming soon...
Comparison with fastify-sse-v2
Feature | fastify-sse-v2 | @fastify/sse |
---|---|---|
Basic SSE | ✅ | ✅ |
Async Iterators | ✅ | ✅ |
Stream Support | ✅ | ✅ Enhanced |
Session Management | ❌ | ✅ |
Last-Event-ID | ❌ | ✅ |
Connection Health | ❌ | ✅ |
Fastify Integration | ⚠️ Limited | ✅ Full |
Testing Support | ❌ | ✅ |