Package Exports
- @vercel/queue
Readme
Vercel Queues
A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.
Features
- Generic Payload Support: Send and receive any type of data with type safety
- Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
- Streaming Support: Handle large payloads without loading them entirely into memory
- Pub/Sub Pattern: Topic-based messaging with consumer groups
- Type Safety: Full TypeScript support with generic types
- Automatic Retries: Built-in visibility timeout management
Installation
npm install @vercel/queueQuick Start
For local development, you'll need to pull your Vercel environment variables (including the OIDC token):
# Install Vercel CLI if you haven't already
npm i -g vercel
# Pull environment variables from your Vercel project
vc env pullPublishing and consuming messages on a queue
// index.ts
import { send, receive } from "@vercel/queue";
type Message = {
message: string;
timestamp: number;
};
// Send a message to a topic
await send<Message>("my-topic", {
message: "Hello, World!",
timestamp: Date.now(),
});
// Consume a single message off the queue
// (Often wrapped in a loop to keep polling messages off the queue)
await receive<Message>("my-topic", "my-consumer-group", (message, metadata) => {
console.log("Received:", message.message);
console.log("Timestamp:", new Date(message.timestamp));
console.log("Message Metadata", metadata);
// => { messageId, deliveryCount, timestamp }
});Usage with Vercel
When deploying on Vercel, rather than having a persistent server subscribed to a queue, Vercel can automatically trigger your API routes when messages are ready for consumption based on your vercel.json configuration.
To demonstrate using queues on Vercel, let's use a Next.js app. You can use an existing app or create one using create-next-app.
TypeScript Configuration
Update your tsconfig.json to use "bundler" module resolution for proper
package export resolution:
{
"compilerOptions": {
"moduleResolution": "bundler"
// ... other options
}
}Publishing messages to a queue
Create a new server function to publish messages
// app/actions.ts
"use server";
import { send } from "@vercel/queue";
export async function publishTestMessage(message: string) {
const { messageId } = await send("my-topic", {
message,
timestamp: Date.now(),
});
console.log(`Published message ${messageId}`);
}Now wire up the server function to your app
// app/some/page.tsx
"use client";
import { publishTestMessage } from "./actions";
export default function Page() {
return (
// ...
<button onClick={() => publishTestMessage("Hello world")}>
Publish Test Message
</button>
);
}Consuming the queue
Messages are consumed using consumer groups, which provide load balancing and parallel processing capabilities.
Usage with Vercel
To consume queue messages in a Vercel deployment, you need to create (Next.js) API routes and configure them in your vercel.json file.
1. Create API Routes
Create API routes to handle incoming queue messages using the handleCallback helper:
// app/api/queue/handle/route.ts
import { handleCallback } from "@vercel/queue";
// Option 1: Single topic with multiple consumer groups
export const POST = handleCallback({
"my-topic": {
"consumer-group-1": async (message, metadata) => {
console.log(`Consumer group 1 processing:`, message, metadata);
// Handle consumer group 1 logic
await processGroup1(message);
},
"consumer-group-2": async (message, metadata) => {
console.log(`Consumer group 2 processing:`, message, metadata);
// Handle consumer group 2 logic
await processGroup2(message);
},
},
});
async function processGroup1(message: any) {
// Consumer group 1 specific logic
}
async function processGroup2(message: any) {
// Consumer group 2 specific logic
}// Alternative: Multiple topics in one handler
export const POST = handleCallback({
"user-events": {
welcome: async (message, metadata) => {
console.log(`New user event:`, message, metadata);
await sendWelcomeEmail(message.email);
},
},
"order-events": {
fulfillment: async (order, metadata) => {
console.log(`Processing order:`, order, metadata);
await fulfillOrder(order);
},
analytics: async (order, metadata) => {
console.log(`Tracking order:`, order, metadata);
await trackOrder(order);
},
},
});2. Configure vercel.json
Create a vercel.json file in your project root to declare which topics and consumer groups each API route handles:
{
"functions": {
"app/api/queue/handle/route.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "my-topic",
"consumer": "consumer-group-1"
},
{
"type": "queue/v1beta",
"topic": "my-topic",
"consumer": "consumer-group-2"
}
]
}
}
}3. Multiple API Routes
You can also create separate API routes for different topics:
// app/api/queue/users/route.ts - Handle user events
import { handleCallback } from "@vercel/queue";
export const POST = handleCallback({
"user-events": {
processors: async (user, metadata) => {
console.log(`Processing user event:`, user, metadata);
await sendWelcomeEmail(user.email);
},
},
});// app/api/queue/orders/route.ts - Handle order events
import { handleCallback } from "@vercel/queue";
export const POST = handleCallback({
"order-events": {
fulfillment: async (order, metadata) => {
console.log(`Processing order:`, order, metadata);
await fulfillOrder(order);
},
},
});With corresponding vercel.json:
{
"functions": {
"app/api/queue/users/route.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "user-events",
"consumer": "processors"
}
]
},
"app/api/queue/orders/route.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "order-events",
"consumer": "fulfillment"
}
]
}
}
}Key Points
- Automatic Triggering: Vercel automatically triggers your API routes when messages are available for the configured topic/consumer combinations
- Message Processing: Your API routes receive the message ID and other metadata via headers, then use the queue client to process the specific message
- Configuration Required: The
vercel.jsonfile is essential - it tells Vercel which topics and consumers each route should handle - No Polling: Unlike traditional queue consumers, you don't need to poll for messages - Vercel handles the triggering automatically
Key Features
Consumer Groups
Multiple consumers can process messages from the same topic in parallel:
// Multiple workers in the same group - they share/split messages
// Using the same consumer group name means they will load balance messages
await receive("my-topic", "workers", handler1);
await receive("my-topic", "workers", handler2);
// handler1 and handler2 will receive different messages (load balancing)
// Different consumer groups - each gets copies of ALL messages
await receive("my-topic", "analytics", analyticsHandler);
await receive("my-topic", "webhooks", webhooksHandler);
// analyticsHandler and webhooksHandler will both receive every messageArchitecture
- Topics: Named message channels with configurable serialization
- Consumer Groups: Named groups of consumers that process messages in
parallel
receive(): Process messages with flexible consumption patterns- Basic usage: Process next available message
- With
messageId: Process specific message by ID - With
skipPayload: true: Process message metadata only (without payload)
- Transports: Pluggable serialization/deserialization for different data types
- Streaming: Memory-efficient processing of large payloads
- Visibility Timeouts: Automatic message lifecycle management
Performance
The multipart parser is optimized for high-throughput scenarios:
- Streaming: Messages are yielded immediately as headers are parsed
- Memory Efficient: No buffering of complete payloads
- Fast Parsing: Native Buffer operations for ~50% performance improvement
- Scalable: Can handle arbitrarily large responses without memory constraints
Serialization (Transport) System
The queue client supports customizable serialization through the Transport
interface with streaming support for memory-efficient processing. Transport
can be configured using the transport option when calling send() or receive().
Built-in Transports
JsonTransport (Default)
Buffers data for JSON parsing - suitable for structured data that fits in memory.
import { send, JsonTransport } from "@vercel/queue";
// JsonTransport is the default, so these are equivalent:
await send("json-topic", { data: "example" });
await send(
"json-topic",
{ data: "example" },
{ transport: new JsonTransport() },
);BufferTransport
Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.
StreamTransport
True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.
Custom Transport
You can create your own serialization format by implementing the Transport
interface.
Choosing the Right Transport
| Use Case | Recommended Transport | Memory Usage | Performance |
|---|---|---|---|
| Small JSON objects | JsonTransport |
Low | High |
| Binary files < 100MB | BufferTransport |
Medium | High |
| Large files > 100MB | StreamTransport |
Very Low | Medium |
| Real-time data streams | StreamTransport |
Very Low | High |
| Custom protocols | Custom implementation | Varies | Varies |
API Reference
Send Function
// Simple send - automatically uses default client and JSON transport
await send<T>(topicName, payload);
// Send with options including custom transport
await send<T>(topicName, payload, {
transport?: Transport<T>;
idempotencyKey?: string;
retentionSeconds?: number;
});
// Examples:
await send("notifications", { userId: "123", message: "Welcome!" });
await send("images", imageBuffer, {
transport: new BufferTransport(),
});
await send("events", eventData, {
idempotencyKey: "unique-key-123",
retentionSeconds: 3600,
});Receive Function
// Process next available message (simplest form)
await receive("topic-name", "consumer-group", handler);
// Process specific message by ID with payload
await receive("topic-name", "consumer-group", handler, {
messageId: "message-id",
});
// Process specific message by ID without payload (metadata only)
// handler will be called with `undefined` as the payload
await receive("topic-name", "consumer-group", handler, {
messageId: "message-id",
skipPayload: true,
});Message Handler
// Handler function signature
type MessageHandler<T = unknown> = (
message: T,
metadata: MessageMetadata,
) => Promise<MessageHandlerResult> | MessageHandlerResult;
// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;
interface MessageTimeoutResult {
timeoutSeconds: number; // seconds before message becomes available again
}
// Message Metadata
interface MessageMetadata {
messageId: string;
deliveryCount: number;
timestamp: string;
}Receive Options
// Options for the receive function
interface ReceiveOptions<T = unknown> {
messageId?: string; // Process specific message by ID
skipPayload?: boolean; // Skip payload download (requires messageId)
transport?: Transport<T>; // Custom transport (defaults to JsonTransport)
visibilityTimeoutSeconds?: number; // Message visibility timeout
refreshInterval?: number; // Refresh interval for long-running operations
}Transport Interface
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Callback Handler
// Create a callback handler for Next.js route handlers
function handleCallback(
handlers: CallbackHandlers,
): (request: Request) => Promise<Response>;
// Configuration object with handlers for different topics and consumer groups
type CallbackHandlers = {
[topicName: string]: { [consumerGroup: string]: MessageHandler };
};
// Example usage:
export const POST = handleCallback({
"user-events": {
welcome: (message, metadata) => {
console.log(`New user event:`, message, metadata);
},
},
// Multiple consumer groups per topic
"image-processing": {
compress: (message, metadata) => console.log("Compressing image", message),
resize: (message, metadata) => console.log("Resizing image", message),
},
});Examples
Basic JSON Processing
interface UserEvent {
userId: string;
action: string;
timestamp: number;
}
// Send a message
await send<UserEvent>("user-events", {
userId: "123",
action: "login",
timestamp: Date.now(),
});
// Receive and process a message
try {
await receive<UserEvent>("user-events", "processors", async (message) => {
console.log(`User ${message.userId} performed ${message.action}`);
});
} catch (error) {
console.error("Processing error:", error);
}Processing Specific Messages by ID
// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";
try {
await receive<{ userId: string; action: string }>(
"user-events",
"processors",
async (message, { messageId }) => {
console.log(`Processing specific message: ${messageId}`);
console.log(`User ${message.userId} performed ${message.action}`);
},
{ messageId },
);
console.log("Message processed successfully");
} catch (error) {
if (error.message.includes("not found or not available")) {
console.log("Message was already processed or does not exist");
} else {
console.error("Error processing message:", error);
}
}Processing Next Available Message
// Process the next available message (one-shot processing)
try {
await receive<{ taskType: string; data: any }>(
"work-queue",
"workers",
async (message) => {
console.log(`Processing task: ${message.taskType}`);
await processTask(message.taskType, message.data);
},
);
console.log("Message processed successfully");
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("No messages available");
} else if (error instanceof MessageLockedError) {
console.log("Next message is locked");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
} else {
console.error("Error processing message:", error);
}
}
// Handle conditional timeouts
await receive<{ taskType: string; data: any }>(
"work-queue",
"workers",
async (message) => {
if (!canProcessTaskType(message.taskType)) {
// Return timeout to retry later
return { timeoutSeconds: 60 };
}
await processTask(message.taskType, message.data);
},
);
// Process specific message metadata only (no payload download)
await receive<{ taskType: string; data: any }>(
"work-queue",
"workers",
async (_, metadata) => {
console.log(`Message ID: ${metadata.messageId}`);
console.log(`Delivery count: ${metadata.deliveryCount}`);
console.log(`Timestamp: ${metadata.timestamp}`);
// _ is undefined - no payload was downloaded
},
{ messageId: "specific-message-id", skipPayload: true },
);Timing Out Messages
// Process a message with conditional timeout
try {
await receive<{ taskType: string; data: any }>(
"work-queue",
"workers",
async ({ taskType, data }) => {
// Check if we can process this task type right now
if (taskType === "heavy-computation" && isSystemOverloaded()) {
// Return timeout to retry later (5 minutes)
return { timeoutSeconds: 300 };
}
// Check if we have required resources
if (taskType === "external-api" && !isExternalServiceAvailable()) {
// Return timeout to retry in 1 minute
return { timeoutSeconds: 60 };
}
// Process the message normally
console.log(`Processing ${taskType} task`);
await processTask(taskType, data);
// Message will be automatically deleted on successful completion
},
);
} catch (error) {
console.error("Worker processing error:", error);
}
// Example with exponential backoff
try {
await receive<{ taskType: string; data: any }>(
"work-queue",
"workers",
async (message, { deliveryCount }) => {
const maxRetries = 3;
try {
await processMessage(message);
// Successful processing - message will be deleted
} catch (error) {
if (deliveryCount < maxRetries) {
// Exponential backoff: 2^deliveryCount minutes
const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
console.log(
`Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
);
return { timeoutSeconds: timeoutSeconds };
} else {
// Max retries reached, let the message fail and be deleted
console.error(
"Max retries reached, message will be discarded:",
error,
);
throw error;
}
}
},
);
} catch (error) {
console.error("Backoff processing error:", error);
}Error Handling
The queue client provides specific error types for different failure scenarios:
Error Types
QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)- Thrown by
receive()when no messages are available
- Thrown by
MessageLockedError: Thrown when a message is temporarily locked (423 status)- Contains optional
retryAfterproperty with seconds to wait before retry - For
receive()without options: the next message is locked - For
receive()with messageId: the requested message is locked
- Contains optional
MessageNotFoundError: Message doesn't exist (404 status)MessageNotAvailableError: Message exists but isn't available for processing (409 status)MessageCorruptedError: Message data is corrupted or can't be parsedBadRequestError: Invalid request parameters (400 status)- Invalid queue names, missing required parameters
UnauthorizedError: Authentication failure (401 status)- Missing or invalid authentication token
ForbiddenError: Access denied (403 status)- Queue environment doesn't match token environment
InternalServerError: Server-side errors (500+ status codes)- Unexpected server errors, service unavailable, etc.
Error Handling Examples
import {
BadRequestError,
ForbiddenError,
InternalServerError,
MessageLockedError,
QueueEmptyError,
UnauthorizedError,
} from "@vercel/queue";
// Handle empty queue or locked messages
try {
await receive("my-topic", "my-consumer", async (message) => {
// Process message
console.log("Processing message:", message);
});
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("Queue is empty, retry later");
} else if (error instanceof MessageLockedError) {
console.log("Next message is locked");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
}
}
// Handle locked message with retry
try {
await receive("my-topic", "my-consumer", handler, { messageId });
} catch (error) {
if (error instanceof MessageLockedError) {
console.log("Message is locked by another consumer");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
setTimeout(() => retry(), error.retryAfter * 1000);
}
}
}
// Handle authentication and authorization errors
try {
await send("my-topic", payload);
} catch (error) {
if (error instanceof UnauthorizedError) {
console.log("Invalid token - refresh authentication");
} else if (error instanceof ForbiddenError) {
console.log("Environment mismatch - check token/queue configuration");
} else if (error instanceof BadRequestError) {
console.log("Invalid parameters:", error.message);
} else if (error instanceof InternalServerError) {
console.log("Server error - retry with backoff");
}
}
// Complete error handling pattern
function handleQueueError(error: unknown): void {
if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
// Transient errors - safe to retry
console.log("Temporary condition, will retry");
} else if (
error instanceof UnauthorizedError ||
error instanceof ForbiddenError
) {
// Authentication/authorization errors - need to fix configuration
console.log("Auth error - check credentials");
} else if (error instanceof BadRequestError) {
// Client error - fix the request
console.log("Invalid request:", error.message);
} else if (error instanceof InternalServerError) {
// Server error - implement exponential backoff
console.log("Server error - retry with backoff");
} else {
// Unknown error
console.error("Unexpected error:", error);
}
}License
MIT