JSPM

  • Created
  • Published
  • Downloads 172
  • Score
    100M100P100Q116513F
  • License MIT

A promise-based, broker-agnostic client for sending and receiving messages via RabbitMQ

Package Exports

  • @onlineapps/conn-infra-mq
  • @onlineapps/conn-infra-mq/src/config/queueConfig
  • @onlineapps/conn-infra-mq/src/config/queueConfig.js
  • @onlineapps/conn-infra-mq/src/index.js

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@onlineapps/conn-infra-mq) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

@onlineapps/conn-infra-mq

Build Status Coverage Status npm version

Message queue connector with layered architecture for workflow orchestration, RPC, fork-join, and retry patterns. Built on top of RabbitMQ with clean separation of concerns.


🚀 Features

  • Layered Architecture: Clean separation into specialized layers (WorkflowRouter, QueueManager, ForkJoinHandler, RPCHandler, RetryHandler)
  • Workflow Orchestration: Decentralized workflow routing without central orchestrator
  • Fork-Join Pattern: Parallel processing with result aggregation and built-in join strategies
  • RPC Support: Request-response communication with correlation IDs and timeouts
  • Automatic Retry: Exponential backoff, dead letter queue management, configurable retry policies
  • Queue Management: TTL, DLQ, auto-delete, temporary queues, exchange bindings
  • Promise-based API: All operations return promises for clean async/await usage
  • Built-in Serialization: JSON with custom error handling
  • Config Validation: Strict schema validation via Ajv
  • Extensible Transport: Clear separation between core logic and transport layer

📦 Installation

npm install @onlineapps/conn-infra-mq
# or
yarn add @onlineapps/conn-infra-mq

Requires Node.js ≥12. For RabbitMQ usage, ensure an accessible AMQP server.


🏗️ Architecture

ConnectorMQClient (main orchestrator)
    ├── BaseClient (core AMQP operations)
    ├── WorkflowRouter (workflow orchestration)
    ├── QueueManager (queue lifecycle management)
    ├── ForkJoinHandler (parallel processing)
    ├── RPCHandler (request-response patterns)
    └── RetryHandler (error recovery & DLQ)

🔧 Quick Start

'use strict';

const ConnectorMQClient = require('@onlineapps/conn-infra-mq');

(async () => {
  // 1. Create client with configuration
  const client = new ConnectorMQClient({
    host: 'amqp://localhost:5672',
    serviceName: 'my-service',
    queue: 'default-queue',
    durable: true,
    prefetch: 5,                        // Default prefetch count for consumers
    noAck: false,                       // Default auto-acknowledge = false
    retryPolicy: {                      // Optional reconnection policy (not enforced in v1.0.0)
      retries: 5,
      initialDelayMs: 1000,
      maxDelayMs: 30000,
      factor: 2
    }
  });

  // 2. Register a global error handler
  client.onError(err => {
    console.error('[AgentMQClient] Error:', err);
  });

  // 3. Connect to RabbitMQ
  try {
    await client.connect();
    console.log('Connected to broker');
  } catch (err) {
    console.error('Connection failed:', err);
    process.exit(1);
  }

  // 4. Publish a sample message
  const samplePayload = { taskId: 'abc123', action: 'processData', timestamp: Date.now() };
  try {
    await client.publish('job_queue', samplePayload, {
      persistent: true,
      headers: { origin: 'quickStart' }
    });
    console.log('Message published:', samplePayload);
  } catch (err) {
    console.error('Publish error:', err);
  }

  // 5. Consume messages
  try {
    await client.consume(
      'job_queue',
      async (msg) => {
        const data = JSON.parse(msg.content.toString('utf8'));
        console.log('Received:', data);
        // Process message...
        await client.ack(msg);
      },
      { prefetch: 5, noAck: false }
    );
    console.log('Consuming from "job_queue"...');
  } catch (err) {
    console.error('Consume error:', err);
  }

  // 6. Graceful shutdown on SIGINT
  process.on('SIGINT', async () => {
    console.log('Shutting down...');
    try {
      await client.disconnect();
      console.log('Disconnected, exiting.');
      process.exit(0);
    } catch (discErr) {
      console.error('Error during disconnect:', discErr);
      process.exit(1);
    }
  });
})();

📄 Configuration

Configuration can be provided to the AgentMQClient constructor or as overrides to connect(). Below is a summary of supported fields (see docs/api.md for full details):

Field Type Description Default
type string Transport type: 'rabbitmq' 'rabbitmq'
host string Connection URI or hostname. For RabbitMQ: e.g. 'amqp://user:pass@localhost:5672'. Required
queue string Default queue name for publish/consume if not overridden per call. ''
exchange string Default exchange name. Empty string uses the default direct exchange. ''
durable boolean Declare queues/exchanges as durable. true
prefetch integer Default prefetch count for consumers. 1
noAck boolean Default auto-acknowledge setting for consumers. If true, messages will be auto-acked. false
logger object Custom logger with methods: info(), warn(), error(), debug(). If omitted, console is used. null
retryPolicy object Reconnection policy with properties:
retries (number)
initialDelayMs (ms)
maxDelayMs (ms)
factor (multiplier). Not enforced in v1.0.0.
{ retries: 5, initialDelayMs: 1000, maxDelayMs: 30000, factor: 2 }

🛠️ API Reference

For full class and method documentation, including parameter descriptions, return values, and error details, see docs/api.md.


✅ Testing

npm test                  # All tests
npm run test:unit         # Unit tests only
npm run test:component    # Component tests
npm run test:integration  # Integration tests

Test Coverage Status

  • Overall Coverage: 24.52% (improving after refactoring)
  • Passing Tests: 75/104 (72%)
  • Test Suites: 10/14 passing
  • Well Tested: Config, Transports, Error handling (100%)
  • Needs Testing: New layers (1-5%)

See Test Report for detailed coverage analysis.


🎨 Coding Standards

  • Linting: ESLint (eslint:recommended + Prettier).
  • Formatting: Prettier — check with npm run prettier:check, fix with npm run prettier:fix.
  • Testing: Jest, aiming for ≥90% coverage.

🎯 Refactoring Benefits

✅ What Was Achieved

  1. Clean Layered Architecture - Separated into specialized layers
  2. Removed Technical Debt - Replaced MQWrapper with cleaner design
  3. Improved Extensibility - Easy to add new patterns
  4. Better Developer Experience - Cleaner API and documentation

📊 Quality Improvements

  • Separation of Concerns - Each layer has single responsibility
  • Modular Design - Use individual layers independently
  • Testability - Each layer can be tested in isolation
  • Maintainability - Easier to understand and modify
  • Backwards Compatibility - MQWrapper alias maintained

🤝 Contributing

Contributions welcome! Please see CONTRIBUTING.md for guidelines:

  1. Fork the repo.
  2. Create a feature branch: git checkout -b feature/your-feature.
  3. Run tests locally and ensure linting passes.
  4. Commit your changes and push to your branch.
  5. Open a Pull Request against main.

📜 License

This project is licensed under the MIT License. See LICENSE for details.

📚 Documentation