JSPM

  • Created
  • Published
  • Downloads 145654
  • Score
    100M100P100Q184075F
  • License MIT

A Node.js library for interacting with the Vercel Queue Service API

Package Exports

  • @vercel/queue

Readme

VQS - Vercel Queue Service Client

A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.

Features

  • Generic Payload Support: Send and receive any type of data with type safety
  • Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
  • Streaming Support: Handle large payloads without loading them entirely into memory
  • Pub/Sub Pattern: Topic-based messaging with consumer groups
  • Type Safety: Full TypeScript support with generic types
  • Automatic Retries: Built-in visibility timeout management

Installation

npm install @vercel/queue

Quick Start

import { VQSClient, createTopic, JsonTransport } from "@vercel/queue";

const client = new VQSClient({
  token: "your-vercel-oidc-token",
});

// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>(
  client,
  "my-topic",
);

// Publish a message
await topic.publish({
  message: "Hello, World!",
  timestamp: Date.now(),
});

// Create a consumer group
const consumer = topic.consumerGroup("my-processors");

// Process messages continuously with cancellation support
const controller = new AbortController();

// Start processing (blocks until aborted or error)
try {
  await consumer.subscribe(controller.signal, async (message) => {
    console.log("Received:", message.payload.message);
    console.log("Timestamp:", new Date(message.payload.timestamp));
  });
} catch (error) {
  console.error("Processing stopped due to error:", error);
}

// Stop processing from elsewhere in your code
// controller.abort();

Serialization (Transport) System

The VQS client supports customizable serialization through the Transport interface with streaming support for memory-efficient processing. Transport can be configured at the topic level when creating a topic, or at the consumer group level when creating a consumer group.

Built-in Transports

JsonTransport (Default)

Buffers data for JSON parsing - suitable for structured data that fits in memory.

import { JsonTransport, createTopic } from "@vercel/queue";

const topic = createTopic<{ data: any }>(
  client,
  "json-topic",
  new JsonTransport(),
);
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>(client, "json-topic");

BufferTransport

Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.

import { BufferTransport, createTopic } from "@vercel/queue";

const topic = createTopic<Buffer>(
  client,
  "binary-topic",
  new BufferTransport(),
);
const binaryData = Buffer.from("Binary data", "utf8");
await topic.publish(binaryData);

StreamTransport

True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.

import { StreamTransport, createTopic } from "@vercel/queue";

const topic = createTopic<ReadableStream<Uint8Array>>(
  client,
  "streaming-topic",
  new StreamTransport(),
);

// Send large file as stream without loading into memory
const fileStream = new ReadableStream<Uint8Array>({
  start(controller) {
    // Read file in chunks
    for (const chunk of readFileInChunks("large-file.bin")) {
      controller.enqueue(chunk);
    }
    controller.close();
  },
});

await topic.publish(fileStream);

Custom Transport

You can create your own serialization format by implementing the Transport interface:

import { Transport } from "@vercel/queue";

interface Transport<T = unknown> {
  serialize(value: T): Buffer | ReadableStream<Uint8Array>;
  deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
  contentType: string;
}

Choosing the Right Transport

Use Case Recommended Transport Memory Usage Performance
Small JSON objects JsonTransport Low High
Binary files < 100MB BufferTransport Medium High
Large files > 100MB StreamTransport Very Low Medium
Real-time data streams StreamTransport Very Low High
Custom protocols Custom implementation Varies Varies

Complete Example: Video Processing Pipeline

Here's a comprehensive example showing a video processing pipeline that processes videos with FFmpeg and stores the results in Vercel Blob:

import { VQSClient, createTopic, StreamTransport } from "@vercel/queue";
import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static";
import { put } from "@vercel/blob";

const client = new VQSClient({
  token: process.env.VQS_TOKEN!,
});

// Input topic with unoptimized videos
const unoptimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
  client,
  "unoptimized-videos",
  new StreamTransport(),
);

// Output topic for optimized videos
const optimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
  client,
  "optimized-videos",
  new StreamTransport(),
);

