Package Exports
- @ahoo-wang/fetcher-eventstream
Readme
@ahoo-wang/fetcher-eventstream
Power your real-time applications with Server-Sent Events support, specially designed for Large Language Model streaming APIs.
๐ Features
- ๐ก Event Stream Conversion: Converts
text/event-streamresponses to async generators ofServerSentEventobjects - ๐ Interceptor Integration: Automatically adds
eventStream()andjsonEventStream()methods to responses withtext/event-streamcontent type - ๐ SSE Parsing: Parses Server-Sent Events according to the specification, including data, event, id, and retry fields
- ๐ Streaming Support: Handles chunked data and multi-line events correctly
- ๐ฌ Comment Handling: Properly ignores comment lines (lines starting with
:) as per SSE specification - ๐ก๏ธ TypeScript Support: Complete TypeScript type definitions
- โก Performance Optimized: Efficient parsing and streaming for high-performance applications
- ๐ค LLM Streaming Ready: Native support for streaming responses from popular LLM APIs like OpenAI GPT, Claude, etc.
๐ Quick Start
Installation
# Using npm
npm install @ahoo-wang/fetcher-eventstream
# Using pnpm
pnpm add @ahoo-wang/fetcher-eventstream
# Using yarn
yarn add @ahoo-wang/fetcher-eventstreamModule Import
To use the event stream functionality, you need to import the module for its side effects:
import '@ahoo-wang/fetcher-eventstream';This import automatically extends the Response interface with methods for handling Server-Sent Events streams:
eventStream()- Converts a Response withtext/event-streamcontent type to aServerSentEventStreamjsonEventStream<DATA>()- Converts a Response withtext/event-streamcontent type to aJsonServerSentEventStream<DATA>isEventStreamgetter - Checks if the Response has atext/event-streamcontent typerequiredEventStream()- Gets aServerSentEventStream, throwing an error if not availablerequiredJsonEventStream<DATA>()- Gets aJsonServerSentEventStream<DATA>, throwing an error if not available
This is a common pattern in JavaScript/TypeScript for extending existing types with additional functionality without modifying the original type definitions.
Integration Test Example: LLM Client with Event Stream
The following example shows how to create an LLM client with event stream support, similar to the integration test in the Fetcher project. You can find the complete implementation in integration-test/src/eventstream/llmClient.ts.
This example demonstrates how to interact with popular LLM APIs like OpenAI's GPT models using Fetcher's streaming capabilities.
import {
BaseURLCapable,
ContentTypeValues,
FetchExchange,
NamedFetcher,
REQUEST_BODY_INTERCEPTOR_ORDER,
RequestInterceptor,
} from '@ahoo-wang/fetcher';
import {
api,
autoGeneratedError,
body,
post,
ResultExtractors,
} from '@ahoo-wang/fetcher-decorator';
import '@ahoo-wang/fetcher-eventstream';
import { JsonServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
import { ChatRequest, ChatResponse } from './types';
export const llmFetcherName = 'llm';
export interface LlmOptions extends BaseURLCapable {
apiKey: string;
model?: string;
}
export class LlmRequestInterceptor implements RequestInterceptor {
readonly name: string = 'LlmRequestInterceptor';
readonly order: number = REQUEST_BODY_INTERCEPTOR_ORDER - 1;
constructor(private llmOptions: LlmOptions) {}
intercept(exchange: FetchExchange): void {
const chatRequest = exchange.request.body as ChatRequest;
if (!chatRequest.model) {
chatRequest.model = this.llmOptions.model;
}
}
}
export function createLlmFetcher(options: LlmOptions): NamedFetcher {
const llmFetcher = new NamedFetcher(llmFetcherName, {
baseURL: options.baseURL,
headers: {
Authorization: `Bearer ${options.apiKey}`,
'Content-Type': ContentTypeValues.APPLICATION_JSON,
},
});
llmFetcher.interceptors.request.use(new LlmRequestInterceptor(options));
return llmFetcher;
}
@api('/chat', {
fetcher: llmFetcherName,
resultExtractor: ResultExtractors.JsonEventStream,
})
export class LlmClient {
@post('/completions')
streamChat(
@body() body: ChatRequest,
): Promise<JsonServerSentEventStream<ChatResponse>> {
throw autoGeneratedError(body);
}
@post('/completions', { resultExtractor: ResultExtractors.Json })
chat(@body() body: ChatRequest): Promise<ChatResponse> {
throw autoGeneratedError(body);
}
}Using streamChat for Real-time Responses
Here's how to use the streamChat method to get real-time responses from an LLM API:
import { createLlmFetcher, LlmClient } from './llmClient';
// Initialize the LLM client with your API configuration
const llmFetcher = createLlmFetcher({
baseURL: 'https://api.openai.com/v1', // Example for OpenAI
apiKey: process.env.OPENAI_API_KEY || 'your-api-key',
model: 'gpt-3.5-turbo', // Default model
});
// Create the client instance
const llmClient = new LlmClient();
// Example: Stream a chat completion response in real-time
async function streamChatExample() {
try {
// Stream the response token by token
const stream = await llmClient.streamChat({
messages: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'Explain quantum computing in simple terms.' },
],
model: 'gpt-3.5-turbo', // Override default model if needed
stream: true, // Enable streaming
});
// Process the streamed response
let fullResponse = '';
for await (const event of stream) {
// Each event contains a partial response
if (event.data) {
const chunk = event.data;
const content = chunk.choices[0]?.delta?.content || '';
fullResponse += content;
console.log('New token:', content);
// Update UI in real-time as tokens arrive
updateUI(content);
}
}
console.log('Full response:', fullResponse);
} catch (error) {
console.error('Error streaming chat:', error);
}
}
// Helper function to simulate UI updates
function updateUI(content: string) {
// In a real application, this would update your UI
process.stdout.write(content);
}Manual Conversion
import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
// Convert a Response object manually
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);
// Read events from the stream
const reader = eventStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log('Received event:', value);
}
} finally {
reader.releaseLock();
}Basic Usage
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// In responses with text/event-stream content type,
// Response objects will automatically have eventStream() and jsonEventStream() methods
const response = await fetcher.get('/events');
for await (const event of response.requiredEventStream()) {
console.log('Received event:', event);
}
// Using jsonEventStream for JSON data
const jsonResponse = await fetcher.get('/json-events');
for await (const event of response.requiredJsonEventStream<MyDataType>()) {
console.log('Received JSON event:', event.data);
}Manual Conversion
import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
// Convert a Response object manually
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);
// Read events from the stream
const reader = eventStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log('Received event:', value);
}
} finally {
reader.releaseLock();
}๐ API Reference
Module Import
To use the event stream functionality, you need to import the module for its side effects:
import '@ahoo-wang/fetcher-eventstream';This import automatically extends the global Response interface with methods for handling Server-Sent Events streams:
eventStream()- Converts a Response withtext/event-streamcontent type to aServerSentEventStreamjsonEventStream<DATA>()- Converts a Response withtext/event-streamcontent type to aJsonServerSentEventStream<DATA>isEventStreamgetter - Checks if the Response has atext/event-streamcontent typerequiredEventStream()- Gets aServerSentEventStream, throwing an error if not availablerequiredJsonEventStream<DATA>()- Gets aJsonServerSentEventStream<DATA>, throwing an error if not available
This is a common pattern in JavaScript/TypeScript for extending existing types with additional functionality without modifying the original type definitions.
In integration tests and real applications, this import is essential for working with event streams. For example:
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// Response objects will automatically have eventStream() and jsonEventStream() methods
const response = await fetcher.get('/events');
// Handle event stream
for await (const event of response.requiredEventStream()) {
console.log('Received event:', event);
}toJsonServerSentEventStream
Converts a ServerSentEventStream to a JsonServerSentEventStream<DATA> for handling Server-Sent Events with JSON
data.
Signature
function toJsonServerSentEventStream<DATA>(
serverSentEventStream: ServerSentEventStream,
): JsonServerSentEventStream<DATA>;Parameters
serverSentEventStream: TheServerSentEventStreamto convert
Returns
JsonServerSentEventStream<DATA>: A readable stream ofJsonServerSentEvent<DATA>objects
JsonServerSentEvent
Interface defining the structure of a Server-Sent Event with JSON data.
interface JsonServerSentEvent<DATA> extends Omit<ServerSentEvent, 'data'> {
data: DATA; // The event data parsed as JSON
}JsonServerSentEventStream
Type alias for a readable stream of JsonServerSentEvent<DATA> objects.
type JsonServerSentEventStream<DATA> = ReadableStream<
JsonServerSentEvent<DATA>
>;toServerSentEventStream
Converts a Response object with a text/event-stream body to a ServerSentEventStream.
Signature
function toServerSentEventStream(response: Response): ServerSentEventStream;Parameters
response: An HTTP response withtext/event-streamcontent type
Returns
ServerSentEventStream: A readable stream ofServerSentEventobjects
ServerSentEvent
Interface defining the structure of a Server-Sent Event.
interface ServerSentEvent {
data: string; // The event data (required)
event?: string; // The event type (optional, defaults to 'message')
id?: string; // The event ID (optional)
retry?: number; // The reconnection time in milliseconds (optional)
}ServerSentEventStream
Type alias for a readable stream of ServerSentEvent objects.
type ServerSentEventStream = ReadableStream<ServerSentEvent>;๐ ๏ธ Examples
Real-time Notifications
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// Listen for real-time notifications
const response = await fetcher.get('/notifications');
for await (const event of response.requiredEventStream()) {
switch (event.event) {
case 'message':
showNotification('Message', event.data);
break;
case 'alert':
showAlert('Alert', event.data);
break;
case 'update':
handleUpdate(JSON.parse(event.data));
break;
default:
console.log('Unknown event:', event);
}
}Progress Updates
import { Fetcher } from '@ahoo-wang/fetcher';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// Track long-running task progress
const response = await fetcher.get('/tasks/123/progress');
for await (const event of response.requiredEventStream()) {
if (event.event === 'progress') {
const progress = JSON.parse(event.data);
updateProgressBar(progress.percentage);
} else if (event.event === 'complete') {
showCompletionMessage(event.data);
break;
}
}Chat Application
import { Fetcher } from '@ahoo-wang/fetcher';
const fetcher = new Fetcher({
baseURL: 'https://chat-api.example.com',
});
// Real-time chat messages
const response = await fetcher.get('/rooms/123/messages');
for await (const event of response.requiredEventStream()) {
if (event.event === 'message') {
const message = JSON.parse(event.data);
displayMessage(message);
} else if (event.event === 'user-joined') {
showUserJoined(event.data);
} else if (event.event === 'user-left') {
showUserLeft(event.data);
}
}๐งช Testing
# Run tests
pnpm test
# Run tests with coverage
pnpm test --coverageThe test suite includes:
- Event stream conversion tests
- Interceptor functionality tests
- Edge case handling (malformed events, chunked data, etc.)
- Performance tests for large event streams
๐ Server-Sent Events Specification Compliance
This package fully implements the Server-Sent Events specification:
- Data field: Supports multi-line data fields
- Event field: Custom event types
- ID field: Last event ID tracking
- Retry field: Automatic reconnection timeout
- Comment lines: Lines starting with
:are ignored - Event dispatching: Proper event dispatching with default event type 'message'
๐ค Contributing
Contributions are welcome! Please see the contributing guide for more details.
๐ License
This project is licensed under the Apache-2.0 License.
Part of the Fetcher ecosystem