Skip to content

Commit

Permalink
Fix and test for #605
Browse files Browse the repository at this point in the history
  • Loading branch information
chuck-dbos committed Aug 19, 2024
1 parent 5eca729 commit 8c4ace5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
2 changes: 1 addition & 1 deletion packages/dbos-sqs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class SQSReceiver implements DBOSEventReceiver
wfid = cro.config?.getWFKey ? cro.config.getWFKey(message) : undefined;
}
if (!wfid) wfid = message.MessageId;
await executor.workflow(method.registeredFunction as unknown as WorkflowFunction<unknown[], unknown>, {workflowUUID: wfid}, [message]);
await executor.workflow(method.registeredFunction as unknown as WorkflowFunction<unknown[], unknown>, {workflowUUID: wfid}, message);

// Delete the message from the queue after starting workflow (which will take over retries)
await client.send(new DeleteMessageCommand({
Expand Down
18 changes: 15 additions & 3 deletions packages/dbos-sqs/sqs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@ import { TestingRuntime, createTestingRuntime, configureInstance, WorkflowContex

const sleepms = (ms: number) => new Promise((r) => setTimeout(r, ms));

interface ValueObj {
val: number,
}

class SQSReceiver
{
static msgRcvCount: number = 0;
static msgValueSum: number = 0;
@SQSMessageConsumer({queueURL: process.env['SQS_QUEUE_URL']})
@Workflow()
static async recvMessage(_ctx: WorkflowContext, _msg: Message) {
++SQSReceiver.msgRcvCount;
static async recvMessage(_ctx: WorkflowContext, msg: Message) {
const ms = msg.Body!;
const res = JSON.parse(ms) as ValueObj;
SQSReceiver.msgRcvCount++;
SQSReceiver.msgValueSum += res.val;
return Promise.resolve();
}
}
Expand Down Expand Up @@ -53,9 +61,12 @@ describe("sqs-tests", () => {
console.log("SQS unavailable, skipping SQS tests");
return;
}
const sv: ValueObj = {
val: 10,
}
const ser = await testRuntime.invoke(sqsCfg).sendMessage(
{
MessageBody: "{}",
MessageBody: JSON.stringify(sv),
},
);
expect(ser.MessageId).toBeDefined();
Expand All @@ -66,5 +77,6 @@ describe("sqs-tests", () => {
await sleepms(100);
}
expect(SQSReceiver.msgRcvCount).toBe(1);
expect(SQSReceiver.msgValueSum).toBe(10);
});
});

0 comments on commit 8c4ace5

Please sign in to comment.