JSPM

  • Created
  • Published
  • Downloads 152497
  • Score
    100M100P100Q186465F
  • License MIT

A Node.js library for interacting with the Vercel Queue Service API

Package Exports

  • @vercel/queue

Readme

Vercel Queues

A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.

Features

  • Generic Payload Support: Send and receive any type of data with type safety
  • Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
  • Streaming Support: Handle large payloads without loading them entirely into memory
  • Pub/Sub Pattern: Topic-based messaging with consumer groups
  • Type Safety: Full TypeScript support with generic types
  • Automatic Retries: Built-in visibility timeout management

Installation

npm install @vercel/queue

Quick Start

For local development, you'll need to pull your Vercel environment variables (including the OIDC token):

# Install Vercel CLI if you haven't already
npm i -g vercel

# Pull environment variables from your Vercel project
vc env pull

Publishing and consuming messages on a queue

// index.ts
import { send, receive } from "@vercel/queue";

type Message = {
  message: string;
  timestamp: number;
};

// Option 1: Using the send and receive helpers (simplest)
// Automatically uses default client and JSON transport
await send<Message>("my-topic", {
  message: "Hello, World!",
  timestamp: Date.now(),
});

// Consume a single message off the queue
// (Often wrapped in a loop to keep polling messages off the queue)
await receive<Message>("my-topic", "my-consumer-group", (m) => {
  console.log(m.message);
});

// Option 2: Using createTopic for more control

import { createTopic } from "@vercel/queue";

// Create a topic with JSON serialization (default)
// Uses default QueueClient automatically authenticated from Vercel environment
const topic = createTopic<Message>("my-topic");

// Publish a message
await topic.publish({
  message: "Hello, World!",
  timestamp: Date.now(),
});

// Create a consumer group
const consumer = topic.consumerGroup("my-consumer-group");

// Process next available message (one-shot processing)
try {
  await consumer.consume(async (message, metadata) => {
    console.log("Received:", message.message);
    console.log("Timestamp:", new Date(message.timestamp));
    console.log("Message Metadata", metadata);
    // => { messageId, deliveryCount, timestamp }
  });
} catch (error) {
  console.error("Processing error:", error);
}

Usage with Vercel

When deploying on Vercel, rather than having a persistent server subscribed to a queue, Vercel can automatically trigger your API routes when messages are ready for consumption based on your vercel.json configuration.

To demonstrate using queues on Vercel, let's use a Next.js app. You can use an existing app or create one using create-next-app.

TypeScript Configuration

Update your tsconfig.json to use "bundler" module resolution for proper package export resolution:

{
  "compilerOptions": {
    "moduleResolution": "bundler"
    // ... other options
  }
}

Publishing messages to a queue

Create a new server function to publish messages

// app/actions.ts
"use server";

import { send } from "@vercel/queue";

export async function publishTestMessage(message: string) {
  // Option 1: Using simple send shorthand
  const { messageId } = await send(
    "my-topic",
    { message, timestamp: Date.now() },
    },
  );

  console.log(`Published message ${messageId}`);
}

// Option 2: Customize the topic, transport, consumer groups, etc.
import { createTopic } from "@vercel/queue";

export async function publishTestMessage(message: string) {
  // Create a topic with JSON serialization (default)
  const topic = createTopic<{ message: string; timestamp: number }>("my-topic");

  // Publish the message
  const { messageId } = await topic.publish(
    { message, timestamp: Date.now() },
  );

  console.log(`Published message ${messageId}`);
}

Now wire up the server function to your app

// app/some/page.tsx
"use client";
import { publishTestMessage } from "./actions";

export default function Page() {
  return (
    // ...
    <button onClick={() => publishTestMessage("Hello world")}>
      Publish Test Message
    </button>
  );
}

Consuming the queue

Messages are consumed using consumer groups, which provide load balancing and parallel processing capabilities.

Usage with Vercel

To consume queue messages in a Vercel deployment, you need to create (Next.js) API routes and configure them in your vercel.json file.

