JSPM

  • Created
  • Published
  • Downloads 152497
  • Score
    100M100P100Q186283F
  • License MIT

A Node.js library for interacting with the Vercel Queue Service API

Package Exports

  • @vercel/queue

Readme

Vercel Queues

A TypeScript client library for interacting with the Vercel Queue Service API with customizable serialization/deserialization (transport) support, including streaming support for memory-efficient processing of large payloads.

Features

  • Generic Payload Support: Send and receive any type of data with type safety
  • Customizable Serialization: Use built-in transports (JSON, Buffer, Stream) or create your own
  • Streaming Support: Handle large payloads without loading them entirely into memory
  • Pub/Sub Pattern: Topic-based messaging with consumer groups
  • Type Safety: Full TypeScript support with generic types
  • Automatic Retries: Built-in visibility timeout management

Installation

npm install @vercel/queue

Quick Start

For local development, you'll need to pull your Vercel environment variables (including the OIDC token):

# Install Vercel CLI if you haven't already
npm i -g vercel

# Pull environment variables from your Vercel project
vc env pull

Publishing and consuming messages on a queue

// index.ts
import { send, receive } from "@vercel/queue";

type Message = {
  message: string;
  timestamp: number;
};

// Option 1: Using the send and receive helpers (simplest)
// Automatically uses default client and JSON transport
await send<Message>("my-topic", {
  message: "Hello, World!",
  timestamp: Date.now(),
});

// Consume a single message off the queue
// (Often wrapped in a loop to keep polling messages off the queue)
await receive<Message>("my-topic", "my-consumer-group", (m) => {
  console.log(m.message);
});

// Option 2: Using createTopic for more control

import { createTopic } from "@vercel/queue";

// Create a topic with JSON serialization (default)
// Uses default QueueClient automatically authenticated from Vercel environment
const topic = createTopic<Message>("my-topic");

// Publish a message
await topic.publish({
  message: "Hello, World!",
  timestamp: Date.now(),
});

// Create a consumer group
const consumer = topic.consumerGroup("my-consumer-group");

// Process next available message (one-shot processing)
try {
  await consumer.consume(async (message, metadata) => {
    console.log("Received:", message.message);
    console.log("Timestamp:", new Date(message.timestamp));
    console.log("Message Metadata", metadata);
    // => { messageId, deliveryCount, timestamp }
  });
} catch (error) {
  console.error("Processing error:", error);
}

Run the script

# Install dotenv-cli and ts-node if you need it
npm i -g dotenv-cli ts-node typescript

# Run the script with the OIDC token
dotenv -e .env.local ts-node index.ts

Usage with Vercel

When deploying on Vercel, rather than having a persistent server subscribed to a queue, Vercel can trigger a callback route when a message is ready for consumption.

To demonstrate using queues on Vercel, let's use a Next.js app. You can use an existing app or create one using create-next-app.

TypeScript Configuration

Update your tsconfig.json to use "bundler" module resolution for proper package export resolution:

{
  "compilerOptions": {
    "moduleResolution": "bundler"
    // ... other options
  }
}

Publishing messages to a queue

Create a new server function to publish messages

// app/actions.ts
"use server";

import { send } from "@vercel/queue";

export async function publishTestMessage(message: string) {
  // Option 1: Using simple send shorthand
  const { messageId } = await send(
    "my-topic",
    { message, timestamp: Date.now() },
    {
      // Provide a callback URL to invoke a consumer when the message is ready to be processed
      callback: {
        url: getCallbackUrl() // implementation below
      },
    },
  );

  console.log(`Published message ${messageId}`);
}

// Option 2: Customize the topic, transport, consumer groups, etc.
import { createTopic } from "@vercel/queue";

export async function publishTestMessage(message: string) {
  // Create a topic with JSON serialization (default)
  const topic = createTopic<{ message: string; timestamp: number }>("my-topic");

  // Publish the message
  const { messageId } = await topic.publish(
    { message, timestamp: Date.now() },
    {
      // Provide multiple callback URLs to invoke multiple consumer groups
      callback: {
        {
          "consumer-group-1": {
            url: getCallbackUrl()
          },
          "consumer-group-2": {
            url: getCallbackUrl()
            delay: 5 // Delay callback by 5 seconds
          },
        }
      },
    },
  );

  console.log(`Published message ${messageId}`);
}

