diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index 6b27b448..412620c8 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -22,14 +22,37 @@ export class OutboxPrismaAdapter): Promise { - return Promise.resolve(undefined) + async flush(outboxAccumulator: OutboxAccumulator): Promise { + const entries = await outboxAccumulator.getEntries() + + const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName] + + for (const entry of entries) { + await prismaModel.upsert({ + where: { + id: entry.id, + }, + update: { + status: 'SUCCESS', + updated: new Date(), + }, + create: { + id: entry.id, + type: getMessageType(entry.event), + created: entry.created, + updated: new Date(), + data: entry.data, + status: 'SUCCESS', + }, + }) + } } getEntries(maxRetryCount: number): Promise[]> { diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index 6b74b13f..c58cd8ed 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -1,4 +1,4 @@ -import type { OutboxEntry } from '@message-queue-toolkit/outbox-core' +import { InMemoryOutboxAccumulator, type OutboxEntry } from '@message-queue-toolkit/outbox-core' import { type CommonEventDefinition, enrichMessageSchemaWithBase, @@ -37,6 +37,7 @@ describe('outbox-prisma-adapter', () => { id UUID PRIMARY KEY, type TEXT NOT NULL, created TIMESTAMP NOT NULL, + updated TIMESTAMP, retry_count INT NOT NULL DEFAULT 0, data JSONB NOT NULL, status TEXT NOT NULL @@ -74,6 +75,7 @@ describe('outbox-prisma-adapter', () => { id: expect.any(String), type: 'entity.created', created: expect.any(Date), + updated: expect.any(Date), retryCount: 0, data: { id: expect.any(String), @@ -87,4 +89,124 @@ describe('outbox-prisma-adapter', () => { }, ]) }) + + it('should insert successful entries from accumulator', async () => { + const accumulator = new InMemoryOutboxAccumulator() + + const entry1 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + accumulator.add(entry1) + + const entry2 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT 2', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + accumulator.add(entry2) + + await outboxPrismaAdapter.flush(accumulator) + + const entriesAfterFlush = await outboxPrismaAdapter.getEntries(10) + + expect(entriesAfterFlush).toMatchObject([ + { + id: entry1.id, + status: 'SUCCESS', + }, + { + id: entry2.id, + status: 'SUCCESS', + }, + ]) + }) + + it("should update successful entries' status to 'SUCCESS'", async () => { + const accumulator = new InMemoryOutboxAccumulator() + + const entry1 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + accumulator.add(entry1) + + const entry2 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT 2', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + accumulator.add(entry2) + + await outboxPrismaAdapter.createEntry(entry1) + await outboxPrismaAdapter.createEntry(entry2) + + const beforeFlush = await outboxPrismaAdapter.getEntries(10) + expect(beforeFlush).toMatchObject([ + { + id: entry1.id, + status: 'CREATED', + }, + { + id: entry2.id, + status: 'CREATED', + }, + ]) + + outboxPrismaAdapter.flush(accumulator) + + const afterFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFlush).toMatchObject([ + { + id: entry1.id, + status: 'SUCCESS', + }, + { + id: entry2.id, + status: 'SUCCESS', + }, + ]) + }) }) diff --git a/packages/outbox-prisma-adapter/test/schema.prisma b/packages/outbox-prisma-adapter/test/schema.prisma index a7196c29..1b3c0ee8 100644 --- a/packages/outbox-prisma-adapter/test/schema.prisma +++ b/packages/outbox-prisma-adapter/test/schema.prisma @@ -6,6 +6,7 @@ datasource db { model OutboxEntry { id String @id @default(uuid()) @db.Uuid created DateTime @default(now()) + updated DateTime @default(now()) @updatedAt type String retryCount Int @default(0) @map("retry_count") data Json