From 6c749611fa55240915f17ef0f5839fc051d38360 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 5 Nov 2018 07:27:11 -0800 Subject: [PATCH] Add Pub/Sub ack deadline example (#315) Add Pub/Sub ack deadline example with 1 worker. --- samples/subscriptions.js | 121 ++++++++++++++++++--------------------- 1 file changed, 56 insertions(+), 65 deletions(-) diff --git a/samples/subscriptions.js b/samples/subscriptions.js index 58bd5b546..b9ba89312 100755 --- a/samples/subscriptions.js +++ b/samples/subscriptions.js @@ -340,94 +340,85 @@ function synchronousPull(projectName, subscriptionName) { ); // The maximum number of messages returned for this request. // Pub/Sub may return fewer than the number specified. - const maxMessages = 3; - const ackDeadlineSeconds = 30; + const maxMessages = 1; + const newAckDeadlineSeconds = 30; const request = { subscription: formattedSubscription, maxMessages: maxMessages, }; - // `messages` is a dict that stores message ack ids as keys, and message - // data and the processing states (true if done, false if not) as values. - const messages = {}; - // The worker function takes a message and starts a long-running process. + let isProcessed = false; + + // The worker function is meant to be non-blocking. It starts a long- + // running process, such as writing the message to a table, which may + // take longer than the default 10-sec acknowledge deadline. function worker(message) { - const target = Math.floor(Math.random() * 1e5); - console.log(`Processing "${message.message.data}" for ${target / 1e3}s...`); + console.log(`Processing "${message.message.data}"...`); setTimeout(() => { console.log(`Finished procesing "${message.message.data}".`); - // After the message has been processed, set its processing state to true. - messages[message.ackId][1] = true; - }, target); + isProcessed = true; + }, 30000); } - // The subscriber pulls a specific number of messages. + // The subscriber pulls a specified number of messages. client .pull(request) .then(responses => { // The first element of `responses` is a PullResponse object. const response = responses[0]; + // Obtain the first message. + const message = response.receivedMessages[0]; - // Initialize `messages` with message ackId, message data and `false` as - // processing state. Then, start each message in a worker function. - response.receivedMessages.forEach(message => { - messages[message.ackId] = [message.message.data, false]; - worker(message); - }); - - let numProcessed = 0; + // Send the message to the worker function. + worker(message); - // setInterval() gets run every 10s. + // setInterval() checks the worker process every 5 sec. + // If the pre-set ack deadline is n sec, it is best to + // set the interval to be every (n/2) sec. const interval = setInterval(function() { - // Every 10s, we do a check on the processing states of the messages. - Object.keys(messages).forEach(ackId => { - if (messages[ackId][1]) { - // If the processing state for a particular message is true, - // We will ack the message. - const ackRequest = { - subscription: formattedSubscription, - ackIds: [ackId], - }; - - client.acknowledge(ackRequest).catch(err => { + // If the message has been processed.. + if (isProcessed) { + const ackRequest = { + subscription: formattedSubscription, + ackIds: [message.ackId], + }; + + //..acknowledges the message. + client + .acknowledge(ackRequest) + .then(() => { + console.log(`Acknowledged: "${message.message.data}".`); + // Exit after the message is acknowledged. + clearInterval(interval); + console.log(`Done.`); + }) + .catch(err => { console.error(err); }); - - console.log(`Acknowledged: "${messages[ackId][0]}".`); - - // Increment numProcessed by 1. - numProcessed += 1; - - // Remove this message from `messages`. - delete messages[ackId]; - } else { - // If the processing state of a particular message remains false, - // we will modify its ack deadline. - const modifyAckRequest = { - subscription: formattedSubscription, - ackIds: [ackId], - ackDeadlineSeconds: ackDeadlineSeconds, - }; - - client.modifyAckDeadline(modifyAckRequest).catch(err => { + } else { + // If the message is not yet processed.. + const modifyAckRequest = { + subscription: formattedSubscription, + ackIds: [message.ackId], + ackDeadlineSeconds: newAckDeadlineSeconds, + }; + + //..reset its ack deadline. + client + .modifyAckDeadline(modifyAckRequest) + .then(() => { + console.log( + `Reset ack deadline for "${ + message.message.data + }" for ${newAckDeadlineSeconds}s.` + ); + }) + .catch(err => { console.error(err); }); - - console.log( - `Reset ack deadline for "${ - messages[ackId][0] - }" for ${ackDeadlineSeconds}s.` - ); - } - - // If all messages have been processed, we clear out of the interval. - if (numProcessed === response.receivedMessages.length) { - clearInterval(interval); - console.log(`Done.`); - } - }); - }, 10000); + } + }, 5000); }) .catch(err => { console.error(err);