// Helper function to generate a local callback URL
function getCallbackUrl() {
  const callbackUrl = new URL(
    process.env.VERCEL_URL
      ? `https://${process.env.VERCEL_URL}/api/queue/handle`
      : "http://localhost:3000/api/queue/handle"
  );

  // Add Vercel automation bypass secret if available (for preview deployments)
  if (process.env.VERCEL_AUTOMATION_BYPASS_SECRET) {
    callbackUrl.searchParams.set(
      "x-vercel-protection-bypass",
      process.env.VERCEL_AUTOMATION_BYPASS_SECRET
    );
  }

  return callbackUrl.toString();
}

Now wire up the server function to your app

// app/some/page.tsx
"use client";
import { publishTestMessage } from "./actions";

export default function Page() {
  return (
    // ...
    <button onClick={() => publishTestMessage("Hello world")}>
      Publish Test Message
    </button>
  );
}

Consuming the queue

Instead of running a persistent server that subscribes to the queue, we use the callback functionality of Vercel queues to consume messages on the fly, when a message is ready to be processed.

The handleCallback helper function simplifies queue callback handling in NextJS:

// app/api/queue/handle/route.ts
import { handleCallback } from "@vercel/queue";

// Option 1: Specify a single handler for the topic
export const POST = handleCallback({
  "my-topic": (message, metadata) => {
    console.log(`Received message:`, message, metadata);
    // metadata: { messageId, deliveryCount, timestamp }
  },

  // .. more topic handlers can be provided here
});

// This consumes messages on the "default" consumer group, which is used when no consumer groups
// were specified in the publish `callback` earlierA

// Option 2: Multiple consumer groups
export const POST = handleCallback({
  // topic: "my-topic"
  "my-topic": {
    // consumer group: "compress"
    "consumer-group-1": (message, metadata) => {
      console.log("Message:", message);
    },
    // consumer group: "resize"
    "consume-group-2": (message, metadata) => {
      console.log("Message", message);
    },
  },
});

Key Features

Streaming Support

Handle large files and data streams without loading them into memory:

import { createTopic, StreamTransport } from "@vercel/queue";

const videoTopic = createTopic<ReadableStream<Uint8Array>>(
  "video-processing",
  new StreamTransport(),
);

// Process large video files efficiently
const processor = videoTopic.consumerGroup("processors");
await processor.consume(async (videoStream) => {
  // Process stream chunk by chunk
  const reader = videoStream.getReader();
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    await processChunk(value);
  }
});

Consumer Groups

Multiple consumers can process messages from the same topic in parallel:

// Multiple workers in the same group - they share/split messages
const worker1 = topic.consumerGroup("workers");
const worker2 = topic.consumerGroup("workers"); // Same group name
// worker1 and worker2 will receive different messages (load balancing)

// Different consumer groups - each gets copies of ALL messages
const analytics = topic.consumerGroup("analytics");
const webhooks = topic.consumerGroup("webhooks");
// analytics and webhooks will both receive every message

Architecture

  • Topics: Named message channels with configurable serialization
  • Consumer Groups: Named groups of consumers that process messages in parallel
    • consume(): Process messages with flexible consumption patterns
      • No options: Process next available message
      • With messageId: Process specific message by ID
      • With skipPayload: true: Process message metadata only (without payload)
  • Transports: Pluggable serialization/deserialization for different data types
  • Streaming: Memory-efficient processing of large payloads
  • Visibility Timeouts: Automatic message lifecycle management

Performance

The multipart parser is optimized for high-throughput scenarios:

  • Streaming: Messages are yielded immediately as headers are parsed
  • Memory Efficient: No buffering of complete payloads
  • Fast Parsing: Native Buffer operations for ~50% performance improvement
  • Scalable: Can handle arbitrarily large responses without memory constraints

Serialization (Transport) System

The queue client supports customizable serialization through the Transport interface with streaming support for memory-efficient processing. Transport can be configured at the topic level when creating a topic, or at the consumer group level when creating a consumer group.

Built-in Transports

JsonTransport (Default)

Buffers data for JSON parsing - suitable for structured data that fits in memory.

import { createTopic, JsonTransport } from "@vercel/queue";

const topic = createTopic<{ data: any }>("json-topic", new JsonTransport());
// or simply (JsonTransport is the default):
const topic = createTopic<{ data: any }>("json-topic");