1. Create API Routes

Create API routes to handle incoming queue messages using the handleCallback helper:

// app/api/queue/handle/route.ts
import { handleCallback } from "@vercel/queue";

// Option 1: Single topic with multiple consumer groups
export const POST = handleCallback({
  "my-topic": {
    "consumer-group-1": async (message, metadata) => {
      console.log(`Consumer group 1 processing:`, message, metadata);
      // Handle consumer group 1 logic
      await processGroup1(message);
    },
    "consumer-group-2": async (message, metadata) => {
      console.log(`Consumer group 2 processing:`, message, metadata);
      // Handle consumer group 2 logic
      await processGroup2(message);
    },
  },
});

async function processGroup1(message: any) {
  // Consumer group 1 specific logic
}

async function processGroup2(message: any) {
  // Consumer group 2 specific logic
}
// Alternative: Multiple topics in one handler
export const POST = handleCallback({
  "user-events": {
    welcome: async (message, metadata) => {
      console.log(`New user event:`, message, metadata);
      await sendWelcomeEmail(message.email);
    },
  },
  "order-events": {
    fulfillment: async (order, metadata) => {
      console.log(`Processing order:`, order, metadata);
      await fulfillOrder(order);
    },
    analytics: async (order, metadata) => {
      console.log(`Tracking order:`, order, metadata);
      await trackOrder(order);
    },
  },
});

2. Configure vercel.json

Create a vercel.json file in your project root to declare which topics and consumer groups each API route handles:

{
  "functions": {
    "app/api/queue/handle/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v1beta",
          "topic": "my-topic",
          "consumer": "consumer-group-1"
        },
        {
          "type": "queue/v1beta",
          "topic": "my-topic",
          "consumer": "consumer-group-2"
        }
      ]
    }
  }
}

3. Multiple API Routes

You can also create separate API routes for different topics:

// app/api/queue/users/route.ts - Handle user events
import { handleCallback } from "@vercel/queue";

export const POST = handleCallback({
  "user-events": {
    processors: async (user, metadata) => {
      console.log(`Processing user event:`, user, metadata);
      await sendWelcomeEmail(user.email);
    },
  },
});
// app/api/queue/orders/route.ts - Handle order events
import { handleCallback } from "@vercel/queue";

export const POST = handleCallback({
  "order-events": {
    fulfillment: async (order, metadata) => {
      console.log(`Processing order:`, order, metadata);
      await fulfillOrder(order);
    },
  },
});

With corresponding vercel.json:

{
  "functions": {
    "app/api/queue/users/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v1beta",
          "topic": "user-events",
          "consumer": "processors"
        }
      ]
    },
    "app/api/queue/orders/route.ts": {
      "experimentalTriggers": [
        {
          "type": "queue/v1beta",
          "topic": "order-events",
          "consumer": "fulfillment"
        }
      ]
    }
  }
}

Key Points

  • Automatic Triggering: Vercel automatically triggers your API routes when messages are available for the configured topic/consumer combinations
  • Message Processing: Your API routes receive the message ID and other metadata via headers, then use the queue client to process the specific message
  • Configuration Required: The vercel.json file is essential - it tells Vercel which topics and consumers each route should handle
  • No Polling: Unlike traditional queue consumers, you don't need to poll for messages - Vercel handles the triggering automatically

Key Features

Consumer Groups

Multiple consumers can process messages from the same topic in parallel:

// Multiple workers in the same group - they share/split messages
const worker1 = topic.consumerGroup("workers");
const worker2 = topic.consumerGroup("workers"); // Same group name
// worker1 and worker2 will receive different messages (load balancing)

// Different consumer groups - each gets copies of ALL messages
const analytics = topic.consumerGroup("analytics");
const webhooks = topic.consumerGroup("webhooks");
// analytics and webhooks will both receive every message

