diff --git a/packages/core/lib/queues/HandlerSpy.ts b/packages/core/lib/queues/HandlerSpy.ts new file mode 100644 index 00000000..ecd951c0 --- /dev/null +++ b/packages/core/lib/queues/HandlerSpy.ts @@ -0,0 +1,117 @@ +import { Fifo } from 'toad-cache' + +import type { MessageProcessingResult } from '../types/MessageQueueTypes' + +export type HandlerSpyParams = { + bufferSize?: number + messageIdField?: string +} + +export type SpyResult = { + message: MessagePayloadSchemas + processingResult: MessageProcessingResult +} + +type SpyResultCacheEntry = { + value: SpyResult +} + +type SpyPromiseMetadata = { + fields: Partial + processingResult?: MessageProcessingResult + promise: Promise> + resolve: ( + value: SpyResult | PromiseLike>, + ) => void +} + +export class HandlerSpy { + private readonly messageBuffer: Fifo> + private readonly messageIdField: string + private readonly spyPromises: SpyPromiseMetadata[] + + constructor(params: HandlerSpyParams = {}) { + this.messageBuffer = new Fifo(params.bufferSize ?? 100) + this.messageIdField = params.messageIdField ?? 'id' + this.spyPromises = [] + } + + private messageMatchesFilter( + spyResult: SpyResult, + fields: Partial, + processingResult?: MessageProcessingResult, + ) { + return ( + Object.entries(fields).every(([key, value]) => { + // @ts-ignore + return spyResult.message[key] === value + }) && + (!processingResult || spyResult.processingResult === processingResult) + ) + } + + waitForEvent( + fields: Partial, + processingResult?: MessageProcessingResult, + ): Promise> { + const processedMessageEntry = Object.values(this.messageBuffer.items).find( + // @ts-ignore + (spyResult: SpyResultCacheEntry) => { + return this.messageMatchesFilter(spyResult.value, fields, processingResult) + }, + ) + if (processedMessageEntry) { + return Promise.resolve(processedMessageEntry.value) + } + + let resolve: ( + value: SpyResult | PromiseLike>, + ) => void + const spyPromise = new Promise>((_resolve) => { + resolve = _resolve + }) + + this.spyPromises.push({ + promise: spyPromise, + processingResult, + fields, + // @ts-ignore + resolve, + }) + + return spyPromise + } + + clear() { + this.messageBuffer.clear() + } + + addProcessedMessage(processingResult: SpyResult) { + // @ts-ignore + const cacheId = `${processingResult.message[this.messageIdField]}-${Date.now()}-${( + Math.random() + 1 + ) + .toString(36) + .substring(7)}` + this.messageBuffer.set(cacheId, processingResult) + + const foundPromise = this.spyPromises.find((spyPromise) => { + return this.messageMatchesFilter( + processingResult, + spyPromise.fields, + spyPromise.processingResult, + ) + }) + + if (foundPromise) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + foundPromise.resolve(processingResult) + + const index = this.spyPromises.indexOf(foundPromise) + if (index > -1) { + // only splice array when item is found + this.spyPromises.splice(index, 1) // 2nd parameter means remove one item only + } + } + } +} diff --git a/packages/core/lib/types/MessageQueueTypes.ts b/packages/core/lib/types/MessageQueueTypes.ts index 72876baa..4750f580 100644 --- a/packages/core/lib/types/MessageQueueTypes.ts +++ b/packages/core/lib/types/MessageQueueTypes.ts @@ -5,6 +5,8 @@ export interface QueueConsumer { close(): Promise } +export type MessageProcessingResult = 'retryLater' | 'success' + export interface SyncPublisher { publish(message: MessagePayloadType): void } diff --git a/packages/core/package.json b/packages/core/package.json index b8fd9c80..fd0cac3b 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -17,25 +17,31 @@ "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", "lint": "eslint . --ext .ts", "lint:fix": "eslint . --ext .ts --fix", - "test:coverage": "", + "test": "vitest", + "test:coverage": "npm test -- --coverage", "test:ci": "npm run docker:start:dev && npm run test:coverage && npm run docker:stop:dev", "docker:start:dev": "", "docker:stop:dev": "", "format": "prettier --write .", "prepublishOnly": "npm run build:release" }, - "dependencies": {}, + "dependencies": { + "toad-cache": "^3.4.1" + }, "devDependencies": { "@types/node": "^20.10.3", "@typescript-eslint/eslint-plugin": "^6.13.2", "@typescript-eslint/parser": "^6.13.2", + "@vitest/coverage-v8": "^0.34.6", "del-cli": "^5.1.0", "eslint": "^8.55.0", "eslint-config-prettier": "^9.1.0", "eslint-plugin-import": "^2.29.0", "eslint-plugin-prettier": "^5.0.1", "prettier": "^3.1.0", - "typescript": "^5.3.2" + "typescript": "^5.3.2", + "vitest": "^0.34.6", + "vite": "4.5.1" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", "repository": { diff --git a/packages/core/test/queues/HandlerSpy.spec.ts b/packages/core/test/queues/HandlerSpy.spec.ts new file mode 100644 index 00000000..816ab286 --- /dev/null +++ b/packages/core/test/queues/HandlerSpy.spec.ts @@ -0,0 +1,109 @@ +import { HandlerSpy } from '../../lib/queues/HandlerSpy' + +type Message = { + id: string + status: string +} + +const TEST_MESSAGE: Message = { + id: 'abc', + status: 'done', +} + +const TEST_MESSAGE_2: Message = { + id: 'abc', + status: 'inprogress', +} + +describe('HandlerSpy', () => { + describe('clear', () => { + it('Remove stored events', () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage({ + processingResult: 'success', + message: TEST_MESSAGE_2, + }) + + spy.clear() + + // @ts-ignore + expect(spy.spyPromises).toHaveLength(0) + }) + }) + + describe('waitForEvent', () => { + it('Finds previously consumed event', async () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage({ + processingResult: 'success', + message: TEST_MESSAGE_2, + }) + + spy.addProcessedMessage({ + processingResult: 'success', + message: TEST_MESSAGE, + }) + + const message = await spy.waitForEvent({ + status: 'done', + }) + + expect(message.message).toEqual(TEST_MESSAGE) + }) + + it('Finds multiple previously consumed events', async () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage({ + processingResult: 'retryLater', + message: TEST_MESSAGE, + }) + + spy.addProcessedMessage({ + processingResult: 'success', + message: TEST_MESSAGE, + }) + + const message = await spy.waitForEvent( + { + status: 'done', + }, + 'success', + ) + + const message2 = await spy.waitForEvent( + { + status: 'done', + }, + 'retryLater', + ) + + expect(message.message).toEqual(TEST_MESSAGE) + expect(message2.message).toEqual(TEST_MESSAGE) + }) + + it('Waits for an event to be consumed', async () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage({ + processingResult: 'success', + message: TEST_MESSAGE_2, + }) + + const spyPromise = spy.waitForEvent({ + status: 'done', + }) + + spy.addProcessedMessage({ + processingResult: 'success', + message: TEST_MESSAGE, + }) + + const message = await spyPromise + + expect(message.message).toEqual(TEST_MESSAGE) + }) + }) +}) diff --git a/packages/core/tsconfig.json b/packages/core/tsconfig.json index 1925cf85..9cd7c80a 100644 --- a/packages/core/tsconfig.json +++ b/packages/core/tsconfig.json @@ -7,7 +7,7 @@ "sourceMap": true, "declaration": true, "declarationMap": false, - "types": ["node"], + "types": ["node", "vitest/globals"], "strict": true, "moduleResolution": "node", "noUnusedLocals": false, @@ -16,6 +16,7 @@ "strictNullChecks": true, "importHelpers": true, "baseUrl": ".", + "skipLibCheck": true, "allowSyntheticDefaultImports": true, "esModuleInterop": true, "forceConsistentCasingInFileNames": true, diff --git a/packages/core/vitest.config.ts b/packages/core/vitest.config.ts new file mode 100644 index 00000000..1dcb6fe4 --- /dev/null +++ b/packages/core/vitest.config.ts @@ -0,0 +1,21 @@ +import { defineConfig } from 'vitest/config' + +export default defineConfig({ + test: { + globals: true, + threads: false, + watch: false, + environment: 'node', + reporters: ['default'], + coverage: { + include: ['lib/**/*.ts'], + exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*', 'lib/types/**/*.*'], + reporter: ['text'], + all: true, + lines: 20, + functions: 42, + branches: 65, + statements: 20, + }, + }, +})