BufferTransport

Buffers the entire payload into memory as a Buffer - suitable for binary data that fits in memory.

import { BufferTransport, createTopic } from "@vercel/queue";

const topic = createTopic<Buffer>("binary-topic", new BufferTransport());
const binaryData = Buffer.from("Binary data", "utf8");
await topic.publish(binaryData);

StreamTransport

True streaming support - passes ReadableStream directly without buffering. Ideal for large files and memory-efficient processing.

import { createTopic, StreamTransport } from "@vercel/queue";

const topic = createTopic<ReadableStream<Uint8Array>>(
  "streaming-topic",
  new StreamTransport(),
);

// Send large file as stream without loading into memory
const fileStream = new ReadableStream<Uint8Array>({
  start(controller) {
    // Read file in chunks
    for (const chunk of readFileInChunks("large-file.bin")) {
      controller.enqueue(chunk);
    }
    controller.close();
  },
});

await topic.publish(fileStream);

Custom Transport

You can create your own serialization format by implementing the Transport interface:

import { Transport } from "@vercel/queue";

interface Transport<T = unknown> {
  serialize(value: T): Buffer | ReadableStream<Uint8Array>;
  deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
  contentType: string;
}

Choosing the Right Transport

Use Case Recommended Transport Memory Usage Performance
Small JSON objects JsonTransport Low High
Binary files < 100MB BufferTransport Medium High
Large files > 100MB StreamTransport Very Low Medium
Real-time data streams StreamTransport Very Low High
Custom protocols Custom implementation Varies Varies

API Reference

QueueClient

// Simple usage - automatically gets OIDC token from Vercel environment
const client = new QueueClient();

// Or with options
const client = new QueueClient({
  token?: string; // Optional - will auto-detect if not provided
  baseUrl?: string; // defaults to 'https://vqs.vercel.sh'
});

Topic

// Simple usage with default client
const topic = createTopic<T>(topicName, transport?);

// For custom client configuration, use Topic constructor directly
const customClient = new QueueClient({ baseUrl: "https://custom.vqs.vercel.sh" });
const topic = new Topic<T>(customClient, topicName, transport?);

// Publish a message (uses topic's transport)
await topic.publish(payload, options?);

// Trigger a callback URL when the message is
// ready for consumption
await topic.publish(payload, {
  callback: { url: "https://example.com/webhook" }
});

// Or provide multiple callbacks (each URL is called
// with a separate consumer group)
await topic.publish(payload, {
  callback: {
    group1: { url: "https://example.com/webhook1" },
    group2: { url: "https://example.com/webhook2", delay: 30 }
  }
});

// Create a consumer group (can override transport)
const consumer = topic.consumerGroup<U>(groupName, options?);

Send (Shorthand)

// Simple send - automatically uses default client and JSON transport
await send<T>(topicName, payload);

// Send with options including custom transport
await send<T>(topicName, payload, {
  transport?: Transport<T>;
  idempotencyKey?: string;
  retentionSeconds?: number;
  callback?: Record<string, CallbackConfig> | CallbackConfig;
});

// Examples:
await send("notifications", { userId: "123", message: "Welcome!" });

await send("images", imageBuffer, {
  transport: new BufferTransport(),
  callback: { url: "https://example.com/process-image" }
});

await send("events", eventData, {
  idempotencyKey: "unique-key-123",
  retentionSeconds: 3600,
  callback: {
    analytics: { url: "https://analytics.example.com/webhook" },
    notifications: { url: "https://notifications.example.com/webhook", delay: 30 }
  }
});

ConsumerGroup

// Process next available message (simplest form)
await consumer.consume(handler);

// Process specific message by ID with payload
await consumer.consume(handler, { messageId: "message-id" });

// Process specific message by ID without payload (metadata only)
// handler will be called with `undefined` as the payload
await consumer.consume(handler, { messageId: "message-id", skipPayload: true });

Message Handler

// Handler function signature
type MessageHandler<T = unknown> = (
  message: T,
  metadata: MessageMetadata,
) => Promise<MessageHandlerResult> | MessageHandlerResult;

// Handler result types
type MessageHandlerResult = void | MessageTimeoutResult;

interface MessageTimeoutResult {
  timeoutSeconds: number; // seconds before message becomes available again
}

