JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 27324
  • Score
    100M100P100Q154311F
  • License MIT

Easy AWS step function activity worker in node.js

Package Exports

  • step-function-worker

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (step-function-worker) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

Build Status codecov

step-function-worker

Create a nodejs aws step-function worker/pooler easily :-)

install

npm install step-function-worker

Example usage

Basic example

const fn = function(input, cb, heartbeat){
  // do something
  doSomething(input)

  // call heartbeat to avoid timeout
  heartbeat()

  // call callback in the end
  cb(null, {"foo" : "bar"}); // output must be compatible with JSON.stringify
};

const worker = new StepFunctionWorker({
  activityArn : '<activity-ARN>',
  workerName : 'workerName',
  fn : fn,
  taskConcurrency : 22, // default is null = Infinity
  poolConcurrency : 2 // default is 1
});

Concurrency management

Since version 3.0, concurrency has been replaced by poolConcurrency and taskConcurrency.

see more information in https://github.com/piercus/step-function-worker/issues/16#issuecomment-486971866

  • poolConcurrency is the maximum number of parallel getActivity, http request (see sdk.getActivity) (default: 1) Increase this to have a more responsive worker, decrease this to consume less http connections.

  • taskConcurrency (null means Infinite) represents the maximum number of parallel tasks done by the worker (default: equals to poolConcurrency).

Anyway, you should always have poolConcurrency <= taskConcurrency.

Set the Region

By default, this package is built on top of aws-sdk so you should set your AWS Region by changing AWS_REGION environment variable.

If you want to set it in JS code directly you can do it using awsConfig (see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html to see all available options) like

const worker = new StepFunctionWorker({
  activityArn : '<activity-ARN>',
  workerName : 'workerName',
  fn : fn,
  awsConfig: {
    region: '<your-region>'
  }
});

Close the worker

// when finish close the worker with a callback
// this closing process may take up to 60 seconds per concurent worker, to close all connections smoothly without loosing any task
worker.close(function(){
  process.exit();
})

Get info on current worker

// A worker as multiple poolers and multiple running tasks
// You can have infos about it by doing
const {poolers, tasks} = worker.report();

// poolers is an array of {
//   startTime: <Date>,
//   workerName: <String>,
//   status: <String>
// }
//
// tasks is an array of {
//  taskToken: <String>,
//  input: <Object>,
//  startTime: <Date>
// }
//

Custom logging with winston

You can customize logging by using a winston logger (or winston-like logger) as input

const winston = require('winston');

const logger = winston.createLogger({
  level: 'debug',
  format: winston.format.json(),
  defaultMeta: { service: 'user-service' },
  transports: [
    //
    // - Write to all logs with level `info` and below to `combined.log` 
    // - Write all logs error (and below) to `error.log`.
    //
    new winston.transports.File({ filename: 'error.log', level: 'error' }),
    new winston.transports.File({ filename: 'combined.log' })
  ]
});

const worker = new StepFunctionWorker({
  activityArn : '<activity-ARN>',
  workerName : 'workerName',
  fn : fn,
  logger
});

Alternatively, you can just use a winston-like logger

const logger = console;

const worker = new StepFunctionWorker({
  activityArn : '<activity-ARN>',
  workerName : 'workerName',
  fn : fn,
  logger
});

Events

// when a task starts
worker.on('task', function(task){
  // task.taskToken
  // task.input
  console.log("task ", task.input)
});

// when a task fails
worker.on('failure', function(failure){
  // out.error
  // out.taskToken
  console.log("Failure :",failure.error)
});

// when a heartbeat signal is sent
worker.on('heartbeat', function(beat){
  // out.taskToken
  console.log("Heartbeat");
});

// when a task succeed
worker.on('success', function(out){
  // out.output
  // out.taskToken
  console.log("Success :",out.output)
});

// when an error happens
worker.on('error', function(err){
  console.log("error ", err)
});

// when the worker has no more task to process
worker.on('empty', function(){
  console.log("error ", err)
});

// when the worker reaches taskConcurrency tasks
worker.on('full', function(err){
  console.log("error ", err)
});

Documentation

See JSDoc in the code.