JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 73
  • Score
    100M100P100Q81614F
  • License MIT

DRY Castore MessageQueue definition using FastQ

Package Exports

  • @castore/in-memory-message-queue-adapter
  • @castore/in-memory-message-queue-adapter/dist/cjs/index.js
  • @castore/in-memory-message-queue-adapter/dist/esm/index.js

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@castore/in-memory-message-queue-adapter) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

In Memory Message Queue Adapter

DRY Castore MessageQueue definition using FastQ.

📥 Installation

# npm
npm install @castore/in-memory-message-queue-adapter

# yarn
yarn add @castore/in-memory-message-queue-adapter

This package has @castore/core as peer dependency, so you will have to install it as well:

# npm
npm install @castore/core

# yarn
yarn add @castore/core

👩‍💻 Usage

The simplest way to use this adapter is to use the attachTo static method:

import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';

const messageQueueAdapter =
  InMemoryMessageQueueAdapter.attachTo(appMessageQueue);

This will make your messageQueueAdapter inherit from your appMessageQueue types while plugging them together 🙌

You can also instanciate one on its own, but notice the code duplication:

import type { MessageQueueMessage } from '@castore/core';
import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';

const messageQueueAdapter = new InMemoryMessageQueueAdapter<
  MessageQueueMessage<typeof appMessageQueue>
>();

appMessageQueue.messageQueueAdapter = messageQueueAdapter;

🤖 Set worker

You can provide an async worker for the queue at construction time, or in context later:

const messageQueueAdapter = InMemoryMessageQueueAdapter.attachTo(
  appMessageQueue,
  {
    worker: async message => {
      // 🙌 Correctly typed!
      const { eventStoreId, event } = message;
    },
  },
);

// 👇 Alternatively
const messageQueueAdapter = new InMemoryMessageQueueAdapter<
  MessageQueueMessage<typeof appMessageQueue>
>({
  worker: async message => {
    // 🙌 Correctly typed!
    const { eventStoreId, event } = message;
  },
});

// 👇 Also alternatively
messageQueueAdapter.worker = async message => {
  // 🙌 Correctly typed!
  const { eventStoreId, event } = message;
};

Only one worker at a time can be set up

For more control, the worker has access to more context through its second argument:

messageQueueAdapter.worker = async (message, context) => {
  const { eventStoreId, event } = message;
  const {
    // 👇 See "Retry policy" section below
    attempt,
    retryAttemptsLeft,
    // 👇 If event is replayed
    replay,
  } = context;

  ...
};

♻️ Retry policy

This adapter will retry failed messages handling. You can specify a different retry policy than the default one via its constructor arguments:

  • retryAttempts (?number = 2): The maximum number of retry attempts for a message in case of worker execution failure. If all the retries fail, the error is logged with console.error, and the message ignored.
  • retryDelayInMs (?number = 30000): The minimum delay in milliseconds between the worker execution failure and its first retry.
  • retryBackoffRate (?number = 2): A factor applied to the retryDelayInMs at each subsequent retry.
const messageQueueAdapter = InMemoryMessageQueueAdapter.attachTo(appMessageQueue, {
  retryAttempts: 3,
  retryDelayInMs: 10000,
  retryBackoffRate: 1.5,
});

// 👇 Alternatively
const messageQueueAdapter = new InMemoryMessageQueueAdapter<
  MessageQueueMessage<typeof appMessageQueue>
>({
  retryAttempts: 3,
  retryDelayInMs: 10000,
  retryBackoffRate: 1.5,
});

For instance, if the worker is continously failing for a specific message, the sequence of code execution (with the default retry policy) will look like this:

  • Worker execution: ❌ Failure
  • 30 seconds of delay
  • Worker execution: ❌ Failure
  • 60 seconds of delay (30x2)
  • Worker execution: ❌ Failure
  • No more retry attempt, error is logged