JSPM

  • Created
  • Published
  • Downloads 296760
  • Score
    100M100P100Q205601F
  • License MIT

A Node.js library for interacting with the Vercel Queue Service API

Package Exports

  • @vercel/queue
  • @vercel/queue/nextjs/pages

Readme

Vercel Queues

A TypeScript client library for interacting with the Vercel Queue Service API, designed for seamless integration with Vercel deployments.

Features

  • Automatic Queue Triggering: Vercel automatically triggers your API routes when messages are ready
  • Next.js Integration: Built-in support for Next.js API routes and Server Actions
  • Generic Payload Support: Send and receive any type of data with type safety
  • Pub/Sub Pattern: Topic-based messaging with consumer groups
  • Type Safety: Full TypeScript support with generic types
  • Streaming Support: Handle large payloads efficiently
  • Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own

Installation

npm install @vercel/queue

Quick Start

For local development, you'll need to set up your Vercel project:

# Install Vercel CLI if you haven't already
npm i -g vercel

# Link your project to Vercel
vc link

# Pull environment variables from your Vercel project
vc env pull

Local Development

Queues just work locally. After you have setup your Vercel project, when you send() messages in development mode, they automatically trigger your handlers locally - no external queue infrastructure needed.

The library reads your vercel.json configuration, discovers your queue handlers, and triggers them automatically when messages are sent.

Example Workflow

# Start your dev server
npm run dev

# Send messages - they process locally automatically!

TypeScript Configuration

Update your tsconfig.json to use "bundler" module resolution for proper package export resolution:

{
  "compilerOptions": {
    "moduleResolution": "bundler"
  }
}

Publishing Messages

The send function can be used anywhere in your codebase to publish messages to a queue:

import { send } from "@vercel/queue";

// Send a message to a topic
await send("my-topic", {
  message: "Hello world",
});

// With additional options
await send(
  "my-topic",
  {
    message: "Hello world",
  },
  {
    idempotencyKey: "unique-key", // Optional: prevent duplicate messages
    retentionSeconds: 3600, // Optional: override retention time (defaults to 24 hours)
    delaySeconds: 60, // Optional: delay message delivery by N seconds
  },
);

Example usage in an API route:

// app/api/send-message/route.ts
import { send } from "@vercel/queue";

export async function POST(request: Request) {
  const body = await request.json();

  const { messageId } = await send("my-topic", {
    message: body.message,
  });

  return Response.json({ messageId });
}

Consuming Messages

Messages are consumed using API routes that Vercel automatically triggers when messages are available.

1. Create API Routes

The recommended approach is to handle multiple topics and consumers in a single API route to keep your vercel.json configuration simple:

// app/api/queue/route.ts
import { handleCallback } from "@vercel/queue";

export const POST = handleCallback({
  // Single topic with one consumer
  "my-topic": {
    "my-consumer": async (message, metadata) => {
      // metadata includes: { messageId, deliveryCount, createdAt, topicName, consumerGroup }
      console.log("Processing message:", message);

      // If this throws an error, the message will be automatically retried
      await processMessage(message);
    },
  },

  // Multiple consumers for different purposes
  "order-events": {
    fulfillment: async (order, metadata) => {
      await processOrder(order);
    },
    analytics: async (order, metadata) => {
      await trackOrder(order);
    },
  },
});

While you can split handlers into separate routes if needed (e.g., for code organization or deployment flexibility), consolidating them in one route is recommended for simpler configuration.

Pages Router

For Next.js Pages Router, import from @vercel/queue/nextjs/pages to get a handler compatible with the Pages Router API (NextApiRequest/NextApiResponse):

// pages/api/queue.ts
import { handleCallback } from "@vercel/queue/nextjs/pages";

export default handleCallback({
  "my-topic": {
    "my-consumer": async (message, metadata) => {
      console.log("Processing message:", message);
      await processMessage(message);
    },
  },
  "order-events": {
    fulfillment: async (order, metadata) => {
      await processOrder(order);
    },
    analytics: async (order, metadata) => {
      await trackOrder(order);
    },
  },
});

