Package Exports
- wtsqs
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (wtsqs) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
WTSQS
Simplified SQS Wrapper and Async Worker manager.
Features:
- Simple interface. ✅
- Promise based. ✅
- ES6. ✅
- Optimized async worker. ✅
Install
# Using npm
$ npm install wtsqs --save
# Or using yarn
$ yarn add wtsqsClasses
- WTSQS
A simplified sqs wrapper with interface similar to a normal queue data structure.
- WTSQSWorker
WTSQS worker job manager.
WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.
Typedefs
- Message :
Object Received SQS Message
- Job :
Object Worker Job
WTSQS
A simplified sqs wrapper with interface similar to a normal queue data structure.
Kind: global class
- WTSQS
- new WTSQS(options)
- .size() ⇒
Promise.<integer> - .enqueueOne(payload, [options], [sqsOptions]) ⇒
Promise - .enqueueMany(payloads, [options], [sqsOptions]) ⇒
Promise - .peekOne([options], [sqsOptions]) ⇒
Promise.<(Message|null)> - .peekMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒
Promise.<Array.<Message>> - .deleteOne(message) ⇒
Promise - .deleteMany(messages) ⇒
Promise - .deleteAll() ⇒
Promise - .popOne([options], [sqsOptions]) ⇒
Promise.<(Message|null)> - .popMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒
Promise.<Array.<Message>>
new WTSQS(options)
Constructs WTSQS object.
| Param | Type | Default | Description |
|---|---|---|---|
| options | Object |
Options object. | |
| options.url | String |
SQS queue url. | |
| options.accessKeyId | String |
AWS access key id. | |
| options.secretAccessKey | String |
AWS secret access key. | |
| [options.region] | String |
us-east-1 |
AWS regions where queue exists. |
| [options.defaultMessageGroupId] | String |
FIFO queues only. Default tag assigned to a message that specifies it belongs to a specific message group. If not provided random uuid is assigned to each message which doesn't guarantee order but allows parallelism. | |
| [options.defaultVisibilityTimeout] | Integer |
60 |
Default duration (in seconds) that the received messages are hidden from subsequent retrieve requests. |
| [options.defaultPollWaitTime] | Integer |
10 |
Default duration (in seconds) for which read calls wait for a message to arrive in the queue before returning. |
| [options.sqsOptions] | Object |
Additional options to extend/override the underlying SQS object creation. |
Example
const { WTSQS } = require('wtsqs')
// The most simple way to construct a WTSQS object
const wtsqs = new WTSQS({
url: '//queue-url',
accessKeyId: 'AWS_ACCESS_KEY_ID',
secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})wtsqs.size() ⇒ Promise.<integer>
Get approximate total number of messages in the queue.
Kind: instance method of WTSQS
Example
const size = await wtsqs.size()
console.log(size) // output: 2wtsqs.enqueueOne(payload, [options], [sqsOptions]) ⇒ Promise
Enqueue single payload in the queue.
Kind: instance method of WTSQS
See: SQS#sendMessage
| Param | Type | Default | Description |
|---|---|---|---|
| payload | Object |
JSON serializable object. | |
| [options] | Object |
Options. | |
| [options.messageGroupId] | String |
Message group id to override default id. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS sendMessage request. |
Example
const myObj = { a: 1 }
await wtsqs.enqueueOne(myObj)wtsqs.enqueueMany(payloads, [options], [sqsOptions]) ⇒ Promise
Enqueue batch of payloads in the queue.
Kind: instance method of WTSQS
See: SQS#sendMessageBatch
| Param | Type | Default | Description |
|---|---|---|---|
| payloads | Array.<Object> |
Array of JSON serializable objects. | |
| [options] | Object |
Options object. | |
| [options.messageGroupId] | String |
Message group id to override default id. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS sendMessageBatch request. |
Example
const myObjList = [{ a: 1 }, { b: 3 }]
await wtsqs.enqueueMany(myObjList)wtsqs.peekOne([options], [sqsOptions]) ⇒ Promise.<(Message|null)>
Retrieve single message without deleting it.
Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.
| Param | Type | Default | Description |
|---|---|---|---|
| [options] | Object |
Options object. | |
| [options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
| [options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessage = await wtsqs.peekOne()
console.log(myMessage)
// output:
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
}wtsqs.peekMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒ Promise.<Array.<Message>>
Retrieve batch of messages without deleting them.
Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.
See: SQS#receiveMessage
| Param | Type | Default | Description |
|---|---|---|---|
| [maxNumberOfMessages] | Number |
10 |
Maximum number of messages to retrieve. Must be between 1 and 10. |
| [options] | Object |
Options object. | |
| [options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
| [options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessageList = await wtsqs.peekMany(2)
console.log(myMessageList)
// output:
[
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
},
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { b: 3 }
}
]wtsqs.deleteOne(message) ⇒ Promise
Delete single message from queue.
Kind: instance method of WTSQS
See: SQS#deleteMessage
| Param | Type | Description |
|---|---|---|
| message | Message |
Message to be deleted |
Example
const myMessage = await wtsqs.peekOne()
await wtsqs.deleteOne(myMessage)wtsqs.deleteMany(messages) ⇒ Promise
Delete batch of messages from queue.
Kind: instance method of WTSQS
See: SQS#deleteMessageBatch
| Param | Type | Description |
|---|---|---|
| messages | Array.<Message> |
Messages to be deleted |
Example
const myMessageList = await wtsqs.peekMany(2)
await wtsqs.deleteMany(myMessageList)wtsqs.deleteAll() ⇒ Promise
Delete ALL messages in the queue.
NOTE: Can only be called once every 60 seconds.
Kind: instance method of WTSQS
See: SQS#purgeQueue
Example
await wtsqs.deleteAll()wtsqs.popOne([options], [sqsOptions]) ⇒ Promise.<(Message|null)>
Retrieve single message and immediately delete it.
Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.
| Param | Type | Default | Description |
|---|---|---|---|
| [options] | Object |
Options object. | |
| [options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
| [options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessage = await wtsqs.popOne()
// The message no longer exists in queue
console.log(myMessage)
// output:
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
}wtsqs.popMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒ Promise.<Array.<Message>>
Retrieve batch of messages and immediately delete them.
Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.
| Param | Type | Default | Description |
|---|---|---|---|
| [maxNumberOfMessages] | Number |
10 |
Maximum number of messages to retrieve. Must be between 1 and 10. |
| [options] | Object |
Options object. | |
| [options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
| [options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessageList = await wtsqs.popMany(2)
// Messages no longer exist in queue
console.log(myMessageList)
// output:
[
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
},
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { b: 3 }
}
]WTSQSWorker
WTSQS worker job manager.
WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.
Kind: global class
- WTSQSWorker
- new WTSQSWorker(options)
- instance
- inner
- ~runHandler ⇒
Promise
- ~runHandler ⇒
new WTSQSWorker(options)
Constructs WTSQSWorker object.
| Param | Type | Default | Description |
|---|---|---|---|
| options | Object |
Options object. | |
| options.wtsqs | WTSQS |
WTSQS instance to use for connecting to sqs. | |
| [options.maxConcurrency] | Integer |
20 |
Maximum number of concurrent jobs. |
| [options.pollWaitTime] | Integer |
5 |
Duration (in seconds) for which read calls wait for a job to arrive in the queue before returning. |
| [options.visibilityTimeout] | Integer |
30 |
Duration (in seconds) that the received jobs are hidden from subsequent retrieve requests. |
| [options.logger] | Object | String |
|
Object with trace, debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger. |
Example
const { WTSQS, WTSQSWorker } = require('wtsqs')
const wtsqs = new WTSQS({
url: '//queue-url',
accessKeyId: 'AWS_ACCESS_KEY_ID',
secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})
const worker = new WTSQSWorker({ wtsqs })
worker.run(async (job) => {
await someAsyncFunction(job.body)
console.log(job)
})worker.run(handler)
Start fetching and processing jobs.
Kind: instance method of WTSQSWorker
| Param | Type | Description |
|---|---|---|
| handler | runHandler |
Async function to process a single job. |
WTSQSWorker~runHandler ⇒ Promise
Async callback function to process single job.
Kind: inner typedef of WTSQSWorker
| Param | Type | Description |
|---|---|---|
| job | Job |
A single job to process |
Message : Object
Received SQS Message
Kind: global typedef
Properties
| Name | Type | Description |
|---|---|---|
| id | String |
Message id. |
| receiptHandle | String |
Message receipt handle. |
| md5 | String |
Message body md5 hash sum. |
| body | Object |
Message body containing original payload. |
Job : Object
Worker Job
Kind: global typedef
Properties
| Name | Type | Description |
|---|---|---|
| id | String |
Job id. |
| receiptHandle | String |
Job receipt handle. |
| md5 | String |
Job body md5 hash sum. |
| body | Object |
Job body containing original payload. |