JSPM

  • Created
  • Published
  • Downloads 296760
  • Score
    100M100P100Q205520F
  • License MIT

A Node.js library for interacting with the Vercel Queue Service API

Package Exports

  • @vercel/queue
  • @vercel/queue/nextjs/pages
  • @vercel/queue/web

Readme

Vercel Queues

A TypeScript client library for interacting with the Vercel Queue Service API, designed for seamless integration with Vercel deployments.

Features

  • Automatic Queue Triggering: Vercel automatically triggers your API routes when messages are ready
  • Next.js Integration: Built-in support for Next.js App Router and Pages Router
  • Generic Payload Support: Send and receive any type of data with type safety
  • Pub/Sub Pattern: Topic-based messaging with consumer groups
  • Type Safety: Full TypeScript support with generic types
  • Streaming Support: Handle large payloads efficiently
  • Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
  • Framework Adapters: Web API, Next.js App Router, and Pages Router support

Installation

npm install @vercel/queue

Quick Start

For local development, you'll need to set up your Vercel project:

# Install Vercel CLI if you haven't already
npm i -g vercel

# Link your project to Vercel
vc link

# Pull environment variables from your Vercel project
vc env pull

Local Development

Queues just work locally. After you have setup your Vercel project, when you send() messages in development mode, they automatically trigger your handlers locally - no external queue infrastructure needed.

The library reads your vercel.json configuration, discovers your queue handlers, and triggers them automatically when messages are sent.

Note: Local dev mode is enabled when NODE_ENV=development. Most frameworks (Next.js, etc.) set this automatically when running npm run dev.

Example Workflow

# Start your dev server
npm run dev

# Send messages - they process locally automatically!

Publishing Messages

The send function can be used anywhere in your codebase to publish messages to a queue:

import { send } from "@vercel/queue";

// Send a message to a topic
await send("my-topic", {
  message: "Hello world",
});

// With additional options
await send(
  "my-topic",
  {
    message: "Hello world",
  },
  {
    idempotencyKey: "unique-key", // Optional: prevent duplicate messages
    retentionSeconds: 3600, // Optional: override retention time (defaults to 24 hours)
    delaySeconds: 60, // Optional: delay message delivery by N seconds
  },
);

Example usage in an API route:

// app/api/send-message/route.ts
import { send } from "@vercel/queue";

export async function POST(request: Request) {
  const body = await request.json();

  const { messageId } = await send("my-topic", {
    message: body.message,
  });

  return Response.json({ messageId });
}

Consuming Messages

Messages are consumed using API routes that Vercel automatically triggers when messages are available.

1. Create API Routes

Web API (@vercel/queue/web)

The handleCallback from @vercel/queue/web returns a standard (Request) => Promise<Response> handler. It works with any framework that uses the Web API Request/Response types, including Next.js App Router, Hono, and others.

Next.js App Router:

// app/api/queue/my-topic/route.ts
import { handleCallback } from "@vercel/queue/web";

export const POST = handleCallback(async (message, metadata) => {
  // metadata includes: { messageId, deliveryCount, createdAt, topicName, consumerGroup }
  console.log("Processing message:", message);

  // If this throws an error, the message will be automatically retried
  await processMessage(message);
});

Hono:

import { Hono } from "hono";
import { handleCallback } from "@vercel/queue/web";

const app = new Hono();

app.post(
  "/api/queue",
  handleCallback(async (message, metadata) => {
    console.log("Processing:", message);
  }),
);

export default app;

For multiple topics/consumers, create separate route files:

// app/api/queue/orders/fulfillment/route.ts
import { handleCallback } from "@vercel/queue/web";

export const POST = handleCallback(async (order, metadata) => {
  await processOrder(order);
});
// app/api/queue/orders/analytics/route.ts
import { handleCallback } from "@vercel/queue/web";

export const POST = handleCallback(async (order, metadata) => {
  await trackOrder(order);
});
Pages Router (@vercel/queue/nextjs/pages)

For Next.js Pages Router, import from @vercel/queue/nextjs/pages. This returns a (req, res) => Promise<void> handler:

// pages/api/queue/my-topic.ts
import { handleCallback } from "@vercel/queue/nextjs/pages";