The /nextjs/pages subpath export automatically adapts the handler to work with the Pages Router API.

2. Configure vercel.json

Configure which topics and consumers your API route handles.

For App Router:

{
  "functions": {
    "app/api/queue/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v1beta",
          "topic": "my-topic",
          "consumer": "my-consumer",
          "retryAfterSeconds": 60,
          "initialDelaySeconds": 0
        },
        {
          "type": "queue/v1beta",
          "topic": "order-events",
          "consumer": "fulfillment"
        },
        {
          "type": "queue/v1beta",
          "topic": "order-events",
          "consumer": "analytics",
          "retryAfterSeconds": 300
        }
      ]
    }
  }
}

For Pages Router:

{
  "functions": {
    "pages/api/queue.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v1beta",
          "topic": "my-topic",
          "consumer": "my-consumer",
          "retryAfterSeconds": 60,
          "initialDelaySeconds": 0
        },
        {
          "type": "queue/v1beta",
          "topic": "order-events",
          "consumer": "fulfillment"
        },
        {
          "type": "queue/v1beta",
          "topic": "order-events",
          "consumer": "analytics",
          "retryAfterSeconds": 300
        }
      ]
    }
  }
}

Key Concepts

  • Topics: Named message channels that can have multiple consumer groups
  • Consumer Groups: Named groups of consumers that process messages in parallel
    • Different consumer groups for the same topic each get a copy of every message
    • Multiple consumers in the same group share/split messages for load balancing
  • Automatic Triggering: Vercel triggers your API routes when messages are available
  • Message Processing: Your API routes receive message metadata via headers
  • Configuration: The vercel.json file tells Vercel which routes handle which topics/consumers

Advanced Features

Client Class

For custom configuration (tokens, headers, etc.), use the Client class:

import { Client } from "@vercel/queue";

const client = new Client({
  token: "my-token", // Optional: custom auth token
  headers: { "X-Custom": "header" }, // Optional: custom headers
  pinToDeployment: false, // Optional: disable deployment pinning (default: true)
});

// Send a message
await client.send("my-topic", { hello: "world" });

// Handle callbacks using the same client
export const POST = client.handleCallback({
  "my-topic": {
    "my-group": async (msg, meta) => console.log(msg),
  },
});

Parsing Callback Requests

For custom webhook handling, use parseCallback to extract queue information from CloudEvent requests:

import { parseCallback } from "@vercel/queue";

export async function POST(request: Request) {
  const { queueName, consumerGroup, messageId } = await parseCallback(request);

  // Use the parsed information for custom processing...
  await myWorkflow.handleWebhook(queueName, consumerGroup, messageId);

  return Response.json({ status: "success" });
}

Serialization (Transport) System

The queue client supports customizable serialization through the Transport interface:

Built-in Transports

  1. JsonTransport (Default): For structured data that fits in memory
  2. BufferTransport: For binary data that fits in memory
  3. StreamTransport: For large files and memory-efficient processing

Example:

import { send, JsonTransport } from "@vercel/queue";

// JsonTransport is the default
await send("json-topic", { data: "example" });

// Explicit transport configuration
await send(
  "json-topic",
  { data: "example" },
  { transport: new JsonTransport() },
);

// JsonTransport with custom serialization
const transport = new JsonTransport({
  replacer: (key, value) => (key === "password" ? undefined : value),
  reviver: (key, value) => (key === "date" ? new Date(value) : value),
});
await send("json-topic", { data: "example" }, { transport });

Transport Selection Guide

Use Case Recommended Transport Memory Usage Performance
Small JSON objects JsonTransport Low High
Binary data BufferTransport Medium High
Large payloads StreamTransport Very Low Medium
Real-time streams StreamTransport Very Low High

Error Handling

