Package Exports
- @ozritesh/queue-agnostic
- @ozritesh/queue-agnostic/src/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@ozritesh/queue-agnostic) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
Queue-Agnostic Document Processing System
A flexible, queue-agnostic Node.js solution for processing documents (PDFs, images) that can seamlessly work with different queue providers including RabbitMQ, AWS SQS, Azure Service Bus, and Google Cloud Pub/Sub.
🚀 Features
- Provider Agnostic: Single interface for multiple queue providers
- Environment-Based Configuration: Easy deployment across different clients
- Support for Multiple Providers:
- RabbitMQ
- AWS SQS
- Azure Service Bus
- Google Cloud Pub/Sub
- Automatic Connection Management: Built-in connection handling and graceful shutdown
- Error Handling: Automatic message retry/requeue on failures
- Easy to Extend: Add new queue providers by implementing the QueueInterface
📦 Installation
npm install
🔧 Configuration
Environment Variables
Copy the example environment file and configure it:
cp .env.example .env
Edit .env
and set the appropriate values for your queue provider:
# Set your queue provider
QUEUE_PROVIDER=rabbitmq # or aws-sqs, azure-servicebus, gcp-pubsub
# Set your queue/topic name
QUEUE_NAME=document-processing-queue
# Provider-specific configuration (see .env.example for details)
Provider-Specific Setup
RabbitMQ
QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://localhost:5672
AWS SQS
QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your_key
AWS_SECRET_ACCESS_KEY=your_secret
Azure Service Bus
QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
Google Cloud Pub/Sub
QUEUE_PROVIDER=gcp-pubsub
GCP_PROJECT_ID=your-project-id
GCP_KEY_FILENAME=./service-account-key.json
📖 Usage
Quick Start - Subscriber
const QueueFactory = require('./src/queue/QueueFactory');
// Create queue from environment variables
const queue = QueueFactory.createFromEnv();
await queue.connect();
// Subscribe and process messages
await queue.subscribe('document-processing-queue', async (message) => {
console.log('Processing document:', message);
// Your document processing logic here
});
Run the subscriber:
npm run start:subscriber
Quick Start - Publisher
const QueueFactory = require('./src/queue/QueueFactory');
const queue = QueueFactory.createFromEnv();
await queue.connect();
// Publish a message
await queue.publish('document-processing-queue', {
documentId: 'doc-123',
documentUrl: 'https://example.com/doc.pdf',
documentType: 'pdf'
});
await queue.disconnect();
Run the publisher:
npm run start:publisher
Direct Usage (Without Environment Variables)
const QueueFactory = require('./src/queue/QueueFactory');
// Create queue with explicit configuration
const queue = QueueFactory.create({
provider: 'rabbitmq',
options: {
url: 'amqp://localhost:5672'
}
});
await queue.connect();
// ... use the queue
await queue.disconnect();
🏗️ Architecture
Project Structure
├── src/
│ ├── queue/
│ │ ├── QueueInterface.js # Abstract interface
│ │ ├── QueueFactory.js # Factory for creating adapters
│ │ └── adapters/
│ │ ├── RabbitMQAdapter.js # RabbitMQ implementation
│ │ ├── AWSSQSAdapter.js # AWS SQS implementation
│ │ ├── AzureServiceBusAdapter.js # Azure implementation
│ │ └── GCPPubSubAdapter.js # GCP implementation
│ └── index.js # Main entry point
├── examples/
│ ├── subscriber.js # Example subscriber
│ ├── publisher.js # Example publisher
│ └── direct-usage.js # Direct usage examples
├── .env.example # Environment variables template
└── package.json
Design Pattern
The system uses the Adapter Pattern to provide a unified interface across different queue providers:
QueueInterface (Abstract)
↓
├── RabbitMQAdapter
├── AWSSQSAdapter
├── AzureServiceBusAdapter
└── GCPPubSubAdapter
🔌 Queue Interface
All adapters implement the following interface:
Methods
connect()
Connect to the queue service.
await queue.connect();
disconnect()
Disconnect from the queue service.
await queue.disconnect();
publish(queueName, message, options)
Publish a message to a queue/topic.
await queue.publish('my-queue', {
documentId: '123',
type: 'pdf'
}, {
// Provider-specific options
});
subscribe(queueName, handler, options)
Subscribe to a queue and process messages.
await queue.subscribe('my-queue', async (message) => {
// Process message
}, {
// Provider-specific options
});
isConnected()
Check if the connection is active.
if (queue.isConnected()) {
// Do something
}
⚙️ Provider-Specific Options
RabbitMQ Options
Subscribe Options:
{
prefetch: 1, // Number of messages to prefetch
durable: true, // Queue durability
requeue: true // Requeue on failure
}
AWS SQS Options
Subscribe Options:
{
pollingInterval: 1000, // Polling interval in ms
maxMessages: 10, // Max messages per poll
waitTimeSeconds: 20, // Long polling wait time
visibilityTimeout: 30 // Message visibility timeout
}
Azure Service Bus Options
Subscribe Options:
{
receiveMode: 'peekLock', // or 'receiveAndDelete'
maxConcurrentCalls: 1 // Concurrent message processing
}
Google Cloud Pub/Sub Options
Subscribe Options:
{
topicName: 'my-topic', // Required for new subscriptions
createIfNotExists: true, // Auto-create topic/subscription
flowControl: {
maxMessages: 100
}
}
🔄 Deployment Scenarios
Scenario 1: Client using RabbitMQ
# .env
QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://prod-rabbitmq:5672
QUEUE_NAME=client-a-documents
Scenario 2: Client using AWS
# .env
QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-west-2
QUEUE_NAME=client-b-documents
Scenario 3: Client using Azure
# .env
QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
QUEUE_NAME=client-c-documents
Scenario 4: Client using Google Cloud
# .env
QUEUE_PROVIDER=gcp-pubsub
GCP_PROJECT_ID=client-d-project
QUEUE_NAME=client-d-documents
🛡️ Error Handling
All adapters include built-in error handling:
- Message Processing Errors: Messages are automatically requeued/nacked on handler errors
- Connection Errors: Logged and can be handled with reconnection logic
- Graceful Shutdown: SIGINT/SIGTERM handlers for clean disconnection
🔍 Monitoring & Logging
All adapters include console logging for:
- Connection status
- Message publishing
- Message receiving
- Error conditions
Example output:
✓ Connected to RabbitMQ
✓ Subscribed to RabbitMQ queue: document-processing-queue
📄 Received document for processing: { documentId: '123', ... }
✓ Successfully processed document: 123
🚧 Extending with New Providers
To add a new queue provider:
- Create a new adapter in
src/queue/adapters/
- Extend
QueueInterface
- Implement all required methods
- Add to
QueueFactory.js
- Update environment variable handling
Example skeleton:
const QueueInterface = require('../QueueInterface');
class NewProviderAdapter extends QueueInterface {
async connect() { /* ... */ }
async disconnect() { /* ... */ }
async publish(queueName, message, options) { /* ... */ }
async subscribe(queueName, handler, options) { /* ... */ }
isConnected() { /* ... */ }
}
module.exports = NewProviderAdapter;
📝 License
MIT
🤝 Contributing
Contributions are welcome! Feel free to submit issues or pull requests.
📞 Support
For issues or questions, please open an issue on the repository.