From 9ffbf8ca529435eb74a49bbe9456aa47ee745d0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Tue, 26 Nov 2024 15:28:31 +0100 Subject: [PATCH] Working test for saving outbox entries. --- packages/outbox-prisma-adapter/README.md | 44 ++++-------------- .../lib/outbox-prisma-adapter.ts | 17 ++++++- .../test/outbox-prisma-adapter.spec.ts | 46 +++++++++++-------- .../outbox-prisma-adapter/test/schema.prisma | 8 +++- 4 files changed, 58 insertions(+), 57 deletions(-) diff --git a/packages/outbox-prisma-adapter/README.md b/packages/outbox-prisma-adapter/README.md index 91e9c541..e480b2a2 100644 --- a/packages/outbox-prisma-adapter/README.md +++ b/packages/outbox-prisma-adapter/README.md @@ -1,42 +1,18 @@ -# outbox-core +# outbox-prisma-adapter -Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages. +This package provides a Prisma adapter for the Outbox pattern. -## Installation +### Development -```bash -npm i -S @message-queue-toolkit/outbox-core -``` - -## Usage - -To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class: +#### Tests -```typescript -import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core'; +To run the tests, you need to have a PostgreSQL database running. You can use the following command to start a PostgreSQL database using Docker: -const job = new OutboxPeriodicJob( - //Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit - outboxStorage, - //Default available accumulator for gathering outbox entries as the process job is progressing. - new InMemoryOutboxAccumulator(), - //DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core - eventEmitter, - //See PeriodicJobDependencies from @lokalise/background-jobs-common - dependencies, - //Retry count, how many times outbox entries should be retried to be processed - 3, - //emitBatchSize - how many outbox entries should be emitted at once - 10, - //internalInMs - how often the job should be executed, e.g. below it runs every 1sec - 1000 -) +```sh +docker-compose up -d ``` -Job will take care of processing outbox entries emitted by: -```typescript -const emitter = new OutboxEventEmitter( - //Same instance of outbox storage that is used by OutboxPeriodicJob - outboxStorage -) +Then update Prisma client: +```sh +npx prisma generate --schema=./test/schema.prisma ``` diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index c0f272fa..6b27b448 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -16,8 +16,15 @@ export class OutboxPrismaAdapter> { const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName] + const messageType = getMessageType(outboxEntry.event) return prismaModel.create({ - data: getMessageType(outboxEntry.event), + data: { + id: outboxEntry.id, + type: messageType, + created: outboxEntry.created, + data: outboxEntry.data, + status: outboxEntry.status, + }, }) } @@ -26,6 +33,12 @@ export class OutboxPrismaAdapter[]> { - return Promise.resolve([]) + return this.prisma[this.modelName].findMany({ + where: { + retryCount: { + lte: maxRetryCount, + }, + }, + }) } } diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index ff689091..6b74b13f 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -35,7 +35,11 @@ describe('outbox-prisma-adapter', () => { await prisma.$queryRaw` CREATE TABLE prisma.outbox_entry ( id UUID PRIMARY KEY, - created TIMESTAMP NOT NULL + type TEXT NOT NULL, + created TIMESTAMP NOT NULL, + retry_count INT NOT NULL DEFAULT 0, + data JSONB NOT NULL, + status TEXT NOT NULL ) ` }) @@ -46,24 +50,6 @@ describe('outbox-prisma-adapter', () => { await prisma.$disconnect() }) - it('test db connection', async () => { - const creationDate = new Date() - await prisma.outboxEntry.create({ - data: { - id: 'ce08b43b-6162-4913-86ea-fa9367875e3b', - created: creationDate, - }, - }) - - const result = await prisma.outboxEntry.findMany() - expect(result).toEqual([ - { - id: 'ce08b43b-6162-4913-86ea-fa9367875e3b', - created: creationDate, - }, - ]) - }) - it('creates entry in DB via outbox storage implementation', async () => { await outboxPrismaAdapter.createEntry({ id: uuidv7(), @@ -77,6 +63,28 @@ describe('outbox-prisma-adapter', () => { metadata: {}, timestamp: new Date().toISOString(), }, + retryCount: 0, + created: new Date(), } satisfies OutboxEntry) + + const entries = await outboxPrismaAdapter.getEntries(10) + + expect(entries).toEqual([ + { + id: expect.any(String), + type: 'entity.created', + created: expect.any(Date), + retryCount: 0, + data: { + id: expect.any(String), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: expect.any(String), + }, + status: 'CREATED', + }, + ]) }) }) diff --git a/packages/outbox-prisma-adapter/test/schema.prisma b/packages/outbox-prisma-adapter/test/schema.prisma index 812d7f9c..a7196c29 100644 --- a/packages/outbox-prisma-adapter/test/schema.prisma +++ b/packages/outbox-prisma-adapter/test/schema.prisma @@ -4,8 +4,12 @@ datasource db { } model OutboxEntry { - id String @id @default(uuid()) @db.Uuid - created DateTime @default(now()) + id String @id @default(uuid()) @db.Uuid + created DateTime @default(now()) + type String + retryCount Int @default(0) @map("retry_count") + data Json + status String @@map("outbox_entry") }