The queue client provides specific error types:

  • QueueEmptyError: No messages available in the queue
  • MessageLockedError: Message is being processed by another consumer
  • MessageNotFoundError: Message doesn't exist or has expired
  • MessageNotAvailableError: Message exists but cannot be claimed
  • MessageAlreadyProcessedError: Message was already successfully processed
  • MessageCorruptedError: Message data could not be parsed
  • BadRequestError: Invalid request parameters
  • UnauthorizedError: Authentication failed (invalid or missing token)
  • ForbiddenError: Access denied (wrong environment or project)
  • DuplicateMessageError: Idempotency key was already used
  • ConcurrencyLimitError: Too many in-flight messages
  • ConsumerDiscoveryError: Could not reach the consumer deployment
  • ConsumerRegistryNotConfiguredError: Project not configured for queues
  • InternalServerError: Unexpected server error
  • InvalidLimitError: Batch limit outside valid range (1-10)

Example error handling:

import {
  BadRequestError,
  ConcurrencyLimitError,
  DuplicateMessageError,
  ForbiddenError,
  InternalServerError,
  UnauthorizedError,
} from "@vercel/queue";

try {
  await send("my-topic", payload);
} catch (error) {
  if (error instanceof UnauthorizedError) {
    console.log("Invalid token - refresh authentication");
  } else if (error instanceof ForbiddenError) {
    console.log("Environment mismatch - check configuration");
  } else if (error instanceof BadRequestError) {
    console.log("Invalid parameters:", error.message);
  } else if (error instanceof DuplicateMessageError) {
    console.log("Duplicate message:", error.idempotencyKey);
  } else if (error instanceof ConcurrencyLimitError) {
    console.log(
      "Rate limited:",
      error.currentInflight,
      "/",
      error.maxConcurrency,
    );
  } else if (error instanceof InternalServerError) {
    console.log("Server error - retry with backoff");
  }
}

Environment Variables

The following environment variables can be used to configure the queue client:

Variable Description Default
VERCEL_QUEUE_BASE_URL Override the queue service URL https://vercel-queue.com
VERCEL_QUEUE_BASE_PATH Override the API base path /api/v3/topic
VERCEL_QUEUE_DEBUG Enable debug logging (1 or true) -
VERCEL_DEPLOYMENT_ID Deployment ID (auto-set by Vercel) -

Advanced Usage

Direct Message Processing

Note: The receive function is for advanced use cases where you need direct message processing control outside of Vercel's automatic triggering.

import { receive } from "@vercel/queue";

// Process next available message
await receive<T>(topicName, consumerGroup, handler);

// Process specific message by ID
await receive<T>(topicName, consumerGroup, handler, {
  messageId: "message-id",
});

// Process message with options
await receive<T>(topicName, consumerGroup, handler, {
  messageId: "message-id", // Optional: process specific message by ID
  transport: new JsonTransport(), // Optional: custom transport (defaults to JsonTransport)
  visibilityTimeoutSeconds: 30, // Optional: message visibility timeout
  visibilityRefreshInterval: 10, // Optional: how often to refresh the lock
});

// Handler function signature
type MessageHandler<T = unknown> = (
  message: T,
  metadata: MessageMetadata,
) => Promise<void> | void;

// MessageMetadata type
interface MessageMetadata {
  messageId: string;
  deliveryCount: number;
  createdAt: Date;
  topicName: string;
  consumerGroup: string;
}

Service Limits & Constraints

Throughput & Storage

Limit Value Notes
Message throughput 10,000s msg/sec/topic Scales horizontally
Payload size 1 GB Smaller messages have lower latency
Number of topics Unlimited No hard limit
Consumer groups per message ~4,000 Per-message limit
Messages per queue Unlimited No hard limit

Parameter Constraints

Publishing Messages

Parameter Default Min Max Notes
retentionSeconds 86,400 (24h) 60 86,400 Message TTL
delaySeconds 0 0 ≤ retention Cannot exceed retention
idempotencyKey Dedup window: min(retention, 24h)

Receiving Messages

