Package Exports
- @castore/sqs-message-queue-adapter
- @castore/sqs-message-queue-adapter/dist/cjs/index.js
- @castore/sqs-message-queue-adapter/dist/esm/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@castore/sqs-message-queue-adapter) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
SQS Message Queue Adapter
DRY Castore MessageQueue
definition using AWS SQS.
📥 Installation
# npm
npm install @castore/sqs-message-queue-adapter
# yarn
yarn add @castore/sqs-message-queue-adapter
This package has @castore/core
and @aws-sdk/client-sqs
(above v3) as peer dependencies, so you will have to install them as well:
# npm
npm install @castore/core @aws-sdk/client-sqs
# yarn
yarn add @castore/core @aws-sdk/client-sqs
👩💻 Usage
import { SQSClient } from '@aws-sdk/client-sqs';
import { SQSMessageQueueAdapter } from '@castore/sqs-message-queue-adapter';
const sqsClient = new SQSClient({});
const messageQueueAdapter = new SQSMessageQueueAdapter({
queueUrl: 'https://sqs.us-east-1.amazonaws.com/111122223333/my-super-queue',
sqsClient,
});
// 👇 Alternatively, provide a getter
const messageQueueAdapter = new SQSMessageQueueAdapter({
queueUrl: () => process.env.MY_SQS_QUEUE_URL,
sqsClient,
});
const appMessageQueue = new NotificationMessageQueue({
...
messageQueueAdapter
})
This will directly plug your MessageQueue to SQS 🙌
If your queue is of type FIFO, don't forget to specify it in the constructor:
const messageQueueAdapter = new SQSMessageQueueAdapter({
queueUrl: 'https://sqs.us-east-1.amazonaws.com/111122223333/my-super-queue',
sqsClient,
fifo: true,
});
🤔 How it works
When publishing a message, it is JSON stringified and passed as the record body.
// 👇 Aggregate exists
const message = {
body: '{
\"eventStoreId\": \"POKEMONS\",
\"aggregateId\": \"123\",
}',
... // <= Other technical SQS properties
}
// 👇 Notification
const message = {
body: '{
\"eventStoreId\": \"POKEMONS\",
\"event\": {
\"aggregateId\": \"123\",
\"version\": 1,
\"type\": \"POKEMON_APPEARED\",
\"timestamp\": ...
...
},
}',
...
}
// 👇 State-carrying
const message = {
body: '{
\"eventStoreId\": \"POKEMONS\",
\"event\": {
\"aggregateId\": \"123\",
...
},
\"aggregate\": ...,
}',
...
};
If your queue is of type FIFO, the messageGroupId
and messageDeduplicationId
will be derived from a combination of the eventStoreId
, aggregateId
and version
:
// 👇 Fifo message
const message = {
messageBody: ...,
messageGroupId: "POKEMONS#123",
messageDeduplicationId: "POKEMONS#123#1", // <= Or "POKEMONS#123" for AggregateExistsMessageQueues
... // <= Other technical SQS properties
};
If the replay
option is set to true
, a replay
metadata attribute is included in the message:
// 👇 Replayed notification message
const message = {
body: '{
\"eventStoreId\": \"POKEMONS\",
\"event\": {
\"aggregateId\": \"123\",
...
},
}',
messageAttributes: {
replay: {
// 👇 boolean type is not available in SQS 🤷♂️
dataType: 'Number',
// 👇 numberValue is not available in SQS 🤷♂️
stringValue: '1',
},
},
...
};
On the worker side, you can use the SQSMessageQueueMessage
and SQSMessageQueueMessageBody
TS types to type your argument:
import type {
SQSMessageQueueMessage,
SQSMessageQueueMessageBody,
} from '@castore/sqs-message-queue-adapter';
const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => {
Records.forEach(({ body }) => {
// 👇 Correctly typed!
const recordBody: SQSMessageQueueMessageBody<typeof appMessageQueue> =
JSON.parse(body);
});
};
🔑 IAM
The publishMessage
method requires the sqs:SendMessage
IAM permission on the provided SQS queue.