Architecture

  • Topics: Named message channels with configurable serialization
  • Consumer Groups: Named groups of consumers that process messages in parallel
    • consume(): Process messages with flexible consumption patterns
      • No options: Process next available message
      • With messageId: Process specific message by ID
      • With skipPayload: true: Process message metadata only (without payload)
  • Transports: Pluggable serialization/deserialization for different data types
  • Streaming: Memory-efficient processing of large payloads
  • Visibility Timeouts: Automatic message lifecycle management

Performance

The multipart parser is optimized for high-throughput scenarios:

  • Streaming: Messages are yielded immediately as headers are parsed
  • Memory Efficient: No buffering of complete payloads
  • Fast Parsing: Native Buffer operations for ~50% performance improvement
  • Scalable: Can handle arbitrarily large responses without memory constraints

Serialization (Transport) System

The queue client supports customizable serialization through the Transport interface with streaming support for memory-efficient processing. Transport can be configured at the topic level when creating a topic, or at the consumer group level when creating a consumer group.

Built-in Transports

JsonTransport (Default)

Buffers data for JSON parsing - suitable for structured data that fits in memory.

import { createTopic, JsonTransport } from "@vercel/queue";

const topic = createTopic<{ data: any }>("json-topic", new JsonTransport());
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>("json-topic");

BufferTransport

Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.

StreamTransport

True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.

Custom Transport

You can create your own serialization format by implementing the Transport interface.

Choosing the Right Transport

Use Case Recommended Transport Memory Usage Performance
Small JSON objects JsonTransport Low High
Binary files < 100MB BufferTransport Medium High
Large files > 100MB StreamTransport Very Low Medium
Real-time data streams StreamTransport Very Low High
Custom protocols Custom implementation Varies Varies

API Reference

QueueClient

// Simple usage - automatically gets OIDC token from Vercel environment
const client = new QueueClient();

// Or with options
const client = new QueueClient({
  token?: string; // Optional - will auto-detect if not provided
  baseUrl?: string; // defaults to 'https://vqs.vercel.sh'
});

Topic

// Simple usage with default client
const topic = createTopic<T>(topicName, transport?);

// For custom client configuration, use Topic constructor directly
const customClient = new QueueClient({ baseUrl: "https://custom.vqs.vercel.sh" });
const topic = new Topic<T>(customClient, topicName, transport?);

// Publish a message (uses topic's transport)
await topic.publish(payload, options?);

// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);

Send (Shorthand)

// Simple send - automatically uses default client and JSON transport
await send<T>(topicName, payload);

// Send with options including custom transport
await send<T>(topicName, payload, {
  transport?: Transport<T>;
  idempotencyKey?: string;
  retentionSeconds?: number;
});

// Examples:
await send("notifications", { userId: "123", message: "Welcome!" });

await send("images", imageBuffer, {
  transport: new BufferTransport(),
});

await send("events", eventData, {
  idempotencyKey: "unique-key-123",
  retentionSeconds: 3600,
});

ConsumerGroup

// Process next available message (simplest form)
await consumer.consume(handler);

// Process specific message by ID with payload
await consumer.consume(handler, { messageId: "message-id" });

// Process specific message by ID without payload (metadata only)
// handler will be called with `undefined` as the payload
await consumer.consume(handler, { messageId: "message-id", skipPayload: true });

Message Handler

// Handler function signature
type MessageHandler<T = unknown> = (
  message: T,
  metadata: MessageMetadata,
) => Promise<MessageHandlerResult> | MessageHandlerResult;

// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;

interface MessageTimeoutResult {
  timeoutSeconds: number; // seconds before message becomes available again
}

// Message Metadata
interface MessageMetadata {
  messageId: string;
  deliveryCount: number;
  timestamp: string;
}

ConsumeOptions Interface

interface ConsumeOptions {
  messageId?: string; // Process specific message by ID
  skipPayload?: boolean; // Skip payload download (requires messageId)
}

Transport Interface

interface Transport<T = unknown> {
  serialize(value: T): Buffer | ReadableStream<Uint8Array>;
  deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
  contentType: string;
}

