Skip to content

Commit

Permalink
Add Pub/Sub ack deadline example (#315)
Browse files Browse the repository at this point in the history
Add Pub/Sub ack deadline example with 1 worker.
  • Loading branch information
anguillanneuf authored and fhinkel committed Nov 5, 2018
1 parent f8bd0ae commit 6c74961
Showing 1 changed file with 56 additions and 65 deletions.
121 changes: 56 additions & 65 deletions samples/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -340,94 +340,85 @@ function synchronousPull(projectName, subscriptionName) {
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 3;
const ackDeadlineSeconds = 30;
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
subscription: formattedSubscription,
maxMessages: maxMessages,
};
// `messages` is a dict that stores message ack ids as keys, and message
// data and the processing states (true if done, false if not) as values.
const messages = {};

// The worker function takes a message and starts a long-running process.
let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
const target = Math.floor(Math.random() * 1e5);
console.log(`Processing "${message.message.data}" for ${target / 1e3}s...`);
console.log(`Processing "${message.message.data}"...`);

setTimeout(() => {
console.log(`Finished procesing "${message.message.data}".`);
// After the message has been processed, set its processing state to true.
messages[message.ackId][1] = true;
}, target);
isProcessed = true;
}, 30000);
}

// The subscriber pulls a specific number of messages.
// The subscriber pulls a specified number of messages.
client
.pull(request)
.then(responses => {
// The first element of `responses` is a PullResponse object.
const response = responses[0];
// Obtain the first message.
const message = response.receivedMessages[0];

// Initialize `messages` with message ackId, message data and `false` as
// processing state. Then, start each message in a worker function.
response.receivedMessages.forEach(message => {
messages[message.ackId] = [message.message.data, false];
worker(message);
});

let numProcessed = 0;
// Send the message to the worker function.
worker(message);

// setInterval() gets run every 10s.
// setInterval() checks the worker process every 5 sec.
// If the pre-set ack deadline is n sec, it is best to
// set the interval to be every (n/2) sec.
const interval = setInterval(function() {
// Every 10s, we do a check on the processing states of the messages.
Object.keys(messages).forEach(ackId => {
if (messages[ackId][1]) {
// If the processing state for a particular message is true,
// We will ack the message.
const ackRequest = {
subscription: formattedSubscription,
ackIds: [ackId],
};

client.acknowledge(ackRequest).catch(err => {
// If the message has been processed..
if (isProcessed) {
const ackRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
};

//..acknowledges the message.
client
.acknowledge(ackRequest)
.then(() => {
console.log(`Acknowledged: "${message.message.data}".`);
// Exit after the message is acknowledged.
clearInterval(interval);
console.log(`Done.`);
})
.catch(err => {
console.error(err);
});

console.log(`Acknowledged: "${messages[ackId][0]}".`);

// Increment numProcessed by 1.
numProcessed += 1;

// Remove this message from `messages`.
delete messages[ackId];
} else {
// If the processing state of a particular message remains false,
// we will modify its ack deadline.
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [ackId],
ackDeadlineSeconds: ackDeadlineSeconds,
};

client.modifyAckDeadline(modifyAckRequest).catch(err => {
} else {
// If the message is not yet processed..
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
ackDeadlineSeconds: newAckDeadlineSeconds,
};

//..reset its ack deadline.
client
.modifyAckDeadline(modifyAckRequest)
.then(() => {
console.log(
`Reset ack deadline for "${
message.message.data
}" for ${newAckDeadlineSeconds}s.`
);
})
.catch(err => {
console.error(err);
});

console.log(
`Reset ack deadline for "${
messages[ackId][0]
}" for ${ackDeadlineSeconds}s.`
);
}

// If all messages have been processed, we clear out of the interval.
if (numProcessed === response.receivedMessages.length) {
clearInterval(interval);
console.log(`Done.`);
}
});
}, 10000);
}
}, 5000);
})
.catch(err => {
console.error(err);
Expand Down

0 comments on commit 6c74961

Please sign in to comment.