JSPM

@message-in-the-middle/aws

0.0.1
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 3
  • Score
    100M100P100Q32037F
  • License MIT

AWS integration for message-in-the-middle (SQS, SNS, DynamoDB, EventBridge)

Package Exports

  • @message-in-the-middle/aws

Readme

@message-in-the-middle/aws

AWS integration for message-in-the-middle - middleware for AWS messaging services (SQS, SNS, DynamoDB, EventBridge).

Features

  • SQS Consumer & Publisher - Receive and send messages to Amazon SQS
  • SNS Publisher - Publish messages to Amazon SNS topics
  • EventBridge Publisher - Publish events to Amazon EventBridge
  • DynamoDB Store - Deduplication store using DynamoDB

Installation

npm install @message-in-the-middle/aws @message-in-the-middle/core
# Install AWS SDK v3 clients (peer dependencies)
npm install @aws-sdk/client-sqs @aws-sdk/client-sns @aws-sdk/client-eventbridge @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodb

Quick Start

SQS Consumer

import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });

const manager = createSQSConsumerPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
  parseJson: true,
  autoDelete: true
});

// Add your business logic
manager.use(async (context, next) => {
  console.log('Processing message:', context.message);
  // Your processing logic here
  await next();
});

// IMPORTANT: Clean up resources on shutdown
process.on('SIGTERM', async () => {
  await manager.destroy();
  process.exit(0);
});

// Process SQS messages
const sqsMessage = {
  Body: JSON.stringify({ orderId: '123' }),
  ReceiptHandle: 'xxx',
  MessageId: 'yyy'
};

await manager.processInbound(sqsMessage.Body, sqsMessage);

SQS Publisher

import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSPublisherPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });

const manager = createSQSPublisherPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
});

// Send a message
await manager.processOutbound({ orderId: '123', status: 'pending' });

SNS Publisher

import { SNSClient } from '@aws-sdk/client-sns';
import { createSNSPublisherPipeline } from '@message-in-the-middle/aws';

const snsClient = new SNSClient({ region: 'us-east-1' });

const manager = createSNSPublisherPipeline({
  client: snsClient,
  topicArn: 'arn:aws:sns:us-east-1:123456789:my-topic',
  subject: 'Order Notification'
});

// Publish a message
await manager.processOutbound({ orderId: '123', event: 'created' });

EventBridge Publisher

import { EventBridgeClient } from '@aws-sdk/client-eventbridge';
import { createEventBridgePublisherPipeline } from '@message-in-the-middle/aws';

const ebClient = new EventBridgeClient({ region: 'us-east-1' });

const manager = createEventBridgePublisherPipeline({
  client: ebClient,
  source: 'myapp.orders',
  detailType: 'Order Event'
});

// Publish an event
await manager.processOutbound({
  eventType: 'created',
  orderId: '123'
});

DynamoDB Deduplication

import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });

const manager = createSQSConsumerPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  deduplication: {
    dynamoClient: dynamoClient,
    tableName: 'message-deduplication',
    ttlSeconds: 86400 // 24 hours
  }
});

⚠️ Best Practices

CRITICAL: Reuse AWS SDK Client Instances

Always reuse AWS SDK client instances across your application. Creating multiple client instances causes connection pool duplication, leading to memory leaks and resource exhaustion.

✅ CORRECT - Reuse clients:

// Create clients ONCE at application startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const snsClient = new SNSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });

// Reuse the same client instances for all pipelines
const consumerManager = createSQSConsumerPipeline({
  client: sqsClient, // ✅ Reusing client
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/input-queue'
});

const publisherManager = createSQSPublisherPipeline({
  client: sqsClient, // ✅ Same client instance
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
});

const snsManager = createSNSPublisherPipeline({
  client: snsClient, // ✅ Reusing SNS client
  topicArn: 'arn:aws:sns:us-east-1:123:my-topic'
});

❌ WRONG - Creating new clients causes memory leaks:

// ❌ DON'T DO THIS - Creates new connection pool each time!
for (let i = 0; i < 1000; i++) {
  const manager = createSQSConsumerPipeline({
    client: new SQSClient({ region: 'us-east-1' }), // ❌ NEW CLIENT = MEMORY LEAK!
    queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue'
  });
}
// Result: 1000 connection pools = file descriptor exhaustion = crash

❌ WRONG - Creating clients in handlers:

// ❌ DON'T DO THIS
app.post('/process', async (req, res) => {
  // ❌ Creating new client on every request = memory leak
  const sqsClient = new SQSClient({ region: 'us-east-1' });
  const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });
  // ...
});

// ✅ DO THIS INSTEAD - Create client once at startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });

app.post('/process', async (req, res) => {
  // ✅ Reuse existing manager/client
  await manager.processInbound(...);
});

Why This Matters

Each AWS SDK client instance:

  • Opens its own HTTP connection pool (default: 50 connections)
  • Allocates memory for request/response buffers
  • Maintains separate credentials and configuration

Creating 100 clients = 5,000 open connections + significant memory overhead.

// config/aws.ts - Create clients once at startup
export const awsClients = {
  sqs: new SQSClient({ region: process.env.AWS_REGION }),
  sns: new SNSClient({ region: process.env.AWS_REGION }),
  dynamodb: new DynamoDBClient({ region: process.env.AWS_REGION }),
  eventbridge: new EventBridgeClient({ region: process.env.AWS_REGION }),
};

// services/queue-processor.ts - Reuse clients
import { awsClients } from '../config/aws';

export const queueManager = createSQSConsumerPipeline({
  client: awsClients.sqs, // ✅ Reuse shared client
  queueUrl: process.env.QUEUE_URL,
});

Manual Configuration

For more control, you can configure middlewares manually:

import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { SQSClient } from '@aws-sdk/client-sqs';
import {
  SQSConsumerMiddleware,
  SQSPublisherMiddleware
} from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = new MessageMiddlewareManager();

// Add middlewares
manager.useInbound(new SQSConsumerMiddleware({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  autoDelete: true
}));

manager.useOutbound(new SQSPublisherMiddleware({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
}));

DynamoDB Table Setup

For deduplication, create a DynamoDB table with:

  • Partition Key: id (String)
  • TTL Attribute: ttl (Number) - Enable TTL in table settings

Example CloudFormation:

DeduplicationTable:
  Type: AWS::DynamoDB::Table
  Properties:
    TableName: message-deduplication
    AttributeDefinitions:
      - AttributeName: id
        AttributeType: S
    KeySchema:
      - AttributeName: id
        KeyType: HASH
    BillingMode: PAY_PER_REQUEST
    TimeToLiveSpecification:
      Enabled: true
      AttributeName: ttl

TypeScript Support

Full TypeScript support with generics:

interface OrderMessage {
  orderId: string;
  status: 'pending' | 'completed';
}

const manager = createSQSConsumerPipeline<OrderMessage>({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/orders'
});

// Type-safe message processing
manager.use(async (context, next) => {
  const order = context.message; // Type: OrderMessage
  console.log(order.orderId);
  await next();
});

License

MIT