JSPM

  • Created
  • Published
  • Downloads 145654
  • Score
    100M100P100Q184419F
  • License MIT

A Node.js library for interacting with the Vercel Queue Service API

Package Exports

  • @vercel/queue

Readme

Vercel Queues

A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.

Features

  • Generic Payload Support: Send and receive any type of data with type safety
  • Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
  • Streaming Support: Handle large payloads without loading them entirely into memory
  • Pub/Sub Pattern: Topic-based messaging with consumer groups
  • Type Safety: Full TypeScript support with generic types
  • Automatic Retries: Built-in visibility timeout management

Installation

npm install @vercel/queue

Quick Start

For local development, you'll need to pull your Vercel environment variables (including the OIDC token):

# Install Vercel CLI if you haven't already
npm i -g vercel

# Pull environment variables from your Vercel project
vc env pull

Publishing and consuming messages on a queue

// index.ts
import { QueueClient, createTopic, JsonTransport } from "@vercel/queue";

// Create a client - automatically authenticated using the OIDC token
const client = QueueClient.fromVercelFunction();

// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>(
  client,
  "my-topic",
);

// Publish a message
await topic.publish({
  message: "Hello, World!",
  timestamp: Date.now(),
});

// Create a consumer group
const consumer = topic.consumerGroup("my-processors");

// Process messages continuously with cancellation support
const controller = new AbortController();

// Start processing (blocks until aborted or error)
try {
  await consumer.subscribe(controller.signal, async (message) => {
    console.log("Received:", message.payload.message);
    console.log("Timestamp:", new Date(message.payload.timestamp));
  });
} catch (error) {
  console.error("Processing stopped due to error:", error);
}

// Stop processing from elsewhere in your code
// controller.abort();

Run the script

# Using dotenv to load the OIDC token
dotenv -e .env.local node index.ts

Usage with Vercel

When deploying on Vercel, rather than having a persistent server subscribed to a queue, Vercel can trigger a callback route when a message is ready for consumption.

To demonstrate using queues on Vercel, let's use a Next.js app. You can use an existing app or create one using create-next-app.

TypeScript Configuration

Update your tsconfig.json to use "bundler" module resolution for proper package export resolution:

{
  "compilerOptions": {
    "moduleResolution": "bundler"
    // ... other options
  }
}

Publishing messages to a queue

Create a new server function to publish messages

// app/action.ts
"use server";

import { QueueClient, createTopic } from "@vercel/queue";

export async function publishTestMessage(message: string) {
  // Initialize a queue client
  const client = await QueueClient.fromVercelFunction();

  // Create a topic with JSON serialization (default)
  const topic = createTopic<{ message: string; timestamp: number }>(
    client,
    "my-topic",
  );

  // Publish the message
  const { messageId } = await topic.publish(
    { message, timestamp: Date.now() },
    {
      // Provide a callback URL to invoke a consumer when the message is ready to be processed
      callbacks: {
        webhook: {
          url: process.env.VERCEL_PROJECT_PRODUCTION_URL
            ? `https://${process.env.VERCEL_PROJECT_PRODUCTION_URL}/api/queue/handle`
            : "http://localhost:3000/api/queue/handle",
        },
      },
    },
  );

  console.log(`Published message ${messageId}`);
}

Now wire up the server function to your app

// app/some/page.tsx
"use client";
import { publishTestMessage } from "./actions";

export default function Button() {
  return (
    // ...
    <Button onClick={() => publishTestMessage("Hello world")} >
      Publish Test Message
    </a>
  );
}

Consuming the queue

Instead of running a persistent server that subscribes to the queue, we use the callback functionality of Vercel queues to consume messages on the fly, when a message is ready to be processed.

// app/api/queue/handle/route.ts
import { QueueClient, Topic, parseCallbackRequest } from "@vercel/queue";
import { NextRequest } from "next/server";

// Handle Vercel Queue callback requests
export async function POST(request: NextRequest) {
  try {
    // Parse the queue callback information
    const { queueName, consumerGroup, messageId } =
      parseCallbackRequest(request);

    // Create client
    const client = await QueueClient.fromVercelFunction();

    // Create topic and consumer group from the callback info
    const topic = new Topic(client, queueName);
    const cg = topic.consumerGroup(consumerGroup);

    // Process the message
    await cg.receiveMessage(messageId, async (message) => {
      const payload = message.payload as { message: string; timestamp: number };
      console.log(
        `Received message "${payload.message}" (Sent at: ${payload.timestamp})`,
      );
    });

    return Response.json({ status: "success" });
  } catch (error) {
    console.error("Webhook error:", error);
    return Response.json(
      { error: "Failed to process webhook" },
      { status: 500 },
    );
  }
}