Callback Handler

// Create a callback handler for Next.js route handlers
function handleCallback(
  handlers: CallbackHandlers,
): (request: Request) => Promise<Response>;

// Configuration object with handlers for different topics and consumer groups
type CallbackHandlers = {
  [topicName: string]: { [consumerGroup: string]: MessageHandler };
};

// Example usage:
export const POST = handleCallback({
  "user-events": {
    welcome: (message, metadata) => {
      console.log(`New user event:`, message, metadata);
    },
  },

  // Multiple consumer groups per topic
  "image-processing": {
    compress: (message, metadata) => console.log("Compressing image", message),
    resize: (message, metadata) => console.log("Resizing image", message),
  },
});

Examples

Basic JSON Processing

interface UserEvent {
  userId: string;
  action: string;
  timestamp: number;
}

// Option 1: Using send shorthand
await send<UserEvent>("user-events", {
  userId: "123",
  action: "login",
  timestamp: Date.now(),
});

// Option 2: Using createTopic for consumers
const userTopic = createTopic<UserEvent>("user-events");

await userTopic.publish({
  userId: "123",
  action: "login",
  timestamp: Date.now(),
});

const consumer = userTopic.consumerGroup("processors");

// Process next available message
try {
  await consumer.consume(async (message) => {
    console.log(`User ${message.userId} performed ${message.action}`);
  });
} catch (error) {
  console.error("Processing error:", error);
}

Processing Specific Messages by ID

const userTopic = createTopic<{ userId: string; action: string }>(
  "user-events",
);
const consumer = userTopic.consumerGroup("processors");

// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";

try {
  await consumer.consume(
    async (message, { messageId }) => {
      console.log(`Processing specific message: ${messageId}`);
      console.log(`User ${message.userId} performed ${message.action}`);
    },
    { messageId },
  );
  console.log("Message processed successfully");
} catch (error) {
  if (error.message.includes("not found or not available")) {
    console.log("Message was already processed or does not exist");
  } else {
    console.error("Error processing message:", error);
  }
}

Processing Next Available Message

const workTopic = createTopic<{ taskType: string; data: any }>("work-queue");
const worker = workTopic.consumerGroup("workers");

// Process the next available message (one-shot processing)
try {
  await worker.consume(async (message) => {
    console.log(`Processing task: ${message.taskType}`);
    await processTask(message.taskType, message.data);
  });
  console.log("Message processed successfully");
} catch (error) {
  if (error instanceof QueueEmptyError) {
    console.log("No messages available");
  } else if (error instanceof MessageLockedError) {
    console.log("Next message is locked");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
    }
  } else {
    console.error("Error processing message:", error);
  }
}

// Handle conditional timeouts
await worker.consume(async (message) => {
  if (!canProcessTaskType(message.taskType)) {
    // Return timeout to retry later
    return { timeoutSeconds: 60 };
  }

  await processTask(message.taskType, message.data);
});

// Process specific message metadata only (no payload download)
await worker.consume(
  async (_, metadata) => {
    console.log(`Message ID: ${metadata.messageId}`);
    console.log(`Delivery count: ${metadata.deliveryCount}`);
    console.log(`Timestamp: ${metadata.timestamp}`);
    // _ is undefined - no payload was downloaded
  },
  { messageId: "specific-message-id", skipPayload: true },
);

Timing Out Messages

const workTopic = createTopic<{ taskType: string; data: any }>("work-queue");
const worker = workTopic.consumerGroup("workers");

// Process a message with conditional timeout
try {
  await worker.consume(async ({ taskType, data }) => {
    // Check if we can process this task type right now
    if (taskType === "heavy-computation" && isSystemOverloaded()) {
      // Return timeout to retry later (5 minutes)
      return { timeoutSeconds: 300 };
    }

    // Check if we have required resources
    if (taskType === "external-api" && !isExternalServiceAvailable()) {
      // Return timeout to retry in 1 minute
      return { timeoutSeconds: 60 };
    }

    // Process the message normally
    console.log(`Processing ${taskType} task`);
    await processTask(taskType, data);
    // Message will be automatically deleted on successful completion
  });
} catch (error) {
  console.error("Worker processing error:", error);
}

