Package Exports
- @vercel/queue
Readme
VQS - Vercel Queue Service Client
A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.
Features
- Generic Payload Support: Send and receive any type of data with type safety
- Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
- Streaming Support: Handle large payloads without loading them entirely into memory
- Pub/Sub Pattern: Topic-based messaging with consumer groups
- Type Safety: Full TypeScript support with generic types
- Automatic Retries: Built-in visibility timeout management
Installation
npm install @vercel/queueQuick Start
import { VQSClient, createTopic, JsonTransport } from "@vercel/queue";
const client = new VQSClient({
token: "your-vercel-oidc-token",
});
// Create a topic with JSON serialization (default)
const topic = createTopic<{ message: string; timestamp: number }>(
client,
"my-topic",
);
// Publish a message
await topic.publish({
message: "Hello, World!",
timestamp: Date.now(),
});
// Create a consumer group
const consumer = topic.consumerGroup("my-processors");
// Process messages continuously with cancellation support
const controller = new AbortController();
// Start processing (blocks until aborted or error)
try {
await consumer.subscribe(controller.signal, async (message) => {
console.log("Received:", message.payload.message);
console.log("Timestamp:", new Date(message.payload.timestamp));
});
} catch (error) {
console.error("Processing stopped due to error:", error);
}
// Stop processing from elsewhere in your code
// controller.abort();Serialization (Transport) System
The VQS client supports customizable serialization through the Transport interface with streaming support for memory-efficient processing. Transport can be configured at the topic level when creating a topic, or at the consumer group level when creating a consumer group.
Built-in Transports
JsonTransport (Default)
Buffers data for JSON parsing - suitable for structured data that fits in memory.
import { JsonTransport, createTopic } from "@vercel/queue";
const topic = createTopic<{ data: any }>(
client,
"json-topic",
new JsonTransport(),
);
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>(client, "json-topic");BufferTransport
Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.
import { BufferTransport, createTopic } from "@vercel/queue";
const topic = createTopic<Buffer>(
client,
"binary-topic",
new BufferTransport(),
);
const binaryData = Buffer.from("Binary data", "utf8");
await topic.publish(binaryData);StreamTransport
True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.
import { StreamTransport, createTopic } from "@vercel/queue";
const topic = createTopic<ReadableStream<Uint8Array>>(
client,
"streaming-topic",
new StreamTransport(),
);
// Send large file as stream without loading into memory
const fileStream = new ReadableStream<Uint8Array>({
start(controller) {
// Read file in chunks
for (const chunk of readFileInChunks("large-file.bin")) {
controller.enqueue(chunk);
}
controller.close();
},
});
await topic.publish(fileStream);Custom Transport
You can create your own serialization format by implementing the Transport interface:
import { Transport } from "@vercel/queue";
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Choosing the Right Transport
| Use Case | Recommended Transport | Memory Usage | Performance |
|---|---|---|---|
| Small JSON objects | JsonTransport |
Low | High |
| Binary files < 100MB | BufferTransport |
Medium | High |
| Large files > 100MB | StreamTransport |
Very Low | Medium |
| Real-time data streams | StreamTransport |
Very Low | High |
| Custom protocols | Custom implementation | Varies | Varies |
Complete Example: Video Processing Pipeline
Here's a comprehensive example showing a video processing pipeline that processes videos with FFmpeg and stores the results in Vercel Blob:
import { VQSClient, createTopic, StreamTransport } from "@vercel/queue";
import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static";
import { put } from "@vercel/blob";
const client = new VQSClient({
token: process.env.VQS_TOKEN!,
});
// Input topic with unoptimized videos
const unoptimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
client,
"unoptimized-videos",
new StreamTransport(),
);
// Output topic for optimized videos
const optimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
client,
"optimized-videos",
new StreamTransport(),
);
// Step 1: Process videos with FFmpeg
const videoProcessor = unoptimizedVideosTopic.consumerGroup("processors");
const processingController = new AbortController();
try {
await videoProcessor.subscribe(
processingController.signal,
async (message) => {
const inputVideoStream = message.payload;
console.log("Processing video...");
if (!ffmpeg) {
throw new Error("FFmpeg not available");
}
// Create optimized video stream using FFmpeg
const optimizedStream = new ReadableStream<Uint8Array>({
start(controller) {
const ffmpegProcess = spawn(
ffmpeg,
[
"-i",
"pipe:0", // Input from stdin
"-c:v",
"libvpx-vp9", // Video codec
"-c:a",
"libopus", // Audio codec
"-crf",
"23", // Quality
"-f",
"webm", // Output format
"pipe:1", // Output to stdout
],
{ stdio: ["pipe", "pipe", "pipe"] },
);
// Pipe input stream to FFmpeg
const reader = inputVideoStream.getReader();
const pipeInput = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
ffmpegProcess.stdin?.end();
break;
}
ffmpegProcess.stdin?.write(value);
}
};
pipeInput();
// Stream FFmpeg output
ffmpegProcess.stdout?.on("data", (chunk) => {
controller.enqueue(new Uint8Array(chunk));
});
ffmpegProcess.on("close", (code) => {
if (code === 0) {
controller.close();
} else {
controller.error(new Error(`FFmpeg failed with code ${code}`));
}
});
},
});
// Publish optimized video to next topic
await optimizedVideosTopic.publish(optimizedStream);
console.log("Video optimized and published");
},
);
} catch (error) {
console.error("Video processing error:", error);
}
// Step 2: Store optimized videos in Vercel Blob
const blobUploader = optimizedVideosTopic.consumerGroup("blob-uploaders");
const uploadController = new AbortController();
try {
await blobUploader.subscribe(uploadController.signal, async (message) => {
const optimizedVideo = message.payload;
// Upload to Vercel Blob storage
const filename = `optimized-${Date.now()}.webm`;
const blob = await put(filename, optimizedVideo, {
access: "public",
contentType: "video/webm",
});
console.log(`Video uploaded to blob: ${blob.url} (${blob.size} bytes)`);
});
} catch (error) {
console.error("Blob upload error:", error);
}
// Graceful shutdown
process.on("SIGINT", () => {
processingController.abort();
uploadController.abort();
});API Reference
VQSClient
const client = new VQSClient({
token: string;
baseUrl?: string; // defaults to 'https://@vercel/queue.vercel.sh'
});Topic
const topic = createTopic<T>(client, topicName, transport?);
// Publish a message (uses topic's transport)
await topic.publish(payload, options?);
// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);ConsumerGroup
// Start continuous processing (blocks until signal is aborted or error occurs)
await consumer.subscribe(signal, handler, options?);
// Process a specific message by ID
await consumer.receiveMessage(messageId, handler);
// Process the next available message
await consumer.receiveNextMessage(handler);
// Handle a specific message by ID without payload
await consumer.handleMessage(messageId, handler);Message Handler
// Handler function signature
type MessageHandler<T> = (
message: Message<T>,
) => Promise<MessageHandlerResult> | MessageHandlerResult;
// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;
interface MessageTimeoutResult {
timeoutSeconds: number; // seconds before message becomes available again
}Transport Interface
interface Transport<T = unknown> {
serialize(value: T): Buffer | ReadableStream<Uint8Array>;
deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
contentType: string;
}Callback Utilities
// Parse VQS callback request headers
function parseCallbackRequest(request: Request): CallbackMessageOptions;
// Callback options type
interface CallbackMessageOptions {
queueName: string;
consumerGroup: string;
messageId: string;
}
// Error thrown for invalid callback requests
class InvalidCallbackError extends Error;Examples
Basic JSON Processing
interface UserEvent {
userId: string;
action: string;
timestamp: number;
}
const userTopic = createTopic<UserEvent>(client, "user-events");
await userTopic.publish({
userId: "123",
action: "login",
timestamp: Date.now(),
});
const consumer = userTopic.consumerGroup("processors");
const controller = new AbortController();
try {
await consumer.subscribe(controller.signal, async (message) => {
console.log(
`User ${message.payload.userId} performed ${message.payload.action}`,
);
});
} catch (error) {
console.error("Processing error:", error);
}
// Stop processing when needed
// controller.abort();Processing Specific Messages by ID
const userTopic = createTopic<{ userId: string; action: string }>(
client,
"user-events",
);
const consumer = userTopic.consumerGroup("processors");
// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";
try {
await consumer.receiveMessage(messageId, async (message) => {
console.log(`Processing specific message: ${message.messageId}`);
console.log(
`User ${message.payload.userId} performed ${message.payload.action}`,
);
});
console.log("Message processed successfully");
} catch (error) {
if (error.message.includes("not found or not available")) {
console.log("Message was already processed or does not exist");
} else if (error.message.includes("FIFO ordering violation")) {
console.log("FIFO queue requires processing messages in order");
} else {
console.error("Error processing message:", error);
}
}Processing Next Available Message
const workTopic = createTopic<{ taskType: string; data: any }>(
client,
"work-queue",
);
const worker = workTopic.consumerGroup("workers");
// Process the next available message (one-shot processing)
try {
await worker.receiveNextMessage(async (message) => {
console.log(`Processing task: ${message.payload.taskType}`);
await processTask(message.payload.taskType, message.payload.data);
});
console.log("Message processed successfully");
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("No messages available");
} else if (error instanceof MessageLockedError) {
console.log("Next message is locked (FIFO queue)");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
} else {
console.error("Error processing message:", error);
}
}
// You can also use it with timeout results
await worker.receiveNextMessage(async (message) => {
if (!canProcessTaskType(message.payload.taskType)) {
// Return timeout to retry later
return { timeoutSeconds: 60 };
}
await processTask(message.payload.taskType, message.payload.data);
});Timing Out Messages
const workTopic = createTopic<{ taskType: string; data: any }>(
client,
"work-queue",
);
const worker = workTopic.consumerGroup("workers");
const controller = new AbortController();
try {
await worker.subscribe(controller.signal, async (message) => {
const { taskType, data } = message.payload;
// Check if we can process this task type right now
if (taskType === "heavy-computation" && isSystemOverloaded()) {
// Return timeout to retry later (5 minutes)
return { timeoutSeconds: 300 };
}
// Check if we have required resources
if (taskType === "external-api" && !isExternalServiceAvailable()) {
// Return timeout to retry in 1 minute
return { timeoutSeconds: 60 };
}
// Process the message normally
console.log(`Processing ${taskType} task`);
await processTask(taskType, data);
// Message will be automatically deleted on successful completion
});
} catch (error) {
console.error("Worker processing error:", error);
}
// Example with exponential backoff
const backoffController = new AbortController();
try {
await worker.subscribe(backoffController.signal, async (message) => {
const maxRetries = 3;
const deliveryCount = message.deliveryCount;
try {
await processMessage(message.payload);
// Successful processing - message will be deleted
} catch (error) {
if (deliveryCount < maxRetries) {
// Exponential backoff: 2^deliveryCount minutes
const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
console.log(
`Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
);
return { timeoutSeconds: timeoutSeconds };
} else {
// Max retries reached, let the message fail and be deleted
console.error("Max retries reached, message will be discarded:", error);
throw error;
}
}
});
} catch (error) {
console.error("Backoff processing error:", error);
}License
MIT
Error Handling
The VQS client provides specific error types for different failure scenarios:
Error Types
QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)- Only thrown when directly using
client.receiveMessages() ConsumerGroup.subscribe()handles this error internally and continues polling
- Only thrown when directly using
MessageLockedError: Thrown when a message is temporarily locked (423 status)- Contains optional
retryAfterproperty with seconds to wait before retry - For
receiveMessages()on FIFO queues: the next message in sequence is locked - For
receiveMessageById(): the requested message is locked ConsumerGroup.subscribe()handles this error internally when polling
- Contains optional
MessageNotFoundError: Message doesn't exist (404 status)MessageNotAvailableError: Message exists but isn't available for processing (409 status)FifoOrderingViolationError: FIFO queue ordering violation (409 status with nextMessageId)- Contains
nextMessageIdproperty indicating which message to process first
- Contains
FailedDependencyError: FIFO ordering violation when receiving by ID (424 status)- Contains
nextMessageIdproperty indicating which message must be processed first - Similar to
FifoOrderingViolationErrorbut specifically for receive-by-ID operations
- Contains
MessageCorruptedError: Message data is corrupted or can't be parsedBadRequestError: Invalid request parameters (400 status)- Invalid queue names, FIFO limit violations, missing required parameters
UnauthorizedError: Authentication failure (401 status)- Missing or invalid authentication token
ForbiddenError: Access denied (403 status)- Queue environment doesn't match token environment
InternalServerError: Server-side errors (500+ status codes)- Unexpected server errors, service unavailable, etc.
Error Handling Examples
import {
QueueEmptyError,
MessageLockedError,
FifoOrderingViolationError,
FailedDependencyError,
BadRequestError,
UnauthorizedError,
ForbiddenError,
InternalServerError,
} from "@vercel/queue";
// Handle empty queue or locked messages
try {
for await (const message of client.receiveMessages(options, transport)) {
// Process messages
}
} catch (error) {
if (error instanceof QueueEmptyError) {
console.log("Queue is empty, retry later");
} else if (error instanceof MessageLockedError) {
console.log("Next message in FIFO queue is locked");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
}
}
}
// Handle locked message with retry
try {
await consumer.receiveMessage(messageId, handler);
} catch (error) {
if (error instanceof MessageLockedError) {
console.log("Message is locked by another consumer");
if (error.retryAfter) {
console.log(`Retry after ${error.retryAfter} seconds`);
setTimeout(() => retry(), error.retryAfter * 1000);
}
} else if (error instanceof FailedDependencyError) {
// FIFO ordering violation for receive by ID
console.log(`Must process ${error.nextMessageId} first`);
}
}
// Handle authentication and authorization errors
try {
await topic.publish(payload);
} catch (error) {
if (error instanceof UnauthorizedError) {
console.log("Invalid token - refresh authentication");
} else if (error instanceof ForbiddenError) {
console.log("Environment mismatch - check token/queue configuration");
} else if (error instanceof BadRequestError) {
console.log("Invalid parameters:", error.message);
} else if (error instanceof InternalServerError) {
console.log("Server error - retry with backoff");
}
}
// Complete error handling pattern
function handleVQSError(error: unknown): void {
if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
// Transient errors - safe to retry
console.log("Temporary condition, will retry");
} else if (
error instanceof UnauthorizedError ||
error instanceof ForbiddenError
) {
// Authentication/authorization errors - need to fix configuration
console.log("Auth error - check credentials");
} else if (error instanceof BadRequestError) {
// Client error - fix the request
console.log("Invalid request:", error.message);
} else if (error instanceof InternalServerError) {
// Server error - implement exponential backoff
console.log("Server error - retry with backoff");
} else {
// Unknown error
console.error("Unexpected error:", error);
}
}