// Message Metadata
interface MessageMetadata {
  messageId: string;
  deliveryCount: number;
  timestamp: string;
}

ConsumeOptions Interface

interface ConsumeOptions {
  messageId?: string; // Process specific message by ID
  skipPayload?: boolean; // Skip payload download (requires messageId)
}

Transport Interface

interface Transport<T = unknown> {
  serialize(value: T): Buffer | ReadableStream<Uint8Array>;
  deserialize(stream: ReadableStream<Uint8Array>): Promise<T>;
  contentType: string;
}

Callback Utilities

// Parse queue callback request headers
function parseCallbackRequest(request: Request): CallbackMessageOptions;

// Callback options type
interface CallbackMessageOptions {
  queueName: string;
  consumerGroup: string;
  messageId: string;
}
// Create a callback handler for NextJS route handlers
function handleCallback(handlers: CallbackHandlers): (request: Request) => Promise<Response>;

// Configuration object with handlers for different topics
type CallbackHandlers = {
  [topicName: string]:
    | MessageHandler // Single handler (uses 'default' consumer group)
    | { [consumerGroup: string]: MessageHandler }; // Multiple consumer group handlers
};

// Example usage:
export const POST = handleCallback({
  // Topic handler (uses 'default' consumer group)
  "new-users": (message, metadata) => {
    console.log(`New user event:`, message, metadata);
  },

  // Consumer group specific handlers
  "image-processing": {
    "compress": (message, metadata) => console.log("Compressing image", message),
    "resize": (message, metadata) => console.log("Resizing image", message),
  }
});

// Error thrown for invalid callback requests
class InvalidCallbackError extends Error;

Examples

Basic JSON Processing

interface UserEvent {
  userId: string;
  action: string;
  timestamp: number;
}

// Option 1: Using send shorthand
await send<UserEvent>("user-events", {
  userId: "123",
  action: "login",
  timestamp: Date.now(),
});

// Option 2: Using createTopic for consumers
const userTopic = createTopic<UserEvent>("user-events");

await userTopic.publish({
  userId: "123",
  action: "login",
  timestamp: Date.now(),
});

const consumer = userTopic.consumerGroup("processors");

// Process next available message
try {
  await consumer.consume(async (message) => {
    console.log(`User ${message.userId} performed ${message.action}`);
  });
} catch (error) {
  console.error("Processing error:", error);
}

Processing Specific Messages by ID

const userTopic = createTopic<{ userId: string; action: string }>(
  "user-events",
);
const consumer = userTopic.consumerGroup("processors");

// Process a specific message if you know its ID
const messageId = "01234567-89ab-cdef-0123-456789abcdef";

try {
  await consumer.consume(
    async (message, { messageId }) => {
      console.log(`Processing specific message: ${messageId}`);
      console.log(`User ${message.userId} performed ${message.action}`);
    },
    { messageId },
  );
  console.log("Message processed successfully");
} catch (error) {
  if (error.message.includes("not found or not available")) {
    console.log("Message was already processed or does not exist");
  } else if (error.message.includes("FIFO ordering violation")) {
    console.log("FIFO queue requires processing messages in order");
  } else {
    console.error("Error processing message:", error);
  }
}

Processing Next Available Message

const workTopic = createTopic<{ taskType: string; data: any }>("work-queue");
const worker = workTopic.consumerGroup("workers");

// Process the next available message (one-shot processing)
try {
  await worker.consume(async (message) => {
    console.log(`Processing task: ${message.taskType}`);
    await processTask(message.taskType, message.data);
  });
  console.log("Message processed successfully");
} catch (error) {
  if (error instanceof QueueEmptyError) {
    console.log("No messages available");
  } else if (error instanceof MessageLockedError) {
    console.log("Next message is locked (FIFO queue)");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
    }
  } else {
    console.error("Error processing message:", error);
  }
}

// Handle conditional timeouts
await worker.consume(async (message) => {
  if (!canProcessTaskType(message.taskType)) {
    // Return timeout to retry later
    return { timeoutSeconds: 60 };
  }

  await processTask(message.taskType, message.data);
});

// Process specific message metadata only (no payload download)
await worker.consume(
  async (_, metadata) => {
    console.log(`Message ID: ${metadata.messageId}`);
    console.log(`Delivery count: ${metadata.deliveryCount}`);
    console.log(`Timestamp: ${metadata.timestamp}`);
    // _ is undefined - no payload was downloaded
  },
  { messageId: "specific-message-id", skipPayload: true },
);

