Package Exports
- upring
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (upring) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
upring
UpRing provides application-level sharding, based on node.js streams. UpRing allocates some resources to a node, based on the hash of a key
, and allows you to query the node using a request response pattern (based on JS objects) which can embed streams.
UpRing simplifies the implementation and deployment of a cluster of nodes using a gossip membership protocol and a consistent hasrhing (see swim-hashring). It uses tentacoli as a transport layer.
Install
npm i upring
Example
Here is an example client:
'use strict'
const upring = require('upring')
const client = upring({
client: true, // this does not provides services to the ring
// fill in with your base node, it maches the one for local usage
base: [process.argv[2]]
})
client.on('up', () => {
client.request({
// the same key will always go to the same host
// if it is online or until new servers come online
key: 'a key',
cmd: 'read'
}, (err, response) => {
if (err) {
console.log(err.message)
return
}
response.streams.out.pipe(process.stdout)
response.streams.out.on('end', () => {
process.exit(0)
})
})
})
And here is an example server, acting as a base node:
'use strict'
const upring = require('upring')
const server = upring({
hashring: {
port: 7799
}
})
const fs = require('fs')
server.on('up', () => {
console.log('server up at', server.whoami())
})
server.add({ cmd: 'read' }, (req, reply) => {
reply(null, {
streams: {
out: fs.createReadStream(__filename)
}
})
})
//or using the sugar free syntax
server.add('write', (req, reply) => {
reply(null, {
answering: 'foo'
})
})
We recommend using baseswim to run a base node. It also available as a tiny docker image.
upring()
instance.request()
instance.peerConn()
instance.peers()
instance.add()
instance.whoami()
instance.allocatedToMe()
instance.track()
instance.logger
instance.close()
upring(opts)
Create a new upring.
Options:
hashring
: Options for swim-hashring.client
: if the current node can answer request from other peers or not. Defaults tofalse
. Alias forhashring.client
base
: alias forhashring.base
.name
: alias forhashring.name
.port
: the tcp port to listen to for the RPC communications, it is allocated dynamically and discovered via gossip by default.logLevel
: the level for the embedded logger; default'info'
.logger
: a pino instance to log stuff to.
Events:
up
: when this instance is up & running and properly configured.move
: see swim-hashring'move'
event.steal
: see swim-hashring'steal'
event.request
: when a request comes in to be handled by the current node, if the router is not configured. It has the request object as first argument, a function to call when finished as second argument:
instance.on('request', (req, reply) => {
reply(null, {
a: 'response',
streams: {
any: stream
}
})
})
See tentacoli for the full details on the request/response format.
instance.request(obj, cb)
Forward the given request to the ring. The node that will reply to the
current enquiry will be picked by the key
property in obj
.
Callback will be called when a response is received, or an error
occurred.
Example:
instance.request({
key: 'some data',
streams: {
in: fs.createWriteStream('out')
}
}, (err) => {
if (err) throw err
})
See tentacoli for the full details on the request/response format.
Retry logic
If the target instance fails while waiting for a response, the message will be sent to the next peer in the ring. This does not applies to streams, which will be closed or errored.
instance.peers()
All the other peers, as computed by swim-hashring.
Example:
console.log(instance.peers().map((peer) => peer.id))
instance.peerConn(peer)
Return the connection for the peer. See tentacoli for the full details on the API.
Example:
instance.peerConn(instance.peers()[0]).request({
hello: 'world'
}, console.log))
instance.add(pattern, func)
Execute the given function when the received received requests matches the given pattern. The request is matched using bloomrun, e.g. in insertion order.
After a call to add
, any non-matching messages will return an error to
the caller.
Setting up any pattern-matching routes disables the 'request'
event.
Example:
instance.add({ cmd: 'parse' }, (req, reply) => {
reply(null, {
a: 'response',
streams: {
any: stream
}
})
})
For convenience a command can also be defined by a string
.
Example:
instance.add('parse', (req, reply) => {
reply(null, {
a: 'response',
streams: {
any: stream
}
})
})
instance.whoami()
The id of the current peer. It will throw if the node has not emitted
'up'
yet.
instance.allocatedToMe(key)
Returns true
or false
depending if the given key has been allocated to this node or not.
instance.track(key, callback(err, newPeer))
Track the given key
: the given callback
will be fired when the
key
exits from this peer responsibility.
The callback will be called with a newPeer
if the peers knows the
target, with null
otherwise, e.g. when close
is called.
Returns a function
that can be used to remove the tracker.
instance.close(cb)
Close the current instance
instance.logger
A pino instance to log stuff to.
License
MIT