diff --git a/packages/dbos-sqs/index.ts b/packages/dbos-sqs/index.ts index 650d6c26d..f4a43defa 100644 --- a/packages/dbos-sqs/index.ts +++ b/packages/dbos-sqs/index.ts @@ -195,7 +195,7 @@ class SQSReceiver implements DBOSEventReceiver wfid = cro.config?.getWFKey ? cro.config.getWFKey(message) : undefined; } if (!wfid) wfid = message.MessageId; - await executor.workflow(method.registeredFunction as unknown as WorkflowFunction, {workflowUUID: wfid}, [message]); + await executor.workflow(method.registeredFunction as unknown as WorkflowFunction, {workflowUUID: wfid}, message); // Delete the message from the queue after starting workflow (which will take over retries) await client.send(new DeleteMessageCommand({ diff --git a/packages/dbos-sqs/sqs.test.ts b/packages/dbos-sqs/sqs.test.ts index 50598d2c3..7300b9840 100644 --- a/packages/dbos-sqs/sqs.test.ts +++ b/packages/dbos-sqs/sqs.test.ts @@ -5,13 +5,21 @@ import { TestingRuntime, createTestingRuntime, configureInstance, WorkflowContex const sleepms = (ms: number) => new Promise((r) => setTimeout(r, ms)); +interface ValueObj { + val: number, +} + class SQSReceiver { static msgRcvCount: number = 0; + static msgValueSum: number = 0; @SQSMessageConsumer({queueURL: process.env['SQS_QUEUE_URL']}) @Workflow() - static async recvMessage(_ctx: WorkflowContext, _msg: Message) { - ++SQSReceiver.msgRcvCount; + static async recvMessage(_ctx: WorkflowContext, msg: Message) { + const ms = msg.Body!; + const res = JSON.parse(ms) as ValueObj; + SQSReceiver.msgRcvCount++; + SQSReceiver.msgValueSum += res.val; return Promise.resolve(); } } @@ -53,9 +61,12 @@ describe("sqs-tests", () => { console.log("SQS unavailable, skipping SQS tests"); return; } + const sv: ValueObj = { + val: 10, + } const ser = await testRuntime.invoke(sqsCfg).sendMessage( { - MessageBody: "{}", + MessageBody: JSON.stringify(sv), }, ); expect(ser.MessageId).toBeDefined(); @@ -66,5 +77,6 @@ describe("sqs-tests", () => { await sleepms(100); } expect(SQSReceiver.msgRcvCount).toBe(1); + expect(SQSReceiver.msgValueSum).toBe(10); }); });