Timing Out Messages

const workTopic = createTopic<{ taskType: string; data: any }>("work-queue");
const worker = workTopic.consumerGroup("workers");

// Process a message with conditional timeout
try {
  await worker.consume(async ({ taskType, data }) => {
    // Check if we can process this task type right now
    if (taskType === "heavy-computation" && isSystemOverloaded()) {
      // Return timeout to retry later (5 minutes)
      return { timeoutSeconds: 300 };
    }

    // Check if we have required resources
    if (taskType === "external-api" && !isExternalServiceAvailable()) {
      // Return timeout to retry in 1 minute
      return { timeoutSeconds: 60 };
    }

    // Process the message normally
    console.log(`Processing ${taskType} task`);
    await processTask(taskType, data);
    // Message will be automatically deleted on successful completion
  });
} catch (error) {
  console.error("Worker processing error:", error);
}

// Example with exponential backoff
try {
  await worker.consume(async (message, { deliveryCount }) => {
    const maxRetries = 3;

    try {
      await processMessage(message);
      // Successful processing - message will be deleted
    } catch (error) {
      if (deliveryCount < maxRetries) {
        // Exponential backoff: 2^deliveryCount minutes
        const timeoutSeconds = Math.pow(2, deliveryCount) * 60;
        console.log(
          `Retrying message in ${timeoutSeconds} seconds (attempt ${deliveryCount})`,
        );
        return { timeoutSeconds: timeoutSeconds };
      } else {
        // Max retries reached, let the message fail and be deleted
        console.error("Max retries reached, message will be discarded:", error);
        throw error;
      }
    }
  });
} catch (error) {
  console.error("Backoff processing error:", error);
}

Complete Example: Video Processing Pipeline

Here's a comprehensive example showing a video processing pipeline that processes videos with FFmpeg and stores the results in Vercel Blob:

import { createTopic, StreamTransport } from "@vercel/queue";
import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static";
import { put } from "@vercel/blob";

// Input topic with unoptimized videos
const unoptimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
  "unoptimized-videos",
  new StreamTransport(),
);

// Output topic for optimized videos
const optimizedVideosTopic = createTopic<ReadableStream<Uint8Array>>(
  "optimized-videos",
  new StreamTransport(),
);

// Step 1: Process videos with FFmpeg
const videoProcessor = unoptimizedVideosTopic.consumerGroup("processors");

try {
  await videoProcessor.consume(async (inputVideoStream) => {
    console.log("Processing video...");

    if (!ffmpeg) {
      throw new Error("FFmpeg not available");
    }

    // Create optimized video stream using FFmpeg
    const optimizedStream = new ReadableStream<Uint8Array>({
      start(controller) {
        const ffmpegProcess = spawn(
          ffmpeg,
          [
            "-i",
            "pipe:0", // Input from stdin
            "-c:v",
            "libvpx-vp9", // Video codec
            "-c:a",
            "libopus", // Audio codec
            "-crf",
            "23", // Quality
            "-f",
            "webm", // Output format
            "pipe:1", // Output to stdout
          ],
          { stdio: ["pipe", "pipe", "pipe"] },
        );

        // Pipe input stream to FFmpeg
        const reader = inputVideoStream.getReader();
        const pipeInput = async () => {
          while (true) {
            const { done, value } = await reader.read();
            if (done) {
              ffmpegProcess.stdin?.end();
              break;
            }
            ffmpegProcess.stdin?.write(value);
          }
        };
        pipeInput();

        // Stream FFmpeg output
        ffmpegProcess.stdout?.on("data", (chunk) => {
          controller.enqueue(new Uint8Array(chunk));
        });

        ffmpegProcess.on("close", (code) => {
          if (code === 0) {
            controller.close();
          } else {
            controller.error(new Error(`FFmpeg failed with code ${code}`));
          }
        });
      },
    });

    // Publish optimized video to next topic
    await optimizedVideosTopic.publish(optimizedStream);
    console.log("Video optimized and published");
  });
} catch (error) {
  console.error("Video processing error:", error);
}

// Step 2: Store optimized videos in Vercel Blob
const blobUploader = optimizedVideosTopic.consumerGroup("blob-uploaders");

