JSPM

@message-in-the-middle/aws

0.1.3
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • 0
  • Score
    100M100P100Q32021F
  • License MIT

AWS integration for message-in-the-middle (SQS, SNS, DynamoDB, EventBridge)

Package Exports

  • @message-in-the-middle/aws

Readme

@message-in-the-middle/aws

⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment

message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.

Why This Exists

Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.


AWS integration for message-in-the-middle - middleware for AWS messaging services (SQS, SNS, DynamoDB, EventBridge).

Features

  • SQS Consumer & Publisher - Receive and send messages to Amazon SQS
  • SNS Publisher - Publish messages to Amazon SNS topics
  • EventBridge Publisher - Publish events to Amazon EventBridge
  • DynamoDB Store - Deduplication store using DynamoDB

Installation

npm install @message-in-the-middle/aws @message-in-the-middle/core
# Install AWS SDK v3 clients (peer dependencies)
npm install @aws-sdk/client-sqs @aws-sdk/client-sns @aws-sdk/client-eventbridge @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodb

Quick Start

SQS Consumer

import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });

const manager = createSQSConsumerPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
  parseJson: true,
  autoDelete: true
});

// Add your business logic
manager.use(async (context, next) => {
  console.log('Processing message:', context.message);
  // Your processing logic here
  await next();
});

// IMPORTANT: Clean up resources on shutdown
process.on('SIGTERM', async () => {
  await manager.destroy();
  process.exit(0);
});

// Process SQS messages
const sqsMessage = {
  Body: JSON.stringify({ orderId: '123' }),
  ReceiptHandle: 'xxx',
  MessageId: 'yyy'
};

await manager.processInbound(sqsMessage.Body, sqsMessage);

SQS Publisher

import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSPublisherPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });

const manager = createSQSPublisherPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
});

// Send a message
await manager.processOutbound({ orderId: '123', status: 'pending' });

SNS Publisher

import { SNSClient } from '@aws-sdk/client-sns';
import { createSNSPublisherPipeline } from '@message-in-the-middle/aws';

const snsClient = new SNSClient({ region: 'us-east-1' });

const manager = createSNSPublisherPipeline({
  client: snsClient,
  topicArn: 'arn:aws:sns:us-east-1:123456789:my-topic',
  subject: 'Order Notification'
});

// Publish a message
await manager.processOutbound({ orderId: '123', event: 'created' });

EventBridge Publisher

import { EventBridgeClient } from '@aws-sdk/client-eventbridge';
import { createEventBridgePublisherPipeline } from '@message-in-the-middle/aws';

const ebClient = new EventBridgeClient({ region: 'us-east-1' });

const manager = createEventBridgePublisherPipeline({
  client: ebClient,
  source: 'myapp.orders',
  detailType: 'Order Event'
});

// Publish an event
await manager.processOutbound({
  eventType: 'created',
  orderId: '123'
});

DynamoDB Deduplication

import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });

const manager = createSQSConsumerPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  deduplication: {
    dynamoClient: dynamoClient,
    tableName: 'message-deduplication',
    ttlSeconds: 86400 // 24 hours
  }
});

SQS Poller (Production-Ready Polling)

The SQSPoller provides production-ready SQS message polling with concurrency control, graceful shutdown, and event-driven lifecycle hooks.

Basic Usage

import { SQSClient } from '@aws-sdk/client-sqs';
import { SQSPoller } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });
const poller = new SQSPoller(sqsClient, { logger: console });

// Start polling - returns QueueController
const ordersQueue = poller.start({
  queueUrl: process.env.ORDERS_QUEUE_URL,
  manager: ordersManager,
  name: 'orders',
  concurrency: 10,
  waitTimeSeconds: 20 // Long polling
});

// Per-queue event handlers (eliminates if-else chains!)
ordersQueue.on('message:processed', (message, duration) => {
  console.log(`Order processed in ${duration}ms`, { messageId: message.MessageId });
});

ordersQueue.on('message:failed', (message, error) => {
  console.error('Order processing failed', { messageId: message.MessageId, error });
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  await ordersQueue.stop();
});

Two Event Systems: Semantic Separation

The poller provides two complementary event systems:

  1. Global Events (on SQSPoller) - For cross-queue system concerns
  2. Per-Queue Events (on QueueController) - For queue-specific business logic
const poller = new SQSPoller(sqsClient, { logger });

// Global events: System-wide metrics across ALL queues
poller.on('message:processed', (queueName, message, duration) => {
  metrics.timing('sqs.duration', duration, { queue: queueName });
  apm.track({ event: 'sqs.processed', queue: queueName, duration });
});

