Package Exports
- busmq
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (busmq) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
node-busmq
A high performance, highly available and scalable, message bus and queueing system for node.js. Message queues are backed by Redis, a high performance, in-memory key/value store.
The Basics
- Event based message queues
- Event based bi-directional channels for peer-to-peer communication (backed by message queues)
- Reliable delivery of messages (AKA Guarantee Delivery)
- Persistent Publish/Subscribe
- Federation over distributed data centers
- Auto expiration of queues after a pre-defined idle time
- Scalability through the use of multiple redis instances and node processes
- High availability through redis master-slave setup and stateless node processes
- Tolerance to dynamic addition of redis instances during scale out
- Fast
Why Yet Another "Queue-Backed-by-Redis" Module?
There are several exiting node modules that provide great queues-backed-by-redis functionality, such as Kue, Bull and Convoy, so what's so special about busmq?
Although seemingly the other modules provide similar features, they lack a very specific feature that's required for a reliable message queue: guaranteed order. Jobs is the main focus for these modules, whereas busmq focuses on messages.
Inherently, job processing does not require a certain order - if a job fails it can simply be retried at a later time with (usually) no ill-effects. However, message processing is very order dependant - if you receive a message out of order then there's no telling what the consequences may be. A good example is TCP message order importance - clients are guaranteed that TCP packets are always received in the correct order. That's what busmq focuses on, which makes busmq much more like RabbitMQ rather than a generic queueing system.
Of course, the other modules may double as messages queues, but that's just not their main focus. In addition, busmq provides built-in features for peer-to-peer communication, scaling, high-availability and federation which are extremely important for a reliable messaging system.
High Availability and Scaling
Scaling is achieved by spreading queues and channels between multiple redis instances. The redis instance is selected by performing a calculation on the queue/channel name. If the redis instances are added and the designated redis instance of a queue changes because of it then the bus will still find the correct redis instance. There will be some time penalty until the system stabilizes after the addition.
High availability for redis is achieved by using standard redis high availability setups, such as Redis Sentinal or AWS ElasticCache
Bus
The bus holds connections to one or more redis instances and is used
to create queues, channels and persistent objects.
Node processes connecting to the same bus have access to and can use all queues, channels and persistent objects.
node-busmq uses the great node_redis module to communicate with the redis instances, so it is highly recommended to also install hiredis to achieve the best performance.
Connecting to a bus
var Bus = require('busmq');
var bus = Bus.create({redis: ['redis://192.168.0.1:6359', 'redis://192.168.0.2:6359']);
bus.on('error', function(err) {
// an error has occurred
});
bus.on('online', function() {
// the bus is online - we can use queues, channels ans persistent objects
});
bus.on('offline', function() {
// the bus is offline - redis is down...
});
// connect the redis instances
bus.connect();Queue
A queue of messages.
Messages are pushed to the queue and consumed from it in they order that they were pushed.
Any number of clients can produce messages to a queue, and any number of consumers can consume messages from a queue.
Attach and Detach
Pushing messages and consuming them requires attaching to the queue. The queue will remain in existence for as long as it has at least one client attached to it.
To stop using a queue, detach from it. Once a queue has no more clients attached, it will automatically expire after a predefined ttl (also losing any messages in it).
Using a queue
Producer:
bus.on('online', function() {
var q = bus.queue('foo');
q.on('attached', function() {
console.log('attached to queue');
});
q.attach();
q.push({hello: 'world'});
q.push('my name if foo');
});Consumer:
bus.on('online', function() {
var q = bus.queue('foo');
q.on('attached', function() {
console.log('attached to queue. messages will soon start flowing in...');
});
q.on('message', function(message, id) {
if (message === 'my name if foo') {
q.detach();
}
});
q.attach();
q.consume(); // the 'message' event will be fired when a message is retrieved
});Consumption Modes
There are three modes that messages can be consumed from a queue, with various degrees of flexibility for each mode.
Unreliable Delivery
This is a Zero-or-Once message delivery mode, which is also the default mode. Messages are consumed from the queue by one consumer only and will not be consumed again by that consumer or any other consumer. This method of consumption is unreliable in a sense that if the consumer crashes before being able to handle the message, it is lost forever.
// consume with default settings
q.consume();
// this is the same as the default settings
q.consume({reliable: false, remove: true});Reliable Delivery (Guarantee Delivery)
This is a Once-or-More message delivery mode, where it is guaranteed that messages will be delivered at least once. Every consumed message enters a 'waiting for ack' state. The consumer should call 'ack' on a message in order to mark it as handled. When the client issues an 'ack' on the message, the message is permanently discarded from the queue and will not be consumed again.
If a client crashes when consuming in this mode, any messages that have not been ACKed will be delivered once more when a client starts to consume again.
Note: This mode does not work well with multiple consumers. The behavior of multiple clients consuming in reliable mode from the same queue is undefined.
// consume message reliably. message with id 3 is the last acked message
q.consume({reliable: true, last: 3});Persistent Publish/Subscribe
This is a form of publish/subscribe, where all consumers receive all messages, even if they were not consuming at the time messages were being pushed to the queue. A consumer can also specify the index of the message to start consuming from.
// consume message without removing them from the queue. start consuming from message at index 0.
q.consume({remove: false, index: 0});Channel
A bi-directional channel for peer-to-peer communication. Under the hood, a channel uses two message queues, where each peer pushes messages to one queue and consumes messages from the other queue. It does not matter which peer connects to the channel first.
Each peer in the channel has a role. For all purposes roles are the same, except that the roles determine to which queue messages will be pushed and from which queue they will be consumed. To peers to communicate over the channel, they must have opposite roles.
By default, a channel uses role local to consume messages and remote to push messages.
Since peers must have opposite roles, if using the default roles, one peer must call channel#listen and the other peer must call channel#connect.
It is also possible to specify other roles explicitly, such as client and server.
This enables specifying the local role and the remote role, and just connecting the channel without calling listen.
Specifying roles explicitly may add to readability, but not much more than that.
A channel supports the same consumption modes as a queue does. See Consumption Modes for details.
Using a channel (default roles)
Server endpoint:
bus.on('online', function() {
var c = bus.channel('bar'); // use default names for the endpoints
c.on('connected', function() {
// connected to the channel
});
c.on('remote:connected', function() {
// the client is connected to the channel
c.send('hello client!');
});
c.on('message', function(message) {
// received a message from the client
});
c.listen(); // reverse the endpoint roles and connect to the channel
});Client endpoint:
bus.on('online', function() {
var c = bus.channel('bar'); // use default names for the endpoints
c.on('connected', function() {
// connected to the channel
});
c.on('remote:connected', function() {
// the server is connected to the channel
c.send('hello server!');
});
c.on('message', function(message) {
// received a message from the server
});
c.connect(); // connect to the channel
});Using a channel (explicit roles)
Server endpoint:
bus.on('online', function() {
// local role is server, remote role is client
var c = bus.channel('zoo', 'server', 'client');
c.on('connected', function() {
// connected to the channel
});
c.on('remote:connected', function() {
// the client is connected to the channel
c.send('hello client!');
});
c.on('message', function(message) {
// received a message from the client
});
c.connect(); // connect to the channel
});Client endpoint:
bus.on('online', function() {
// notice the reverse order of roles
// local role is client, remote role is server
var c = bus.channel('zoo', 'client', 'server');
c.on('connected', function() {
// connected to the channel
});
c.on('remote:connected', function() {
// the server is connected to the channel
c.send('hello server!');
});
c.on('message', function(message) {
// received a message from the server
});
c.connect(); // connect to the channel
});Persistable
It is possible to persist arbitrary objects to the bus. A persistable object defines a set of properties on the object that are tracked for modification. When saving a dirty object (where dirty means that some tracked properties have changed) only those dirty properties are persisted to the bus. Loading a persistable object reads all of the persisted properties.
bus.on('online', function() {
var object = {field: 'this field is not persisted'};
var p = bus.persistify('obj', object, ['foo', 'bar', 'zoo']);
p.foo = 'hello';
p.bar = 1;
p.zoo = true;
p.save(function(err) {
// foo, bar and zoo fields have been saved
});
p.foo = 'world';
p.save(function(err) {
// only foo has been saved
});
// load the persistified properties
var p2 = bus.persistify('obj', {}, ['foo', 'bar', 'zoo']);
p2.load(function(err, exists) {
// exists == true
// p2.foo == 'world'
// p2.bar == 2
// p2.zpp == true'
});
});Federation
It is sometimes desirable to setup bus instances in different locations, where redis servers of one location are not directly accessible to other locations. This setup is very common when building a bus that spans several data centers, where each data center is isolated behind a firewall.
Federation enables using queues, channels and persisted objects of a bus without access to the redis servers themselves. When federating an object, the federating bus uses web sockets to the target bus as the federation channel, and the target bus manages the object on its redis servers on behalf of the federating bus. The federating bus does not host the federated objects on the local redis servers.
Federation is done over web sockets since they are firewall and proxy friendly.
The federating bus utilizes a simple pool of hot-connected web sockets. When a bus is initialized, it immediately spins up an initial number of web sockets that connect to other bus instances. When federating an object, the bus selects a web socket from the pool and starts federating the object over it. Once a web socket is given for federation from the pool, the pool immediately opens a new web socket to replace the one that was just given. This way, the pool always contains a minimum number of available web sockets for immediate use. This behavior provides the best performance by eliminating the need to wait for the web socket to open when starting to federate.
The API and events of a federated objects are exactly the same as a non-federated objects. This is achieved using the awesome dnode module for RPCing the object API.
Opening a bus with a federation server
var http = require('http');
var httpServer = http.createServer(); // create the http server to serve as the federation server. you can also use express if you like...
httpServer.listen(8881);
var Bus = require('busmq');
var options = {
redis: 'redis://127.0.0.1', // connect this bus to a local running redis
federate: { // also open a federation server
server: httpServer, // use the provided http server as the federation server
secret: 'mysecret', // a secret key for authorizing clients
path: '/my/fed/path'
}
};
var bus = Bus.create(options);
bus.on('online', function() {
// the bus is now ready to receive federation requests
});
bus.connect();Federating a queue
var Bus = require('busmq');
var options = {
redis: 'http://127.0.0.1', // connect this bus to a local running redis
federate: { // also connect to a federate bus
poolSize: 5, // keep the pool size with 5 web sockets
urls: ['http://my.other.bus:8881'], // pre-connect to these urls, 5 web sockets to each url
secret: 'mysecret', // the secret ket to authorize with the federation server
path: '/my/fed/path'
}
};
var bus = Bus.create(options);
bus.on('online', function() {
// federate the queue to a bus located at a different data center
var fed = bus.federate(bus.queue('foo'), 'http://my.other.bus');
fed.on('ready', function(q) {
// federation is ready - we can start using the queue
q.on('attached', function() {
// do whatever
});
q.attach();
});
});Federating a channel
var Bus = require('busmq');
var options = {
redis: 'redis://127.0.0.1', // connect this bus to a local running redis
federate: { // also connect to a federate bus
poolSize: 5, // keep the pool size with 5 web sockets
urls: ['http://my.other.bus:8881'], // pre-connect to these urls, 5 web sockets to each url
secret: 'mysecret', // the secret ket to authorize with the federation server
path: '/my/fed/path'
}
};
var bus = Bus.create(options);
bus.on('online', function() {
// federate the channel to a bus located at a different data center
var fed = bus.federate(bus.channel('bar'), 'http://my.other.bus');
fed.on('ready', function(c) {
// federation is ready - we can start using the channel
c.on('message', function(message) {
// do whatever
});
c.attach();
});
});Federating a persistable object
var Bus = require('busmq');
var options = {
redis: 'http://127.0.0.1', // connect this bus to a local running redis
federate: { // also connect to a federate bus
poolSize: 5, // keep the pool size with 5 web sockets
urls: ['http://my.other.bus:8881'], // pre-connect to these urls, 5 web sockets to each url
secret: 'mysecret', // the secret ket to authorize with the federation server
path: '/my/fed/path'
}
};
var bus = Bus.create(options);
bus.on('online', function() {
// federate the persistent object to a bus located at a different data center
var fed = bus.federate(bus.persistify('bar', object, ['field1', 'field2']), 'http://my.other.bus');
fed.on('ready', function(p) {
// federation is ready - we can start using the persisted object
p.load(function(err, exists) {
// do whatever
});
});
});Performance
Performance was measured with two key indicators in mind:
- Message Throughput - the number of messages per second that can be pushed and consumed from a queue
- Message Throughout Consistency - how consistent the throughput is over time
There is also a third indicator that might be interesting to examine and that is "Queue Open/Close Throughout". I'm pretty sure there's place for improvement there, so no benchmarking was performed in that area.
Environment
The benchmark was performed on two c3.xlarge AWS machines running Debian 7. Each machine has 4 Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz and 7.5GB of RAM.
One machine was setup to run 4 instances of redis 2.8.19. Redis is single threaded so it can only utilize one CPU.
A second machine was setup to run 4 node busmq processes executing the benchmarking code. Each one of the 4 node processes connected to all of the 4 redis instances running on the first machine.
Benchmark
The benchmark flow is as follows:
- start up 4 node processes (one per cpu)
- on startup, the node process creates 100 queues
- once all the nodes of all the processes have been created, every process performs:
- initiate a report cycle of 2 seconds
- push/consume 32 byte messages as fast as possible to/from all queues
- report the number of pushed and consumed messages per cycle
- reset the pushed and consumed message counters at the end of every cycle
- run a total of 100 cycles
Results
Benchmarks are only good for what they actually measure. There are always use cases that do no align with the results so be careful with any conclusions. It's advised to perform your own performance tests with your use cases and setups in mind.
On average, the benchmark shows every second about 10400 messages were pushed and 9973 messages were consumed. It is also apparent that the push/consume throughput is quite consistent over time. (The X-axis shows the cycle number, the Y-axis shows the number of messages)

Additional testing indicates that the size of the messages has little to no impact on the throughput. However, increasing the number of queues by an order of magnitude does effect the performance.
API
Enough with examples. Let's see the API.
Bus API
bus#create([options])
Create a new bus instance. Options:
redis- specified the redis servers to connect to. Can be a string or an array of string urls. A valid url has the formredis://<host_or_ip>[:port].federate- an object defining federation options:server- an http/https server object to listen for incoming federation connections. if undefined then federation server will not be openpath- the path within the server to accept federation requests onurls- an array of urls of the formhttp[s]://<ip-or-host>[:port]of other bus instances that this bus can federate to. default is an empty array.poolSize- the number of web sockets to keep open and idle at all times to federated bus instances. default is 10.secret- a secret key to be shared among all bus instances that can federate to each other. default isnotsosecret.
logger- the logger that the bus should use
Call bus#connect to connect to the redis instances and to open the federation server.
bus#withLog(log)
Attach a logger to the bus instance. Returns the bus instance.
bus#withRedis(redis)
Use the provided node_redis client to create connections. Returns the bus instance.
bus#connect()
Connect to the redis servers and start the federation server (if one was specified). Once connected to all redis instances, the online will be emitted.
If the bus gets disconnected from the the redis instances, the offline event will be emitted.
bus#disconnect()
Disconnect from the redis instances and stop the federation server. Once disconnected, the offline event will be emitted.
bus#isOnline()
Return true if the bus is online, false if the bus offline.
bus#queue(name)
Create a new Queue instance.
name- the name of the queue.
Returns a new Queue instance. Call queue#attach before using the queue.
bus#channel(name [, local, remote])
Create a new Channel instance.
name- the name of the channel.local- [optional] specifies the local role. default islocal.remote- [optional] specifies the remote role. default isremote.
bus#persistify(name, object, properties)
Create a new Persistable object. Persistifying an object adds additional methods to the persistified object. See the API for more details.
name- the name of the persisted object.object- the object to persistify.properties- an array of property names to persist.
bus#federate(object, target)
Federate object to the specified target instead of hosting the object on the local redis servers.
Do not use any of the object API's before federation setup is complete.
object-queue,channelorpersistedobjects to federate. These are created normally throughbus#queue,bus#channelandbus#persistify.target- the target bus url or an already open websocket to the target bus. The url has the formhttp[s]://<location>[:<port>]
Bus Events
online- emitted when the bus has successfully connected to all of the specified redis instancesoffline- emitted when the bus loses connections to the redis instanceserror- an error occurs
Queue API
queue#attach([options])
Attach to the queue. If the queue does not already exist it is created.
Once attached, the attached event is emitted.
Options:
ttl- duration in seconds for the queue to live without any attachments. default is 30 seconds.
queue#detach()
Detach from the queue. The queue will continue to live for as long as it has at least one attachment.
Once a queue has no more attachments, it will continue to exist for the predefined ttl, or until it
is attached to again.
queue#push(message)
Push a message to the queue. The message can be a JSON object or a string. The message will remain in the queue until it is consumed by a consumer.
queue#consume([options])
Start consuming messages from the queue.
The message event is emitted whenever a message is consumed from the queue.
Options:
maxif specified, onlymaxmessages will be consumed from the queue. If not specified, messages will be continuously consumed as they are pushed into the queue.remove-trueindicates to remove a read message from the queue, andfalseleaves it in the queue so that it may be read once more. default istrue. Note: The behavior of mixing consumers that remove messages with consumers that do not remove messages from the same queue is undefined.reliable- applicable only ifremoveistrue. indicates that every consumed message needs to be ACKed in order not to receive it again in case of callingconsumeagain. seequeue#ackfor ack details. default isfalse.last- applicable only ifreliableistrue. indicates the last message id that was ACKed so that only messages with higher id's should be received. if any messages still exist in the queue with id's lower thanlastthey will be discarded. this behaves exactly like callingqueue#ackwith the last id before starting to consume. default is 0.
queue#ack(id)
Specifies that the message with the specified id, and all messages with lower id's, can safely be discarded so that they should never be consumed again. Ignored if not consuming in reliable mode.
id- the message id to ack
queue#isConsuming([callback])
Returns true if this client is consuming messages, false otherwise.
queue#stop()
Stop consuming messages from the queue.
queue#close()
Closes the queue and destroys all messages. Emits the closed event once it is closed.
queue#flush()
Empty the queue, removing all messages.
queue#exists([callback])
Checks if the queue already exists or not.
callback- receivestrueif the queue exists,falseotherwise
queue#count([callback])
Returns the number if messages in the queue.
callback- receives the number of messages in the queue
queue#ttl([callback])
Returns the time in seconds for the queue to live without any attachments.
callback- receives the ttl in seconds
queue#metadata(key [, value][, callback])
Get or set arbitrary metadata on the queue.
Will set the metadata key to the provided value, or get the current value of the key if the value parameter is not provided.
key- the metadata key to set or getvalue- [optional] the value to set on the key.callback- if setting a metadata value, it is called with no arguments upon success. if retrieving the value, it be called with the retrieved value.
queue#pushed([callback])
Returns the number of messages pushed by this client to the queue
queue#consumed([callback])
Returns the number of messages consumed by this client from the queue
Queue Events
attaching- emitted when starting to attachattached- emitted when attached to the queue. The listener callback receivestrueif the queue already exists andfalseif it was just created.detaching- emitted when starting to detachdetached- emitted when detached from the queue. If no other clients are attached to the queue, the queue will remain alive for thettldurationconsuming- emitted when starting or stopping to consume messages from the queue. The listener callback will receivetrueif starting to consume andfalseif stopping to consume.message- emitted when a message is consumed from the queue. The listener callback receives the message as a string and the id of the message as an integer.error- emitted when some error occurs. The listener callback receives the error.
Channel API
channel#connect()
Connects to the channel. The connect event is emitted once connected to the channel.
channel#attach()
Alias to channel#connect()
channel#listen()
Connects to the channel with reverse semantics of the roles.
The connect event is emitted once connected to the channel.
channel#send(message)
Send a message to the peer. The peer does need to be connected for a message to be sent.
channel#sendTo(endpoint, message)
Send a message to the the specified endpoint. There is no need to connect to the channel with channel#connect or channel#listen.
channel#disconnect()
Disconnect from the channel. The channel remains open and a different peer can connect to it.
channel#detach()
Alias to channel#disconnect()
channel#end()
End the channel. No more messages can be pushed or consumed. This also caused the peer to disconnect from the channel and close the message queues.
channel#ack(id)
See queue#ack for details
channel#isAttached([callback])
Returns true if connected to the channel, false if not connected.
Channel Events
connect- emitted when connected to the channelremote:connect- emitted when a remote peer connects to the channeldisconnect- emitted when disconnected from the channelremote:disconnect- emitted when the remote peer disconnects from the channelmessage- emitted when a message is received from the channel. The listener callback receives the message as a string.end- emitted when the remote peer ends the channelerror- emitted when an error occurs. The listener callback receives the error.
Persistable API
persistable#save(callback)
Save all the dirty properties. The dirty properties are marked as not dirty after the save completes.
callback- called when the save has finished. has the formfunction(err).
persistable#load(callback)
Load all the tracked properties. All properties are marked as not dirty after the load completes.
callback- called when the load has finished. has the formfunction(err, exists)whereexistsis true if the persisted object was found in the bus.
persistable#persist(ttl)
Start a periodic timer to continuously mark the persisted object as being used.
ttlspecifies the number of seconds to keep the object alive in the bus.
persistable#unpersist()
Stop the periodic timer. This will cause object to expire after the defined ttl provided in the persist method.
Federate API
federate#close(disconnect)
Close the federation object.
disconnect- true to disconnect the underlying websocket
Federate Events
ready- emitted when the federation setup is ready. The callback receives the bus object to use.unauthorized- incorrect secret key was used to authenticate with the federation serverclose- the federation connection closederror- some error occurred. the callback receives theerrormessage
Tests
Redis server must be installed to run the tests, but does not need to be running. Download redis from http://redis.io.
To run the tests: ./node_modules/mocha/bin/mocha test
License
The MIT License (MIT)
Copyright (c) 2014 Capriza Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.