JSPM

@ozritesh/queue-agnostic

1.0.2
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 4
  • Score
    100M100P100Q69284F
  • License MIT

Universal queue abstraction library supporting RabbitMQ, AWS SQS, Azure Service Bus, and GCP Pub/Sub with a single unified interface

Package Exports

  • @ozritesh/queue-agnostic
  • @ozritesh/queue-agnostic/src/index.js

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@ozritesh/queue-agnostic) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

Queue-Agnostic Document Processing System

A flexible, queue-agnostic Node.js solution for processing documents (PDFs, images) that can seamlessly work with different queue providers including RabbitMQ, AWS SQS, Azure Service Bus, and Google Cloud Pub/Sub.

🚀 Features

  • Provider Agnostic: Single interface for multiple queue providers
  • Environment-Based Configuration: Easy deployment across different clients
  • Support for Multiple Providers:
    • RabbitMQ
    • AWS SQS
    • Azure Service Bus
    • Google Cloud Pub/Sub
  • Automatic Connection Management: Built-in connection handling and graceful shutdown
  • Error Handling: Automatic message retry/requeue on failures
  • Easy to Extend: Add new queue providers by implementing the QueueInterface

📦 Installation

npm install

🔧 Configuration

Environment Variables

Copy the example environment file and configure it:

cp .env.example .env

Edit .env and set the appropriate values for your queue provider:

# Set your queue provider
QUEUE_PROVIDER=rabbitmq  # or aws-sqs, azure-servicebus, gcp-pubsub

# Set your queue/topic name
QUEUE_NAME=document-processing-queue

# Provider-specific configuration (see .env.example for details)

Provider-Specific Setup

RabbitMQ

QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://localhost:5672

AWS SQS

QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your_key
AWS_SECRET_ACCESS_KEY=your_secret

Azure Service Bus

QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...

Google Cloud Pub/Sub

QUEUE_PROVIDER=gcp-pubsub
GCP_PROJECT_ID=your-project-id
GCP_KEY_FILENAME=./service-account-key.json

📖 Usage

Quick Start - Subscriber

const QueueFactory = require('./src/queue/QueueFactory');

// Create queue from environment variables
const queue = QueueFactory.createFromEnv();

await queue.connect();

// Subscribe and process messages
await queue.subscribe('document-processing-queue', async (message) => {
  console.log('Processing document:', message);
  // Your document processing logic here
});

Run the subscriber:

npm run start:subscriber

Quick Start - Publisher

const QueueFactory = require('./src/queue/QueueFactory');

const queue = QueueFactory.createFromEnv();
await queue.connect();

// Publish a message
await queue.publish('document-processing-queue', {
  documentId: 'doc-123',
  documentUrl: 'https://example.com/doc.pdf',
  documentType: 'pdf'
});

await queue.disconnect();

Run the publisher:

npm run start:publisher

Direct Usage (Without Environment Variables)

const QueueFactory = require('./src/queue/QueueFactory');

// Create queue with explicit configuration
const queue = QueueFactory.create({
  provider: 'rabbitmq',
  options: {
    url: 'amqp://localhost:5672'
  }
});

await queue.connect();
// ... use the queue
await queue.disconnect();

🏗️ Architecture

Project Structure

├── src/
│   ├── queue/
│   │   ├── QueueInterface.js          # Abstract interface
│   │   ├── QueueFactory.js            # Factory for creating adapters
│   │   └── adapters/
│   │       ├── RabbitMQAdapter.js     # RabbitMQ implementation
│   │       ├── AWSSQSAdapter.js       # AWS SQS implementation
│   │       ├── AzureServiceBusAdapter.js  # Azure implementation
│   │       └── GCPPubSubAdapter.js    # GCP implementation
│   └── index.js                       # Main entry point
├── examples/
│   ├── subscriber.js                  # Example subscriber
│   ├── publisher.js                   # Example publisher
│   └── direct-usage.js                # Direct usage examples
├── .env.example                       # Environment variables template
└── package.json

