Package Exports
- @castore/in-memory-message-queue-adapter
- @castore/in-memory-message-queue-adapter/dist/cjs/index.js
- @castore/in-memory-message-queue-adapter/dist/esm/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@castore/in-memory-message-queue-adapter) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
In Memory Message Queue Adapter
DRY Castore MessageQueue
definition using FastQ.
📥 Installation
# npm
npm install @castore/in-memory-message-queue-adapter
# yarn
yarn add @castore/in-memory-message-queue-adapter
This package has @castore/core
as peer dependency, so you will have to install it as well:
# npm
npm install @castore/core
# yarn
yarn add @castore/core
👩💻 Usage
The simplest way to use this adapter is to use the attachTo
static method:
import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';
const messageQueueAdapter =
InMemoryMessageQueueAdapter.attachTo(appMessageQueue);
This will make your messageQueueAdapter
inherit from your appMessageQueue
types while plugging them together 🙌
You can also instanciate one on its own, but notice the code duplication:
import type { MessageQueueMessage } from '@castore/core';
import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';
const messageQueueAdapter = new InMemoryMessageQueueAdapter<
MessageQueueMessage<typeof appMessageQueue>
>();
appMessageQueue.messageQueueAdapter = messageQueueAdapter;
🤖 Set worker
You can provide an async worker for the queue at construction time, or in context later:
const messageQueueAdapter = InMemoryMessageQueueAdapter.attachTo(
appMessageQueue,
{
worker: async message => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message;
},
},
);
// 👇 Alternatively
const messageQueueAdapter = new InMemoryMessageQueueAdapter<
MessageQueueMessage<typeof appMessageQueue>
>({
worker: async message => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message;
},
});
// 👇 Also alternatively
messageQueueAdapter.worker = async message => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message;
};
Only one worker at a time can be set up
For more control, the worker has access to more context through its second argument:
messageQueueAdapter.worker = async (message, context) => {
const { eventStoreId, event } = message;
const {
// 👇 See "Retry policy" section below
attempt,
retryAttemptsLeft,
// 👇 If event is replayed
replay,
} = context;
...
};
♻️ Retry policy
This adapter will retry failed messages handling. You can specify a different retry policy than the default one via its constructor arguments:
retryAttempts (?number = 2)
: The maximum number of retry attempts for a message in case of worker execution failure. If all the retries fail, the error is logged withconsole.error
, and the message ignored.retryDelayInMs (?number = 30000)
: The minimum delay in milliseconds between the worker execution failure and its first retry.retryBackoffRate (?number = 2)
: A factor applied to theretryDelayInMs
at each subsequent retry.
const messageQueueAdapter = InMemoryMessageQueueAdapter.attachTo(appMessageQueue, {
retryAttempts: 3,
retryDelayInMs: 10000,
retryBackoffRate: 1.5,
});
// 👇 Alternatively
const messageQueueAdapter = new InMemoryMessageQueueAdapter<
MessageQueueMessage<typeof appMessageQueue>
>({
retryAttempts: 3,
retryDelayInMs: 10000,
retryBackoffRate: 1.5,
});
For instance, if the worker is continously failing for a specific message, the sequence of code execution (with the default retry policy) will look like this:
- Worker execution: ❌ Failure
- 30 seconds of delay
- Worker execution: ❌ Failure
- 60 seconds of delay (30x2)
- Worker execution: ❌ Failure
- No more retry attempt, error is logged