export default handleCallback(async (message, metadata) => {
  console.log("Processing message:", message);
  await processMessage(message);
});

2. Configure vercel.json

Configure which topics and consumers your API routes handle.

{
  "functions": {
    "app/api/queue/my-topic/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v2beta",
          "topic": "my-topic",
          "retryAfterSeconds": 60,
          "initialDelaySeconds": 0
        }
      ]
    },
    "app/api/queue/orders/fulfillment/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v2beta",
          "topic": "order-events"
        }
      ]
    },
    "app/api/queue/orders/analytics/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v2beta",
          "topic": "order-events",
          "retryAfterSeconds": 300
        }
      ]
    }
  }
}

Key Concepts

  • Topics: Named message channels that can have multiple consumer groups
  • Consumer Groups: Named groups of consumers that process messages in parallel
    • Different consumer groups for the same topic each get a copy of every message
    • Multiple consumers in the same group share/split messages for load balancing
  • Automatic Triggering: Vercel triggers your API routes when messages are available
  • Message Processing: Your API routes receive message metadata via headers
  • Configuration: The vercel.json file tells Vercel which routes handle which topics/consumers
  • Delivery Modes: The server uses CloudEvents binary content mode to deliver messages. For small messages, the full payload and receipt handle are pushed directly in the HTTP body and headers, avoiding an extra API fetch. For large messages, only the message ID is sent and the SDK fetches the payload.

Advanced Features

Custom Client Configuration

For custom configuration (tokens, headers, transport), create a QueueClient and pass it via options:

import { QueueClient, send } from "@vercel/queue";
import { handleCallback } from "@vercel/queue/web";

const client = new QueueClient({
  token: "my-token",
  headers: { "X-Custom": "header" },
});

// Send with custom client
await send("my-topic", { hello: "world" }, { client });

// Handle callbacks with custom client
export const POST = handleCallback(async (msg, meta) => console.log(msg), {
  client,
});

Core Handler (Framework Agnostic)

For custom framework integration, use the core handleCallback from @vercel/queue. It takes parsed request data and throws on errors:

import { handleCallback, parseRawCallback } from "@vercel/queue";

// In your framework handler:
const parsed = parseRawCallback(body, headers);
try {
  await handleCallback(async (msg, meta) => {
    console.log("Processing:", msg);
  }, parsed);
  // success
} catch (error) {
  // handle error → 500
}

Serialization (Transport) System

The queue client supports customizable serialization through the Transport interface:

Built-in Transports

  1. JsonTransport (Default): For structured data that fits in memory
  2. BufferTransport: For binary data that fits in memory
  3. StreamTransport: For large files and memory-efficient processing

Example:

import { send, JsonTransport } from "@vercel/queue";

// JsonTransport is the default
await send("json-topic", { data: "example" });

// Explicit transport configuration
await send(
  "json-topic",
  { data: "example" },
  { transport: new JsonTransport() },
);

// JsonTransport with custom serialization
const transport = new JsonTransport({
  replacer: (key, value) => (key === "password" ? undefined : value),
  reviver: (key, value) => (key === "date" ? new Date(value) : value),
});
await send("json-topic", { data: "example" }, { transport });

Transport Selection Guide

Use Case Recommended Transport Memory Usage Performance
Small JSON objects JsonTransport Low High
Binary data BufferTransport Medium High
Large payloads StreamTransport Very Low Medium
Real-time streams StreamTransport Very Low High

Handling Empty Queues

When no messages are available in the queue, the handler receives null for both the message and metadata parameters. This allows graceful handling without exceptions:

await receive("my-topic", "my-consumer", async (message, metadata) => {
  if (!message) {
    console.log("No message received - queue is empty");
    return;
  }

  // Process the message
  console.log("Processing:", message);
  console.log("Message ID:", metadata.messageId);
});

The same pattern works with handleCallback:

import { handleCallback } from "@vercel/queue/web";

export const POST = handleCallback(async (message, metadata) => {
  if (!message) {
    // No message available - handle gracefully
    return;
  }
  await processMessage(message);
});

Error Handling