try {
  await blobUploader.consume(async (optimizedVideo) => {
    // Upload to Vercel Blob storage
    const filename = `optimized-${Date.now()}.webm`;
    const blob = await put(filename, optimizedVideo, {
      access: "public",
      contentType: "video/webm",
    });

    console.log(`Video uploaded to blob: ${blob.url} (${blob.size} bytes)`);
  });
} catch (error) {
  console.error("Blob upload error:", error);
}

Error Handling

The queue client provides specific error types for different failure scenarios:

Error Types

  • QueueEmptyError: Thrown when attempting to receive messages from an empty queue (204 status)

    • Thrown by consume() when no messages are available
    • Also thrown when directly using client.receiveMessages()
  • MessageLockedError: Thrown when a message is temporarily locked (423 status)

    • Contains optional retryAfter property with seconds to wait before retry
    • For consume() without options: the next message in FIFO sequence is locked
    • For consume() with messageId: the requested message is locked
  • MessageNotFoundError: Message doesn't exist (404 status)

  • MessageNotAvailableError: Message exists but isn't available for processing (409 status)

  • FifoOrderingViolationError: FIFO queue ordering violation (409 status with nextMessageId)

    • Contains nextMessageId property indicating which message to process first
  • FailedDependencyError: FIFO ordering violation when receiving by ID (424 status)

    • Contains nextMessageId property indicating which message must be processed first
    • Similar to FifoOrderingViolationError but specifically for receive-by-ID operations
  • MessageCorruptedError: Message data is corrupted or can't be parsed

  • BadRequestError: Invalid request parameters (400 status)

    • Invalid queue names, FIFO limit violations, missing required parameters
  • UnauthorizedError: Authentication failure (401 status)

    • Missing or invalid authentication token
  • ForbiddenError: Access denied (403 status)

    • Queue environment doesn't match token environment
  • InternalServerError: Server-side errors (500+ status codes)

    • Unexpected server errors, service unavailable, etc.

Error Handling Examples

import {
  BadRequestError,
  FailedDependencyError,
  FifoOrderingViolationError,
  ForbiddenError,
  InternalServerError,
  MessageLockedError,
  QueueEmptyError,
  UnauthorizedError,
} from "@vercel/queue";

// Handle empty queue or locked messages
try {
  for await (const message of client.receiveMessages(options, transport)) {
    // Process messages
  }
} catch (error) {
  if (error instanceof QueueEmptyError) {
    console.log("Queue is empty, retry later");
  } else if (error instanceof MessageLockedError) {
    console.log("Next message in FIFO queue is locked");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
    }
  }
}

// Handle locked message with retry
try {
  await consumer.consume(handler, { messageId });
} catch (error) {
  if (error instanceof MessageLockedError) {
    console.log("Message is locked by another consumer");
    if (error.retryAfter) {
      console.log(`Retry after ${error.retryAfter} seconds`);
      setTimeout(() => retry(), error.retryAfter * 1000);
    }
  } else if (error instanceof FailedDependencyError) {
    // FIFO ordering violation for receive by ID
    console.log(`Must process ${error.nextMessageId} first`);
  }
}

// Handle authentication and authorization errors
try {
  await topic.publish(payload);
} catch (error) {
  if (error instanceof UnauthorizedError) {
    console.log("Invalid token - refresh authentication");
  } else if (error instanceof ForbiddenError) {
    console.log("Environment mismatch - check token/queue configuration");
  } else if (error instanceof BadRequestError) {
    console.log("Invalid parameters:", error.message);
  } else if (error instanceof InternalServerError) {
    console.log("Server error - retry with backoff");
  }
}

// Complete error handling pattern
function handleQueueError(error: unknown): void {
  if (error instanceof QueueEmptyError || error instanceof MessageLockedError) {
    // Transient errors - safe to retry
    console.log("Temporary condition, will retry");
  } else if (
    error instanceof UnauthorizedError ||
    error instanceof ForbiddenError
  ) {
    // Authentication/authorization errors - need to fix configuration
    console.log("Auth error - check credentials");
  } else if (error instanceof BadRequestError) {
    // Client error - fix the request
    console.log("Invalid request:", error.message);
  } else if (error instanceof InternalServerError) {
    // Server error - implement exponential backoff
    console.log("Server error - retry with backoff");
  } else {
    // Unknown error
    console.error("Unexpected error:", error);
  }
}

License

MIT