Package Exports
- @message-in-the-middle/aws
Readme
@message-in-the-middle/aws
AWS integration for message-in-the-middle - middleware for AWS messaging services (SQS, SNS, DynamoDB, EventBridge).
Features
- SQS Consumer & Publisher - Receive and send messages to Amazon SQS
- SNS Publisher - Publish messages to Amazon SNS topics
- EventBridge Publisher - Publish events to Amazon EventBridge
- DynamoDB Store - Deduplication store using DynamoDB
Installation
npm install @message-in-the-middle/aws @message-in-the-middle/core
# Install AWS SDK v3 clients (peer dependencies)
npm install @aws-sdk/client-sqs @aws-sdk/client-sns @aws-sdk/client-eventbridge @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodbQuick Start
SQS Consumer
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
parseJson: true,
autoDelete: true
});
// Add your business logic
manager.use(async (context, next) => {
console.log('Processing message:', context.message);
// Your processing logic here
await next();
});
// IMPORTANT: Clean up resources on shutdown
process.on('SIGTERM', async () => {
await manager.destroy();
process.exit(0);
});
// Process SQS messages
const sqsMessage = {
Body: JSON.stringify({ orderId: '123' }),
ReceiptHandle: 'xxx',
MessageId: 'yyy'
};
await manager.processInbound(sqsMessage.Body, sqsMessage);SQS Publisher
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSPublisherPipeline } from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSPublisherPipeline({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
});
// Send a message
await manager.processOutbound({ orderId: '123', status: 'pending' });SNS Publisher
import { SNSClient } from '@aws-sdk/client-sns';
import { createSNSPublisherPipeline } from '@message-in-the-middle/aws';
const snsClient = new SNSClient({ region: 'us-east-1' });
const manager = createSNSPublisherPipeline({
client: snsClient,
topicArn: 'arn:aws:sns:us-east-1:123456789:my-topic',
subject: 'Order Notification'
});
// Publish a message
await manager.processOutbound({ orderId: '123', event: 'created' });EventBridge Publisher
import { EventBridgeClient } from '@aws-sdk/client-eventbridge';
import { createEventBridgePublisherPipeline } from '@message-in-the-middle/aws';
const ebClient = new EventBridgeClient({ region: 'us-east-1' });
const manager = createEventBridgePublisherPipeline({
client: ebClient,
source: 'myapp.orders',
detailType: 'Order Event'
});
// Publish an event
await manager.processOutbound({
eventType: 'created',
orderId: '123'
});DynamoDB Deduplication
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
deduplication: {
dynamoClient: dynamoClient,
tableName: 'message-deduplication',
ttlSeconds: 86400 // 24 hours
}
});⚠️ Best Practices
CRITICAL: Reuse AWS SDK Client Instances
Always reuse AWS SDK client instances across your application. Creating multiple client instances causes connection pool duplication, leading to memory leaks and resource exhaustion.
✅ CORRECT - Reuse clients:
// Create clients ONCE at application startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const snsClient = new SNSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });
// Reuse the same client instances for all pipelines
const consumerManager = createSQSConsumerPipeline({
client: sqsClient, // ✅ Reusing client
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/input-queue'
});
const publisherManager = createSQSPublisherPipeline({
client: sqsClient, // ✅ Same client instance
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
});
const snsManager = createSNSPublisherPipeline({
client: snsClient, // ✅ Reusing SNS client
topicArn: 'arn:aws:sns:us-east-1:123:my-topic'
});❌ WRONG - Creating new clients causes memory leaks:
// ❌ DON'T DO THIS - Creates new connection pool each time!
for (let i = 0; i < 1000; i++) {
const manager = createSQSConsumerPipeline({
client: new SQSClient({ region: 'us-east-1' }), // ❌ NEW CLIENT = MEMORY LEAK!
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue'
});
}
// Result: 1000 connection pools = file descriptor exhaustion = crash❌ WRONG - Creating clients in handlers:
// ❌ DON'T DO THIS
app.post('/process', async (req, res) => {
// ❌ Creating new client on every request = memory leak
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });
// ...
});
// ✅ DO THIS INSTEAD - Create client once at startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });
app.post('/process', async (req, res) => {
// ✅ Reuse existing manager/client
await manager.processInbound(...);
});Why This Matters
Each AWS SDK client instance:
- Opens its own HTTP connection pool (default: 50 connections)
- Allocates memory for request/response buffers
- Maintains separate credentials and configuration
Creating 100 clients = 5,000 open connections + significant memory overhead.
Recommended Pattern
// config/aws.ts - Create clients once at startup
export const awsClients = {
sqs: new SQSClient({ region: process.env.AWS_REGION }),
sns: new SNSClient({ region: process.env.AWS_REGION }),
dynamodb: new DynamoDBClient({ region: process.env.AWS_REGION }),
eventbridge: new EventBridgeClient({ region: process.env.AWS_REGION }),
};
// services/queue-processor.ts - Reuse clients
import { awsClients } from '../config/aws';
export const queueManager = createSQSConsumerPipeline({
client: awsClients.sqs, // ✅ Reuse shared client
queueUrl: process.env.QUEUE_URL,
});Manual Configuration
For more control, you can configure middlewares manually:
import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { SQSClient } from '@aws-sdk/client-sqs';
import {
SQSConsumerMiddleware,
SQSPublisherMiddleware
} from '@message-in-the-middle/aws';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = new MessageMiddlewareManager();
// Add middlewares
manager.useInbound(new SQSConsumerMiddleware({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
autoDelete: true
}));
manager.useOutbound(new SQSPublisherMiddleware({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
}));DynamoDB Table Setup
For deduplication, create a DynamoDB table with:
- Partition Key:
id(String) - TTL Attribute:
ttl(Number) - Enable TTL in table settings
Example CloudFormation:
DeduplicationTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: message-deduplication
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
BillingMode: PAY_PER_REQUEST
TimeToLiveSpecification:
Enabled: true
AttributeName: ttlTypeScript Support
Full TypeScript support with generics:
interface OrderMessage {
orderId: string;
status: 'pending' | 'completed';
}
const manager = createSQSConsumerPipeline<OrderMessage>({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/orders'
});
// Type-safe message processing
manager.use(async (context, next) => {
const order = context.message; // Type: OrderMessage
console.log(order.orderId);
await next();
});License
MIT