Package Exports
- @vercel/queue
Readme
Vercel Queues
A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.
Features
- Generic Payload Support: Send and receive any type of data with type safety
- Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
- Streaming Support: Handle large payloads without loading them entirely into memory
- Pub/Sub Pattern: Topic-based messaging with consumer groups
- Type Safety: Full TypeScript support with generic types
- Automatic Retries: Built-in visibility timeout management
Installation
npm install @vercel/queueQuick Start
For local development, you'll need to pull your Vercel environment variables (including the OIDC token):
# Install Vercel CLI if you haven't already
npm i -g vercel
# Pull environment variables from your Vercel project
vc env pullPublishing and consuming messages on a queue
// index.ts
import { send, receive } from "@vercel/queue";
type Message = {
message: string;
timestamp: number;
};
// Option 1: Using the send and receive helpers (simplest)
// Automatically uses default client and JSON transport
await send<Message>("my-topic", {
message: "Hello, World!",
timestamp: Date.now(),
});
// Consume a single message off the queue
// (Often wrapped in a loop to keep polling messages off the queue)
await receive<Message>("my-topic", "my-consumer-group", (m) => {
console.log(m.message);
});
// Option 2: Using createTopic for more control
import { createTopic } from "@vercel/queue";
// Create a topic with JSON serialization (default)
// Uses default QueueClient automatically authenticated from Vercel environment
const topic = createTopic<Message>("my-topic");
// Publish a message
await topic.publish({
message: "Hello, World!",
timestamp: Date.now(),
});
// Create a consumer group
const consumer = topic.consumerGroup("my-consumer-group");
// Process next available message (one-shot processing)
try {
await consumer.consume(async (message, metadata) => {
console.log("Received:", message.message);
console.log("Timestamp:", new Date(message.timestamp));
console.log("Message Metadata", metadata);
// => { messageId, deliveryCount, timestamp }
});
} catch (error) {
console.error("Processing error:", error);
}Run the script
# Install dotenv-cli and ts-node if you need it
npm i -g dotenv-cli ts-node typescript
# Run the script with the OIDC token
dotenv -e .env.local ts-node index.tsUsage with Vercel
When deploying on Vercel, rather than having a persistent server subscribed to a queue, Vercel can trigger a callback route when a message is ready for consumption.
To demonstrate using queues on Vercel, let's use a Next.js app. You can use an existing app or create one using create-next-app.
TypeScript Configuration
Update your tsconfig.json to use "bundler" module resolution for proper
package export resolution:
{
"compilerOptions": {
"moduleResolution": "bundler"
// ... other options
}
}Publishing messages to a queue
Create a new server function to publish messages
// app/actions.ts
"use server";
import { send } from "@vercel/queue";
export async function publishTestMessage(message: string) {
// Option 1: Using simple send shorthand
const { messageId } = await send(
"my-topic",
{ message, timestamp: Date.now() },
{
// Provide a callback URL to invoke a consumer when the message is ready to be processed
callback: {
url: getCallbackUrl() // implementation below
},
},
);
console.log(`Published message ${messageId}`);
}
// Option 2: Customize the topic, transport, consumer groups, etc.
import { createTopic } from "@vercel/queue";
export async function publishTestMessage(message: string) {
// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>("my-topic");
// Publish the message
const { messageId } = await topic.publish(
{ message, timestamp: Date.now() },
{
// Provide multiple callback URLs to invoke multiple consumer groups
callback: {
{
"consumer-group-1": {
url: getCallbackUrl()
},
"consumer-group-2": {
url: getCallbackUrl()
delay: 5 // Delay callback by 5 seconds
},
}
},
},
);
console.log(`Published message ${messageId}`);
}
// Helper function to generate a local callback URL
function getCallbackUrl() {
const callbackUrl = new URL(
process.env.VERCEL_URL
? `https://${process.env.VERCEL_URL}/api/queue/handle`
: "http://localhost:3000/api/queue/handle"
);
// Add Vercel automation bypass secret if available (for preview deployments)
if (process.env.VERCEL_AUTOMATION_BYPASS_SECRET) {
callbackUrl.searchParams.set(
"x-vercel-protection-bypass",
process.env.VERCEL_AUTOMATION_BYPASS_SECRET
);
}
return callbackUrl.toString();
}
Now wire up the server function to your app
// app/some/page.tsx
"use client";
import { publishTestMessage } from "./actions";
export default function Page() {
return (
// ...
<button onClick={() => publishTestMessage("Hello world")}>
Publish Test Message
</button>
);
}Consuming the queue
Instead of running a persistent server that subscribes to the queue, we use the callback functionality of Vercel queues to consume messages on the fly, when a message is ready to be processed.
The handleCallback helper function simplifies queue callback handling in NextJS:
// app/api/queue/handle/route.ts
import { handleCallback } from "@vercel/queue";
// Option 1: Specify a single handler for the topic
export const POST = handleCallback({
"my-topic": (message, metadata) => {
console.log(`Received message:`, message, metadata);
// metadata: { messageId, deliveryCount, timestamp }
},
// .. more topic handlers can be provided here
});
// This consumes messages on the "default" consumer group, which is used when no consumer groups
// were specified in the publish `callback` earlierA
// Option 2: Multiple consumer groups
export const POST = handleCallback({
// topic: "my-topic"
"my-topic": {
// consumer group: "compress"
"consumer-group-1": (message, metadata) => {
console.log("Message:", message);
},
// consumer group: "resize"
"consume-group-2": (message, metadata) => {
console.log("Message", message);
},
},
});Key Features
Streaming Support
Handle large files and data streams without loading them into memory:
import { createTopic, StreamTransport } from "@vercel/queue";
const videoTopic = createTopic<ReadableStream<Uint8Array>>(
"video-processing",
new StreamTransport(),
);
// Process large video files efficiently
const processor = videoTopic.consumerGroup("processors");
await processor.consume(async (videoStream) => {
// Process stream chunk by chunk
const reader = videoStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
await processChunk(value);
}
});Consumer Groups
Multiple consumers can process messages from the same topic in parallel:
// Multiple workers in the same group - they share/split messages
const worker1 = topic.consumerGroup("workers");
const worker2 = topic.consumerGroup("workers"); // Same group name
// worker1 and worker2 will receive different messages (load balancing)
// Different consumer groups - each gets copies of ALL messages
const analytics = topic.consumerGroup("analytics");
const webhooks = topic.consumerGroup("webhooks");
// analytics and webhooks will both receive every messageArchitecture
- Topics: Named message channels with configurable serialization
- Consumer Groups: Named groups of consumers that process messages in
parallel
consume(): Process messages with flexible consumption patterns- No options: Process next available message
- With
messageId: Process specific message by ID - With
skipPayload: true: Process message metadata only (without payload)
- Transports: Pluggable serialization/deserialization for different data types
- Streaming: Memory-efficient processing of large payloads
- Visibility Timeouts: Automatic message lifecycle management
Performance
The multipart parser is optimized for high-throughput scenarios:
- Streaming: Messages are yielded immediately as headers are parsed
- Memory Efficient: No buffering of complete payloads
- Fast Parsing: Native Buffer operations for ~50% performance improvement
- Scalable: Can handle arbitrarily large responses without memory constraints
Serialization (Transport) System
The queue client supports customizable serialization through the Transport
interface with streaming support for memory-efficient processing. Transport
can be configured at the topic level when creating a topic, or at the
consumer group level when creating a consumer group.
Built-in Transports
JsonTransport (Default)
Buffers data for JSON parsing - suitable for structured data that fits in memory.
import { createTopic, JsonTransport } from "@vercel/queue";
const topic = createTopic<{ data: any }>("json-topic", new JsonTransport());
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>("json-topic");BufferTransport
Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.
import { BufferTransport, createTopic } from "@vercel/queue";
const topic = createTopic<Buffer>("binary-topic", new BufferTransport());
const binaryData = Buffer.from("Binary data", "utf8");
await topic.publish(binaryData);StreamTransport
True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.
import { createTopic, StreamTransport } from "@vercel/queue";
const topic = createTopic<ReadableStream<Uint8Array>>(
"streaming-topic",
new StreamTransport(),
);
// Send large file as stream without loading into memory
const fileStream = new ReadableStream<Uint8Array>({
start(controller) {
// Read file in chunks
for (const chunk of readFileInChunks("large-file.bin")) {
controller.enqueue(chunk);
}
controller.close();
},
});
await topic.publish(fileStream);Custom Transport
You can create your own serialization format by implementing the Transport
interface:
import { Transport } from "@vercel/queue";
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Choosing the Right Transport
| Use Case | Recommended Transport | Memory Usage | Performance |
|---|---|---|---|
| Small JSON objects | JsonTransport |
Low | High |
| Binary files < 100MB | BufferTransport |
Medium | High |
| Large files > 100MB | StreamTransport |
Very Low | Medium |
| Real-time data streams | StreamTransport |
Very Low | High |
| Custom protocols | Custom implementation | Varies | Varies |
API Reference
QueueClient
// Simple usage - automatically gets OIDC token from Vercel environment
const client = new QueueClient();
// Or with options
const client = new QueueClient({
token?: string; // Optional - will auto-detect if not provided
baseUrl?: string; // defaults to 'https://vqs.vercel.sh'
});Topic
// Simple usage with default client
const topic = createTopic<T>(topicName, transport?);
// For custom client configuration, use Topic constructor directly
const customClient = new QueueClient({ baseUrl: "https://custom.vqs.vercel.sh" });
const topic = new Topic<T>(customClient, topicName, transport?);
// Publish a message (uses topic's transport)
await topic.publish(payload, options?);
// Trigger a callback URL when the message is
// ready for consumption
await topic.publish(payload, {
callback: { url: "https://example.com/webhook" }
});
// Or provide multiple callbacks (each URL is called
// with a separate consumer group)
await topic.publish(payload, {
callback: {
group1: { url: "https://example.com/webhook1" },
group2: { url: "https://example.com/webhook2", delay: 30 }
}
});
// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);Send (Shorthand)
// Simple send - automatically uses default client and JSON transport
await send<T>(topicName, payload);
// Send with options including custom transport
await send<T>(topicName, payload, {
transport?: Transport<T>;
idempotencyKey?: string;
retentionSeconds?: number;
callback?: Record<string, CallbackConfig> | CallbackConfig;
});
// Examples:
await send("notifications", { userId: "123", message: "Welcome!" });
await send("images", imageBuffer, {
transport: new BufferTransport(),
callback: { url: "https://example.com/process-image" }
});
await send("events", eventData, {
idempotencyKey: "unique-key-123",
retentionSeconds: 3600,
callback: {
analytics: { url: "https://analytics.example.com/webhook" },
notifications: { url: "https://notifications.example.com/webhook", delay: 30 }
}
});ConsumerGroup
// Process next available message (simplest form)
await consumer.consume(handler);
// Process specific message by ID with payload
await consumer.consume(handler, { messageId: "message-id" });
// Process specific message by ID without payload (metadata only)
// handler will be called with `undefined` as the payload
await consumer.consume(handler, { messageId: "message-id", skipPayload: true });Message Handler
// Handler function signature
type MessageHandler<T = unknown> = (
message: T,
metadata: MessageMetadata,
) => Promise<MessageHandlerResult> | MessageHandlerResult;
// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;
interface MessageTimeoutResult {
timeoutSeconds: number; // seconds before message becomes available again
}
// Message Metadata
interface MessageMetadata {
messageId: string;
deliveryCount: number;
timestamp: string;
}ConsumeOptions Interface
interface ConsumeOptions {
messageId?: string; // Process specific message by ID
skipPayload?: boolean; // Skip payload download (requires messageId)
}Transport Interface
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Callback Utilities
// Parse queue callback request headers
function parseCallbackRequest(request: Request): CallbackMessageOptions;
// Callback options type
interface CallbackMessageOptions {
queueName: string;
consumerGroup: string;
messageId: string;
}
// Create a callback handler for NextJS route handlers
function handleCallback(handlers: CallbackHandlers): (request: Request) => Promise<Response>;
// Configuration object with handlers for different topics
type CallbackHandlers = {
[topicName: string]:
| MessageHandler // Single handler (uses 'default' consumer group)
| { [consumerGroup: string]: MessageHandler }; // Multiple consumer group handlers
};
// Example usage:
export const POST = handleCallback({
// Topic handler (uses 'default' consumer group)
"new-users": (message, metadata) => {
console.log(`New user event:`, message, metadata);
},
// Consumer group specific handlers
"image-processing": {
"compress": (message, metadata) => console.log("Compressing image", message),
"resize": (message, metadata) => console.log("Resizing image", message),
}
});
// Error thrown for invalid callback requests
class InvalidCallbackError extends Error;Examples
Basic JSON Processing
interface UserEvent {
userId: string;
action: string;
timestamp: number;
}
// Option 1: Using send shorthand
await send<UserEvent>("user-events", {
userId: "123",
action: "login",
timestamp: Date.now(),
});
// Option 2: Using createTopic for consumers
const userTopic = createTopic<UserEvent>("user-events");
await userTopic.publish({
userId: "123",
action: "login",
timestamp: Date.now(),
});
const consumer = userTopic.consumerGroup("processors");
// Process next available message
try {
await consumer.consume(async (message) => {
console.log(`User ${message.userId} performed ${message.action}`);
});
} catch (error) {
console.error("Processing error:", error);
}Processing Specific Messages by ID
const userTopic = createTopic<{ userId: string; action: string }>(
"user-events",
);
const consumer = userTopic.consumerGroup("processors");
// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";
try {
await consumer.consume(
async (message, { messageId }) => {
console.log(`Processing specific message: ${messageId}`);
console.log(`User ${message.userId} performed ${message.action}`);
},
{ messageId },
);
console.log("Message processed successfully");
} catch (error) {
if (error.message.includes("not found or not available")) {
console.log("Message was already processed or does not exist");
} else if (error.message.includes("FIFO ordering violation")) {
console.log("FIFO queue requires processing messages in order");
} else {
console.error("Error processing message:", error);
}
}Processing Next Available Message
const workTopic = createTopic<{ taskType: string; data: any }>("work-queue");
const worker = workTopic.consumerGroup("workers");
// Process the next available message (one-shot processing)
try {
await worker.consume(async (message) => {
console.log(`Processing task: ${message.taskType}`);
await processTask(message.taskType, message.data);
});
console.log("Message processed successfully");
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("No messages available");
} else if (error instanceof MessageLockedError) {
console.log("Next message is locked (FIFO queue)");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
} else {
console.error("Error processing message:", error);
}
}
// Handle conditional timeouts
await worker.consume(async (message) => {
if (!canProcessTaskType(message.taskType)) {
// Return timeout to retry later
return { timeoutSeconds: 60 };
}
await processTask(message.taskType, message.data);
});
// Process specific message metadata only (no payload download)
await worker.consume(
async (_, metadata) => {
console.log(`Message ID: ${metadata.messageId}`);
console.log(`Delivery count: ${metadata.deliveryCount}`);
console.log(`Timestamp: ${metadata.timestamp}`);
// _ is undefined - no payload was downloaded
},
{ messageId: "specific-message-id", skipPayload: true },
);Timing Out Messages
const workTopic = createTopic<{ taskType: string; data: any }>("work-queue");
const worker = workTopic.consumerGroup("workers");
// Process a message with conditional timeout
try {
await worker.consume(async ({ taskType, data }) => {
// Check if we can process this task type right now
if (taskType === "heavy-computation" && isSystemOverloaded()) {
// Return timeout to retry later (5 minutes)
return { timeoutSeconds: 300 };
}
// Check if we have required resources
if (taskType === "external-api" && !isExternalServiceAvailable()) {
// Return timeout to retry in 1 minute
return { timeoutSeconds: 60 };
}
// Process the message normally
console.log(`Processing ${taskType} task`);
await processTask(taskType, data);
// Message will be automatically deleted on successful completion
});
} catch (error) {
console.error("Worker processing error:", error);
}
// Example with exponential backoff
try {
await worker.consume(async (message, { deliveryCount }) => {
const maxRetries = 3;
try {
await processMessage(message);
// Successful processing - message will be deleted
} catch (error) {
if (deliveryCount < maxRetries) {
// Exponential backoff: 2^deliveryCount minutes
const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
console.log(
`Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
);
return { timeoutSeconds: timeoutSeconds };
} else {
// Max retries reached, let the message fail and be deleted
console.error("Max retries reached, message will be discarded:", error);
throw error;
}
}
});
} catch (error) {
console.error("Backoff processing error:", error);
}Complete Example: Video Processing Pipeline
Here's a comprehensive example showing a video processing pipeline that processes videos with FFmpeg and stores the results in Vercel Blob:
import { createTopic, StreamTransport } from "@vercel/queue";
import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static";
import { put } from "@vercel/blob";
// Input topic with unoptimized videos
const unoptimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
"unoptimized-videos",
new StreamTransport(),
);
// Output topic for optimized videos
const optimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
"optimized-videos",
new StreamTransport(),
);
// Step 1: Process videos with FFmpeg
const videoProcessor = unoptimizedVideosTopic.consumerGroup("processors");
try {
await videoProcessor.consume(async (inputVideoStream) => {
console.log("Processing video...");
if (!ffmpeg) {
throw new Error("FFmpeg not available");
}
// Create optimized video stream using FFmpeg
const optimizedStream = new ReadableStream<Uint8Array>({
start(controller) {
const ffmpegProcess = spawn(
ffmpeg,
[
"-i",
"pipe:0", // Input from stdin
"-c:v",
"libvpx-vp9", // Video codec
"-c:a",
"libopus", // Audio codec
"-crf",
"23", // Quality
"-f",
"webm", // Output format
"pipe:1", // Output to stdout
],
{ stdio: ["pipe", "pipe", "pipe"] },
);
// Pipe input stream to FFmpeg
const reader = inputVideoStream.getReader();
const pipeInput = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
ffmpegProcess.stdin?.end();
break;
}
ffmpegProcess.stdin?.write(value);
}
};
pipeInput();
// Stream FFmpeg output
ffmpegProcess.stdout?.on("data", (chunk) => {
controller.enqueue(new Uint8Array(chunk));
});
ffmpegProcess.on("close", (code) => {
if (code === 0) {
controller.close();
} else {
controller.error(new Error(`FFmpeg failed with code ${code}`));
}
});
},
});
// Publish optimized video to next topic
await optimizedVideosTopic.publish(optimizedStream);
console.log("Video optimized and published");
});
} catch (error) {
console.error("Video processing error:", error);
}
// Step 2: Store optimized videos in Vercel Blob
const blobUploader = optimizedVideosTopic.consumerGroup("blob-uploaders");
try {
await blobUploader.consume(async (optimizedVideo) => {
// Upload to Vercel Blob storage
const filename = `optimized-${Date.now()}.webm`;
const blob = await put(filename, optimizedVideo, {
access: "public",
contentType: "video/webm",
});
console.log(`Video uploaded to blob: ${blob.url} (${blob.size} bytes)`);
});
} catch (error) {
console.error("Blob upload error:", error);
}Error Handling
The queue client provides specific error types for different failure scenarios:
Error Types
QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)- Thrown by
consume()when no messages are available - Also thrown when directly using
client.receiveMessages()
- Thrown by
MessageLockedError: Thrown when a message is temporarily locked (423 status)- Contains optional
retryAfterproperty with seconds to wait before retry - For
consume()without options: the next message in FIFO sequence is locked - For
consume()with messageId: the requested message is locked
- Contains optional
MessageNotFoundError: Message doesn't exist (404 status)MessageNotAvailableError: Message exists but isn't available for processing (409 status)FifoOrderingViolationError: FIFO queue ordering violation (409 status with nextMessageId)- Contains
nextMessageIdproperty indicating which message to process first
- Contains
FailedDependencyError: FIFO ordering violation when receiving by ID (424 status)- Contains
nextMessageIdproperty indicating which message must be processed first - Similar to
FifoOrderingViolationErrorbut specifically for receive-by-ID operations
- Contains
MessageCorruptedError: Message data is corrupted or can't be parsedBadRequestError: Invalid request parameters (400 status)- Invalid queue names, FIFO limit violations, missing required parameters
UnauthorizedError: Authentication failure (401 status)- Missing or invalid authentication token
ForbiddenError: Access denied (403 status)- Queue environment doesn't match token environment
InternalServerError: Server-side errors (500+ status codes)- Unexpected server errors, service unavailable, etc.
Error Handling Examples
import {
BadRequestError,
FailedDependencyError,
FifoOrderingViolationError,
ForbiddenError,
InternalServerError,
MessageLockedError,
QueueEmptyError,
UnauthorizedError,
} from "@vercel/queue";
// Handle empty queue or locked messages
try {
for await (const message of client.receiveMessages(options, transport)) {
// Process messages
}
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("Queue is empty, retry later");
} else if (error instanceof MessageLockedError) {
console.log("Next message in FIFO queue is locked");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
}
}
// Handle locked message with retry
try {
await consumer.consume(handler, { messageId });
} catch (error) {
if (error instanceof MessageLockedError) {
console.log("Message is locked by another consumer");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
setTimeout(() => retry(), error.retryAfter * 1000);
}
} else if (error instanceof FailedDependencyError) {
// FIFO ordering violation for receive by ID
console.log(`Must process ${error.nextMessageId} first`);
}
}
// Handle authentication and authorization errors
try {
await topic.publish(payload);
} catch (error) {
if (error instanceof UnauthorizedError) {
console.log("Invalid token - refresh authentication");
} else if (error instanceof ForbiddenError) {
console.log("Environment mismatch - check token/queue configuration");
} else if (error instanceof BadRequestError) {
console.log("Invalid parameters:", error.message);
} else if (error instanceof InternalServerError) {
console.log("Server error - retry with backoff");
}
}
// Complete error handling pattern
function handleQueueError(error: unknown): void {
if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
// Transient errors - safe to retry
console.log("Temporary condition, will retry");
} else if (
error instanceof UnauthorizedError ||
error instanceof ForbiddenError
) {
// Authentication/authorization errors - need to fix configuration
console.log("Auth error - check credentials");
} else if (error instanceof BadRequestError) {
// Client error - fix the request
console.log("Invalid request:", error.message);
} else if (error instanceof InternalServerError) {
// Server error - implement exponential backoff
console.log("Server error - retry with backoff");
} else {
// Unknown error
console.error("Unexpected error:", error);
}
}License
MIT