upring

by upringjs

upringjs / upring

application-level sharding on node.js streams

210 Stars 21 Forks Last release: almost 3 years ago (v0.22.0) MIT License 176 Commits 25 Releases

Available items

No Items, yet!

The developer of this repository has not created any items for sale yet. Need a bug fixed? Help with integration? A different license? Create a request here:

logo

upring

npm version Build Status Coverage Status

UpRing provides application-level sharding, based on node.js streams. UpRing allocates some resources to a node, based on the hash of a

key
, and allows you to query the node using a request response pattern (based on JS objects) which can embed streams.

UpRing simplifies the implementation and deployment of a cluster of nodes using a gossip membership protocol and a consistent hashing scheme (see swim-hashring). It uses tentacoli as a transport layer.

Install

npm i upring

Plugins

Upring is extensible via thanks to a powerful plugins system.

  • upring-kv
    - a scalable key/value store on top of Upring.
  • upring-pubsub
    - a scalable publish subscribe system without a central broker.
  • upring-set
    - Redis set api on top of Upring.
  • upring-control
    - a monitoring dashboard for your upring cluster.
    See the demo at https://youtu.be/fLDOCwiKbbo.

API

upring(opts)

Create a new upring.

Options:

  • hashring
    : Options for swim-hashring.
  • client
    : if the current node can answer request from other peers or not. Defaults to
    false
    . Alias for
    hashring.client
  • base
    : alias for
    hashring.base
    .
  • name
    : alias for
    hashring.name
    .
  • port
    : the tcp port to listen to for the RPC communications, it is allocated dynamically and discovered via gossip by default.
  • logLevel
    : the level for the embedded logger; default
    'info'
    .
  • logger
    : a pino instance to log stuff to.

Events:

  • up
    : when this instance is up & running and properly configured.
  • move
    : see swim-hashring
    'move'
    event.
  • steal
    : see swim-hashring
    'steal'
    event.
  • request
    : when a request comes in to be handled by the current node, if the router is not configured. It has the request object as first argument, a function to call when finished as second argument:
  • 'peerUp'
    : when a peer that is part of the hashring gets online
  • 'peerDown'
    : when a peer that is part of the hashring gets offline
instance.on('request', (req, reply) => {
  reply(null, {
    a: 'response',
    streams: {
      any: stream
    }
  })
})

See tentacoli for the full details on the request/response format.

instance.request(obj, cb)

Forward the given request to the ring. The node that will reply to the current enquiry will be picked by the

key
property in
obj
. Callback will be called when a response is received, or an error occurred.

Example:

instance.request({
  key: 'some data',
  streams: {
    in: fs.createWriteStream('out')
  }
}, (err) => {
  if (err) throw err
})

See tentacoli for the full details on the request/response format.

Retry logic

If the target instance fails while waiting for a response, the message will be sent to the next peer in the ring. This does not applies to streams, which will be closed or errored.

instance.requestp(obj)

Same as instance.request(), but with promises. ```js instance .requestp({ key: 'some data', hello: 42 }) .then(response => { // handle response }) .catch(err => { // handle error })

// were your saying async await? try { const response = await instance.requestp({ key: 'some data', hello: 42 }) // handle response } catch (err) { // handle error } ```

instance.fire(obj, [callback])

Fire and forget the given request to the ring. The node that will reply to the current enquiry will be picked by the

key
property in
obj
.
You can pass an optional callback that will be called if there is an error while sending the message, or after the message has been sent successfully.
If the given key does not exist in the ring, a debug log will be emitted.

Example:

instance.fire({
  key: 'some data',
  hello: 42
})

instance.peers([myself])

All the other peers, as computed by swim-hashring. If

myself
is set to
true
, then we get data of the current peer as well.

Example:

console.log(instance.peers().map((peer) => peer.id))

instance.mymeta()

Returns the information regarding this peer.

instance.peerConn(peer)

Return the connection for the peer. See tentacoli for the full details on the API.

Example:

instance.peerConn(instance.peers()[0]).request({
  hello: 'world'
}, console.log))

instance.add(pattern, [schema,] func)

Execute the given function when the received received requests matches the given pattern. The request is matched using bloomrun, e.g. in insertion order.

After a call to

add
, any non-matching messages will return an error to the caller.

Setting up any pattern-matching routes disables the

'request'
event.

Example:

instance.add({ cmd: 'parse' }, (req, reply) => {
  reply(null, {
    a: 'response',
    streams: {
      any: stream
    }
  })
})