// Per-queue events: Queue-specific business logic (no if-else chains!)
const ordersQueue = poller.start({ name: 'orders', ... });
ordersQueue.on('message:processed', async (message, duration) => {
  const order = JSON.parse(message.Body);
  await db.orders.update(order.orderId, { status: 'processed', duration });
  await sendOrderConfirmation(order.customerId);
});

const notificationsQueue = poller.start({ name: 'notifications', ... });
notificationsQueue.on('message:processed', (message, duration) => {
  logger.info('Notification sent', { messageId: message.MessageId, duration });
});

Benefits:

  • ✅ No more if-else chains for queue-specific handling
  • ✅ Type-safe per-queue events with full IntelliSense
  • ✅ Clean separation of concerns (system vs business logic)
  • ✅ Easy to add new queues (no scattered updates)

For comprehensive documentation, see SQS Poller Guide.

⚠️ Best Practices

CRITICAL: Reuse AWS SDK Client Instances

Always reuse AWS SDK client instances across your application. Creating multiple client instances causes connection pool duplication, leading to memory leaks and resource exhaustion.

✅ CORRECT - Reuse clients:

// Create clients ONCE at application startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const snsClient = new SNSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });

// Reuse the same client instances for all pipelines
const consumerManager = createSQSConsumerPipeline({
  client: sqsClient, // ✅ Reusing client
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/input-queue'
});

const publisherManager = createSQSPublisherPipeline({
  client: sqsClient, // ✅ Same client instance
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
});

const snsManager = createSNSPublisherPipeline({
  client: snsClient, // ✅ Reusing SNS client
  topicArn: 'arn:aws:sns:us-east-1:123:my-topic'
});

❌ WRONG - Creating new clients causes memory leaks:

// ❌ DON'T DO THIS - Creates new connection pool each time!
for (let i = 0; i < 1000; i++) {
  const manager = createSQSConsumerPipeline({
    client: new SQSClient({ region: 'us-east-1' }), // ❌ NEW CLIENT = MEMORY LEAK!
    queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue'
  });
}
// Result: 1000 connection pools = file descriptor exhaustion = crash

❌ WRONG - Creating clients in handlers:

// ❌ DON'T DO THIS
app.post('/process', async (req, res) => {
  // ❌ Creating new client on every request = memory leak
  const sqsClient = new SQSClient({ region: 'us-east-1' });
  const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });
  // ...
});

// ✅ DO THIS INSTEAD - Create client once at startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });

app.post('/process', async (req, res) => {
  // ✅ Reuse existing manager/client
  await manager.processInbound(...);
});

Why This Matters

Each AWS SDK client instance:

  • Opens its own HTTP connection pool (default: 50 connections)
  • Allocates memory for request/response buffers
  • Maintains separate credentials and configuration

Creating 100 clients = 5,000 open connections + significant memory overhead.

// config/aws.ts - Create clients once at startup
export const awsClients = {
  sqs: new SQSClient({ region: process.env.AWS_REGION }),
  sns: new SNSClient({ region: process.env.AWS_REGION }),
  dynamodb: new DynamoDBClient({ region: process.env.AWS_REGION }),
  eventbridge: new EventBridgeClient({ region: process.env.AWS_REGION }),
};

// services/queue-processor.ts - Reuse clients
import { awsClients } from '../config/aws';

export const queueManager = createSQSConsumerPipeline({
  client: awsClients.sqs, // ✅ Reuse shared client
  queueUrl: process.env.QUEUE_URL,
});

Manual Configuration

For more control, you can configure middlewares manually:

import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { SQSClient } from '@aws-sdk/client-sqs';
import {
  SQSConsumerMiddleware,
  SQSPublisherMiddleware
} from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = new MessageMiddlewareManager();

// Add middlewares
manager.useInbound(new SQSConsumerMiddleware({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  autoDelete: true
}));

manager.useOutbound(new SQSPublisherMiddleware({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
}));

DynamoDB Table Setup

For deduplication, create a DynamoDB table with:

  • Partition Key: id (String)
  • TTL Attribute: ttl (Number) - Enable TTL in table settings

Example CloudFormation:

DeduplicationTable:
  Type: AWS::DynamoDB::Table
  Properties:
    TableName: message-deduplication
    AttributeDefinitions:
      - AttributeName: id
        AttributeType: S
    KeySchema:
      - AttributeName: id
        KeyType: HASH
    BillingMode: PAY_PER_REQUEST
    TimeToLiveSpecification:
      Enabled: true
      AttributeName: ttl

TypeScript Support

Full TypeScript support with generics:

interface OrderMessage {
  orderId: string;
  status: 'pending' | 'completed';
}

const manager = createSQSConsumerPipeline<OrderMessage>({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/orders'
});

// Type-safe message processing
manager.use(async (context, next) => {
  const order = context.message; // Type: OrderMessage
  console.log(order.orderId);
  await next();
});

License

MIT