Package Exports
- @vercel/queue
Readme
Vercel Queues
A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.
Features
- Generic Payload Support: Send and receive any type of data with type safety
- Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
- Streaming Support: Handle large payloads without loading them entirely into memory
- Pub/Sub Pattern: Topic-based messaging with consumer groups
- Type Safety: Full TypeScript support with generic types
- Automatic Retries: Built-in visibility timeout management
Installation
npm install @vercel/queueQuick Start
For local development, you'll need to pull your Vercel environment variables (including the OIDC token):
# Install Vercel CLI if you haven't already
npm i -g vercel
# Pull environment variables from your Vercel project
vc env pullPublishing and consuming messages on a queue
// index.ts
import { QueueClient, createTopic, JsonTransport } from "@vercel/queue";
// Create a client - automatically authenticated using the OIDC token
const client = QueueClient.fromVercelFunction();
// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>(
client,
"my-topic",
);
// Publish a message
await topic.publish({
message: "Hello, World!",
timestamp: Date.now(),
});
// Create a consumer group
const consumer = topic.consumerGroup("my-processors");
// Process messages continuously with cancellation support
const controller = new AbortController();
// Start processing (blocks until aborted or error)
try {
await consumer.subscribe(controller.signal, async (message) => {
console.log("Received:", message.payload.message);
console.log("Timestamp:", new Date(message.payload.timestamp));
});
} catch (error) {
console.error("Processing stopped due to error:", error);
}
// Stop processing from elsewhere in your code
// controller.abort();Run the script
# Using dotenv to load the OIDC token
dotenv -e .env.local node index.tsUsage with Vercel
When deploying on Vercel, rather than having a persistent server subscribed to a queue, Vercel can trigger a callback route when a message is ready for consumption.
To demonstrate using queues on Vercel, let's use a Next.js app. You can use an existing app or create one using create-next-app.
TypeScript Configuration
Update your tsconfig.json to use "bundler" module resolution for proper package export resolution:
{
"compilerOptions": {
"moduleResolution": "bundler"
// ... other options
}
}Publishing messages to a queue
Create a new server function to publish messages
// app/action.ts
"use server";
import { QueueClient, createTopic } from "@vercel/queue";
export async function publishTestMessage(message: string) {
// Initialize a queue client
const client = await QueueClient.fromVercelFunction();
// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>(
client,
"my-topic",
);
// Publish the message
const { messageId } = await topic.publish(
{ message, timestamp: Date.now() },
{
// Provide a callback URL to invoke a consumer when the message is ready to be processed
callbacks: {
webhook: {
url: process.env.VERCEL_PROJECT_PRODUCTION_URL
? `https://${process.env.VERCEL_PROJECT_PRODUCTION_URL}/api/queue/handle`
: "http://localhost:3000/api/queue/handle",
},
},
},
);
console.log(`Published message ${messageId}`);
}Now wire up the server function to your app
// app/some/page.tsx
"use client";
import { publishTestMessage } from "./actions";
export default function Button() {
return (
// ...
<Button onClick={() => publishTestMessage("Hello world")} >
Publish Test Message
</a>
);
}Consuming the queue
Instead of running a persistent server that subscribes to the queue, we use the callback functionality of Vercel queues to consume messages on the fly, when a message is ready to be processed.
// app/api/queue/handle/route.ts
import { QueueClient, Topic, parseCallbackRequest } from "@vercel/queue";
import { NextRequest } from "next/server";
// Handle Vercel Queue callback requests
export async function POST(request: NextRequest) {
try {
// Parse the queue callback information
const { queueName, consumerGroup, messageId } =
parseCallbackRequest(request);
// Create client
const client = await QueueClient.fromVercelFunction();
// Create topic and consumer group from the callback info
const topic = new Topic(client, queueName);
const cg = topic.consumerGroup(consumerGroup);
// Process the message
await cg.receiveMessage(messageId, async (message) => {
const payload = message.payload as { message: string; timestamp: number };
console.log(
`Received message "${payload.message}" (Sent at: ${payload.timestamp})`,
);
});
return Response.json({ status: "success" });
} catch (error) {
console.error("Webhook error:", error);
return Response.json(
{ error: "Failed to process webhook" },
{ status: 500 },
);
}
}![NOTE] A single webhook handle can be used to process messages across various queues and consumer groups. Use the values of
queueNameandconsumerGroupfromparseCallbackRequest()to dynamically handle different code paths:if (queueName === "upload-queue") { processImageQueue(consumerGroup, message); } // ... function processImageQueue(consumerGroup, message) { if (consumerGroup === "compress") { // handle image compression } // ... } // ...We are building an SDK to make queues and workflow easier to use. Reach out if you're interested.
Key Features
Streaming Support
Handle large files and data streams without loading them into memory:
import { StreamTransport } from "@vercel/queue";
const videoTopic = createTopic<ReadableStream<Uint8Array>>(
client,
"video-processing",
new StreamTransport(),
);
// Process large video files efficiently
const processor = videoTopic.consumerGroup("processors");
await processor.subscribe(signal, async (message) => {
const videoStream = message.payload;
// Process stream chunk by chunk
const reader = videoStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
await processChunk(value);
}
});Consumer Groups
Multiple consumers can process messages from the same topic in parallel:
// Multiple workers in the same group - they share/split messages
const worker1 = topic.consumerGroup("workers");
const worker2 = topic.consumerGroup("workers"); // Same group name
// worker1 and worker2 will receive different messages (load balancing)
// Different consumer groups - each gets copies of ALL messages
const analytics = topic.consumerGroup("analytics");
const webhooks = topic.consumerGroup("webhooks");
// analytics and webhooks will both receive every messageArchitecture
- Topics: Named message channels with configurable serialization
- Consumer Groups: Named groups of consumers that process messages in parallel
subscribe(): Continuously process messages with automatic pollingreceiveMessage(): Process a specific message by IDreceiveNextMessage(): Process the next available message (one-shot)handleMessage(): Process message metadata only (without payload)
- Transports: Pluggable serialization/deserialization for different data types
- Streaming: Memory-efficient processing of large payloads
- Visibility Timeouts: Automatic message lifecycle management
Performance
The multipart parser is optimized for high-throughput scenarios:
- Streaming: Messages are yielded immediately as headers are parsed
- Memory Efficient: No buffering of complete payloads
- Fast Parsing: Native Buffer operations for ~50% performance improvement
- Scalable: Can handle arbitrarily large responses without memory constraints
Serialization (Transport) System
The queue client supports customizable serialization through the Transport interface with streaming support for memory-efficient processing. Transport can be configured at the topic level when creating a topic, or at the consumer group level when creating a consumer group.
Built-in Transports
JsonTransport (Default)
Buffers data for JSON parsing - suitable for structured data that fits in memory.
import { JsonTransport, createTopic } from "@vercel/queue";
const topic = createTopic<{ data: any }>(
client,
"json-topic",
new JsonTransport(),
);
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>(client, "json-topic");BufferTransport
Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.
import { BufferTransport, createTopic } from "@vercel/queue";
const topic = createTopic<Buffer>(
client,
"binary-topic",
new BufferTransport(),
);
const binaryData = Buffer.from("Binary data", "utf8");
await topic.publish(binaryData);StreamTransport
True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.
import { StreamTransport, createTopic } from "@vercel/queue";
const topic = createTopic<ReadableStream<Uint8Array>>(
client,
"streaming-topic",
new StreamTransport(),
);
// Send large file as stream without loading into memory
const fileStream = new ReadableStream<Uint8Array>({
start(controller) {
// Read file in chunks
for (const chunk of readFileInChunks("large-file.bin")) {
controller.enqueue(chunk);
}
controller.close();
},
});
await topic.publish(fileStream);Custom Transport
You can create your own serialization format by implementing the Transport interface:
import { Transport } from "@vercel/queue";
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Choosing the Right Transport
| Use Case | Recommended Transport | Memory Usage | Performance |
|---|---|---|---|
| Small JSON objects | JsonTransport |
Low | High |
| Binary files < 100MB | BufferTransport |
Medium | High |
| Large files > 100MB | StreamTransport |
Very Low | Medium |
| Real-time data streams | StreamTransport |
Very Low | High |
| Custom protocols | Custom implementation | Varies | Varies |
API Reference
QueueClient
const client = new QueueClient({
token: string;
baseUrl?: string; // defaults to 'https://vqs.vercel.sh'
});Topic
const topic = createTopic<T>(client, topicName, transport?);
// Publish a message (uses topic's transport)
await topic.publish(payload, options?);
// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);ConsumerGroup
// Start continuous processing (blocks until signal is aborted or error occurs)
await consumer.subscribe(signal, handler, options?);
// Process a specific message by ID
await consumer.receiveMessage(messageId, handler);
// Process the next available message
await consumer.receiveNextMessage(handler);
// Handle a specific message by ID without payload
await consumer.handleMessage(messageId, handler);Message Handler
// Handler function signature
type MessageHandler<T> = (
message: Message<T>,
) => Promise<MessageHandlerResult> | MessageHandlerResult;
// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;
interface MessageTimeoutResult {
timeoutSeconds: number; // seconds before message becomes available again
}Transport Interface
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Callback Utilities
// Parse queue callback request headers
function parseCallbackRequest(request: Request): CallbackMessageOptions;
// Callback options type
interface CallbackMessageOptions {
queueName: string;
consumerGroup: string;
messageId: string;
}
// Error thrown for invalid callback requests
class InvalidCallbackError extends Error;Examples
Basic JSON Processing
interface UserEvent {
userId: string;
action: string;
timestamp: number;
}
const userTopic = createTopic<UserEvent>(client, "user-events");
await userTopic.publish({
userId: "123",
action: "login",
timestamp: Date.now(),
});
const consumer = userTopic.consumerGroup("processors");
const controller = new AbortController();
try {
await consumer.subscribe(controller.signal, async (message) => {
console.log(
`User ${message.payload.userId} performed ${message.payload.action}`,
);
});
} catch (error) {
console.error("Processing error:", error);
}
// Stop processing when needed
// controller.abort();Processing Specific Messages by ID
const userTopic = createTopic<{ userId: string; action: string }>(
client,
"user-events",
);
const consumer = userTopic.consumerGroup("processors");
// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";
try {
await consumer.receiveMessage(messageId, async (message) => {
console.log(`Processing specific message: ${message.messageId}`);
console.log(
`User ${message.payload.userId} performed ${message.payload.action}`,
);
});
console.log("Message processed successfully");
} catch (error) {
if (error.message.includes("not found or not available")) {
console.log("Message was already processed or does not exist");
} else if (error.message.includes("FIFO ordering violation")) {
console.log("FIFO queue requires processing messages in order");
} else {
console.error("Error processing message:", error);
}
}Processing Next Available Message
const workTopic = createTopic<{ taskType: string; data: any }>(
client,
"work-queue",
);
const worker = workTopic.consumerGroup("workers");
// Process the next available message (one-shot processing)
try {
await worker.receiveNextMessage(async (message) => {
console.log(`Processing task: ${message.payload.taskType}`);
await processTask(message.payload.taskType, message.payload.data);
});
console.log("Message processed successfully");
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("No messages available");
} else if (error instanceof MessageLockedError) {
console.log("Next message is locked (FIFO queue)");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
} else {
console.error("Error processing message:", error);
}
}
// You can also use it with timeout results
await worker.receiveNextMessage(async (message) => {
if (!canProcessTaskType(message.payload.taskType)) {
// Return timeout to retry later
return { timeoutSeconds: 60 };
}
await processTask(message.payload.taskType, message.payload.data);
});Timing Out Messages
const workTopic = createTopic<{ taskType: string; data: any }>(
client,
"work-queue",
);
const worker = workTopic.consumerGroup("workers");
const controller = new AbortController();
try {
await worker.subscribe(controller.signal, async (message) => {
const { taskType, data } = message.payload;
// Check if we can process this task type right now
if (taskType === "heavy-computation" && isSystemOverloaded()) {
// Return timeout to retry later (5 minutes)
return { timeoutSeconds: 300 };
}
// Check if we have required resources
if (taskType === "external-api" && !isExternalServiceAvailable()) {
// Return timeout to retry in 1 minute
return { timeoutSeconds: 60 };
}
// Process the message normally
console.log(`Processing ${taskType} task`);
await processTask(taskType, data);
// Message will be automatically deleted on successful completion
});
} catch (error) {
console.error("Worker processing error:", error);
}
// Example with exponential backoff
const backoffController = new AbortController();
try {
await worker.subscribe(backoffController.signal, async (message) => {
const maxRetries = 3;
const deliveryCount = message.deliveryCount;
try {
await processMessage(message.payload);
// Successful processing - message will be deleted
} catch (error) {
if (deliveryCount < maxRetries) {
// Exponential backoff: 2^deliveryCount minutes
const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
console.log(
`Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
);
return { timeoutSeconds: timeoutSeconds };
} else {
// Max retries reached, let the message fail and be deleted
console.error("Max retries reached, message will be discarded:", error);
throw error;
}
}
});
} catch (error) {
console.error("Backoff processing error:", error);
}Complete Example: Video Processing Pipeline
Here's a comprehensive example showing a video processing pipeline that processes videos with FFmpeg and stores the results in Vercel Blob:
import { QueueClient, createTopic, StreamTransport } from "@vercel/queue";
import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static";
import { put } from "@vercel/blob";
const client = new QueueClient({
token: "your-vercel-oidc-token",
});
// Input topic with unoptimized videos
const unoptimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
client,
"unoptimized-videos",
new StreamTransport(),
);
// Output topic for optimized videos
const optimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
client,
"optimized-videos",
new StreamTransport(),
);
// Step 1: Process videos with FFmpeg
const videoProcessor = unoptimizedVideosTopic.consumerGroup("processors");
const processingController = new AbortController();
try {
await videoProcessor.subscribe(
processingController.signal,
async (message) => {
const inputVideoStream = message.payload;
console.log("Processing video...");
if (!ffmpeg) {
throw new Error("FFmpeg not available");
}
// Create optimized video stream using FFmpeg
const optimizedStream = new ReadableStream<Uint8Array>({
start(controller) {
const ffmpegProcess = spawn(
ffmpeg,
[
"-i",
"pipe:0", // Input from stdin
"-c:v",
"libvpx-vp9", // Video codec
"-c:a",
"libopus", // Audio codec
"-crf",
"23", // Quality
"-f",
"webm", // Output format
"pipe:1", // Output to stdout
],
{ stdio: ["pipe", "pipe", "pipe"] },
);
// Pipe input stream to FFmpeg
const reader = inputVideoStream.getReader();
const pipeInput = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
ffmpegProcess.stdin?.end();
break;
}
ffmpegProcess.stdin?.write(value);
}
};
pipeInput();
// Stream FFmpeg output
ffmpegProcess.stdout?.on("data", (chunk) => {
controller.enqueue(new Uint8Array(chunk));
});
ffmpegProcess.on("close", (code) => {
if (code === 0) {
controller.close();
} else {
controller.error(new Error(`FFmpeg failed with code ${code}`));
}
});
},
});
// Publish optimized video to next topic
await optimizedVideosTopic.publish(optimizedStream);
console.log("Video optimized and published");
},
);
} catch (error) {
console.error("Video processing error:", error);
}
// Step 2: Store optimized videos in Vercel Blob
const blobUploader = optimizedVideosTopic.consumerGroup("blob-uploaders");
const uploadController = new AbortController();
try {
await blobUploader.subscribe(uploadController.signal, async (message) => {
const optimizedVideo = message.payload;
// Upload to Vercel Blob storage
const filename = `optimized-${Date.now()}.webm`;
const blob = await put(filename, optimizedVideo, {
access: "public",
contentType: "video/webm",
});
console.log(`Video uploaded to blob: ${blob.url} (${blob.size} bytes)`);
});
} catch (error) {
console.error("Blob upload error:", error);
}
// Graceful shutdown
process.on("SIGINT", () => {
processingController.abort();
uploadController.abort();
});Error Handling
The queue client provides specific error types for different failure scenarios:
Error Types
QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)- Only thrown when directly using
client.receiveMessages() ConsumerGroup.subscribe()handles this error internally and continues polling
- Only thrown when directly using
MessageLockedError: Thrown when a message is temporarily locked (423 status)- Contains optional
retryAfterproperty with seconds to wait before retry - For
receiveMessages()on FIFO queues: the next message in sequence is locked - For
receiveMessageById(): the requested message is locked ConsumerGroup.subscribe()handles this error internally when polling
- Contains optional
MessageNotFoundError: Message doesn't exist (404 status)MessageNotAvailableError: Message exists but isn't available for processing (409 status)FifoOrderingViolationError: FIFO queue ordering violation (409 status with nextMessageId)- Contains
nextMessageIdproperty indicating which message to process first
- Contains
FailedDependencyError: FIFO ordering violation when receiving by ID (424 status)- Contains
nextMessageIdproperty indicating which message must be processed first - Similar to
FifoOrderingViolationErrorbut specifically for receive-by-ID operations
- Contains
MessageCorruptedError: Message data is corrupted or can't be parsedBadRequestError: Invalid request parameters (400 status)- Invalid queue names, FIFO limit violations, missing required parameters
UnauthorizedError: Authentication failure (401 status)- Missing or invalid authentication token
ForbiddenError: Access denied (403 status)- Queue environment doesn't match token environment
InternalServerError: Server-side errors (500+ status codes)- Unexpected server errors, service unavailable, etc.
Error Handling Examples
import {
QueueEmptyError,
MessageLockedError,
FifoOrderingViolationError,
FailedDependencyError,
BadRequestError,
UnauthorizedError,
ForbiddenError,
InternalServerError,
} from "@vercel/queue";
// Handle empty queue or locked messages
try {
for await (const message of client.receiveMessages(options, transport)) {
// Process messages
}
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("Queue is empty, retry later");
} else if (error instanceof MessageLockedError) {
console.log("Next message in FIFO queue is locked");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
}
}
// Handle locked message with retry
try {
await consumer.receiveMessage(messageId, handler);
} catch (error) {
if (error instanceof MessageLockedError) {
console.log("Message is locked by another consumer");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
setTimeout(() => retry(), error.retryAfter * 1000);
}
} else if (error instanceof FailedDependencyError) {
// FIFO ordering violation for receive by ID
console.log(`Must process ${error.nextMessageId} first`);
}
}
// Handle authentication and authorization errors
try {
await topic.publish(payload);
} catch (error) {
if (error instanceof UnauthorizedError) {
console.log("Invalid token - refresh authentication");
} else if (error instanceof ForbiddenError) {
console.log("Environment mismatch - check token/queue configuration");
} else if (error instanceof BadRequestError) {
console.log("Invalid parameters:", error.message);
} else if (error instanceof InternalServerError) {
console.log("Server error - retry with backoff");
}
}
// Complete error handling pattern
function handleQueueError(error: unknown): void {
if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
// Transient errors - safe to retry
console.log("Temporary condition, will retry");
} else if (
error instanceof UnauthorizedError ||
error instanceof ForbiddenError
) {
// Authentication/authorization errors - need to fix configuration
console.log("Auth error - check credentials");
} else if (error instanceof BadRequestError) {
// Client error - fix the request
console.log("Invalid request:", error.message);
} else if (error instanceof InternalServerError) {
// Server error - implement exponential backoff
console.log("Server error - retry with backoff");
} else {
// Unknown error
console.error("Unexpected error:", error);
}
}License
MIT