diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index 2b4ea8b9..05715da1 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -30,5 +30,9 @@ jobs: run: | npm install --ignore-scripts + - name: Build + run: | + npm run build + - name: Run lint run: npm run lint diff --git a/.gitignore b/.gitignore index 3e5e6564..53a8393c 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ dist .pnp.* /.idea /package-lock.json + +# prisma +db-client diff --git a/biome.json b/biome.json index fa4d8f68..55921a49 100644 --- a/biome.json +++ b/biome.json @@ -1,12 +1,15 @@ { - "$schema": "./node_modules/@biomejs/biome/configuration_schema.json", - "extends": ["./node_modules/@kibertoad/biome-config/configs/biome-package.json"], - "linter": { - "rules": { - "performance": { - "noBarrelFile": "off", - "noReExportAll": "off" - } - } - } + "$schema": "./node_modules/@biomejs/biome/configuration_schema.json", + "extends": ["./node_modules/@kibertoad/biome-config/configs/biome-package.json"], + "linter": { + "rules": { + "performance": { + "noBarrelFile": "off", + "noReExportAll": "off" + } + } + }, + "files": { + "ignore": ["db-client"] + } } diff --git a/packages/outbox-prisma-adapter/README.md b/packages/outbox-prisma-adapter/README.md new file mode 100644 index 00000000..e480b2a2 --- /dev/null +++ b/packages/outbox-prisma-adapter/README.md @@ -0,0 +1,18 @@ +# outbox-prisma-adapter + +This package provides a Prisma adapter for the Outbox pattern. + +### Development + +#### Tests + +To run the tests, you need to have a PostgreSQL database running. You can use the following command to start a PostgreSQL database using Docker: + +```sh +docker-compose up -d +``` + +Then update Prisma client: +```sh +npx prisma generate --schema=./test/schema.prisma +``` diff --git a/packages/outbox-prisma-adapter/docker-compose.yml b/packages/outbox-prisma-adapter/docker-compose.yml new file mode 100644 index 00000000..068b7a74 --- /dev/null +++ b/packages/outbox-prisma-adapter/docker-compose.yml @@ -0,0 +1,10 @@ +services: + + postgres: + image: postgres:16.2 + environment: + POSTGRES_USER: prisma + POSTGRES_PASSWORD: prisma + POSTGRES_DB: prisma + ports: + - 5432:5432 diff --git a/packages/outbox-prisma-adapter/index.ts b/packages/outbox-prisma-adapter/index.ts new file mode 100644 index 00000000..aeacc8f5 --- /dev/null +++ b/packages/outbox-prisma-adapter/index.ts @@ -0,0 +1 @@ +export * from './lib/outbox-prisma-adapter' diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts new file mode 100644 index 00000000..d8d7d516 --- /dev/null +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -0,0 +1,167 @@ +import type { + OutboxAccumulator, + OutboxEntry, + OutboxStorage, +} from '@message-queue-toolkit/outbox-core' +import { type CommonEventDefinition, getMessageType } from '@message-queue-toolkit/schemas' +import type { PrismaClient } from '@prisma/client' + +type ModelDelegate = { + // biome-ignore lint/suspicious/noExplicitAny: + create: (args: any) => Promise + // biome-ignore lint/suspicious/noExplicitAny: + findMany: (args: any) => Promise + // biome-ignore lint/suspicious/noExplicitAny: + createMany: (args: any) => Promise + // biome-ignore lint/suspicious/noExplicitAny: + updateMany: (args: any) => Promise +} + +export class OutboxPrismaAdapter< + SupportedEvents extends CommonEventDefinition[], + ModelName extends keyof PrismaClient & string, +> implements OutboxStorage +{ + constructor( + private readonly prisma: PrismaClient, + private readonly modelName: ModelName, + ) {} + + createEntry( + outboxEntry: OutboxEntry, + ): Promise> { + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate + + // @ts-ignore + const messageType = getMessageType(outboxEntry.event) + return prismaModel.create({ + data: { + id: outboxEntry.id, + type: messageType, + created: outboxEntry.created, + updated: outboxEntry.updated, + data: outboxEntry.data, + status: outboxEntry.status, + retryCount: outboxEntry.retryCount, + }, + }) + } + + async flush(outboxAccumulator: OutboxAccumulator): Promise { + const entries = await outboxAccumulator.getEntries() + const failedEntries = await outboxAccumulator.getFailedEntries() + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate + + const existingEntries = await prismaModel.findMany({ + where: { + id: { + in: [...entries.map((entry) => entry.id), ...failedEntries.map((entry) => entry.id)], + }, + }, + }) + + await this.prisma.$transaction(async (prisma) => { + const prismaModel = prisma[this.modelName] as ModelDelegate + await this.handleSuccesses(prismaModel, entries, existingEntries) + await this.handleFailures(prismaModel, failedEntries, existingEntries) + }) + } + + private async handleSuccesses( + prismaModel: ModelDelegate, + entries: OutboxEntry[], + existingEntries: OutboxEntry[], + ) { + const toCreate = entries.filter( + (entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id), + ) + const toUpdate = entries.filter((entry) => + existingEntries.some((existingEntry) => existingEntry.id === entry.id), + ) + + if (toCreate.length > 0) { + await prismaModel.createMany({ + data: toCreate.map((entry) => ({ + id: entry.id, + // @ts-ignore + type: getMessageType(entry.event), + created: entry.created, + updated: new Date(), + data: entry.data, + status: 'SUCCESS', + })), + }) + } + + if (toUpdate.length > 0) { + await prismaModel.updateMany({ + where: { + id: { + in: toUpdate.map((entry) => entry.id), + }, + }, + data: { + status: 'SUCCESS', + updated: new Date(), + }, + }) + } + } + + private async handleFailures( + prismaModel: ModelDelegate, + entries: OutboxEntry[], + existingEntries: OutboxEntry[], + ) { + const toCreate = entries.filter( + (entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id), + ) + const toUpdate = entries.filter((entry) => + existingEntries.some((existingEntry) => existingEntry.id === entry.id), + ) + + if (toCreate.length > 0) { + await prismaModel.createMany({ + data: toCreate.map((entry) => ({ + id: entry.id, + // @ts-ignore + type: getMessageType(entry.event), + created: entry.created, + updated: new Date(), + data: entry.data, + status: 'FAILED', + retryCount: 1, + })), + }) + } + + if (toUpdate.length > 0) { + await prismaModel.updateMany({ + where: { + id: { + in: toUpdate.map((entry) => entry.id), + }, + }, + data: { + status: 'FAILED', + updated: new Date(), + retryCount: { + increment: 1, + }, + }, + }) + } + } + + getEntries(maxRetryCount: number): Promise[]> { + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate + + return prismaModel.findMany({ + where: { + retryCount: { + lte: maxRetryCount, + }, + }, + }) + } +} diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json new file mode 100644 index 00000000..f4878ee9 --- /dev/null +++ b/packages/outbox-prisma-adapter/package.json @@ -0,0 +1,66 @@ +{ + "name": "@message-queue-toolkit/outbox-prisma-adapter", + "version": "0.1.0", + "private": false, + "license": "MIT", + "description": "OutboxStorage implementation for @message-queue-toolkit/outbox-core package.", + "maintainers": [ + { + "name": "Igor Savin", + "email": "kibertoad@gmail.com" + } + ], + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "del-cli dist && npm run db:update-client && tsc", + "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", + "test": "vitest", + "test:coverage": "npm run docker:start:dev && npm run db:wait && npm test -- --coverage && npm run docker:stop:dev", + "test:ci": "npm run test:coverage", + "lint": "biome check . && tsc --project tsconfig.json --noEmit", + "lint:fix": "biome check --write .", + "docker:start:dev": "docker compose up -d", + "docker:stop:dev": "docker compose down", + "db:wait": "while ! echo \"SELECT 1;\" | prisma db execute --stdin; do sleep 1; done", + "db:update-client": "prisma generate", + "prepublishOnly": "npm run build:release" + }, + "peerDependencies": { + "@message-queue-toolkit/core": ">=14.0.0", + "@message-queue-toolkit/outbox-core": ">=0.1.0", + "@message-queue-toolkit/schemas": ">=4.0.0", + "prisma": "^5.22.0", + "@prisma/client": "^5.19.1" + }, + "devDependencies": { + "@biomejs/biome": "1.8.3", + "@kibertoad/biome-config": "^1.2.1", + "@prisma/client": "^5.22.0", + "@types/node": "^22.0.0", + "@vitest/coverage-v8": "^2.0.4", + "del-cli": "^5.1.0", + "prisma": "^5.22.0", + "typescript": "^5.5.3", + "uuidv7": "^1.0.2", + "vitest": "^2.0.4", + "zod": "^3.23.8" + }, + "homepage": "https://github.com/kibertoad/message-queue-toolkit", + "repository": { + "type": "git", + "url": "git://github.com/kibertoad/message-queue-toolkit.git" + }, + "keywords": [ + "message", + "queue", + "queues", + "abstract", + "common", + "utils", + "notification", + "outbox", + "pattern" + ], + "files": ["README.md", "LICENSE", "dist/*"] +} diff --git a/packages/outbox-prisma-adapter/prisma/schema.prisma b/packages/outbox-prisma-adapter/prisma/schema.prisma new file mode 100644 index 00000000..1b3c0ee8 --- /dev/null +++ b/packages/outbox-prisma-adapter/prisma/schema.prisma @@ -0,0 +1,20 @@ +datasource db { + provider = "postgresql" + url = "postgresql://prisma:prisma@localhost:5432/prisma?schema=prisma" +} + +model OutboxEntry { + id String @id @default(uuid()) @db.Uuid + created DateTime @default(now()) + updated DateTime @default(now()) @updatedAt + type String + retryCount Int @default(0) @map("retry_count") + data Json + status String + + @@map("outbox_entry") +} + +generator client { + provider = "prisma-client-js" +} diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts new file mode 100644 index 00000000..0e3499bb --- /dev/null +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -0,0 +1,259 @@ +import { InMemoryOutboxAccumulator, type OutboxEntry } from '@message-queue-toolkit/outbox-core' +import { + type CommonEventDefinition, + enrichMessageSchemaWithBase, +} from '@message-queue-toolkit/schemas' +import { PrismaClient } from '@prisma/client' +import { uuidv7 } from 'uuidv7' +import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { z } from 'zod' +import { OutboxPrismaAdapter } from '../lib/outbox-prisma-adapter' + +const events = { + created: { + ...enrichMessageSchemaWithBase( + 'entity.created', + z.object({ + message: z.string(), + }), + ), + }, +} satisfies Record + +type SupportedEvents = (typeof events)[keyof typeof events][] + +describe('outbox-prisma-adapter', () => { + let prisma: PrismaClient + let outboxPrismaAdapter: OutboxPrismaAdapter + + const ENTRY_1 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + + const ENTRY_2 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT 2', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + + beforeAll(async () => { + prisma = new PrismaClient({ + log: ['query'], + }) + + outboxPrismaAdapter = new OutboxPrismaAdapter( + prisma, + 'outboxEntry', + ) + + await prisma.$queryRaw`create schema if not exists prisma;` + await prisma.$queryRaw` + CREATE TABLE prisma.outbox_entry ( + id UUID PRIMARY KEY, + type TEXT NOT NULL, + created TIMESTAMP NOT NULL, + updated TIMESTAMP, + retry_count INT NOT NULL DEFAULT 0, + data JSONB NOT NULL, + status TEXT NOT NULL + ) + ` + }) + + beforeEach(async () => { + await prisma.$queryRaw`DELETE FROM prisma.outbox_entry;` + }) + + afterAll(async () => { + await prisma.$queryRaw`DROP TABLE prisma.outbox_entry;` + await prisma.$queryRaw`DROP SCHEMA prisma;` + await prisma.$disconnect() + }) + + it('creates entry in DB via outbox storage implementation', async () => { + await outboxPrismaAdapter.createEntry({ + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry) + + const entries = await outboxPrismaAdapter.getEntries(10) + + expect(entries).toEqual([ + { + id: expect.any(String), + type: 'entity.created', + created: expect.any(Date), + updated: expect.any(Date), + retryCount: 0, + data: { + id: expect.any(String), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: expect.any(String), + }, + status: 'CREATED', + }, + ]) + }) + + it('should insert successful entries from accumulator', async () => { + const accumulator = new InMemoryOutboxAccumulator() + accumulator.add(ENTRY_1) + accumulator.add(ENTRY_2) + + await outboxPrismaAdapter.flush(accumulator) + + const entriesAfterFlush = await outboxPrismaAdapter.getEntries(10) + + expect(entriesAfterFlush).toMatchObject([ + { + id: ENTRY_1.id, + status: 'SUCCESS', + }, + { + id: ENTRY_2.id, + status: 'SUCCESS', + }, + ]) + }) + + it("should update existing entries' status to 'SUCCESS'", async () => { + const accumulator = new InMemoryOutboxAccumulator() + accumulator.add(ENTRY_1) + accumulator.add(ENTRY_2) + + await outboxPrismaAdapter.createEntry(ENTRY_1) + await outboxPrismaAdapter.createEntry(ENTRY_2) + + const beforeFlush = await outboxPrismaAdapter.getEntries(10) + expect(beforeFlush).toMatchObject([ + { + id: ENTRY_1.id, + status: 'CREATED', + }, + { + id: ENTRY_2.id, + status: 'CREATED', + }, + ]) + + await outboxPrismaAdapter.flush(accumulator) + + const afterFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFlush).toMatchObject([ + { + id: ENTRY_1.id, + status: 'SUCCESS', + }, + { + id: ENTRY_2.id, + status: 'SUCCESS', + }, + ]) + }) + + it('should handle mix of entries, non existing and existing, and change their status to SUCCESS', async () => { + const accumulator = new InMemoryOutboxAccumulator() + accumulator.add(ENTRY_1) + accumulator.add(ENTRY_2) + + //Only one exists in DB. + await outboxPrismaAdapter.createEntry(ENTRY_2) + + await outboxPrismaAdapter.flush(accumulator) + + const afterFirstFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFirstFlush).toMatchObject([ + { + id: ENTRY_1.id, + status: 'SUCCESS', + }, + { + id: ENTRY_2.id, + status: 'SUCCESS', + }, + ]) + }) + + it("should change failed entries' status to 'FAILED' and increment retry count", async () => { + const accumulator = new InMemoryOutboxAccumulator() + accumulator.addFailure(ENTRY_1) + + await outboxPrismaAdapter.flush(accumulator) + + const afterFirstFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFirstFlush).toMatchObject([ + { + id: ENTRY_1.id, + status: 'FAILED', + retryCount: 1, + }, + ]) + }) + + it('should change failed EXISTING entries status to FAILED and increment retry count', async () => { + const accumulator = new InMemoryOutboxAccumulator() + const failedEntry = { ...ENTRY_1, retryCount: 3, status: 'FAILED' } satisfies OutboxEntry< + SupportedEvents[number] + > + accumulator.addFailure(failedEntry) + + await outboxPrismaAdapter.createEntry(failedEntry) + await outboxPrismaAdapter.flush(accumulator) + + const afterFirstFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFirstFlush).toMatchObject([ + { + id: failedEntry.id, + status: 'FAILED', + retryCount: 4, + }, + ]) + }) + + it('should not fetch entries that exceed retry limit', async () => { + const failedEntry = { ...ENTRY_1, retryCount: 6, status: 'FAILED' } satisfies OutboxEntry< + SupportedEvents[number] + > + await outboxPrismaAdapter.createEntry(failedEntry) + + const entries = await outboxPrismaAdapter.getEntries(5) + + expect(entries).toEqual([]) + }) +}) diff --git a/packages/outbox-prisma-adapter/tsconfig.json b/packages/outbox-prisma-adapter/tsconfig.json new file mode 100644 index 00000000..9cd7c80a --- /dev/null +++ b/packages/outbox-prisma-adapter/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + "outDir": "dist", + "module": "commonjs", + "target": "ES2022", + "lib": ["ES2022", "dom"], + "sourceMap": true, + "declaration": true, + "declarationMap": false, + "types": ["node", "vitest/globals"], + "strict": true, + "moduleResolution": "node", + "noUnusedLocals": false, + "noUnusedParameters": false, + "noFallthroughCasesInSwitch": true, + "strictNullChecks": true, + "importHelpers": true, + "baseUrl": ".", + "skipLibCheck": true, + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true + }, + "include": ["lib/**/*.ts", "test/**/*.ts", "index.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/outbox-prisma-adapter/tsconfig.release.json b/packages/outbox-prisma-adapter/tsconfig.release.json new file mode 100644 index 00000000..93ab99f8 --- /dev/null +++ b/packages/outbox-prisma-adapter/tsconfig.release.json @@ -0,0 +1,5 @@ +{ + "extends": "./tsconfig.json", + "include": ["lib/**/*.ts", "index.ts"], + "exclude": ["node_modules", "dist", "lib/**/*.spec.ts"] +} diff --git a/packages/outbox-prisma-adapter/vitest.config.mts b/packages/outbox-prisma-adapter/vitest.config.mts new file mode 100644 index 00000000..2bcce478 --- /dev/null +++ b/packages/outbox-prisma-adapter/vitest.config.mts @@ -0,0 +1,23 @@ +import { defineConfig } from 'vitest/config' + +export default defineConfig({ + test: { + globals: true, + watch: false, + environment: 'node', + reporters: ['default'], + coverage: { + provider: 'v8', + include: ['lib/**/*.ts'], + exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*'], + reporter: ['text'], + all: true, + thresholds: { + lines: 100, + functions: 100, + branches: 91.66, + statements: 100, + }, + }, + }, +})