// Step 1: Process videos with FFmpeg
const videoProcessor = unoptimizedVideosTopic.consumerGroup("processors");
const processingController = new AbortController();

try {
  await videoProcessor.subscribe(
    processingController.signal,
    async (message) => {
      const inputVideoStream = message.payload;
      console.log("Processing video...");

      if (!ffmpeg) {
        throw new Error("FFmpeg not available");
      }

      // Create optimized video stream using FFmpeg
      const optimizedStream = new ReadableStream<Uint8Array>({
        start(controller) {
          const ffmpegProcess = spawn(
            ffmpeg,
            [
              "-i",
              "pipe:0", // Input from stdin
              "-c:v",
              "libvpx-vp9", // Video codec
              "-c:a",
              "libopus", // Audio codec
              "-crf",
              "23", // Quality
              "-f",
              "webm", // Output format
              "pipe:1", // Output to stdout
            ],
            { stdio: ["pipe", "pipe", "pipe"] },
          );

          // Pipe input stream to FFmpeg
          const reader = inputVideoStream.getReader();
          const pipeInput = async () => {
            while (true) {
              const { done, value } = await reader.read();
              if (done) {
                ffmpegProcess.stdin?.end();
                break;
              }
              ffmpegProcess.stdin?.write(value);
            }
          };
          pipeInput();

          // Stream FFmpeg output
          ffmpegProcess.stdout?.on("data", (chunk) => {
            controller.enqueue(new Uint8Array(chunk));
          });

          ffmpegProcess.on("close", (code) => {
            if (code === 0) {
              controller.close();
            } else {
              controller.error(new Error(`FFmpeg failed with code ${code}`));
            }
          });
        },
      });

      // Publish optimized video to next topic
      await optimizedVideosTopic.publish(optimizedStream);
      console.log("Video optimized and published");
    },
  );
} catch (error) {
  console.error("Video processing error:", error);
}

// Step 2: Store optimized videos in Vercel Blob
const blobUploader = optimizedVideosTopic.consumerGroup("blob-uploaders");
const uploadController = new AbortController();

try {
  await blobUploader.subscribe(uploadController.signal, async (message) => {
    const optimizedVideo = message.payload;

    // Upload to Vercel Blob storage
    const filename = `optimized-${Date.now()}.webm`;
    const blob = await put(filename, optimizedVideo, {
      access: "public",
      contentType: "video/webm",
    });

    console.log(`Video uploaded to blob: ${blob.url} (${blob.size} bytes)`);
  });
} catch (error) {
  console.error("Blob upload error:", error);
}

// Graceful shutdown
process.on("SIGINT", () => {
  processingController.abort();
  uploadController.abort();
});

API Reference

VQSClient

const client = new VQSClient({
  token: string;
  baseUrl?: string; // defaults to 'https://@vercel/queue.vercel.sh'
});

Topic

const topic = createTopic<T>(client, topicName, transport?);

// Publish a message (uses topic's transport)
await topic.publish(payload, options?);

// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);

ConsumerGroup

// Start continuous processing (blocks until signal is aborted or error occurs)
await consumer.subscribe(signal, handler, options?);

// Process a specific message by ID
await consumer.receiveMessage(messageId, handler);

// Process the next available message
await consumer.receiveNextMessage(handler);

// Handle a specific message by ID without payload
await consumer.handleMessage(messageId, handler);

Message Handler

// Handler function signature
type MessageHandler<T> = (
  message: Message<T>,
) => Promise<MessageHandlerResult> | MessageHandlerResult;

// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;

interface MessageTimeoutResult {
  timeoutSeconds: number; // seconds before message becomes available again
}

Transport Interface

interface Transport<T = unknown> {
  serialize(value: T): Buffer | ReadableStream<Uint8Array>;
  deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
  contentType: string;
}

Callback Utilities

// Parse VQS callback request headers
function parseCallbackRequest(request: Request): CallbackMessageOptions;

// Callback options type
interface CallbackMessageOptions {
  queueName: string;
  consumerGroup: string;
  messageId: string;
}

