Package Exports
- @vercel/queue
Readme
Vercel Queues
A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.
Features
- Generic Payload Support: Send and receive any type of data with type safety
- Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
- Streaming Support: Handle large payloads without loading them entirely into memory
- Pub/Sub Pattern: Topic-based messaging with consumer groups
- Type Safety: Full TypeScript support with generic types
- Automatic Retries: Built-in visibility timeout management
Installation
npm install @vercel/queueQuick Start
For local development, you'll need to pull your Vercel environment variables (including the OIDC token):
# Install Vercel CLI if you haven't already
npm i -g vercel
# Pull environment variables from your Vercel project
vc env pullPublishing and consuming messages on a queue
// index.ts
import { createTopic, JsonTransport, QueueClient } from "@vercel/queue";
// Create a client - automatically authenticated using the OIDC token
const client = await QueueClient.fromVercelFunction();
// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>(
client,
"my-topic",
);
// Publish a message
await topic.publish({
message: "Hello, World!",
timestamp: Date.now(),
});
// Create a consumer group
const consumer = topic.consumerGroup("my-processors");
// Process messages continuously with cancellation support
const controller = new AbortController();
// Start processing (blocks until aborted or error)
try {
await consumer.subscribe(controller.signal, async (message) => {
console.log("Received:", message.payload.message);
console.log("Timestamp:", new Date(message.payload.timestamp));
});
} catch (error) {
console.error("Processing stopped due to error:", error);
}
// Stop processing from elsewhere in your code
// controller.abort();Run the script
# Using dotenv to load the OIDC token
dotenv -e .env.local node index.tsUsage with Vercel
When deploying on Vercel, rather than having a persistent server subscribed to a queue, Vercel can trigger a callback route when a message is ready for consumption.
To demonstrate using queues on Vercel, let's use a Next.js app. You can use an existing app or create one using create-next-app.
TypeScript Configuration
Update your tsconfig.json to use "bundler" module resolution for proper
package export resolution:
{
"compilerOptions": {
"moduleResolution": "bundler"
// ... other options
}
}Publishing messages to a queue
Create a new server function to publish messages
// app/action.ts
"use server";
import { createTopic, QueueClient } from "@vercel/queue";
export async function publishTestMessage(message: string) {
// Initialize a queue client
const client = await QueueClient.fromVercelFunction();
// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>(
client,
"my-topic",
);
// Publish the message
const { messageId } = await topic.publish(
{ message, timestamp: Date.now() },
{
// Provide a callback URL to invoke a consumer when the message is ready to be processed
callback: {
url: process.env.VERCEL_PROJECT_PRODUCTION_URL
? `https://${process.env.VERCEL_PROJECT_PRODUCTION_URL}/api/queue/handle`
: "http://localhost:3000/api/queue/handle",
},
},
);
console.log(`Published message ${messageId}`);
}Now wire up the server function to your app
// app/some/page.tsx
"use client";
import { publishTestMessage } from "./actions";
export default function Button() {
return (
// ...
<Button onClick={() => publishTestMessage("Hello world")} >
Publish Test Message
</a>
);
}Consuming the queue
Instead of running a persistent server that subscribes to the queue, we use the callback functionality of Vercel queues to consume messages on the fly, when a message is ready to be processed.
The handleCallback helper function simplifies queue callback handling in NextJS:
// app/api/queue/handle/route.ts
import { handleCallback } from "@vercel/queue";
export const POST = handleCallback({
// Handle messages sent on the "new-users" topic (the consumer
// group "default" will be used)
"my-topic": (message, metadata) => {
console.log(`Received message:`, message, metadata);
// metadata: { messageId, deliveryCount, timestamp }
},
});
// Or, specify separate handlers for separate consumer groups
export const POST = handleCallback({
// topic: "my-topic"
"my-topic": {
// consumer group: "compress"
compress: (message, metadata) => {
console.log("Compressing image:", message);
},
// consumer group: "resize"
resize: (message, metadata) => {
console.log("Resizing image:", message);
},
// consumer group: "watermark"
watermark: (message, metadata) => {
console.log("Adding watermark:", message);
},
},
});Key Features
Streaming Support
Handle large files and data streams without loading them into memory:
import { StreamTransport } from "@vercel/queue";
const videoTopic = createTopic<ReadableStream<Uint8Array>>(
client,
"video-processing",
new StreamTransport(),
);
// Process large video files efficiently
const processor = videoTopic.consumerGroup("processors");
await processor.subscribe(signal, async (message) => {
const videoStream = message.payload;
// Process stream chunk by chunk
const reader = videoStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
await processChunk(value);
}
});Consumer Groups
Multiple consumers can process messages from the same topic in parallel:
// Multiple workers in the same group - they share/split messages
const worker1 = topic.consumerGroup("workers");
const worker2 = topic.consumerGroup("workers"); // Same group name
// worker1 and worker2 will receive different messages (load balancing)
// Different consumer groups - each gets copies of ALL messages
const analytics = topic.consumerGroup("analytics");
const webhooks = topic.consumerGroup("webhooks");
// analytics and webhooks will both receive every messageArchitecture
- Topics: Named message channels with configurable serialization
- Consumer Groups: Named groups of consumers that process messages in
parallel
subscribe(): Continuously process messages with automatic pollingreceiveMessage(): Process a specific message by IDreceiveNextMessage(): Process the next available message (one-shot)handleMessage(): Process message metadata only (without payload)
- Transports: Pluggable serialization/deserialization for different data types
- Streaming: Memory-efficient processing of large payloads
- Visibility Timeouts: Automatic message lifecycle management
Performance
The multipart parser is optimized for high-throughput scenarios:
- Streaming: Messages are yielded immediately as headers are parsed
- Memory Efficient: No buffering of complete payloads
- Fast Parsing: Native Buffer operations for ~50% performance improvement
- Scalable: Can handle arbitrarily large responses without memory constraints
Serialization (Transport) System
The queue client supports customizable serialization through the Transport
interface with streaming support for memory-efficient processing. Transport
can be configured at the topic level when creating a topic, or at the
consumer group level when creating a consumer group.
Built-in Transports
JsonTransport (Default)
Buffers data for JSON parsing - suitable for structured data that fits in memory.
import { createTopic, JsonTransport } from "@vercel/queue";
const topic = createTopic<{ data: any }>(
client,
"json-topic",
new JsonTransport(),
);
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>(client, "json-topic");BufferTransport
Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.
import { BufferTransport, createTopic } from "@vercel/queue";
const topic = createTopic<Buffer>(
client,
"binary-topic",
new BufferTransport(),
);
const binaryData = Buffer.from("Binary data", "utf8");
await topic.publish(binaryData);StreamTransport
True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.
import { createTopic, StreamTransport } from "@vercel/queue";
const topic = createTopic<ReadableStream<Uint8Array>>(
client,
"streaming-topic",
new StreamTransport(),
);
// Send large file as stream without loading into memory
const fileStream = new ReadableStream<Uint8Array>({
start(controller) {
// Read file in chunks
for (const chunk of readFileInChunks("large-file.bin")) {
controller.enqueue(chunk);
}
controller.close();
},
});
await topic.publish(fileStream);Custom Transport
You can create your own serialization format by implementing the Transport
interface:
import { Transport } from "@vercel/queue";
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Choosing the Right Transport
| Use Case | Recommended Transport | Memory Usage | Performance |
|---|---|---|---|
| Small JSON objects | JsonTransport |
Low | High |
| Binary files < 100MB | BufferTransport |
Medium | High |
| Large files > 100MB | StreamTransport |
Very Low | Medium |
| Real-time data streams | StreamTransport |
Very Low | High |
| Custom protocols | Custom implementation | Varies | Varies |
API Reference
QueueClient
const client = new QueueClient({
token: string;
baseUrl?: string; // defaults to 'https://vqs.vercel.sh'
});Topic
const topic = createTopic<T>(client, topicName, transport?);
// Publish a message (uses topic's transport)
await topic.publish(payload, options?);
// Trigger a callback URL when the message is
// ready for consumption
await topic.publish(payload, {
callback: { url: "https://example.com/webhook" }
});
// Or provide multiple callbacks (each URL is called
// with a separate consumer group)
await topic.publish(payload, {
callback: {
group1: { url: "https://example.com/webhook1" },
group2: { url: "https://example.com/webhook2", delay: 30 }
}
});
// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);ConsumerGroup
// Start continuous processing (blocks until signal is aborted or error occurs)
await consumer.subscribe(signal, handler, options?);
// Process a specific message by ID
await consumer.receiveMessage(messageId, handler);
// Process the next available message
await consumer.receiveNextMessage(handler);
// Handle a specific message by ID without payload
await consumer.handleMessage(messageId, handler);Message Handler
// Handler function signature
type MessageHandler<T> = (
message: Message<T>,
) => Promise<MessageHandlerResult> | MessageHandlerResult;
// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;
interface MessageTimeoutResult {
timeoutSeconds: number; // seconds before message becomes available again
}Transport Interface
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Callback Utilities
// Parse queue callback request headers
function parseCallbackRequest(request: Request): CallbackMessageOptions;
// Callback options type
interface CallbackMessageOptions {
queueName: string;
consumerGroup: string;
messageId: string;
}
// Create a callback handler for NextJS route handlers
function handleCallback(handlers: CallbackHandlers): (request: Request) => Promise<Response>;
// Handler function signature for callbacks
type Handler<T = unknown> = (
payload: T,
metadata: MessageMetadata
) => Promise<MessageHandlerResult> | MessageHandlerResult;
// Message metadata provided to handlers
interface MessageMetadata {
messageId: string;
deliveryCount: number;
timestamp: string;
}
// Configuration object with handlers for different topics
type CallbackHandlers = {
[topicName: string]:
| Handler // Single handler (uses 'default' consumer group)
| { [consumerGroup: string]: Handler }; // Multiple consumer group handlers
};
// Example usage:
export const POST = handleCallback({
// Topic handler (uses 'default' consumer group)
"new-users": (message, metadata) => {
console.log(`New user event:`, message, metadata);
},
// Consumer group specific handlers
"image-processing": {
"compress": (message, metadata) => console.log("Compressing image", message),
"resize": (message, metadata) => console.log("Resizing image", message),
}
});
// Error thrown for invalid callback requests
class InvalidCallbackError extends Error;Examples
Basic JSON Processing
interface UserEvent {
userId: string;
action: string;
timestamp: number;
}
const userTopic = createTopic<UserEvent>(client, "user-events");
await userTopic.publish({
userId: "123",
action: "login",
timestamp: Date.now(),
});
const consumer = userTopic.consumerGroup("processors");
const controller = new AbortController();
try {
await consumer.subscribe(controller.signal, async (message) => {
console.log(
`User ${message.payload.userId} performed ${message.payload.action}`,
);
});
} catch (error) {
console.error("Processing error:", error);
}
// Stop processing when needed
// controller.abort();Processing Specific Messages by ID
const userTopic = createTopic<{ userId: string; action: string }>(
client,
"user-events",
);
const consumer = userTopic.consumerGroup("processors");
// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";
try {
await consumer.receiveMessage(messageId, async (message) => {
console.log(`Processing specific message: ${message.messageId}`);
console.log(
`User ${message.payload.userId} performed ${message.payload.action}`,
);
});
console.log("Message processed successfully");
} catch (error) {
if (error.message.includes("not found or not available")) {
console.log("Message was already processed or does not exist");
} else if (error.message.includes("FIFO ordering violation")) {
console.log("FIFO queue requires processing messages in order");
} else {
console.error("Error processing message:", error);
}
}Processing Next Available Message
const workTopic = createTopic<{ taskType: string; data: any }>(
client,
"work-queue",
);
const worker = workTopic.consumerGroup("workers");
// Process the next available message (one-shot processing)
try {
await worker.receiveNextMessage(async (message) => {
console.log(`Processing task: ${message.payload.taskType}`);
await processTask(message.payload.taskType, message.payload.data);
});
console.log("Message processed successfully");
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("No messages available");
} else if (error instanceof MessageLockedError) {
console.log("Next message is locked (FIFO queue)");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
} else {
console.error("Error processing message:", error);
}
}
// You can also use it with timeout results
await worker.receiveNextMessage(async (message) => {
if (!canProcessTaskType(message.payload.taskType)) {
// Return timeout to retry later
return { timeoutSeconds: 60 };
}
await processTask(message.payload.taskType, message.payload.data);
});Timing Out Messages
const workTopic = createTopic<{ taskType: string; data: any }>(
client,
"work-queue",
);
const worker = workTopic.consumerGroup("workers");
const controller = new AbortController();
try {
await worker.subscribe(controller.signal, async (message) => {
const { taskType, data } = message.payload;
// Check if we can process this task type right now
if (taskType === "heavy-computation" && isSystemOverloaded()) {
// Return timeout to retry later (5 minutes)
return { timeoutSeconds: 300 };
}
// Check if we have required resources
if (taskType === "external-api" && !isExternalServiceAvailable()) {
// Return timeout to retry in 1 minute
return { timeoutSeconds: 60 };
}
// Process the message normally
console.log(`Processing ${taskType} task`);
await processTask(taskType, data);
// Message will be automatically deleted on successful completion
});
} catch (error) {
console.error("Worker processing error:", error);
}
// Example with exponential backoff
const backoffController = new AbortController();
try {
await worker.subscribe(backoffController.signal, async (message) => {
const maxRetries = 3;
const deliveryCount = message.deliveryCount;
try {
await processMessage(message.payload);
// Successful processing - message will be deleted
} catch (error) {
if (deliveryCount < maxRetries) {
// Exponential backoff: 2^deliveryCount minutes
const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
console.log(
`Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
);
return { timeoutSeconds: timeoutSeconds };
} else {
// Max retries reached, let the message fail and be deleted
console.error("Max retries reached, message will be discarded:", error);
throw error;
}
}
});
} catch (error) {
console.error("Backoff processing error:", error);
}Complete Example: Video Processing Pipeline
Here's a comprehensive example showing a video processing pipeline that processes videos with FFmpeg and stores the results in Vercel Blob:
import { createTopic, QueueClient, StreamTransport } from "@vercel/queue";
import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static";
import { put } from "@vercel/blob";
const client = new QueueClient({
token: "your-vercel-oidc-token",
});
// Input topic with unoptimized videos
const unoptimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
client,
"unoptimized-videos",
new StreamTransport(),
);
// Output topic for optimized videos
const optimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
client,
"optimized-videos",
new StreamTransport(),
);
// Step 1: Process videos with FFmpeg
const videoProcessor = unoptimizedVideosTopic.consumerGroup("processors");
const processingController = new AbortController();
try {
await videoProcessor.subscribe(
processingController.signal,
async (message) => {
const inputVideoStream = message.payload;
console.log("Processing video...");
if (!ffmpeg) {
throw new Error("FFmpeg not available");
}
// Create optimized video stream using FFmpeg
const optimizedStream = new ReadableStream<Uint8Array>({
start(controller) {
const ffmpegProcess = spawn(
ffmpeg,
[
"-i",
"pipe:0", // Input from stdin
"-c:v",
"libvpx-vp9", // Video codec
"-c:a",
"libopus", // Audio codec
"-crf",
"23", // Quality
"-f",
"webm", // Output format
"pipe:1", // Output to stdout
],
{ stdio: ["pipe", "pipe", "pipe"] },
);
// Pipe input stream to FFmpeg
const reader = inputVideoStream.getReader();
const pipeInput = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
ffmpegProcess.stdin?.end();
break;
}
ffmpegProcess.stdin?.write(value);
}
};
pipeInput();
// Stream FFmpeg output
ffmpegProcess.stdout?.on("data", (chunk) => {
controller.enqueue(new Uint8Array(chunk));
});
ffmpegProcess.on("close", (code) => {
if (code === 0) {
controller.close();
} else {
controller.error(new Error(`FFmpeg failed with code ${code}`));
}
});
},
});
// Publish optimized video to next topic
await optimizedVideosTopic.publish(optimizedStream);
console.log("Video optimized and published");
},
);
} catch (error) {
console.error("Video processing error:", error);
}
// Step 2: Store optimized videos in Vercel Blob
const blobUploader = optimizedVideosTopic.consumerGroup("blob-uploaders");
const uploadController = new AbortController();
try {
await blobUploader.subscribe(uploadController.signal, async (message) => {
const optimizedVideo = message.payload;
// Upload to Vercel Blob storage
const filename = `optimized-${Date.now()}.webm`;
const blob = await put(filename, optimizedVideo, {
access: "public",
contentType: "video/webm",
});
console.log(`Video uploaded to blob: ${blob.url} (${blob.size} bytes)`);
});
} catch (error) {
console.error("Blob upload error:", error);
}
// Graceful shutdown
process.on("SIGINT", () => {
processingController.abort();
uploadController.abort();
});Error Handling
The queue client provides specific error types for different failure scenarios:
Error Types
QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)- Only thrown when directly using
client.receiveMessages() ConsumerGroup.subscribe()handles this error internally and continues polling
- Only thrown when directly using
MessageLockedError: Thrown when a message is temporarily locked (423 status)- Contains optional
retryAfterproperty with seconds to wait before retry - For
receiveMessages()on FIFO queues: the next message in sequence is locked - For
receiveMessageById(): the requested message is locked ConsumerGroup.subscribe()handles this error internally when polling
- Contains optional
MessageNotFoundError: Message doesn't exist (404 status)MessageNotAvailableError: Message exists but isn't available for processing (409 status)FifoOrderingViolationError: FIFO queue ordering violation (409 status with nextMessageId)- Contains
nextMessageIdproperty indicating which message to process first
- Contains
FailedDependencyError: FIFO ordering violation when receiving by ID (424 status)- Contains
nextMessageIdproperty indicating which message must be processed first - Similar to
FifoOrderingViolationErrorbut specifically for receive-by-ID operations
- Contains
MessageCorruptedError: Message data is corrupted or can't be parsedBadRequestError: Invalid request parameters (400 status)- Invalid queue names, FIFO limit violations, missing required parameters
UnauthorizedError: Authentication failure (401 status)- Missing or invalid authentication token
ForbiddenError: Access denied (403 status)- Queue environment doesn't match token environment
InternalServerError: Server-side errors (500+ status codes)- Unexpected server errors, service unavailable, etc.
Error Handling Examples
import {
BadRequestError,
FailedDependencyError,
FifoOrderingViolationError,
ForbiddenError,
InternalServerError,
MessageLockedError,
QueueEmptyError,
UnauthorizedError,
} from "@vercel/queue";
// Handle empty queue or locked messages
try {
for await (const message of client.receiveMessages(options, transport)) {
// Process messages
}
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("Queue is empty, retry later");
} else if (error instanceof MessageLockedError) {
console.log("Next message in FIFO queue is locked");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
}
}
// Handle locked message with retry
try {
await consumer.receiveMessage(messageId, handler);
} catch (error) {
if (error instanceof MessageLockedError) {
console.log("Message is locked by another consumer");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
setTimeout(() => retry(), error.retryAfter * 1000);
}
} else if (error instanceof FailedDependencyError) {
// FIFO ordering violation for receive by ID
console.log(`Must process ${error.nextMessageId} first`);
}
}
// Handle authentication and authorization errors
try {
await topic.publish(payload);
} catch (error) {
if (error instanceof UnauthorizedError) {
console.log("Invalid token - refresh authentication");
} else if (error instanceof ForbiddenError) {
console.log("Environment mismatch - check token/queue configuration");
} else if (error instanceof BadRequestError) {
console.log("Invalid parameters:", error.message);
} else if (error instanceof InternalServerError) {
console.log("Server error - retry with backoff");
}
}
// Complete error handling pattern
function handleQueueError(error: unknown): void {
if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
// Transient errors - safe to retry
console.log("Temporary condition, will retry");
} else if (
error instanceof UnauthorizedError ||
error instanceof ForbiddenError
) {
// Authentication/authorization errors - need to fix configuration
console.log("Auth error - check credentials");
} else if (error instanceof BadRequestError) {
// Client error - fix the request
console.log("Invalid request:", error.message);
} else if (error instanceof InternalServerError) {
// Server error - implement exponential backoff
console.log("Server error - retry with backoff");
} else {
// Unknown error
console.error("Unexpected error:", error);
}
}License
MIT