Package Exports
- @vercel/queue
- @vercel/queue/nextjs/pages
Readme
Vercel Queues
A TypeScript client library for interacting with the Vercel Queue Service API, designed for seamless integration with Vercel deployments.
Features
- Automatic Queue Triggering: Vercel automatically triggers your API routes when messages are ready
- Next.js Integration: Built-in support for Next.js API routes and Server Actions
- Generic Payload Support: Send and receive any type of data with type safety
- Pub/Sub Pattern: Topic-based messaging with consumer groups
- Type Safety: Full TypeScript support with generic types
- Streaming Support: Handle large payloads efficiently
- Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
Installation
npm install @vercel/queueQuick Start
For local development, you'll need to set up your Vercel project:
# Install Vercel CLI if you haven't already
npm i -g vercel
# Link your project to Vercel
vc link
# Pull environment variables from your Vercel project
vc env pullLocal Development
Queues just work locally. After you have setup your Vercel project, when you send() messages in development mode, they automatically trigger your handlers locally - no external queue infrastructure needed.
The library reads your vercel.json configuration, discovers your queue handlers, and triggers them automatically when messages are sent.
Example Workflow
# Start your dev server
npm run dev
# Send messages - they process locally automatically!TypeScript Configuration
Update your tsconfig.json to use "bundler" module resolution for proper package export resolution:
{
"compilerOptions": {
"moduleResolution": "bundler"
}
}Publishing Messages
The send function can be used anywhere in your codebase to publish messages to a queue:
import { send } from "@vercel/queue";
// Send a message to a topic
await send("my-topic", {
message: "Hello world",
});
// With additional options
await send(
"my-topic",
{
message: "Hello world",
},
{
idempotencyKey: "unique-key", // Optional: prevent duplicate messages
retentionSeconds: 3600, // Optional: override retention time (defaults to 24 hours)
delaySeconds: 60, // Optional: delay message delivery by N seconds
},
);Example usage in an API route:
// app/api/send-message/route.ts
import { send } from "@vercel/queue";
export async function POST(request: Request) {
const body = await request.json();
const { messageId } = await send("my-topic", {
message: body.message,
});
return Response.json({ messageId });
}Consuming Messages
Messages are consumed using API routes that Vercel automatically triggers when messages are available.
1. Create API Routes
App Router (Recommended)
The recommended approach is to handle multiple topics and consumers in a single API route to keep your vercel.json configuration simple:
// app/api/queue/route.ts
import { handleCallback } from "@vercel/queue";
export const POST = handleCallback({
// Single topic with one consumer
"my-topic": {
"my-consumer": async (message, metadata) => {
// metadata includes: { messageId, deliveryCount, createdAt, topicName, consumerGroup }
console.log("Processing message:", message);
// If this throws an error, the message will be automatically retried
await processMessage(message);
},
},
// Multiple consumers for different purposes
"order-events": {
fulfillment: async (order, metadata) => {
await processOrder(order);
},
analytics: async (order, metadata) => {
await trackOrder(order);
},
},
});While you can split handlers into separate routes if needed (e.g., for code organization or deployment flexibility), consolidating them in one route is recommended for simpler configuration.
Pages Router
For Next.js Pages Router, import from @vercel/queue/nextjs/pages to get a handler compatible with the Pages Router API (NextApiRequest/NextApiResponse):
// pages/api/queue.ts
import { handleCallback } from "@vercel/queue/nextjs/pages";
export default handleCallback({
"my-topic": {
"my-consumer": async (message, metadata) => {
console.log("Processing message:", message);
await processMessage(message);
},
},
"order-events": {
fulfillment: async (order, metadata) => {
await processOrder(order);
},
analytics: async (order, metadata) => {
await trackOrder(order);
},
},
});The /nextjs/pages subpath export automatically adapts the handler to work with the Pages Router API.
2. Configure vercel.json
Configure which topics and consumers your API route handles.
For App Router:
{
"functions": {
"app/api/queue/route.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "my-topic",
"consumer": "my-consumer",
"retryAfterSeconds": 60,
"initialDelaySeconds": 0
},
{
"type": "queue/v1beta",
"topic": "order-events",
"consumer": "fulfillment"
},
{
"type": "queue/v1beta",
"topic": "order-events",
"consumer": "analytics",
"retryAfterSeconds": 300
}
]
}
}
}For Pages Router:
{
"functions": {
"pages/api/queue.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "my-topic",
"consumer": "my-consumer",
"retryAfterSeconds": 60,
"initialDelaySeconds": 0
},
{
"type": "queue/v1beta",
"topic": "order-events",
"consumer": "fulfillment"
},
{
"type": "queue/v1beta",
"topic": "order-events",
"consumer": "analytics",
"retryAfterSeconds": 300
}
]
}
}
}Key Concepts
- Topics: Named message channels that can have multiple consumer groups
- Consumer Groups: Named groups of consumers that process messages in parallel
- Different consumer groups for the same topic each get a copy of every message
- Multiple consumers in the same group share/split messages for load balancing
- Automatic Triggering: Vercel triggers your API routes when messages are available
- Message Processing: Your API routes receive message metadata via headers
- Configuration: The
vercel.jsonfile tells Vercel which routes handle which topics/consumers
Advanced Features
Client Class
For custom configuration (tokens, headers, etc.), use the Client class:
import { Client } from "@vercel/queue";
const client = new Client({
token: "my-token", // Optional: custom auth token
headers: { "X-Custom": "header" }, // Optional: custom headers
pinToDeployment: false, // Optional: disable deployment pinning (default: true)
});
// Send a message
await client.send("my-topic", { hello: "world" });
// Handle callbacks using the same client
export const POST = client.handleCallback({
"my-topic": {
"my-group": async (msg, meta) => console.log(msg),
},
});Parsing Callback Requests
For custom webhook handling, use parseCallback to extract queue information from CloudEvent requests:
import { parseCallback } from "@vercel/queue";
export async function POST(request: Request) {
const { queueName, consumerGroup, messageId } = await parseCallback(request);
// Use the parsed information for custom processing...
await myWorkflow.handleWebhook(queueName, consumerGroup, messageId);
return Response.json({ status: "success" });
}Serialization (Transport) System
The queue client supports customizable serialization through the Transport interface:
Built-in Transports
- JsonTransport (Default): For structured data that fits in memory
- BufferTransport: For binary data that fits in memory
- StreamTransport: For large files and memory-efficient processing
Example:
import { send, JsonTransport } from "@vercel/queue";
// JsonTransport is the default
await send("json-topic", { data: "example" });
// Explicit transport configuration
await send(
"json-topic",
{ data: "example" },
{ transport: new JsonTransport() },
);
// JsonTransport with custom serialization
const transport = new JsonTransport({
replacer: (key, value) => (key === "password" ? undefined : value),
reviver: (key, value) => (key === "date" ? new Date(value) : value),
});
await send("json-topic", { data: "example" }, { transport });Transport Selection Guide
| Use Case | Recommended Transport | Memory Usage | Performance |
|---|---|---|---|
| Small JSON objects | JsonTransport | Low | High |
| Binary data | BufferTransport | Medium | High |
| Large payloads | StreamTransport | Very Low | Medium |
| Real-time streams | StreamTransport | Very Low | High |
Error Handling
The queue client provides specific error types:
QueueEmptyError: No messages available in the queueMessageLockedError: Message is being processed by another consumerMessageNotFoundError: Message doesn't exist or has expiredMessageNotAvailableError: Message exists but cannot be claimedMessageAlreadyProcessedError: Message was already successfully processedMessageCorruptedError: Message data could not be parsedBadRequestError: Invalid request parametersUnauthorizedError: Authentication failed (invalid or missing token)ForbiddenError: Access denied (wrong environment or project)DuplicateMessageError: Idempotency key was already usedConcurrencyLimitError: Too many in-flight messagesConsumerDiscoveryError: Could not reach the consumer deploymentConsumerRegistryNotConfiguredError: Project not configured for queuesInternalServerError: Unexpected server errorInvalidLimitError: Batch limit outside valid range (1-10)
Example error handling:
import {
BadRequestError,
ConcurrencyLimitError,
DuplicateMessageError,
ForbiddenError,
InternalServerError,
UnauthorizedError,
} from "@vercel/queue";
try {
await send("my-topic", payload);
} catch (error) {
if (error instanceof UnauthorizedError) {
console.log("Invalid token - refresh authentication");
} else if (error instanceof ForbiddenError) {
console.log("Environment mismatch - check configuration");
} else if (error instanceof BadRequestError) {
console.log("Invalid parameters:", error.message);
} else if (error instanceof DuplicateMessageError) {
console.log("Duplicate message:", error.idempotencyKey);
} else if (error instanceof ConcurrencyLimitError) {
console.log(
"Rate limited:",
error.currentInflight,
"/",
error.maxConcurrency,
);
} else if (error instanceof InternalServerError) {
console.log("Server error - retry with backoff");
}
}Environment Variables
The following environment variables can be used to configure the queue client:
| Variable | Description | Default |
|---|---|---|
VERCEL_QUEUE_BASE_URL |
Override the queue service URL | https://vercel-queue.com |
VERCEL_QUEUE_BASE_PATH |
Override the API base path | /api/v3/topic |
VERCEL_QUEUE_DEBUG |
Enable debug logging (1 or true) |
- |
VERCEL_DEPLOYMENT_ID |
Deployment ID (auto-set by Vercel) | - |
Advanced Usage
Direct Message Processing
Note: The
receivefunction is for advanced use cases where you need direct message processing control outside of Vercel's automatic triggering.
import { receive } from "@vercel/queue";
// Process next available message
await receive<T>(topicName, consumerGroup, handler);
// Process specific message by ID
await receive<T>(topicName, consumerGroup, handler, {
messageId: "message-id",
});
// Process message with options
await receive<T>(topicName, consumerGroup, handler, {
messageId: "message-id", // Optional: process specific message by ID
transport: new JsonTransport(), // Optional: custom transport (defaults to JsonTransport)
visibilityTimeoutSeconds: 30, // Optional: message visibility timeout
visibilityRefreshInterval: 10, // Optional: how often to refresh the lock
});
// Handler function signature
type MessageHandler<T = unknown> = (
message: T,
metadata: MessageMetadata,
) => Promise<void> | void;
// MessageMetadata type
interface MessageMetadata {
messageId: string;
deliveryCount: number;
createdAt: Date;
topicName: string;
consumerGroup: string;
}Service Limits & Constraints
Throughput & Storage
| Limit | Value | Notes |
|---|---|---|
| Message throughput | 10,000s msg/sec/topic | Scales horizontally |
| Payload size | 1 GB | Smaller messages have lower latency |
| Number of topics | Unlimited | No hard limit |
| Consumer groups per message | ~4,000 | Per-message limit |
| Messages per queue | Unlimited | No hard limit |
Parameter Constraints
Publishing Messages
| Parameter | Default | Min | Max | Notes |
|---|---|---|---|---|
retentionSeconds |
86,400 (24h) | 60 | 86,400 | Message TTL |
delaySeconds |
0 | 0 | ≤ retention | Cannot exceed retention |
idempotencyKey |
— | — | — | Dedup window: min(retention, 24h) |
Receiving Messages
| Parameter | Default | Min | Max | Notes |
|---|---|---|---|---|
visibilityTimeoutSeconds |
30 | 0 | 3,600 | 0 = immediate re-visibility |
limit |
1 | 1 | 10 | Messages per request |
maxConcurrency |
unlimited | 1 | 10,000 | In-flight message limit |
Visibility Extension
| Constraint | Value |
|---|---|
visibilityTimeoutSeconds |
0 - 3,600 seconds |
| Cannot extend beyond | Message's original expiration time |
| Receipt handle | Must match the receive operation |
Identifier Formats
| Identifier | Pattern | Example |
|---|---|---|
| Topic/Queue name | [A-Za-z0-9_-]+ |
my-queue, task_queue_v2 |
| Consumer group | [A-Za-z0-9_-]+ |
worker-1, analytics_consumer |
| Message ID | Opaque string | 0-1, 3-7K9mNpQrS |
| Receipt handle | Opaque string | Used for delete/visibility ops |
Content-Type Handling
| Scenario | Result |
|---|---|
Client provides Content-Type |
Used as-is |
| No header, magic bytes detected | Auto-detected MIME type |
| No header, detection fails | application/octet-stream |
Wildcard Topics
Topic patterns support wildcards for flexible routing:
{
"functions": {
"app/api/queue/route.ts": {
"experimentalTriggers": [
{
"type": "queue/v1beta",
"topic": "user-*",
"consumer": "processor"
}
]
}
}
}Wildcard Rules:
*may only appear once in the pattern*must be at the end of the topic name- Valid:
user-*,orders-* - Invalid:
*-events,user-*-data
API Reference
Client Configuration
import { Client } from "@vercel/queue";
const client = new Client({
// Base URL for the queue service
// Default: "https://vercel-queue.com"
// Env: VERCEL_QUEUE_BASE_URL
baseUrl: "https://vercel-queue.com",
// API path prefix
// Default: "/api/v3/topic"
// Env: VERCEL_QUEUE_BASE_PATH
basePath: "/api/v3/topic",
// Auth token (auto-fetched via OIDC if not provided)
token: "my-token",
// Custom headers for all requests
headers: { "X-Custom": "value" },
// Deployment ID for message routing
// Default: process.env.VERCEL_DEPLOYMENT_ID
deploymentId: "dpl_xxx",
// Pin messages to current deployment when publishing
// Default: true
pinToDeployment: true,
});Send Options
await send("my-topic", payload, {
// Deduplication key
// Dedup window: min(retentionSeconds, 24 hours)
idempotencyKey: "unique-key",
// Message TTL in seconds
// Default: 86400, Min: 60, Max: 86400
retentionSeconds: 3600,
// Delay before message becomes visible
// Default: 0, Min: 0, Max: retentionSeconds
delaySeconds: 60,
// Custom serializer (default: JsonTransport)
transport: new JsonTransport(),
});Receive Options
await receive("my-topic", "my-consumer", handler, {
// Specific message ID to consume (optional)
messageId: "0-1",
// Message lock duration in seconds
// Default: 30, Min: 0, Max: 3600
visibilityTimeoutSeconds: 60,
// How often to refresh the lock during processing
// Default: visibilityTimeoutSeconds / 3
visibilityRefreshInterval: 15,
// Custom deserializer (default: JsonTransport)
transport: new JsonTransport(),
});Receive Options (Advanced)
await receive("my-topic", "my-consumer", handler, {
// Payload deserializer
// Default: JsonTransport
transport: new JsonTransport(),
// Message lock duration
// Default: 30, Min: 0, Max: 3600
visibilityTimeoutSeconds: 60,
// How often to refresh the lock during processing
// Default: visibilityTimeoutSeconds / 3
visibilityRefreshInterval: 20,
});handleCallback Options
export const POST = handleCallback(
{
"my-topic": {
"my-consumer": async (message, metadata) => {
await processMessage(message);
},
},
},
{
// Message lock duration for long-running handlers
// Default: 30, Min: 0, Max: 3600
// visibilityRefreshInterval defaults to visibilityTimeoutSeconds / 3
visibilityTimeoutSeconds: 300, // 5 minutes
},
);License
MIT