// Error thrown for invalid callback requests
class InvalidCallbackError extends Error;

Examples

Basic JSON Processing

interface UserEvent {
  userId: string;
  action: string;
  timestamp: number;
}

const userTopic = createTopic<UserEvent>(client, "user-events");

await userTopic.publish({
  userId: "123",
  action: "login",
  timestamp: Date.now(),
});

const consumer = userTopic.consumerGroup("processors");
const controller = new AbortController();

try {
  await consumer.subscribe(controller.signal, async (message) => {
    console.log(
      `User ${message.payload.userId} performed ${message.payload.action}`,
    );
  });
} catch (error) {
  console.error("Processing error:", error);
}

// Stop processing when needed
// controller.abort();

Processing Specific Messages by ID

const userTopic = createTopic<{ userId: string; action: string }>(
  client,
  "user-events",
);
const consumer = userTopic.consumerGroup("processors");

// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";

try {
  await consumer.receiveMessage(messageId, async (message) => {
    console.log(`Processing specific message: ${message.messageId}`);
    console.log(
      `User ${message.payload.userId} performed ${message.payload.action}`,
    );
  });
  console.log("Message processed successfully");
} catch (error) {
  if (error.message.includes("not found or not available")) {
    console.log("Message was already processed or does not exist");
  } else if (error.message.includes("FIFO ordering violation")) {
    console.log("FIFO queue requires processing messages in order");
  } else {
    console.error("Error processing message:", error);
  }
}

Processing Next Available Message

const workTopic = createTopic<{ taskType: string; data: any }>(
  client,
  "work-queue",
);
const worker = workTopic.consumerGroup("workers");

// Process the next available message (one-shot processing)
try {
  await worker.receiveNextMessage(async (message) => {
    console.log(`Processing task: ${message.payload.taskType}`);
    await processTask(message.payload.taskType, message.payload.data);
  });
  console.log("Message processed successfully");
} catch (error) {
  if (error instanceof QueueEmptyError) {
    console.log("No messages available");
  } else if (error instanceof MessageLockedError) {
    console.log("Next message is locked (FIFO queue)");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
    }
  } else {
    console.error("Error processing message:", error);
  }
}

// You can also use it with timeout results
await worker.receiveNextMessage(async (message) => {
  if (!canProcessTaskType(message.payload.taskType)) {
    // Return timeout to retry later
    return { timeoutSeconds: 60 };
  }

  await processTask(message.payload.taskType, message.payload.data);
});

Timing Out Messages

const workTopic = createTopic<{ taskType: string; data: any }>(
  client,
  "work-queue",
);
const worker = workTopic.consumerGroup("workers");
const controller = new AbortController();

try {
  await worker.subscribe(controller.signal, async (message) => {
    const { taskType, data } = message.payload;

    // Check if we can process this task type right now
    if (taskType === "heavy-computation" && isSystemOverloaded()) {
      // Return timeout to retry later (5 minutes)
      return { timeoutSeconds: 300 };
    }

    // Check if we have required resources
    if (taskType === "external-api" && !isExternalServiceAvailable()) {
      // Return timeout to retry in 1 minute
      return { timeoutSeconds: 60 };
    }

    // Process the message normally
    console.log(`Processing ${taskType} task`);
    await processTask(taskType, data);
    // Message will be automatically deleted on successful completion
  });
} catch (error) {
  console.error("Worker processing error:", error);
}

// Example with exponential backoff
const backoffController = new AbortController();

