Package Exports
- @onlineapps/conn-infra-mq
- @onlineapps/conn-infra-mq/src/config/queueConfig
- @onlineapps/conn-infra-mq/src/config/queueConfig.js
- @onlineapps/conn-infra-mq/src/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@onlineapps/conn-infra-mq) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
@onlineapps/conn-infra-mq
Message queue connector with layered architecture for workflow orchestration, RPC, fork-join, and retry patterns. Built on top of RabbitMQ with clean separation of concerns.
🚀 Features
- Layered Architecture: Clean separation into specialized layers (WorkflowRouter, QueueManager, ForkJoinHandler, RPCHandler, RetryHandler)
- Workflow Orchestration: Decentralized workflow routing without central orchestrator
- Fork-Join Pattern: Parallel processing with result aggregation and built-in join strategies
- RPC Support: Request-response communication with correlation IDs and timeouts
- Automatic Retry: Exponential backoff, dead letter queue management, configurable retry policies
- Queue Management: TTL, DLQ, auto-delete, temporary queues, exchange bindings
- Promise-based API: All operations return promises for clean async/await usage
- Built-in Serialization: JSON with custom error handling
- Config Validation: Strict schema validation via Ajv
- Extensible Transport: Clear separation between core logic and transport layer
📦 Installation
npm install @onlineapps/conn-infra-mq
# or
yarn add @onlineapps/conn-infra-mqRequires Node.js ≥12. For RabbitMQ usage, ensure an accessible AMQP server.
🏗️ Architecture
ConnectorMQClient (main orchestrator)
├── BaseClient (core AMQP operations)
├── WorkflowRouter (workflow orchestration)
├── QueueManager (queue lifecycle management)
├── ForkJoinHandler (parallel processing)
├── RPCHandler (request-response patterns)
└── RetryHandler (error recovery & DLQ)🔧 Quick Start
'use strict';
const ConnectorMQClient = require('@onlineapps/conn-infra-mq');
(async () => {
// 1. Create client with configuration
const client = new ConnectorMQClient({
host: 'amqp://localhost:5672',
serviceName: 'my-service',
queue: 'default-queue',
durable: true,
prefetch: 5, // Default prefetch count for consumers
noAck: false, // Default auto-acknowledge = false
retryPolicy: { // Optional reconnection policy (not enforced in v1.0.0)
retries: 5,
initialDelayMs: 1000,
maxDelayMs: 30000,
factor: 2
}
});
// 2. Register a global error handler
client.onError(err => {
console.error('[AgentMQClient] Error:', err);
});
// 3. Connect to RabbitMQ
try {
await client.connect();
console.log('Connected to broker');
} catch (err) {
console.error('Connection failed:', err);
process.exit(1);
}
// 4. Publish a sample message
const samplePayload = { taskId: 'abc123', action: 'processData', timestamp: Date.now() };
try {
await client.publish('job_queue', samplePayload, {
persistent: true,
headers: { origin: 'quickStart' }
});
console.log('Message published:', samplePayload);
} catch (err) {
console.error('Publish error:', err);
}
// 5. Consume messages
try {
await client.consume(
'job_queue',
async (msg) => {
const data = JSON.parse(msg.content.toString('utf8'));
console.log('Received:', data);
// Process message...
await client.ack(msg);
},
{ prefetch: 5, noAck: false }
);
console.log('Consuming from "job_queue"...');
} catch (err) {
console.error('Consume error:', err);
}
// 6. Graceful shutdown on SIGINT
process.on('SIGINT', async () => {
console.log('Shutting down...');
try {
await client.disconnect();
console.log('Disconnected, exiting.');
process.exit(0);
} catch (discErr) {
console.error('Error during disconnect:', discErr);
process.exit(1);
}
});
})();📄 Configuration
Configuration can be provided to the AgentMQClient constructor or as overrides to connect(). Below is a summary of supported fields (see docs/api.md for full details):
| Field | Type | Description | Default |
|---|---|---|---|
type |
string |
Transport type: 'rabbitmq' |
'rabbitmq' |
host |
string |
Connection URI or hostname. For RabbitMQ: e.g. 'amqp://user:pass@localhost:5672'. |
Required |
queue |
string |
Default queue name for publish/consume if not overridden per call. | '' |
exchange |
string |
Default exchange name. Empty string uses the default direct exchange. | '' |
durable |
boolean |
Declare queues/exchanges as durable. | true |
prefetch |
integer |
Default prefetch count for consumers. | 1 |
noAck |
boolean |
Default auto-acknowledge setting for consumers. If true, messages will be auto-acked. |
false |
logger |
object |
Custom logger with methods: info(), warn(), error(), debug(). If omitted, console is used. |
null |
retryPolicy |
object |
Reconnection policy with properties: ‒ retries (number)‒ initialDelayMs (ms)‒ maxDelayMs (ms)‒ factor (multiplier). Not enforced in v1.0.0. |
{ retries: 5, initialDelayMs: 1000, maxDelayMs: 30000, factor: 2 } |
🛠️ API Reference
For full class and method documentation, including parameter descriptions, return values, and error details, see docs/api.md.
✅ Testing
npm test # All tests
npm run test:unit # Unit tests only
npm run test:component # Component tests
npm run test:integration # Integration testsTest Coverage Status
- Overall Coverage: 24.52% (improving after refactoring)
- Passing Tests: 75/104 (72%)
- Test Suites: 10/14 passing
- Well Tested: Config, Transports, Error handling (100%)
- Needs Testing: New layers (1-5%)
See Test Report for detailed coverage analysis.
🎨 Coding Standards
- Linting: ESLint (
eslint:recommended+ Prettier). - Formatting: Prettier — check with
npm run prettier:check, fix withnpm run prettier:fix. - Testing: Jest, aiming for ≥90% coverage.
🎯 Refactoring Benefits
✅ What Was Achieved
- Clean Layered Architecture - Separated into specialized layers
- Removed Technical Debt - Replaced MQWrapper with cleaner design
- Improved Extensibility - Easy to add new patterns
- Better Developer Experience - Cleaner API and documentation
📊 Quality Improvements
- Separation of Concerns - Each layer has single responsibility
- Modular Design - Use individual layers independently
- Testability - Each layer can be tested in isolation
- Maintainability - Easier to understand and modify
- Backwards Compatibility - MQWrapper alias maintained
🤝 Contributing
Contributions welcome! Please see CONTRIBUTING.md for guidelines:
- Fork the repo.
- Create a feature branch:
git checkout -b feature/your-feature. - Run tests locally and ensure linting passes.
- Commit your changes and push to your branch.
- Open a Pull Request against
main.
📜 License
This project is licensed under the MIT License. See LICENSE for details.