![NOTE] A single webhook handle can be used to process messages across various queues and consumer groups. Use the values of queueName and consumerGroup from parseCallbackRequest() to dynamically handle different code paths:

if (queueName === "upload-queue") {
  processImageQueue(consumerGroup, message);
}
// ...
function processImageQueue(consumerGroup, message) {
  if (consumerGroup === "compress") {
    // handle image compression
  }
  // ...
}
// ...

We are building an SDK to make queues and workflow easier to use. Reach out if you're interested.

Key Features

Streaming Support

Handle large files and data streams without loading them into memory:

import { StreamTransport } from "@vercel/queue";

const videoTopic = createTopic<ReadableStream<Uint8Array>>(
  client,
  "video-processing",
  new StreamTransport(),
);

// Process large video files efficiently
const processor = videoTopic.consumerGroup("processors");
await processor.subscribe(signal, async (message) => {
  const videoStream = message.payload;
  // Process stream chunk by chunk
  const reader = videoStream.getReader();
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    await processChunk(value);
  }
});

Consumer Groups

Multiple consumers can process messages from the same topic in parallel:

// Multiple workers in the same group - they share/split messages
const worker1 = topic.consumerGroup("workers");
const worker2 = topic.consumerGroup("workers"); // Same group name
// worker1 and worker2 will receive different messages (load balancing)

// Different consumer groups - each gets copies of ALL messages
const analytics = topic.consumerGroup("analytics");
const webhooks = topic.consumerGroup("webhooks");
// analytics and webhooks will both receive every message

Architecture

  • Topics: Named message channels with configurable serialization
  • Consumer Groups: Named groups of consumers that process messages in parallel
    • subscribe(): Continuously process messages with automatic polling
    • receiveMessage(): Process a specific message by ID
    • receiveNextMessage(): Process the next available message (one-shot)
    • handleMessage(): Process message metadata only (without payload)
  • Transports: Pluggable serialization/deserialization for different data types
  • Streaming: Memory-efficient processing of large payloads
  • Visibility Timeouts: Automatic message lifecycle management

Performance

The multipart parser is optimized for high-throughput scenarios:

  • Streaming: Messages are yielded immediately as headers are parsed
  • Memory Efficient: No buffering of complete payloads
  • Fast Parsing: Native Buffer operations for ~50% performance improvement
  • Scalable: Can handle arbitrarily large responses without memory constraints

Serialization (Transport) System

The queue client supports customizable serialization through the Transport interface with streaming support for memory-efficient processing. Transport can be configured at the topic level when creating a topic, or at the consumer group level when creating a consumer group.

Built-in Transports

JsonTransport (Default)

Buffers data for JSON parsing - suitable for structured data that fits in memory.

import { JsonTransport, createTopic } from "@vercel/queue";

const topic = createTopic<{ data: any }>(
  client,
  "json-topic",
  new JsonTransport(),
);
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>(client, "json-topic");

BufferTransport

Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.

import { BufferTransport, createTopic } from "@vercel/queue";

const topic = createTopic<Buffer>(
  client,
  "binary-topic",
  new BufferTransport(),
);
const binaryData = Buffer.from("Binary data", "utf8");
await topic.publish(binaryData);

StreamTransport

True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.

import { StreamTransport, createTopic } from "@vercel/queue";

const topic = createTopic<ReadableStream<Uint8Array>>(
  client,
  "streaming-topic",
  new StreamTransport(),
);

// Send large file as stream without loading into memory
const fileStream = new ReadableStream<Uint8Array>({
  start(controller) {
    // Read file in chunks
    for (const chunk of readFileInChunks("large-file.bin")) {
      controller.enqueue(chunk);
    }
    controller.close();
  },
});

await topic.publish(fileStream);

Custom Transport

You can create your own serialization format by implementing the Transport interface:

import { Transport } from "@vercel/queue";

interface Transport<T = unknown> {
  serialize(value: T): Buffer | ReadableStream<Uint8Array>;
  deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
  contentType: string;
}

