JSPM

  • Created
  • Published
  • Downloads 1037275
  • Score
    100M100P100Q178797F
  • License BSD

Job manager

Package Exports

  • bull
  • bull/lib/job
  • bull/lib/priority-queue

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (bull) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

Bull Job Manager

bull

A lightweight, robust and fast job processing queue. Designed with stability and atomicity in mind. The API is inspired by Kue.

It uses redis for persistence, so the queue is not lost if the server goes down for any reason.

If you need more features than the ones provided by Bull check Kue but keep in mind this open issue.

BuildStatus

Install:

npm install bull

Quick Guide

var Queue = require('bull');

var queue = new Queue('media transcoding', 6379, '127.0.0.1'));

queue.process('video transcode', function(job, done){
  
  // job.data contains the custom data passed when the job was created
  
  // transcode video asynchronously and report progress
  job.progress(42);
  
  // call done when finished
  done();
  
  // or give a error if error
  done(Error('error transcoding'));
  
  // If the job throws an unhandled exception it is also handled correctly
  throw (Error('some unexpected error'));
});

queue.process('audio transcode', function(job, done){
  // transcode audio asynchronously and report progress
  job.progress(42);
  
  // call done when finished
  done();
  
  // or give a error if error
  done(Error('error transcoding'));
  
  // If the job throws an unhandled exception it is also handled correctly
  throw (Error('some unexpected error'));
});

queue.process('image transcode', function(job, done){
  // transcode image asynchronously and report progress
  job.progress(42);
  
  // call done when finished
  done();
  
  // or give a error if error
  done(Error('error transcoding'));
  
  // If the job throws an unhandled exception it is also handled correctly
  throw (Error('some unexpected error'));
});

queue.createJob('video transcode', {video: 'http://example.com/video1.mov'});
queue.createJob('audio transcode', {audio: 'http://example.com/audio1.mp3'});
queue.createJob('image transcode', {image: 'http://example.com/image1.tiff'});

A queue can be paused and resumed:

queue.pause().then(function(){
  // queue is paused now
});

queue.resume().then(function(){
  // queue is resumed now
})

A queue emits also some useful events:

queue.on('completed', function(job){
  // Job completed!
})
.on('failed', function(job, err){
  // Job failed with reason err!
})
.on('progress', function(job, progress){
  // Job progress updated!
})
.on('paused', function(){
  // The queue has been paused
})
.on('progress', function(job, progress){
  // The queue has been resumed
})

Queues are cheap, so if you need many of them just create new ones with different names:

var userJohn = new Queue('john');
var userLisa = new Queue('lisa');
.
.
.

Queues are robust and can be run in parallel in several threads or processes without any risk of hazzards or queue corruption. Check this simple example using cluster to parallelize jobs accross processes:

var 
  Queue = require('bull'),
  cluster = require('cluster');

var queue = new Queue("test concurrent queue", 6379, '127.0.0.1');

queue.process('test concurrent job', function(job, jobDone){
  if(cluster.isMaster){
    console.log("Job done in master", job.jobId);
  }else{
    console.log("Job done by worker", cluster.worker.id, job.jobId);
  }
  jobDone();
});

if(cluster.isMaster){
  var numWorkers = 8;
  for (var i = 0; i < numWorkers; i++) {
    cluster.fork();
  }
}

##Documentation

Reference

###Queue(queueName, redisPort, redisHost, [redisOpts])

This is the Queue constructor. It creates a new Queue that is persisted in Redis. Everytime the same queue is instantiated it tries to process all the old jobs that may exist from a previous unfinished session.

Arguments

    queueName {String} A unique name for this Queue.
    redisPort {Number} A port where redis server is running.
    redisHost {String} A host specified as IP or domain where redis is running.
    redisOptions {Object} Optional options to pass to the redis client.

#### Queue##process(jobName, function(job, done))

Defines a processing function for the jobs with the given name.

The callback is called everytime a job in the queue matches the name and provides an instance of the job and a done callback to be called after the job has been completed. If done can be called providing an Error instsance to signal that the job did not complete successfully.

Arguments

    jobName {String} A job type name.
    cb {Function} A callback called for every job of the given name.

#### Queue##createJob(jobName, args)

Creates a new job and adds it to the queue. If the queue is empty the job will be executed directly, otherwise it will be placed in the queue and executed as soon as possible.

Arguments

  jobName {String} A job type name.
  args {PlainObject} A plain object with arguments that will be passed
    to the job processing function in job.data.
  returns {Promise} A promise that resolves when the job has been succesfully
    added to the queue (or rejects if some error occured).

### Job

A job includes all data needed to perform its execution, as well as the progress method needed to update its progress.

The most important property for the user is Job##data that includes the object that was passed to Queue##createJob, and that is normally used to perform the job.


##License

(The MIT License)

Copyright (c) 2013 Manuel Astudillo manuel@optimalbits.com

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.