Package Exports
- sveltekit-sse
Readme
SvelteKit SSE
This library provides an easy way to produce and consume server sent events.
Install with:
npm i -D sveltekit-sse
Create your server sent event with:
// src/routes/custom-event/+server.js
import { event } from 'sveltekit-sse'
/**
* @param {number} milliseconds
* @returns
*/
function delay(milliseconds) {
return new Promise(function run(r) {
setTimeout(r, milliseconds)
})
}
export function GET() {
return event(async function run(emit) {
while (true) {
emit(`${Date.now()}`)
await delay(1000)
}
}).toResponse()
}
(expand for more details)
The stream and http connection will end whenever the callback passed to event()
resolves.
The infinite loop is of great importance in this case because it prevents the callback from resolving and keeps the connection to the client open.
You can end the connection at any time by return
ing or break
ing the loop
export function GET() {
let i = 0
return event(async function run(emit){
while (true) {
emit(`${Date.now()}`)
await delay(1000)
if(9 === i) {
return
}
i++
}
}).toResponse()
}
This will complete the response after 10 iterations.
You don't have to use an infinite loop, you can achieve the same result using any other kind of semaphore-like behavior.
For example using Promise
export function GET() {
let i = 0
return event(async function run(emit){
await new Promise(function run(stop){ // this line will prevent the callback from resolving
while (i < 10) {
emit(`${Date.now()}`)
await delay(1000)
i++
}
stop() // if you omit this line, the promise will not resolve and the connection will remain open
})
}).toResponse()
}
Without calling stop()
, even though the while
loop is not infinite anymore, the connection will remain open and you will leak memory this way.
Read more about this topic in the locking section.
and consume the source on your client with:
<script>
// src/routes/+page.svelte
import { source } from 'sveltekit-sse'
const value = source('/custom-event')
</script>
{$value}
Multiple events
All major browsers will limit the number of parallel http connections.
One solution to this problem is using http2.
However, for various reasons not everyone can serve http2 responses, in that case you can use the same http1 connection to emit multiple events.
// src/routes/events/+server.js
import { events } from 'sveltekit-sse'
/**
* @param {number} milliseconds
* @returns
*/
function delay(milliseconds) {
return new Promise(function run(r) {
setTimeout(r, milliseconds)
})
}
export function GET() {
return events(async function run(emit) {
while (true) {
emit('event-1', `/events (1) says: ${Date.now()}`)
emit('event-2', `/events (2) says: ${Date.now()}`)
emit('event-3', `/events (3) says: ${Date.now()}`)
await delay(2000)
}
}).toResponse()
}
and consume it on your client with:
<script>
import { source } from 'sveltekit-sse'
const connection = source('/events')
const value1 = connection.select('event-1')
const value2 = connection.select('event-2')
const value3 = connection.select('event-3')
</script>
{$value1}
<br />
{$value2}
<br />
{$value3}
Transform
While on the client, you can transform the stream into any type of object you want by using source::select::transform
.
The transform
method receives a ReadableStream
, which you can use to read incoming messages from the source.
Here's an example how to use it.
<script>
import { source } from 'sveltekit-sse'
const connection = source('/custom-event')
const channel = connection.select('message')
const transformed = channel.transform(function start(stream) {
let state = {
/** @type {Array<function(string):void>}*/
listeners: [],
}
const reader = stream.getReader()
const store = {
subscribe(callback) {
if (!state.listeners.includes(callback)) {
state.listeners.push(callback)
}
return function stop() {
state.listeners = state.listeners.filter(function pass(value) {
return value !== callback
})
}
},
}
const listen = async function () {
let value = ''
while (({ value } = await reader.read())) {
state.listeners.forEach(function run(callback) {
callback(value)
})
}
}
listen()
return store
})
$: console.log({ $transformed })
</script>
Custom Headers
The standard EventSource
class does not permit setting custom headers or manipulating the underlying request options.
This library achieves client side event sourcing using fetch
.
[!NOTE] Custom headers are only available since version
0.4.0
.
The following will set a Authorization: Bearer ...
header to the underlying http request.
<script>
import { source } from 'sveltekit-sse'
const data = source('/event', {
headers: {
Authorization: 'Bearer ...',
},
})
</script>
{$data}
Reconnect
You can reconnect to the stream whenever the stream closes by invoking Event::connect
.
<script>
import { source } from 'sveltekit-sse'
const data = source('/custom-event').onClose(function stop({ connect }) {
connect()
console.log('reconnecting')
})
setTimeout(function run() {
data.close()
}, 5000)
</script>
{$data}
Json
You can parse incoming messages from the source as json using source::select::json
.
<script>
import { source } from 'sveltekit-sse'
const connection = source('/custom-event')
const json = connection.select('message').json(
function onJsonParseError({error, currentRawValue, previousParsedValue}){
console.error(`Could not parse "${currentRawValue}" as json.`, error)
return previousParsedValue // this will be the new value of the store
}
)
$: console.log({$json})
</svelte>
When a parsing error occurs, onJsonParseError
is invoked.
Whatever this function returns will become the new value of the store, in the example above previousParsedValue
, which is the previous (valid) value of the store.
Locking
More often than not for an SSE endpoint to be useful you usually need to keep the connection alive for a long periods of time.
The default behavior for any event is to close the connection immediately as soon as your event callback resolves.
For example
export function GET() {
return event(async function run(emit) {
emit('hello world')
}).toResponse()
}
This endpoint will emit the string hello world
and immediately close the connection.
This is not very useful.
Usually you need to consume data from your database or some other I/O resource over longer periods of time.
Regardless of the origin of your data, you most likely will need to keep the connection alive.
One way of doing that is using a Promise
export function GET() {
return event(async function run(emit) {
await new Promise(function run(stop){
// imagine `myIo` is some I/O data provider you need to consume
myIo.addEventListener("data", function run(data){
emit(data)
})
myIo.addEventListener("close", stop)
myIo.addEventListener("error", stop)
})
}).toResponse()
}
This looks fine to me personally, but some might prefer a more svelte-like
way of doing things, that is: using EventsOptions::locked
.
Both event
and events
take a second parameter, an EventsOptions
object with the following signature
/**
* @typedef EventsOptions
* @property {false|import("svelte/store").Writable<boolean>} locked
*/
When locked
is a valid writable store the event will automatically enter a locked state and will wait for the locked
store to be set to false
.
export function GET() {
const locked = writable(true)
return event(async function run(emit) {
myIo.addEventListener("data", function run(data){
emit(data)
})
myIo.addEventListener("close", function close(){
locked.set(false)
})
myIo.addEventListener("error", function error(){
locked.set(false)
})
},
{locked}
).toResponse()
}
Other notes
[!NOTE]
- Multiple sources connecting to the same path will use the same cached connection.
- When the readable store becomes inactive, meaning when the last subscriber unsubscribes from the store, the background connection is closed.
- (Then) When the first subscription is issued to the store, the store will attempt to connect (again) to the server.
- This note applies to single event sources, multiple events sources, transform and json modifiers.