Skip to content

Commit

Permalink
Working test for saving outbox entries.
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilwylegala committed Nov 26, 2024
1 parent e93bdad commit 963014a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 57 deletions.
44 changes: 10 additions & 34 deletions packages/outbox-prisma-adapter/README.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,18 @@
# outbox-core
# outbox-prisma-adapter

Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages.
This package provides a Prisma adapter for the Outbox pattern.

## Installation
### Development

```bash
npm i -S @message-queue-toolkit/outbox-core
```

## Usage

To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class:
#### Tests

```typescript
import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core';
To run the tests, you need to have a PostgreSQL database running. You can use the following command to start a PostgreSQL database using Docker:

const job = new OutboxPeriodicJob(
//Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit
outboxStorage,
//Default available accumulator for gathering outbox entries as the process job is progressing.
new InMemoryOutboxAccumulator(),
//DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core
eventEmitter,
//See PeriodicJobDependencies from @lokalise/background-jobs-common
dependencies,
//Retry count, how many times outbox entries should be retried to be processed
3,
//emitBatchSize - how many outbox entries should be emitted at once
10,
//internalInMs - how often the job should be executed, e.g. below it runs every 1sec
1000
)
```sh
docker-compose up -d
```

Job will take care of processing outbox entries emitted by:
```typescript
const emitter = new OutboxEventEmitter(
//Same instance of outbox storage that is used by OutboxPeriodicJob
outboxStorage
)
Then update Prisma client:
```sh
npx prisma generate --schema=./test/schema.prisma
```
17 changes: 15 additions & 2 deletions packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
): Promise<OutboxEntry<SupportedEvents[number]>> {
const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName]

const messageType = getMessageType(outboxEntry.event)
return prismaModel.create({
data: getMessageType(outboxEntry.event),
data: {
id: outboxEntry.id,
type: messageType,
created: outboxEntry.created,
data: outboxEntry.data,
status: outboxEntry.status,
},
})
}

Expand All @@ -26,6 +33,12 @@ export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]
}

getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> {
return Promise.resolve([])
return this.prisma[this.modelName].findMany({
where: {
retryCount: {
lte: maxRetryCount,
},
},
})
}
}
46 changes: 27 additions & 19 deletions packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ describe('outbox-prisma-adapter', () => {
await prisma.$queryRaw`
CREATE TABLE prisma.outbox_entry (
id UUID PRIMARY KEY,
created TIMESTAMP NOT NULL
type TEXT NOT NULL,
created TIMESTAMP NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
data JSONB NOT NULL,
status TEXT NOT NULL
)
`
})
Expand All @@ -46,24 +50,6 @@ describe('outbox-prisma-adapter', () => {
await prisma.$disconnect()
})

it('test db connection', async () => {
const creationDate = new Date()
await prisma.outboxEntry.create({
data: {
id: 'ce08b43b-6162-4913-86ea-fa9367875e3b',
created: creationDate,
},
})

const result = await prisma.outboxEntry.findMany()
expect(result).toEqual([
{
id: 'ce08b43b-6162-4913-86ea-fa9367875e3b',
created: creationDate,
},
])
})

it('creates entry in DB via outbox storage implementation', async () => {
await outboxPrismaAdapter.createEntry({
id: uuidv7(),
Expand All @@ -77,6 +63,28 @@ describe('outbox-prisma-adapter', () => {
metadata: {},
timestamp: new Date().toISOString(),
},
retryCount: 0,
created: new Date(),
} satisfies OutboxEntry<SupportedEvents[number]>)

const entries = await outboxPrismaAdapter.getEntries(10)

expect(entries).toEqual([
{
id: expect.any(String),
type: 'entity.created',
created: expect.any(Date),
retryCount: 0,
data: {
id: expect.any(String),
payload: {
message: 'TEST EVENT',
},
metadata: {},
timestamp: expect.any(String),
},
status: 'CREATED',
},
])
})
})
8 changes: 6 additions & 2 deletions packages/outbox-prisma-adapter/test/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ datasource db {
}

model OutboxEntry {
id String @id @default(uuid()) @db.Uuid
created DateTime @default(now())
id String @id @default(uuid()) @db.Uuid
created DateTime @default(now())
type String
retryCount Int @default(0) @map("retry_count")
data Json
status String
@@map("outbox_entry")
}
Expand Down

0 comments on commit 963014a

Please sign in to comment.