Package Exports
- @message-in-the-middle/aws
Readme
@message-in-the-middle/aws
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
AWS integration for message-in-the-middle - middleware for AWS messaging services (SQS, SNS, DynamoDB, EventBridge).
Features
- SQS Consumer & Publisher - Receive and send messages to Amazon SQS
- SNS Publisher - Publish messages to Amazon SNS topics
- EventBridge Publisher - Publish events to Amazon EventBridge
- DynamoDB Store - Deduplication store using DynamoDB
Installation
npm install @message-in-the-middle/aws @message-in-the-middle/core
# Install AWS SDK v3 clients (peer dependencies)
npm install @aws-sdk/client-sqs @aws-sdk/client-sns @aws-sdk/client-eventbridge @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodbQuick Start
SQS Consumer
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
parseJson: true,
autoDelete: true
});
// Add your business logic
manager.use(async (context, next) => {
console.log('Processing message:', context.message);
// Your processing logic here
await next();
});
// IMPORTANT: Clean up resources on shutdown
process.on('SIGTERM', async () => {
await manager.destroy();
process.exit(0);
});
// Process SQS messages
const sqsMessage = {
Body: JSON.stringify({ orderId: '123' }),
ReceiptHandle: 'xxx',
MessageId: 'yyy'
};
await manager.processInbound(sqsMessage.Body, sqsMessage);SQS Publisher
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSPublisherPipeline } from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSPublisherPipeline({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
});
// Send a message
await manager.processOutbound({ orderId: '123', status: 'pending' });SNS Publisher
import { SNSClient } from '@aws-sdk/client-sns';
import { createSNSPublisherPipeline } from '@message-in-the-middle/aws';
const snsClient = new SNSClient({ region: 'us-east-1' });
const manager = createSNSPublisherPipeline({
client: snsClient,
topicArn: 'arn:aws:sns:us-east-1:123456789:my-topic',
subject: 'Order Notification'
});
// Publish a message
await manager.processOutbound({ orderId: '123', event: 'created' });EventBridge Publisher
import { EventBridgeClient } from '@aws-sdk/client-eventbridge';
import { createEventBridgePublisherPipeline } from '@message-in-the-middle/aws';
const ebClient = new EventBridgeClient({ region: 'us-east-1' });
const manager = createEventBridgePublisherPipeline({
client: ebClient,
source: 'myapp.orders',
detailType: 'Order Event'
});
// Publish an event
await manager.processOutbound({
eventType: 'created',
orderId: '123'
});DynamoDB Deduplication
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
deduplication: {
dynamoClient: dynamoClient,
tableName: 'message-deduplication',
ttlSeconds: 86400 // 24 hours
}
});SQS Poller (Production-Ready Polling)
The SQSPoller provides production-ready SQS message polling with concurrency control, graceful shutdown, and event-driven lifecycle hooks.
Basic Usage
import { SQSClient } from '@aws-sdk/client-sqs';
import { SQSPoller } from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const poller = new SQSPoller(sqsClient, { logger: console });
// Start polling - returns QueueController
const ordersQueue = poller.start({
queueUrl: process.env.ORDERS_QUEUE_URL,
manager: ordersManager,
name: 'orders',
concurrency: 10,
waitTimeSeconds: 20 // Long polling
});
// Per-queue event handlers (eliminates if-else chains!)
ordersQueue.on('message:processed', (message, duration) => {
console.log(`Order processed in ${duration}ms`, { messageId: message.MessageId });
});
ordersQueue.on('message:failed', (message, error) => {
console.error('Order processing failed', { messageId: message.MessageId, error });
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await ordersQueue.stop();
});Two Event Systems: Semantic Separation
The poller provides two complementary event systems:
- Global Events (on
SQSPoller) - For cross-queue system concerns - Per-Queue Events (on
QueueController) - For queue-specific business logic
const poller = new SQSPoller(sqsClient, { logger });
// Global events: System-wide metrics across ALL queues
poller.on('message:processed', (queueName, message, duration) => {
metrics.timing('sqs.duration', duration, { queue: queueName });
apm.track({ event: 'sqs.processed', queue: queueName, duration });
});
// Per-queue events: Queue-specific business logic (no if-else chains!)
const ordersQueue = poller.start({ name: 'orders', ... });
ordersQueue.on('message:processed', async (message, duration) => {
const order = JSON.parse(message.Body);
await db.orders.update(order.orderId, { status: 'processed', duration });
await sendOrderConfirmation(order.customerId);
});
const notificationsQueue = poller.start({ name: 'notifications', ... });
notificationsQueue.on('message:processed', (message, duration) => {
logger.info('Notification sent', { messageId: message.MessageId, duration });
});Benefits:
- ✅ No more if-else chains for queue-specific handling
- ✅ Type-safe per-queue events with full IntelliSense
- ✅ Clean separation of concerns (system vs business logic)
- ✅ Easy to add new queues (no scattered updates)
For comprehensive documentation, see SQS Poller Guide.
⚠️ Best Practices
CRITICAL: Reuse AWS SDK Client Instances
Always reuse AWS SDK client instances across your application. Creating multiple client instances causes connection pool duplication, leading to memory leaks and resource exhaustion.
✅ CORRECT - Reuse clients:
// Create clients ONCE at application startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const snsClient = new SNSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });
// Reuse the same client instances for all pipelines
const consumerManager = createSQSConsumerPipeline({
client: sqsClient, // ✅ Reusing client
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/input-queue'
});
const publisherManager = createSQSPublisherPipeline({
client: sqsClient, // ✅ Same client instance
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
});
const snsManager = createSNSPublisherPipeline({
client: snsClient, // ✅ Reusing SNS client
topicArn: 'arn:aws:sns:us-east-1:123:my-topic'
});❌ WRONG - Creating new clients causes memory leaks:
// ❌ DON'T DO THIS - Creates new connection pool each time!
for (let i = 0; i < 1000; i++) {
const manager = createSQSConsumerPipeline({
client: new SQSClient({ region: 'us-east-1' }), // ❌ NEW CLIENT = MEMORY LEAK!
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue'
});
}
// Result: 1000 connection pools = file descriptor exhaustion = crash❌ WRONG - Creating clients in handlers:
// ❌ DON'T DO THIS
app.post('/process', async (req, res) => {
// ❌ Creating new client on every request = memory leak
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });
// ...
});
// ✅ DO THIS INSTEAD - Create client once at startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });
app.post('/process', async (req, res) => {
// ✅ Reuse existing manager/client
await manager.processInbound(...);
});Why This Matters
Each AWS SDK client instance:
- Opens its own HTTP connection pool (default: 50 connections)
- Allocates memory for request/response buffers
- Maintains separate credentials and configuration
Creating 100 clients = 5,000 open connections + significant memory overhead.
Recommended Pattern
// config/aws.ts - Create clients once at startup
export const awsClients = {
sqs: new SQSClient({ region: process.env.AWS_REGION }),
sns: new SNSClient({ region: process.env.AWS_REGION }),
dynamodb: new DynamoDBClient({ region: process.env.AWS_REGION }),
eventbridge: new EventBridgeClient({ region: process.env.AWS_REGION }),
};
// services/queue-processor.ts - Reuse clients
import { awsClients } from '../config/aws';
export const queueManager = createSQSConsumerPipeline({
client: awsClients.sqs, // ✅ Reuse shared client
queueUrl: process.env.QUEUE_URL,
});Manual Configuration
For more control, you can configure middlewares manually:
import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { SQSClient } from '@aws-sdk/client-sqs';
import {
SQSConsumerMiddleware,
SQSPublisherMiddleware
} from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = new MessageMiddlewareManager();
// Add middlewares
manager.useInbound(new SQSConsumerMiddleware({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
autoDelete: true
}));
manager.useOutbound(new SQSPublisherMiddleware({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
}));DynamoDB Table Setup
For deduplication, create a DynamoDB table with:
- Partition Key:
id(String) - TTL Attribute:
ttl(Number) - Enable TTL in table settings
Example CloudFormation:
DeduplicationTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: message-deduplication
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
BillingMode: PAY_PER_REQUEST
TimeToLiveSpecification:
Enabled: true
AttributeName: ttlTypeScript Support
Full TypeScript support with generics:
interface OrderMessage {
orderId: string;
status: 'pending' | 'completed';
}
const manager = createSQSConsumerPipeline<OrderMessage>({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/orders'
});
// Type-safe message processing
manager.use(async (context, next) => {
const order = context.message; // Type: OrderMessage
console.log(order.orderId);
await next();
});License
MIT