Package Exports
- step-function-worker
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (step-function-worker) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
step-function-worker
Create a nodejs aws step-function worker/pooler easily :-)
install
npm install step-function-worker
Example usage
Basic example
const fn = function(input, cb, heartbeat){
// do something
doSomething(input)
// call heartbeat to avoid timeout
heartbeat()
// call callback in the end
cb(null, {"foo" : "bar"}); // output must be compatible with JSON.stringify
};
const worker = new StepFunctionWorker({
activityArn : '<activity-ARN>',
workerName : 'workerName',
fn : fn,
taskConcurrency : 22, // default is null = Infinity
poolConcurrency : 2 // default is 1
});
Concurrency management
Since version 3.0, concurrency
has been replaced by poolConcurrency
and taskConcurrency
.
see more information in https://github.com/piercus/step-function-worker/issues/16#issuecomment-486971866
poolConcurrency
is the maximum number of parallel getActivity, http request (seesdk.getActivity
) (default:1
) Increase this to have a more responsive worker, decrease this to consume less http connections.taskConcurrency
(null
means Infinite) represents the maximum number of parallel tasks done by the worker (default: equals topoolConcurrency
).
Anyway, you should always have poolConcurrency
<= taskConcurrency
.
Set the Region
By default, this package is built on top of aws-sdk
so you should set your AWS Region by changing AWS_REGION
environment variable.
If you want to set it in JS code directly you can do it using awsConfig
(see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html to see all available options) like
const worker = new StepFunctionWorker({
activityArn : '<activity-ARN>',
workerName : 'workerName',
fn : fn,
awsConfig: {
region: '<your-region>'
}
});
Close the worker
// when finish close the worker with a callback
// this closing process may take up to 60 seconds per concurent worker, to close all connections smoothly without loosing any task
worker.close(function(){
process.exit();
})
Get info on current worker
// A worker as multiple poolers and multiple running tasks
// You can have infos about it by doing
const {poolers, tasks} = worker.report();
// poolers is an array of {
// startTime: <Date>,
// workerName: <String>,
// status: <String>
// }
//
// tasks is an array of {
// taskToken: <String>,
// input: <Object>,
// startTime: <Date>
// }
//
Custom logging with winston
You can customize logging by using a winston logger (or winston-like logger) as input
const winston = require('winston');
const logger = winston.createLogger({
level: 'debug',
format: winston.format.json(),
defaultMeta: { service: 'user-service' },
transports: [
//
// - Write to all logs with level `info` and below to `combined.log`
// - Write all logs error (and below) to `error.log`.
//
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});
const worker = new StepFunctionWorker({
activityArn : '<activity-ARN>',
workerName : 'workerName',
fn : fn,
logger
});
Alternatively, you can just use a winston-like logger
const logger = console;
const worker = new StepFunctionWorker({
activityArn : '<activity-ARN>',
workerName : 'workerName',
fn : fn,
logger
});
Events
// when a task starts
worker.on('task', function(task){
// task.taskToken
// task.input
console.log("task ", task.input)
});
// when a task fails
worker.on('failure', function(failure){
// out.error
// out.taskToken
console.log("Failure :",failure.error)
});
// when a heartbeat signal is sent
worker.on('heartbeat', function(beat){
// out.taskToken
console.log("Heartbeat");
});
// when a task succeed
worker.on('success', function(out){
// out.output
// out.taskToken
console.log("Success :",out.output)
});
// when an error happens
worker.on('error', function(err){
console.log("error ", err)
});
// when the worker has no more task to process
worker.on('empty', function(){
console.log("error ", err)
});
// when the worker reaches taskConcurrency tasks
worker.on('full', function(err){
console.log("error ", err)
});
Documentation
See JSDoc in the code.