From 9e415d0e8cb621715d87add9aad77b89667ff60c Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Fri, 26 May 2023 10:34:34 +0200 Subject: [PATCH] Allow to emit tasks to different queues --- CHANGELOG.md | 6 ++++++ package.json | 2 +- src/definitions.ts | 5 ++++- src/messages.ts | 2 +- tests/bus.test.ts | 31 ++++++++++++++++++++++++++++++- 5 files changed, 42 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c66b75..eec8ca6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # pg-tbus +## 0.1.7 + +### Patch Changes + +- Allow to emit tasks to different queues + ## 0.1.6 ### Patch Changes diff --git a/package.json b/package.json index 4732a53..25b3420 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "pg-tbus", "author": "IlijaNL", - "version": "0.1.6", + "version": "0.1.7", "types": "dist/index.d.ts", "module": "dist/index.mjs", "main": "dist/index.js", diff --git a/src/definitions.ts b/src/definitions.ts index 7e60949..4b40b5f 100644 --- a/src/definitions.ts +++ b/src/definitions.ts @@ -121,6 +121,7 @@ export interface TaskDefinition extends TaskDeclaration { export interface Task { task_name: string; + queue?: string; data: Data; config: Partial; } @@ -179,9 +180,11 @@ export const declareTask = (props: DeclareTaskProps): Task `invalid input for task ${props.task_name}: ${validateFn.errors?.map((e) => e.message).join(' \n')}` ); } + return { - data: input, + queue: props.queue, task_name: props.task_name, + data: input, config: { ...props.config, ...config }, }; }; diff --git a/src/messages.ts b/src/messages.ts index c28cab8..3ac1d5a 100644 --- a/src/messages.ts +++ b/src/messages.ts @@ -127,7 +127,7 @@ export const createTaskFactory = tn: task.task_name, trace: trigger, }, - queue: props.queue, + queue: task.queue ?? props.queue, ...config, }; }; diff --git a/tests/bus.test.ts b/tests/bus.test.ts index d115cab..ff56e44 100644 --- a/tests/bus.test.ts +++ b/tests/bus.test.ts @@ -2,7 +2,7 @@ import { Type } from '@sinclair/typebox'; import EventEmitter, { once } from 'events'; import { Pool } from 'pg'; import tap from 'tap'; -import { createEventHandler, createTBus, defineEvent, defineTask } from '../src'; +import { createEventHandler, createTBus, declareTask, defineEvent, defineTask } from '../src'; import { resolveWithinSeconds } from '../src/utils'; import { cleanupSchema, createRandomSchema } from './helpers'; import stringify from 'safe-stable-stringify'; @@ -62,6 +62,35 @@ tap.test('bus', async (tap) => { await waitProm; }); + tap.test('emit task to different queue', async ({ teardown, equal }) => { + const ee = new EventEmitter(); + const bus = createTBus('emit_queue', { db: sqlPool, schema: schema }); + const task_name = 'emit_task'; + + const taskDef = defineTask({ + task_name: task_name, + queue: 'abc', + schema: Type.Object({ works: Type.String() }), + handler: async (props) => { + equal(props.input.works, 'abcd'); + equal(props.trigger.type, 'direct'); + ee.emit('handled'); + }, + }); + + const waitProm = once(ee, 'handled'); + + const bus2 = createTBus('abc', { db: sqlPool, schema: schema }); + + bus2.registerTask(taskDef); + await bus2.start(); + + teardown(() => bus2.stop()); + + await bus.send(taskDef.from({ works: 'abcd' })); + await waitProm; + }); + tap.test('emit event', async (t) => { const ee = new EventEmitter(); const bus = createTBus('emit_event_queue', { db: sqlPool, schema: schema });