Parameter Default Min Max Notes
visibilityTimeoutSeconds 30 0 3,600 0 = immediate re-visibility
limit 1 1 10 Messages per request
maxConcurrency unlimited 1 10,000 In-flight message limit

Visibility Extension

Constraint Value
visibilityTimeoutSeconds 0 - 3,600 seconds
Cannot extend beyond Message's original expiration time
Receipt handle Must match the receive operation

Identifier Formats

Identifier Pattern Example
Topic/Queue name [A-Za-z0-9_-]+ my-queue, task_queue_v2
Consumer group [A-Za-z0-9_-]+ worker-1, analytics_consumer
Message ID Opaque string 0-1, 3-7K9mNpQrS
Receipt handle Opaque string Used for delete/visibility ops

Content-Type Handling

Scenario Result
Client provides Content-Type Used as-is
No header, magic bytes detected Auto-detected MIME type
No header, detection fails application/octet-stream

Wildcard Topics

Topic patterns support wildcards for flexible routing:

{
  "functions": {
    "app/api/queue/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v1beta",
          "topic": "user-*",
          "consumer": "processor"
        }
      ]
    }
  }
}

Wildcard Rules:

  • * may only appear once in the pattern
  • * must be at the end of the topic name
  • Valid: user-*, orders-*
  • Invalid: *-events, user-*-data

API Reference

Client Configuration

import { Client } from "@vercel/queue";

const client = new Client({
  // Base URL for the queue service
  // Default: "https://vercel-queue.com"
  // Env: VERCEL_QUEUE_BASE_URL
  baseUrl: "https://vercel-queue.com",

  // API path prefix
  // Default: "/api/v3/topic"
  // Env: VERCEL_QUEUE_BASE_PATH
  basePath: "/api/v3/topic",

  // Auth token (auto-fetched via OIDC if not provided)
  token: "my-token",

  // Custom headers for all requests
  headers: { "X-Custom": "value" },

  // Deployment ID for message routing
  // Default: process.env.VERCEL_DEPLOYMENT_ID
  deploymentId: "dpl_xxx",

  // Pin messages to current deployment when publishing
  // Default: true
  pinToDeployment: true,
});

Send Options

await send("my-topic", payload, {
  // Deduplication key
  // Dedup window: min(retentionSeconds, 24 hours)
  idempotencyKey: "unique-key",

  // Message TTL in seconds
  // Default: 86400, Min: 60, Max: 86400
  retentionSeconds: 3600,

  // Delay before message becomes visible
  // Default: 0, Min: 0, Max: retentionSeconds
  delaySeconds: 60,

  // Custom serializer (default: JsonTransport)
  transport: new JsonTransport(),
});

Receive Options

await receive("my-topic", "my-consumer", handler, {
  // Specific message ID to consume (optional)
  messageId: "0-1",

  // Message lock duration in seconds
  // Default: 30, Min: 0, Max: 3600
  visibilityTimeoutSeconds: 60,

  // How often to refresh the lock during processing
  // Default: visibilityTimeoutSeconds / 3
  visibilityRefreshInterval: 15,

  // Custom deserializer (default: JsonTransport)
  transport: new JsonTransport(),
});

Receive Options (Advanced)

await receive("my-topic", "my-consumer", handler, {
  // Payload deserializer
  // Default: JsonTransport
  transport: new JsonTransport(),

  // Message lock duration
  // Default: 30, Min: 0, Max: 3600
  visibilityTimeoutSeconds: 60,

  // How often to refresh the lock during processing
  // Default: visibilityTimeoutSeconds / 3
  visibilityRefreshInterval: 20,
});

handleCallback Options

export const POST = handleCallback(
  {
    "my-topic": {
      "my-consumer": async (message, metadata) => {
        await processMessage(message);
      },
    },
  },
  {
    // Message lock duration for long-running handlers
    // Default: 30, Min: 0, Max: 3600
    // visibilityRefreshInterval defaults to visibilityTimeoutSeconds / 3
    visibilityTimeoutSeconds: 300, // 5 minutes
  },
);

License

MIT