Design Pattern

The system uses the Adapter Pattern to provide a unified interface across different queue providers:

QueueInterface (Abstract)
    ↓
    ├── RabbitMQAdapter
    ├── AWSSQSAdapter
    ├── AzureServiceBusAdapter
    └── GCPPubSubAdapter

🔌 Queue Interface

All adapters implement the following interface:

Methods

connect()

Connect to the queue service.

await queue.connect();

disconnect()

Disconnect from the queue service.

await queue.disconnect();

publish(queueName, message, options)

Publish a message to a queue/topic.

await queue.publish('my-queue', {
  documentId: '123',
  type: 'pdf'
}, {
  // Provider-specific options
});

subscribe(queueName, handler, options)

Subscribe to a queue and process messages.

await queue.subscribe('my-queue', async (message) => {
  // Process message
}, {
  // Provider-specific options
});

isConnected()

Check if the connection is active.

if (queue.isConnected()) {
  // Do something
}

⚙️ Provider-Specific Options

RabbitMQ Options

Subscribe Options:

{
  prefetch: 1,           // Number of messages to prefetch
  durable: true,         // Queue durability
  requeue: true          // Requeue on failure
}

AWS SQS Options

Subscribe Options:

{
  pollingInterval: 1000,    // Polling interval in ms
  maxMessages: 10,          // Max messages per poll
  waitTimeSeconds: 20,      // Long polling wait time
  visibilityTimeout: 30     // Message visibility timeout
}

Azure Service Bus Options

Subscribe Options:

{
  receiveMode: 'peekLock',     // or 'receiveAndDelete'
  maxConcurrentCalls: 1        // Concurrent message processing
}

Google Cloud Pub/Sub Options

Subscribe Options:

{
  topicName: 'my-topic',       // Required for new subscriptions
  createIfNotExists: true,      // Auto-create topic/subscription
  flowControl: {
    maxMessages: 100
  }
}

🔄 Deployment Scenarios

Scenario 1: Client using RabbitMQ

# .env
QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://prod-rabbitmq:5672
QUEUE_NAME=client-a-documents

Scenario 2: Client using AWS

# .env
QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-west-2
QUEUE_NAME=client-b-documents

Scenario 3: Client using Azure

# .env
QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
QUEUE_NAME=client-c-documents

Scenario 4: Client using Google Cloud

# .env
QUEUE_PROVIDER=gcp-pubsub
GCP_PROJECT_ID=client-d-project
QUEUE_NAME=client-d-documents

🛡️ Error Handling

All adapters include built-in error handling:

  • Message Processing Errors: Messages are automatically requeued/nacked on handler errors
  • Connection Errors: Logged and can be handled with reconnection logic
  • Graceful Shutdown: SIGINT/SIGTERM handlers for clean disconnection

🔍 Monitoring & Logging

All adapters include console logging for:

  • Connection status
  • Message publishing
  • Message receiving
  • Error conditions

Example output:

✓ Connected to RabbitMQ
✓ Subscribed to RabbitMQ queue: document-processing-queue
📄 Received document for processing: { documentId: '123', ... }
✓ Successfully processed document: 123

🚧 Extending with New Providers

To add a new queue provider:

  1. Create a new adapter in src/queue/adapters/
  2. Extend QueueInterface
  3. Implement all required methods
  4. Add to QueueFactory.js
  5. Update environment variable handling

Example skeleton:

const QueueInterface = require('../QueueInterface');

class NewProviderAdapter extends QueueInterface {
  async connect() { /* ... */ }
  async disconnect() { /* ... */ }
  async publish(queueName, message, options) { /* ... */ }
  async subscribe(queueName, handler, options) { /* ... */ }
  isConnected() { /* ... */ }
}

module.exports = NewProviderAdapter;

📝 License

MIT

🤝 Contributing

Contributions are welcome! Feel free to submit issues or pull requests.

📞 Support

For issues or questions, please open an issue on the repository.