The queue client provides specific error types:

  • MessageLockedError: Message is being processed by another consumer
  • MessageNotFoundError: Message doesn't exist or has expired
  • MessageNotAvailableError: Message exists but cannot be claimed
  • MessageAlreadyProcessedError: Message was already successfully processed
  • MessageCorruptedError: Message data could not be parsed
  • BadRequestError: Invalid request parameters
  • UnauthorizedError: Authentication failed (invalid or missing token)
  • ForbiddenError: Access denied (wrong environment or project)
  • DuplicateMessageError: Idempotency key was already used
  • ConsumerDiscoveryError: Could not reach the consumer deployment
  • ConsumerRegistryNotConfiguredError: Project not configured for queues
  • InternalServerError: Unexpected server error
  • InvalidLimitError: Batch limit outside valid range (1-10)

Example error handling:

import {
  BadRequestError,
  DuplicateMessageError,
  ForbiddenError,
  InternalServerError,
  UnauthorizedError,
} from "@vercel/queue";

try {
  await send("my-topic", payload);
} catch (error) {
  if (error instanceof UnauthorizedError) {
    console.log("Invalid token - refresh authentication");
  } else if (error instanceof ForbiddenError) {
    console.log("Environment mismatch - check configuration");
  } else if (error instanceof BadRequestError) {
    console.log("Invalid parameters:", error.message);
  } else if (error instanceof DuplicateMessageError) {
    console.log("Duplicate message:", error.idempotencyKey);
  } else if (error instanceof InternalServerError) {
    console.log("Server error - retry with backoff");
  }
}

Environment Variables

The following environment variables can be used to configure the queue client:

Variable Description Default
VERCEL_QUEUE_BASE_URL Override the queue service URL https://vercel-queue.com
VERCEL_QUEUE_BASE_PATH Override the API base path /api/v3/topic
VERCEL_QUEUE_DEBUG Enable debug logging (1 or true) -
VERCEL_DEPLOYMENT_ID Deployment ID (auto-set by Vercel) -

Advanced Usage

Direct Message Processing

Note: The receive function is for advanced use cases where you need direct message processing control outside of Vercel's automatic triggering.

import { receive } from "@vercel/queue";

// Process next available message (or null if queue is empty)
await receive<T>(topicName, consumerGroup, async (message, metadata) => {
  if (!message) {
    console.log("Queue is empty");
    return;
  }
  // Process message
});

// Batch processing: fetch up to 10 messages in one request
await receive<T>(topicName, consumerGroup, handler, {
  limit: 10, // Default: 1, Min: 1, Max: 10
});

// Process specific message by ID
await receive<T>(topicName, consumerGroup, handler, {
  messageId: "message-id",
});

// Note: limit and messageId are mutually exclusive options

// Handler function signature
// When queue is empty, both message and metadata are null
type MessageHandler<T = unknown> = (
  message: T | null,
  metadata: MessageMetadata | null,
) => Promise<void> | void;

// MessageMetadata type
interface MessageMetadata {
  messageId: string;
  deliveryCount: number;
  createdAt: Date;
  topicName: string;
  consumerGroup: string;
}

Service Limits & Constraints

Throughput & Storage

Limit Value Notes
Message throughput 10,000+ msg/sec/topic Scales horizontally
Payload size 1 GB Smaller messages have lower latency
Number of topics Unlimited No hard limit
Consumer groups per message ~4,000 Per-message limit
Messages per queue Unlimited No hard limit

Parameter Constraints

Publishing Messages

Parameter Default Min Max Notes
retentionSeconds 86,400 (24h) 60 86,400 Message TTL
delaySeconds 0 0 ≤ retention Cannot exceed retention
idempotencyKey Dedup window: min(retention, 24h)

Receiving Messages

Parameter Default Min Max Notes
visibilityTimeoutSeconds 30 0 3,600 0 = immediate re-visibility
limit 1 1 10 Messages per request

Visibility Extension

Constraint Value
visibilityTimeoutSeconds 0 - 3,600 seconds
Cannot extend beyond Message's original expiration time
Receipt handle Must match the receive operation

Identifier Formats

Identifier Pattern Example
Topic/Queue name [A-Za-z0-9_-]+ my-queue, task_queue_v2
Consumer group [A-Za-z0-9_-]+ worker-1, analytics_consumer
Message ID Opaque string 0-1, 3-7K9mNpQrS
Receipt handle Opaque string Used for delete/visibility ops