For convenience a command can also be defined by a

string
.

Example:

instance.add('parse', (req, reply) => {
  reply(null, {
    a: 'response',
    streams: {
      any: stream
    }
  })
})

// async await is supported as well instance.add('parse', async (req, reply) => { const data = await something() return { data } })

Validation

Upring offers you out of the box a nice and standard way to validate your requests, JSON schema!
Internally uses ajv to achieve the maximum speed and correctness.

js
instance.add({ cmd: 'parse' }, {
  type: 'object',
  properties: {
    cmd: { type: 'string' },
    key: { type: 'string' },
    value: { type: 'number' }
  },
  required: ['cmd']
}, (req, reply) => {
  reply(null, req)
})

instance.whoami()

The id of the current peer. It will throw if the node has not emitted

'up'
yet.

instance.join(peers, cb)

Make the instance join the set of peers id (the result of

whomai()
). The

cb
callback is called after join the join is completed.

instance.allocatedToMe(key)

Returns

true
or
false
depending if the given key has been allocated to this node or not.

instance.track(key[, opts])

Create a new tracker for the given

key
.

Options:

  • replica
    , turns on tracking of a replica of the given data. Default:
    false
    .

Events:

  • 'move'
    , when the
    key
    exits from this peer responsibility. The
    'move'
    event will be called with a
    newPeer
    if the peers knows the target, with
    null
    otherwise, e.g. when
    close
    is called.
  • 'replica'
    , adds or replace the replica of the given key. The first argument is the destination peer, while the second is the old replica peer (if any).

Methods:

  • end()
    , quit tracking.

instance.replica(key, cb)

Flag this upring instance as replicating the given key.

cb
is fired once, after the instance becames responsible for the key.

instance.close(cb)

Close the current instance

instance.log

A pino instance to log stuff to.

instance.info

An Object that can be used for publishing custom information through the stock monitoring commands.

Monitoring

If [

upring.add()
][#add] is used, some standard pattern are also added to UpRing to ease monitoring the instance.

Given an

upring
instance, those commands are easily accessible by sending a direct message through the [tentacoli][tentacoli] connection.
const conn = upring.peerConn({ id: '127.0.0.1:7979' })

conn.request({ ns: 'monitoring', cmd: 'memoryUsage' }, console.log)

ns:monitoring,cmd:memoryUsage

Returns the amount of memory currently used by the peer.

const conn = upring.peerConn({ id: '127.0.0.1:7979' })

conn.request({ ns: 'monitoring', cmd: 'memoryUsage' }, console.log)

// the response will be in the format // { rss: 42639360, heapTotal: 23105536, heapUsed: 16028496 }

ns:monitoring,cmd:info

Return some informations about the peer.

const conn = upring.peerConn({ id: '127.0.0.1:7979' })

conn.request({ ns: 'monitoring', cmd: 'info' }, console.log)

// the response will be in the format // { id: '192.168.1.185:55673', // upring: { address: '192.168.1.185', port: 50758 } }

Custom information can be added in

upring.info
, and it will be added to this respsonse.

ns:monitoring,cmd:trace

Returns a stream of sampled key/hash pairs.

const conn = upring.peerConn({ id: '127.0.0.1:7979' })

conn.request({ ns: 'monitoring', cmd: 'trace' }, function (err, res) { if (err) { // do something! }

res.stream.trace.on('data', console.log) // this will be in the format // { id: '192.168.1.185:55673', // keys: // [ { key: 'world', hash: 831779723 }, // { key: 'hello', hash: 2535641019 } ] } })

use, after and ready

UpRing exposes three apis to extend the current instance, in a safe asynchronous bootstrap procedure. With

use
you can add new methods or properties to the current instance and be sure that everything will be loaded before the
up
event.
Example: ```js // main.js upring.use(require('./feature'), { some: 'option' }, err => { if (err) throw err })

// feature.js module.exports = function (upring, opts, next) { upring.sum = (a, b) => a + b next() }

You can use `after` if you need to know when a plugin has been loaded:
js // main.js upring .use(require('./feature'), { some: 'option' }, err => { if (err) throw err }) .after(() => { console.log('loaded!') }) ``
You can also use
ready
if you need to know when everything is ready but the
up` event has not been fired yet.
If you need more info about how this lifecycle works, take a look to the avvio documentation.

Demo

You can try a demo that uses

upring-kv
here.
We recommend using baseswim to run a base node. It also available as a tiny docker image.

Acknowledgements

This project is kindly sponsored by nearForm.

License

MIT

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.