// Example with exponential backoff
try {
  await worker.consume(async (message, { deliveryCount }) => {
    const maxRetries = 3;

    try {
      await processMessage(message);
      // Successful processing - message will be deleted
    } catch (error) {
      if (deliveryCount < maxRetries) {
        // Exponential backoff: 2^deliveryCount minutes
        const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
        console.log(
          `Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
        );
        return { timeoutSeconds: timeoutSeconds };
      } else {
        // Max retries reached, let the message fail and be deleted
        console.error("Max retries reached, message will be discarded:", error);
        throw error;
      }
    }
  });
} catch (error) {
  console.error("Backoff processing error:", error);
}

Error Handling

The queue client provides specific error types for different failure scenarios:

Error Types

  • QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)

    • Thrown by consume() when no messages are available
    • Also thrown when directly using client.receiveMessages()
  • MessageLockedError: Thrown when a message is temporarily locked (423 status)

    • Contains optional retryAfter property with seconds to wait before retry
    • For consume() without options: the next message is locked
    • For consume() with messageId: the requested message is locked
  • MessageNotFoundError: Message doesn't exist (404 status)

  • MessageNotAvailableError: Message exists but isn't available for processing (409 status)

  • MessageCorruptedError: Message data is corrupted or can't be parsed

  • BadRequestError: Invalid request parameters (400 status)

    • Invalid queue names, missing required parameters
  • UnauthorizedError: Authentication failure (401 status)

    • Missing or invalid authentication token
  • ForbiddenError: Access denied (403 status)

    • Queue environment doesn't match token environment
  • InternalServerError: Server-side errors (500+ status codes)

    • Unexpected server errors, service unavailable, etc.

Error Handling Examples

import {
  BadRequestError,

  ForbiddenError,
  InternalServerError,
  MessageLockedError,
  QueueEmptyError,
  UnauthorizedError,
} from "@vercel/queue";

// Handle empty queue or locked messages
try {
  for await (const message of client.receiveMessages(options, transport)) {
    // Process messages
  }
} catch (error) {
  if (error instanceof QueueEmptyError) {
    console.log("Queue is empty, retry later");
  } else if (error instanceof MessageLockedError) {
    console.log("Next message is locked");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
    }
  }
}

// Handle locked message with retry
try {
  await consumer.consume(handler, { messageId });
} catch (error) {
  if (error instanceof MessageLockedError) {
    console.log("Message is locked by another consumer");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
      setTimeout(() => retry(), error.retryAfter * 1000);
    }

}

// Handle authentication and authorization errors
try {
  await topic.publish(payload);
} catch (error) {
  if (error instanceof UnauthorizedError) {
    console.log("Invalid token - refresh authentication");
  } else if (error instanceof ForbiddenError) {
    console.log("Environment mismatch - check token/queue configuration");
  } else if (error instanceof BadRequestError) {
    console.log("Invalid parameters:", error.message);
  } else if (error instanceof InternalServerError) {
    console.log("Server error - retry with backoff");
  }
}

// Complete error handling pattern
function handleQueueError(error: unknown): void {
  if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
    // Transient errors - safe to retry
    console.log("Temporary condition, will retry");
  } else if (
    error instanceof UnauthorizedError ||
    error instanceof ForbiddenError
  ) {
    // Authentication/authorization errors - need to fix configuration
    console.log("Auth error - check credentials");
  } else if (error instanceof BadRequestError) {
    // Client error - fix the request
    console.log("Invalid request:", error.message);
  } else if (error instanceof InternalServerError) {
    // Server error - implement exponential backoff
    console.log("Server error - retry with backoff");
  } else {
    // Unknown error
    console.error("Unexpected error:", error);
  }
}

License

MIT