Choosing the Right Transport

Use Case Recommended Transport Memory Usage Performance
Small JSON objects JsonTransport Low High
Binary files < 100MB BufferTransport Medium High
Large files > 100MB StreamTransport Very Low Medium
Real-time data streams StreamTransport Very Low High
Custom protocols Custom implementation Varies Varies

API Reference

QueueClient

const client = new QueueClient({
  token: string;
  baseUrl?: string; // defaults to 'https://vqs.vercel.sh'
});

Topic

const topic = createTopic<T>(client, topicName, transport?);

// Publish a message (uses topic's transport)
await topic.publish(payload, options?);

// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);

ConsumerGroup

// Start continuous processing (blocks until signal is aborted or error occurs)
await consumer.subscribe(signal, handler, options?);

// Process a specific message by ID
await consumer.receiveMessage(messageId, handler);

// Process the next available message
await consumer.receiveNextMessage(handler);

// Handle a specific message by ID without payload
await consumer.handleMessage(messageId, handler);

Message Handler

// Handler function signature
type MessageHandler<T> = (
  message: Message<T>,
) => Promise<MessageHandlerResult> | MessageHandlerResult;

// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;

interface MessageTimeoutResult {
  timeoutSeconds: number; // seconds before message becomes available again
}

Transport Interface

interface Transport<T = unknown> {
  serialize(value: T): Buffer | ReadableStream<Uint8Array>;
  deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
  contentType: string;
}

Callback Utilities

// Parse queue callback request headers
function parseCallbackRequest(request: Request): CallbackMessageOptions;

// Callback options type
interface CallbackMessageOptions {
  queueName: string;
  consumerGroup: string;
  messageId: string;
}

// Error thrown for invalid callback requests
class InvalidCallbackError extends Error;

Examples

Basic JSON Processing

interface UserEvent {
  userId: string;
  action: string;
  timestamp: number;
}

const userTopic = createTopic<UserEvent>(client, "user-events");

await userTopic.publish({
  userId: "123",
  action: "login",
  timestamp: Date.now(),
});

const consumer = userTopic.consumerGroup("processors");
const controller = new AbortController();

try {
  await consumer.subscribe(controller.signal, async (message) => {
    console.log(
      `User ${message.payload.userId} performed ${message.payload.action}`,
    );
  });
} catch (error) {
  console.error("Processing error:", error);
}

// Stop processing when needed
// controller.abort();

Processing Specific Messages by ID

const userTopic = createTopic<{ userId: string; action: string }>(
  client,
  "user-events",
);
const consumer = userTopic.consumerGroup("processors");

// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";

try {
  await consumer.receiveMessage(messageId, async (message) => {
    console.log(`Processing specific message: ${message.messageId}`);
    console.log(
      `User ${message.payload.userId} performed ${message.payload.action}`,
    );
  });
  console.log("Message processed successfully");
} catch (error) {
  if (error.message.includes("not found or not available")) {
    console.log("Message was already processed or does not exist");
  } else if (error.message.includes("FIFO ordering violation")) {
    console.log("FIFO queue requires processing messages in order");
  } else {
    console.error("Error processing message:", error);
  }
}

Processing Next Available Message

const workTopic = createTopic<{ taskType: string; data: any }>(
  client,
  "work-queue",
);
const worker = workTopic.consumerGroup("workers");

// Process the next available message (one-shot processing)
try {
  await worker.receiveNextMessage(async (message) => {
    console.log(`Processing task: ${message.payload.taskType}`);
    await processTask(message.payload.taskType, message.payload.data);
  });
  console.log("Message processed successfully");
} catch (error) {
  if (error instanceof QueueEmptyError) {
    console.log("No messages available");
  } else if (error instanceof MessageLockedError) {
    console.log("Next message is locked (FIFO queue)");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
    }
  } else {
    console.error("Error processing message:", error);
  }
}

// You can also use it with timeout results
await worker.receiveNextMessage(async (message) => {
  if (!canProcessTaskType(message.payload.taskType)) {
    // Return timeout to retry later
    return { timeoutSeconds: 60 };
  }

  await processTask(message.payload.taskType, message.payload.data);
});

Timing Out Messages

const workTopic = createTopic<{ taskType: string; data: any }>(
  client,
  "work-queue",
);
const worker = workTopic.consumerGroup("workers");
const controller = new AbortController();

