Package Exports
- @onlineapps/conn-infra-mq
- @onlineapps/conn-infra-mq/src/config/queueConfig
- @onlineapps/conn-infra-mq/src/config/queueConfig.js
- @onlineapps/conn-infra-mq/src/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@onlineapps/conn-infra-mq) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
@onlineapps/conn-infra-mq
Message queue connector with layered architecture for workflow orchestration, fork-join, and retry patterns. Built on top of RabbitMQ with clean separation of concerns. Asynchronous workflow pattern only - synchronous RPC patterns are not supported and not aligned with our architecture philosophy.
🚀 Features
- Layered Architecture: Clean separation into specialized layers (WorkflowRouter, QueueManager, ForkJoinHandler, RetryHandler)
- Workflow Orchestration: Decentralized workflow routing without central orchestrator
- Fork-Join Pattern: Parallel processing with result aggregation and built-in join strategies
- Asynchronous First: All communication is asynchronous (fire-and-forget), no synchronous blocking patterns
- Automatic Retry: Exponential backoff, dead letter queue management, configurable retry policies
- Queue Management: TTL, DLQ, auto-delete, temporary queues, exchange bindings
- Promise-based API: All operations return promises for clean async/await usage
- Built-in Serialization: JSON with custom error handling
- Config Validation: Strict schema validation via Ajv
- Extensible Transport: Clear separation between core logic and transport layer
📦 Installation
npm install @onlineapps/conn-infra-mq
# or
yarn add @onlineapps/conn-infra-mqRequires Node.js ≥12. For RabbitMQ usage, ensure an accessible AMQP server.
🏗️ Architecture
ConnectorMQClient (main orchestrator - for business services only)
├── BaseClient (core AMQP operations)
├── WorkflowRouter (workflow orchestration)
├── QueueManager (queue lifecycle management)
├── ForkJoinHandler (parallel processing)
└── RetryHandler (error recovery & DLQ)WorkflowRouter - How It Works
Purpose: Handles workflow routing between services in a decentralized architecture.
Key Methods:
publishWorkflowInit(workflow, options)- Publishes workflow toworkflow.initqueue (entry point)publishToServiceWorkflow(serviceName, message, options)- Routes to specific service's workflow queuepublishWorkflowCompleted(result, options)- Publishes completed workflow toworkflow.completedqueueconsumeWorkflowInit(handler, options)- Consumes fromworkflow.init(competing consumers pattern)consumeServiceWorkflow(serviceName, handler, options)- Consumes from service-specific workflow queue
How It Works:
- Gateway publishes to
workflow.initviapublishWorkflowInit() - Business services (competing consumers) consume from
workflow.initviaconsumeWorkflowInit() - Services route to next service via
publishToServiceWorkflow() - Final service publishes completion via
publishWorkflowCompleted()
Note: WorkflowRouter is part of conn-infra-mq connector (for business services). Infrastructure services (gateway) should use the underlying MQ client library directly, not the connector.
🔧 Quick Start
'use strict';
const ConnectorMQClient = require('@onlineapps/conn-infra-mq');
(async () => {
// 1. Create client with configuration
const client = new ConnectorMQClient({
host: 'amqp://localhost:5672',
serviceName: 'my-service',
queue: 'default-queue',
durable: true,
prefetch: 5, // Default prefetch count for consumers
noAck: false, // Default auto-acknowledge = false
retryPolicy: { // Optional reconnection policy (not enforced in v1.0.0)
retries: 5,
initialDelayMs: 1000,
maxDelayMs: 30000,
factor: 2
}
});
// 2. Register a global error handler
client.onError(err => {
console.error('[AgentMQClient] Error:', err);
});
// 3. Connect to RabbitMQ
try {
await client.connect();
console.log('Connected to broker');
} catch (err) {
console.error('Connection failed:', err);
process.exit(1);
}
// 4. Publish a sample message
const samplePayload = { taskId: 'abc123', action: 'processData', timestamp: Date.now() };
try {
await client.publish('job_queue', samplePayload, {
persistent: true,
headers: { origin: 'quickStart' }
});
console.log('Message published:', samplePayload);
} catch (err) {
console.error('Publish error:', err);
}
// 5. Consume messages
try {
await client.consume(
'job_queue',
async (msg) => {
const data = JSON.parse(msg.content.toString('utf8'));
console.log('Received:', data);
// Process message...
await client.ack(msg);
},
{ prefetch: 5, noAck: false }
);
console.log('Consuming from "job_queue"...');
} catch (err) {
console.error('Consume error:', err);
}
// 6. Graceful shutdown on SIGINT
process.on('SIGINT', async () => {
console.log('Shutting down...');
try {
await client.disconnect();
console.log('Disconnected, exiting.');
process.exit(0);
} catch (discErr) {
console.error('Error during disconnect:', discErr);
process.exit(1);
}
});
})();📄 Configuration
Configuration can be provided to the AgentMQClient constructor or as overrides to connect(). Below is a summary of supported fields (see docs/api.md for full details):
| Field | Type | Description | Default |
|---|---|---|---|
type |
string |
Transport type: 'rabbitmq' |
'rabbitmq' |
host |
string |
Connection URI or hostname. For RabbitMQ: e.g. 'amqp://user:pass@localhost:5672'. |
Required |
queue |
string |
Default queue name for publish/consume if not overridden per call. | '' |
exchange |
string |
Default exchange name. Empty string uses the default direct exchange. | '' |
durable |
boolean |
Declare queues/exchanges as durable. | true |
prefetch |
integer |
Default prefetch count for consumers. | 1 |
noAck |
boolean |
Default auto-acknowledge setting for consumers. If true, messages will be auto-acked. |
false |
logger |
object |
Custom logger with methods: info(), warn(), error(), debug(). If omitted, console is used. |
null |
retryPolicy |
object |
Reconnection policy with properties: ‒ retries (number)‒ initialDelayMs (ms)‒ maxDelayMs (ms)‒ factor (multiplier). Not enforced in v1.0.0. |
{ retries: 5, initialDelayMs: 1000, maxDelayMs: 30000, factor: 2 } |
🛠️ API Reference
For full class and method documentation, including parameter descriptions, return values, and error details, see docs/api.md.
✅ Testing
npm test # All tests
npm run test:unit # Unit tests only
npm run test:component # Component tests
npm run test:integration # Integration testsTest Coverage Status
- Overall Coverage: 24.52% (improving after refactoring)
- Passing Tests: 75/104 (72%)
- Test Suites: 10/14 passing
- Well Tested: Config, Transports, Error handling (100%)
- Needs Testing: New layers (1-5%)
See Test Report for detailed coverage analysis.
🎨 Coding Standards
- Linting: ESLint (
eslint:recommended+ Prettier). - Formatting: Prettier — check with
npm run prettier:check, fix withnpm run prettier:fix. - Testing: Jest, aiming for ≥90% coverage.
🎯 Refactoring Benefits
✅ What Was Achieved
- Clean Layered Architecture - Separated into specialized layers
- Removed Technical Debt - Replaced MQWrapper with cleaner design
- Improved Extensibility - Easy to add new patterns
- Better Developer Experience - Cleaner API and documentation
📊 Quality Improvements
- Separation of Concerns - Each layer has single responsibility
- Modular Design - Use individual layers independently
- Testability - Each layer can be tested in isolation
- Maintainability - Easier to understand and modify
- Backwards Compatibility - MQWrapper alias maintained
🤝 Contributing
Contributions welcome! Please see CONTRIBUTING.md for guidelines:
- Fork the repo.
- Create a feature branch:
git checkout -b feature/your-feature. - Run tests locally and ensure linting passes.
- Commit your changes and push to your branch.
- Open a Pull Request against
main.
📜 License
This project is licensed under the MIT License. See LICENSE for details.