try {
  await worker.subscribe(backoffController.signal, async (message) => {
    const maxRetries = 3;
    const deliveryCount = message.deliveryCount;

    try {
      await processMessage(message.payload);
      // Successful processing - message will be deleted
    } catch (error) {
      if (deliveryCount < maxRetries) {
        // Exponential backoff: 2^deliveryCount minutes
        const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
        console.log(
          `Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
        );
        return { timeoutSeconds: timeoutSeconds };
      } else {
        // Max retries reached, let the message fail and be deleted
        console.error("Max retries reached, message will be discarded:", error);
        throw error;
      }
    }
  });
} catch (error) {
  console.error("Backoff processing error:", error);
}

License

MIT

Error Handling

The VQS client provides specific error types for different failure scenarios:

Error Types

  • QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)

    • Only thrown when directly using client.receiveMessages()
    • ConsumerGroup.subscribe() handles this error internally and continues polling
  • MessageLockedError: Thrown when a message is temporarily locked (423 status)

    • Contains optional retryAfter property with seconds to wait before retry
    • For receiveMessages() on FIFO queues: the next message in sequence is locked
    • For receiveMessageById(): the requested message is locked
    • ConsumerGroup.subscribe() handles this error internally when polling
  • MessageNotFoundError: Message doesn't exist (404 status)

  • MessageNotAvailableError: Message exists but isn't available for processing (409 status)

  • FifoOrderingViolationError: FIFO queue ordering violation (409 status with nextMessageId)

    • Contains nextMessageId property indicating which message to process first
  • FailedDependencyError: FIFO ordering violation when receiving by ID (424 status)

    • Contains nextMessageId property indicating which message must be processed first
    • Similar to FifoOrderingViolationError but specifically for receive-by-ID operations
  • MessageCorruptedError: Message data is corrupted or can't be parsed

  • BadRequestError: Invalid request parameters (400 status)

    • Invalid queue names, FIFO limit violations, missing required parameters
  • UnauthorizedError: Authentication failure (401 status)

    • Missing or invalid authentication token
  • ForbiddenError: Access denied (403 status)

    • Queue environment doesn't match token environment
  • InternalServerError: Server-side errors (500+ status codes)

    • Unexpected server errors, service unavailable, etc.

Error Handling Examples

import {
  QueueEmptyError,
  MessageLockedError,
  FifoOrderingViolationError,
  FailedDependencyError,
  BadRequestError,
  UnauthorizedError,
  ForbiddenError,
  InternalServerError,
} from "@vercel/queue";

// Handle empty queue or locked messages
try {
  for await (const message of client.receiveMessages(options, transport)) {
    // Process messages
  }
} catch (error) {
  if (error instanceof QueueEmptyError) {
    console.log("Queue is empty, retry later");
  } else if (error instanceof MessageLockedError) {
    console.log("Next message in FIFO queue is locked");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
    }
  }
}

// Handle locked message with retry
try {
  await consumer.receiveMessage(messageId, handler);
} catch (error) {
  if (error instanceof MessageLockedError) {
    console.log("Message is locked by another consumer");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
      setTimeout(() => retry(), error.retryAfter * 1000);
    }
  } else if (error instanceof FailedDependencyError) {
    // FIFO ordering violation for receive by ID
    console.log(`Must process ${error.nextMessageId} first`);
  }
}

// Handle authentication and authorization errors
try {
  await topic.publish(payload);
} catch (error) {
  if (error instanceof UnauthorizedError) {
    console.log("Invalid token - refresh authentication");
  } else if (error instanceof ForbiddenError) {
    console.log("Environment mismatch - check token/queue configuration");
  } else if (error instanceof BadRequestError) {
    console.log("Invalid parameters:", error.message);
  } else if (error instanceof InternalServerError) {
    console.log("Server error - retry with backoff");
  }
}

// Complete error handling pattern
function handleVQSError(error: unknown): void {
  if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
    // Transient errors - safe to retry
    console.log("Temporary condition, will retry");
  } else if (
    error instanceof UnauthorizedError ||
    error instanceof ForbiddenError
  ) {
    // Authentication/authorization errors - need to fix configuration
    console.log("Auth error - check credentials");
  } else if (error instanceof BadRequestError) {
    // Client error - fix the request
    console.log("Invalid request:", error.message);
  } else if (error instanceof InternalServerError) {
    // Server error - implement exponential backoff
    console.log("Server error - retry with backoff");
  } else {
    // Unknown error
    console.error("Unexpected error:", error);
  }
}