try {
  await worker.subscribe(controller.signal, async (message) => {
    const { taskType, data } = message.payload;

    // Check if we can process this task type right now
    if (taskType === "heavy-computation" && isSystemOverloaded()) {
      // Return timeout to retry later (5 minutes)
      return { timeoutSeconds: 300 };
    }

    // Check if we have required resources
    if (taskType === "external-api" && !isExternalServiceAvailable()) {
      // Return timeout to retry in 1 minute
      return { timeoutSeconds: 60 };
    }

    // Process the message normally
    console.log(`Processing ${taskType} task`);
    await processTask(taskType, data);
    // Message will be automatically deleted on successful completion
  });
} catch (error) {
  console.error("Worker processing error:", error);
}

// Example with exponential backoff
const backoffController = new AbortController();

try {
  await worker.subscribe(backoffController.signal, async (message) => {
    const maxRetries = 3;
    const deliveryCount = message.deliveryCount;

    try {
      await processMessage(message.payload);
      // Successful processing - message will be deleted
    } catch (error) {
      if (deliveryCount < maxRetries) {
        // Exponential backoff: 2^deliveryCount minutes
        const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
        console.log(
          `Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
        );
        return { timeoutSeconds: timeoutSeconds };
      } else {
        // Max retries reached, let the message fail and be deleted
        console.error("Max retries reached, message will be discarded:", error);
        throw error;
      }
    }
  });
} catch (error) {
  console.error("Backoff processing error:", error);
}

Complete Example: Video Processing Pipeline

Here's a comprehensive example showing a video processing pipeline that processes videos with FFmpeg and stores the results in Vercel Blob:

import { QueueClient, createTopic, StreamTransport } from "@vercel/queue";
import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static";
import { put } from "@vercel/blob";

const client = new QueueClient({
  token: "your-vercel-oidc-token",
});

// Input topic with unoptimized videos
const unoptimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
  client,
  "unoptimized-videos",
  new StreamTransport(),
);

// Output topic for optimized videos
const optimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
  client,
  "optimized-videos",
  new StreamTransport(),
);

// Step 1: Process videos with FFmpeg
const videoProcessor = unoptimizedVideosTopic.consumerGroup("processors");
const processingController = new AbortController();

try {
  await videoProcessor.subscribe(
    processingController.signal,
    async (message) => {
      const inputVideoStream = message.payload;
      console.log("Processing video...");

      if (!ffmpeg) {
        throw new Error("FFmpeg not available");
      }

      // Create optimized video stream using FFmpeg
      const optimizedStream = new ReadableStream<Uint8Array>({
        start(controller) {
          const ffmpegProcess = spawn(
            ffmpeg,
            [
              "-i",
              "pipe:0", // Input from stdin
              "-c:v",
              "libvpx-vp9", // Video codec
              "-c:a",
              "libopus", // Audio codec
              "-crf",
              "23", // Quality
              "-f",
              "webm", // Output format
              "pipe:1", // Output to stdout
            ],
            { stdio: ["pipe", "pipe", "pipe"] },
          );

          // Pipe input stream to FFmpeg
          const reader = inputVideoStream.getReader();
          const pipeInput = async () => {
            while (true) {
              const { done, value } = await reader.read();
              if (done) {
                ffmpegProcess.stdin?.end();
                break;
              }
              ffmpegProcess.stdin?.write(value);
            }
          };
          pipeInput();

          // Stream FFmpeg output
          ffmpegProcess.stdout?.on("data", (chunk) => {
            controller.enqueue(new Uint8Array(chunk));
          });

          ffmpegProcess.on("close", (code) => {
            if (code === 0) {
              controller.close();
            } else {
              controller.error(new Error(`FFmpeg failed with code ${code}`));
            }
          });
        },
      });

      // Publish optimized video to next topic
      await optimizedVideosTopic.publish(optimizedStream);
      console.log("Video optimized and published");
    },
  );
} catch (error) {
  console.error("Video processing error:", error);
}

// Step 2: Store optimized videos in Vercel Blob
const blobUploader = optimizedVideosTopic.consumerGroup("blob-uploaders");
const uploadController = new AbortController();

try {
  await blobUploader.subscribe(uploadController.signal, async (message) => {
    const optimizedVideo = message.payload;

    // Upload to Vercel Blob storage
    const filename = `optimized-${Date.now()}.webm`;
    const blob = await put(filename, optimizedVideo, {
      access: "public",
      contentType: "video/webm",
    });

    console.log(`Video uploaded to blob: ${blob.url} (${blob.size} bytes)`);
  });
} catch (error) {
  console.error("Blob upload error:", error);
}

// Graceful shutdown
process.on("SIGINT", () => {
  processingController.abort();
  uploadController.abort();
});

Error Handling

The queue client provides specific error types for different failure scenarios:

Error Types

  • QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)

    • Only thrown when directly using client.receiveMessages()
    • ConsumerGroup.subscribe() handles this error internally and continues polling
  • MessageLockedError: Thrown when a message is temporarily locked (423 status)

    • Contains optional retryAfter property with seconds to wait before retry
    • For receiveMessages() on FIFO queues: the next message in sequence is locked
    • For receiveMessageById(): the requested message is locked
    • ConsumerGroup.subscribe() handles this error internally when polling
  • MessageNotFoundError: Message doesn't exist (404 status)

  • MessageNotAvailableError: Message exists but isn't available for processing (409 status)

  • FifoOrderingViolationError: FIFO queue ordering violation (409 status with nextMessageId)

    • Contains nextMessageId property indicating which message to process first
  • FailedDependencyError: FIFO ordering violation when receiving by ID (424 status)

    • Contains nextMessageId property indicating which message must be processed first
    • Similar to FifoOrderingViolationError but specifically for receive-by-ID operations
  • MessageCorruptedError: Message data is corrupted or can't be parsed

  • BadRequestError: Invalid request parameters (400 status)

    • Invalid queue names, FIFO limit violations, missing required parameters
  • UnauthorizedError: Authentication failure (401 status)

    • Missing or invalid authentication token
  • ForbiddenError: Access denied (403 status)

    • Queue environment doesn't match token environment
  • InternalServerError: Server-side errors (500+ status codes)

    • Unexpected server errors, service unavailable, etc.

Error Handling Examples

import {
  QueueEmptyError,
  MessageLockedError,
  FifoOrderingViolationError,
  FailedDependencyError,
  BadRequestError,
  UnauthorizedError,
  ForbiddenError,
  InternalServerError,
} from "@vercel/queue";

// Handle empty queue or locked messages
try {
  for await (const message of client.receiveMessages(options, transport)) {
    // Process messages
  }
} catch (error) {
  if (error instanceof QueueEmptyError) {
    console.log("Queue is empty, retry later");
  } else if (error instanceof MessageLockedError) {
    console.log("Next message in FIFO queue is locked");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
    }
  }
}

// Handle locked message with retry
try {
  await consumer.receiveMessage(messageId, handler);
} catch (error) {
  if (error instanceof MessageLockedError) {
    console.log("Message is locked by another consumer");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
      setTimeout(() => retry(), error.retryAfter * 1000);
    }
  } else if (error instanceof FailedDependencyError) {
    // FIFO ordering violation for receive by ID
    console.log(`Must process ${error.nextMessageId} first`);
  }
}

// Handle authentication and authorization errors
try {
  await topic.publish(payload);
} catch (error) {
  if (error instanceof UnauthorizedError) {
    console.log("Invalid token - refresh authentication");
  } else if (error instanceof ForbiddenError) {
    console.log("Environment mismatch - check token/queue configuration");
  } else if (error instanceof BadRequestError) {
    console.log("Invalid parameters:", error.message);
  } else if (error instanceof InternalServerError) {
    console.log("Server error - retry with backoff");
  }
}

// Complete error handling pattern
function handleQueueError(error: unknown): void {
  if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
    // Transient errors - safe to retry
    console.log("Temporary condition, will retry");
  } else if (
    error instanceof UnauthorizedError ||
    error instanceof ForbiddenError
  ) {
    // Authentication/authorization errors - need to fix configuration
    console.log("Auth error - check credentials");
  } else if (error instanceof BadRequestError) {
    // Client error - fix the request
    console.log("Invalid request:", error.message);
  } else if (error instanceof InternalServerError) {
    // Server error - implement exponential backoff
    console.log("Server error - retry with backoff");
  } else {
    // Unknown error
    console.error("Unexpected error:", error);
  }
}

License

MIT