Package Exports
- @vercel/queue
Readme
Vercel Queues
A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.
Features
- Generic Payload Support: Send and receive any type of data with type safety
- Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
- Streaming Support: Handle large payloads without loading them entirely into memory
- Pub/Sub Pattern: Topic-based messaging with consumer groups
- Type Safety: Full TypeScript support with generic types
- Automatic Retries: Built-in visibility timeout management
Installation
npm install @vercel/queueQuick Start
For local development, you'll need to pull your Vercel environment variables (including the OIDC token):
# Install Vercel CLI if you haven't already
npm i -g vercel
# Pull environment variables from your Vercel project
vc env pullPublishing and consuming messages on a queue
// index.ts
import { send, receive } from "@vercel/queue";
type Message = {
message: string;
timestamp: number;
};
// Option 1: Using the send and receive helpers (simplest)
// Automatically uses default client and JSON transport
await send<Message>("my-topic", {
message: "Hello, World!",
timestamp: Date.now(),
});
// Consume a single message off the queue
// (Often wrapped in a loop to keep polling messages off the queue)
await receive<Message>("my-topic", "my-consumer-group", (m) => {
console.log(m.message);
});
// Option 2: Using createTopic for more control
import { createTopic } from "@vercel/queue";
// Create a topic with JSON serialization (default)
// Uses default QueueClient automatically authenticated from Vercel environment
const topic = createTopic<Message>("my-topic");
// Publish a message
await topic.publish({
message: "Hello, World!",
timestamp: Date.now(),
});
// Create a consumer group
const consumer = topic.consumerGroup("my-consumer-group");
// Process next available message (one-shot processing)
try {
await consumer.consume(async (message, metadata) => {
console.log("Received:", message.message);
console.log("Timestamp:", new Date(message.timestamp));
console.log("Message Metadata", metadata);
// => { messageId, deliveryCount, timestamp }
});
} catch (error) {
console.error("Processing error:", error);
}Usage with Vercel
When deploying on Vercel, rather than having a persistent server subscribed to a queue, Vercel can automatically trigger your API routes when messages are ready for consumption based on your vercel.json configuration.
To demonstrate using queues on Vercel, let's use a Next.js app. You can use an existing app or create one using create-next-app.
TypeScript Configuration
Update your tsconfig.json to use "bundler" module resolution for proper
package export resolution:
{
"compilerOptions": {
"moduleResolution": "bundler"
// ... other options
}
}Publishing messages to a queue
Create a new server function to publish messages
// app/actions.ts
"use server";
import { send } from "@vercel/queue";
export async function publishTestMessage(message: string) {
// Option 1: Using simple send shorthand
const { messageId } = await send(
"my-topic",
{ message, timestamp: Date.now() },
},
);
console.log(`Published message ${messageId}`);
}
// Option 2: Customize the topic, transport, consumer groups, etc.
import { createTopic } from "@vercel/queue";
export async function publishTestMessage(message: string) {
// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>("my-topic");
// Publish the message
const { messageId } = await topic.publish(
{ message, timestamp: Date.now() },
);
console.log(`Published message ${messageId}`);
}
Now wire up the server function to your app
// app/some/page.tsx
"use client";
import { publishTestMessage } from "./actions";
export default function Page() {
return (
// ...
<button onClick={() => publishTestMessage("Hello world")}>
Publish Test Message
</button>
);
}Consuming the queue
Messages are consumed using consumer groups, which provide load balancing and parallel processing capabilities.
Usage with Vercel
To consume queue messages in a Vercel deployment, you need to create (Next.js) API routes and configure them in your vercel.json file.
1. Create API Routes
Create API routes to handle incoming queue messages using the handleCallback helper:
// app/api/queue/handle/route.ts
import { handleCallback } from "@vercel/queue";
// Option 1: Single topic with multiple consumer groups
export const POST = handleCallback({
"my-topic": {
"consumer-group-1": async (message, metadata) => {
console.log(`Consumer group 1 processing:`, message, metadata);
// Handle consumer group 1 logic
await processGroup1(message);
},
"consumer-group-2": async (message, metadata) => {
console.log(`Consumer group 2 processing:`, message, metadata);
// Handle consumer group 2 logic
await processGroup2(message);
},
},
});
async function processGroup1(message: any) {
// Consumer group 1 specific logic
}
async function processGroup2(message: any) {
// Consumer group 2 specific logic
}// Alternative: Multiple topics in one handler
export const POST = handleCallback({
"user-events": {
welcome: async (message, metadata) => {
console.log(`New user event:`, message, metadata);
await sendWelcomeEmail(message.email);
},
},
"order-events": {
fulfillment: async (order, metadata) => {
console.log(`Processing order:`, order, metadata);
await fulfillOrder(order);
},
analytics: async (order, metadata) => {
console.log(`Tracking order:`, order, metadata);
await trackOrder(order);
},
},
});2. Configure vercel.json
Create a vercel.json file in your project root to declare which topics and consumer groups each API route handles:
{
"functions": {
"app/api/queue/handle/route.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "my-topic",
"consumer": "consumer-group-1"
},
{
"type": "queue/v1beta",
"topic": "my-topic",
"consumer": "consumer-group-2"
}
]
}
}
}3. Multiple API Routes
You can also create separate API routes for different topics:
// app/api/queue/users/route.ts - Handle user events
import { handleCallback } from "@vercel/queue";
export const POST = handleCallback({
"user-events": {
processors: async (user, metadata) => {
console.log(`Processing user event:`, user, metadata);
await sendWelcomeEmail(user.email);
},
},
});// app/api/queue/orders/route.ts - Handle order events
import { handleCallback } from "@vercel/queue";
export const POST = handleCallback({
"order-events": {
fulfillment: async (order, metadata) => {
console.log(`Processing order:`, order, metadata);
await fulfillOrder(order);
},
},
});With corresponding vercel.json:
{
"functions": {
"app/api/queue/users/route.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "user-events",
"consumer": "processors"
}
]
},
"app/api/queue/orders/route.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "order-events",
"consumer": "fulfillment"
}
]
}
}
}Key Points
- Automatic Triggering: Vercel automatically triggers your API routes when messages are available for the configured topic/consumer combinations
- Message Processing: Your API routes receive the message ID and other metadata via headers, then use the queue client to process the specific message
- Configuration Required: The
vercel.jsonfile is essential - it tells Vercel which topics and consumers each route should handle - No Polling: Unlike traditional queue consumers, you don't need to poll for messages - Vercel handles the triggering automatically
Key Features
Consumer Groups
Multiple consumers can process messages from the same topic in parallel:
// Multiple workers in the same group - they share/split messages
const worker1 = topic.consumerGroup("workers");
const worker2 = topic.consumerGroup("workers"); // Same group name
// worker1 and worker2 will receive different messages (load balancing)
// Different consumer groups - each gets copies of ALL messages
const analytics = topic.consumerGroup("analytics");
const webhooks = topic.consumerGroup("webhooks");
// analytics and webhooks will both receive every messageArchitecture
- Topics: Named message channels with configurable serialization
- Consumer Groups: Named groups of consumers that process messages in
parallel
consume(): Process messages with flexible consumption patterns- No options: Process next available message
- With
messageId: Process specific message by ID - With
skipPayload: true: Process message metadata only (without payload)
- Transports: Pluggable serialization/deserialization for different data types
- Streaming: Memory-efficient processing of large payloads
- Visibility Timeouts: Automatic message lifecycle management
Performance
The multipart parser is optimized for high-throughput scenarios:
- Streaming: Messages are yielded immediately as headers are parsed
- Memory Efficient: No buffering of complete payloads
- Fast Parsing: Native Buffer operations for ~50% performance improvement
- Scalable: Can handle arbitrarily large responses without memory constraints
Serialization (Transport) System
The queue client supports customizable serialization through the Transport
interface with streaming support for memory-efficient processing. Transport
can be configured at the topic level when creating a topic, or at the
consumer group level when creating a consumer group.
Built-in Transports
JsonTransport (Default)
Buffers data for JSON parsing - suitable for structured data that fits in memory.
import { createTopic, JsonTransport } from "@vercel/queue";
const topic = createTopic<{ data: any }>("json-topic", new JsonTransport());
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>("json-topic");BufferTransport
Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.
StreamTransport
True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.
Custom Transport
You can create your own serialization format by implementing the Transport
interface.
Choosing the Right Transport
| Use Case | Recommended Transport | Memory Usage | Performance |
|---|---|---|---|
| Small JSON objects | JsonTransport |
Low | High |
| Binary files < 100MB | BufferTransport |
Medium | High |
| Large files > 100MB | StreamTransport |
Very Low | Medium |
| Real-time data streams | StreamTransport |
Very Low | High |
| Custom protocols | Custom implementation | Varies | Varies |
API Reference
QueueClient
// Simple usage - automatically gets OIDC token from Vercel environment
const client = new QueueClient();
// Or with options
const client = new QueueClient({
token?: string; // Optional - will auto-detect if not provided
baseUrl?: string; // defaults to 'https://vqs.vercel.sh'
});Topic
// Simple usage with default client
const topic = createTopic<T>(topicName, transport?);
// For custom client configuration, use Topic constructor directly
const customClient = new QueueClient({ baseUrl: "https://custom.vqs.vercel.sh" });
const topic = new Topic<T>(customClient, topicName, transport?);
// Publish a message (uses topic's transport)
await topic.publish(payload, options?);
// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);Send (Shorthand)
// Simple send - automatically uses default client and JSON transport
await send<T>(topicName, payload);
// Send with options including custom transport
await send<T>(topicName, payload, {
transport?: Transport<T>;
idempotencyKey?: string;
retentionSeconds?: number;
});
// Examples:
await send("notifications", { userId: "123", message: "Welcome!" });
await send("images", imageBuffer, {
transport: new BufferTransport(),
});
await send("events", eventData, {
idempotencyKey: "unique-key-123",
retentionSeconds: 3600,
});ConsumerGroup
// Process next available message (simplest form)
await consumer.consume(handler);
// Process specific message by ID with payload
await consumer.consume(handler, { messageId: "message-id" });
// Process specific message by ID without payload (metadata only)
// handler will be called with `undefined` as the payload
await consumer.consume(handler, { messageId: "message-id", skipPayload: true });Message Handler
// Handler function signature
type MessageHandler<T = unknown> = (
message: T,
metadata: MessageMetadata,
) => Promise<MessageHandlerResult> | MessageHandlerResult;
// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;
interface MessageTimeoutResult {
timeoutSeconds: number; // seconds before message becomes available again
}
// Message Metadata
interface MessageMetadata {
messageId: string;
deliveryCount: number;
timestamp: string;
}ConsumeOptions Interface
interface ConsumeOptions {
messageId?: string; // Process specific message by ID
skipPayload?: boolean; // Skip payload download (requires messageId)
}Transport Interface
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Callback Handler
// Create a callback handler for Next.js route handlers
function handleCallback(
handlers: CallbackHandlers,
): (request: Request) => Promise<Response>;
// Configuration object with handlers for different topics and consumer groups
type CallbackHandlers = {
[topicName: string]: { [consumerGroup: string]: MessageHandler };
};
// Example usage:
export const POST = handleCallback({
"user-events": {
welcome: (message, metadata) => {
console.log(`New user event:`, message, metadata);
},
},
// Multiple consumer groups per topic
"image-processing": {
compress: (message, metadata) => console.log("Compressing image", message),
resize: (message, metadata) => console.log("Resizing image", message),
},
});Examples
Basic JSON Processing
interface UserEvent {
userId: string;
action: string;
timestamp: number;
}
// Option 1: Using send shorthand
await send<UserEvent>("user-events", {
userId: "123",
action: "login",
timestamp: Date.now(),
});
// Option 2: Using createTopic for consumers
const userTopic = createTopic<UserEvent>("user-events");
await userTopic.publish({
userId: "123",
action: "login",
timestamp: Date.now(),
});
const consumer = userTopic.consumerGroup("processors");
// Process next available message
try {
await consumer.consume(async (message) => {
console.log(`User ${message.userId} performed ${message.action}`);
});
} catch (error) {
console.error("Processing error:", error);
}Processing Specific Messages by ID
const userTopic = createTopic<{ userId: string; action: string }>(
"user-events",
);
const consumer = userTopic.consumerGroup("processors");
// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";
try {
await consumer.consume(
async (message, { messageId }) => {
console.log(`Processing specific message: ${messageId}`);
console.log(`User ${message.userId} performed ${message.action}`);
},
{ messageId },
);
console.log("Message processed successfully");
} catch (error) {
if (error.message.includes("not found or not available")) {
console.log("Message was already processed or does not exist");
} else {
console.error("Error processing message:", error);
}
}Processing Next Available Message
const workTopic = createTopic<{ taskType: string; data: any }>("work-queue");
const worker = workTopic.consumerGroup("workers");
// Process the next available message (one-shot processing)
try {
await worker.consume(async (message) => {
console.log(`Processing task: ${message.taskType}`);
await processTask(message.taskType, message.data);
});
console.log("Message processed successfully");
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("No messages available");
} else if (error instanceof MessageLockedError) {
console.log("Next message is locked");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
} else {
console.error("Error processing message:", error);
}
}
// Handle conditional timeouts
await worker.consume(async (message) => {
if (!canProcessTaskType(message.taskType)) {
// Return timeout to retry later
return { timeoutSeconds: 60 };
}
await processTask(message.taskType, message.data);
});
// Process specific message metadata only (no payload download)
await worker.consume(
async (_, metadata) => {
console.log(`Message ID: ${metadata.messageId}`);
console.log(`Delivery count: ${metadata.deliveryCount}`);
console.log(`Timestamp: ${metadata.timestamp}`);
// _ is undefined - no payload was downloaded
},
{ messageId: "specific-message-id", skipPayload: true },
);Timing Out Messages
const workTopic = createTopic<{ taskType: string; data: any }>("work-queue");
const worker = workTopic.consumerGroup("workers");
// Process a message with conditional timeout
try {
await worker.consume(async ({ taskType, data }) => {
// Check if we can process this task type right now
if (taskType === "heavy-computation" && isSystemOverloaded()) {
// Return timeout to retry later (5 minutes)
return { timeoutSeconds: 300 };
}
// Check if we have required resources
if (taskType === "external-api" && !isExternalServiceAvailable()) {
// Return timeout to retry in 1 minute
return { timeoutSeconds: 60 };
}
// Process the message normally
console.log(`Processing ${taskType} task`);
await processTask(taskType, data);
// Message will be automatically deleted on successful completion
});
} catch (error) {
console.error("Worker processing error:", error);
}
// Example with exponential backoff
try {
await worker.consume(async (message, { deliveryCount }) => {
const maxRetries = 3;
try {
await processMessage(message);
// Successful processing - message will be deleted
} catch (error) {
if (deliveryCount < maxRetries) {
// Exponential backoff: 2^deliveryCount minutes
const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
console.log(
`Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
);
return { timeoutSeconds: timeoutSeconds };
} else {
// Max retries reached, let the message fail and be deleted
console.error("Max retries reached, message will be discarded:", error);
throw error;
}
}
});
} catch (error) {
console.error("Backoff processing error:", error);
}Error Handling
The queue client provides specific error types for different failure scenarios:
Error Types
QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)- Thrown by
consume()when no messages are available - Also thrown when directly using
client.receiveMessages()
- Thrown by
MessageLockedError: Thrown when a message is temporarily locked (423 status)- Contains optional
retryAfterproperty with seconds to wait before retry - For
consume()without options: the next message is locked - For
consume()with messageId: the requested message is locked
- Contains optional
MessageNotFoundError: Message doesn't exist (404 status)MessageNotAvailableError: Message exists but isn't available for processing (409 status)MessageCorruptedError: Message data is corrupted or can't be parsedBadRequestError: Invalid request parameters (400 status)- Invalid queue names, missing required parameters
UnauthorizedError: Authentication failure (401 status)- Missing or invalid authentication token
ForbiddenError: Access denied (403 status)- Queue environment doesn't match token environment
InternalServerError: Server-side errors (500+ status codes)- Unexpected server errors, service unavailable, etc.
Error Handling Examples
import {
BadRequestError,
ForbiddenError,
InternalServerError,
MessageLockedError,
QueueEmptyError,
UnauthorizedError,
} from "@vercel/queue";
// Handle empty queue or locked messages
try {
for await (const message of client.receiveMessages(options, transport)) {
// Process messages
}
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("Queue is empty, retry later");
} else if (error instanceof MessageLockedError) {
console.log("Next message is locked");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
}
}
// Handle locked message with retry
try {
await consumer.consume(handler, { messageId });
} catch (error) {
if (error instanceof MessageLockedError) {
console.log("Message is locked by another consumer");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
setTimeout(() => retry(), error.retryAfter * 1000);
}
}
// Handle authentication and authorization errors
try {
await topic.publish(payload);
} catch (error) {
if (error instanceof UnauthorizedError) {
console.log("Invalid token - refresh authentication");
} else if (error instanceof ForbiddenError) {
console.log("Environment mismatch - check token/queue configuration");
} else if (error instanceof BadRequestError) {
console.log("Invalid parameters:", error.message);
} else if (error instanceof InternalServerError) {
console.log("Server error - retry with backoff");
}
}
// Complete error handling pattern
function handleQueueError(error: unknown): void {
if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
// Transient errors - safe to retry
console.log("Temporary condition, will retry");
} else if (
error instanceof UnauthorizedError ||
error instanceof ForbiddenError
) {
// Authentication/authorization errors - need to fix configuration
console.log("Auth error - check credentials");
} else if (error instanceof BadRequestError) {
// Client error - fix the request
console.log("Invalid request:", error.message);
} else if (error instanceof InternalServerError) {
// Server error - implement exponential backoff
console.log("Server error - retry with backoff");
} else {
// Unknown error
console.error("Unexpected error:", error);
}
}License
MIT