Content-Type Handling

Scenario Result
Client provides Content-Type Used as-is
No header, magic bytes detected Auto-detected MIME type
No header, detection fails application/octet-stream

Wildcard Topics

Topic patterns support wildcards for flexible routing:

{
  "functions": {
    "app/api/queue/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v2beta",
          "topic": "user-*"
        }
      ]
    }
  }
}

Wildcard Rules:

  • * may only appear once in the pattern
  • * must be at the end of the topic name
  • Valid: user-*, orders-*
  • Invalid: *-events, user-*-data

API Reference

Export Structure

Import Path handleCallback
@vercel/queue Core async function: (handler, parsed, opts?) => Promise<void>
@vercel/queue/web Returns (request: Request) => Promise<Response>
@vercel/queue/nextjs/pages Returns (req, res) => Promise<void>

Additional exports from @vercel/queue:

Export Description
parseCallback Parse a Web API Request into a ParsedCallbackRequest
parseRawCallback Parse a pre-parsed body + headers (e.g. Pages Router)
CLOUD_EVENT_TYPE_V2BETA "com.vercel.queue.v2beta" — binary CloudEvent type constant

QueueClient Configuration

import { QueueClient } from "@vercel/queue";

const client = new QueueClient({
  // Base URL for the queue service
  // Default: "https://vercel-queue.com"
  // Env: VERCEL_QUEUE_BASE_URL
  baseUrl: "https://vercel-queue.com",

  // API path prefix
  // Default: "/api/v3/topic"
  // Env: VERCEL_QUEUE_BASE_PATH
  basePath: "/api/v3/topic",

  // Auth token (auto-fetched via OIDC if not provided)
  token: "my-token",

  // Custom headers for all requests
  headers: { "X-Custom": "value" },

  // Deployment ID for message routing
  // Default: process.env.VERCEL_DEPLOYMENT_ID
  deploymentId: "dpl_xxx",

  // Pin messages to current deployment when publishing
  // Default: true
  pinToDeployment: true,
});

// Pass to any function via options
await send("my-topic", payload, { client });
export const POST = handleCallback(handler, { client });

Send Options

await send("my-topic", payload, {
  // Deduplication key
  // Dedup window: min(retentionSeconds, 24 hours)
  idempotencyKey: "unique-key",

  // Message TTL in seconds
  // Default: 86400, Min: 60, Max: 86400
  retentionSeconds: 3600,

  // Delay before message becomes visible
  // Default: 0, Min: 0, Max: retentionSeconds
  delaySeconds: 60,

  // Custom serializer (default: JsonTransport)
  transport: new JsonTransport(),
});

Receive Options

The receive function supports two mutually exclusive modes:

// Batch mode: receive multiple messages
await receive("my-topic", "my-consumer", handler, {
  // Maximum messages to retrieve in a single request
  // Default: 1, Min: 1, Max: 10
  limit: 10,

  // Message lock duration in seconds
  // Default: 300, Min: 30, Max: 3600
  visibilityTimeoutSeconds: 60,
});

// By-ID mode: receive a specific message
await receive("my-topic", "my-consumer", handler, {
  // Specific message ID to consume
  messageId: "0-1",

  // Message lock duration in seconds
  // Default: 300, Min: 30, Max: 3600
  visibilityTimeoutSeconds: 60,
});

Note: limit and messageId cannot be used together - they are mutually exclusive options.

handleCallback Options

import { handleCallback } from "@vercel/queue/web";

export const POST = handleCallback(
  async (message, metadata) => {
    await processMessage(message);
  },
  {
    // Message lock duration for long-running handlers
    // Default: 300, Min: 30, Max: 3600
    visibilityTimeoutSeconds: 300, // 5 minutes
  },
);

Core handleCallback

The core handleCallback is an async function that takes already-parsed request data. Use it to build custom framework integrations:

import { handleCallback, parseCallback, parseRawCallback } from "@vercel/queue";

// Web API Request
const parsed = await parseCallback(request);

// Or, for frameworks that pre-parse the body (e.g. Pages Router)
const parsed = parseRawCallback(req.body, req.headers);

try {
  await handleCallback(handler, parsed);
} catch (error) {
  // handle error → 500
}

License

MIT