Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub simpler modify ack deadline example #315

Merged
merged 3 commits into from
Nov 5, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 56 additions & 65 deletions samples/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -340,94 +340,85 @@ function synchronousPull(projectName, subscriptionName) {
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 3;
const ackDeadlineSeconds = 30;
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
subscription: formattedSubscription,
maxMessages: maxMessages,
};
// `messages` is a dict that stores message ack ids as keys, and message
// data and the processing states (true if done, false if not) as values.
const messages = {};

// The worker function takes a message and starts a long-running process.
let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
const target = Math.floor(Math.random() * 1e5);
console.log(`Processing "${message.message.data}" for ${target / 1e3}s...`);
console.log(`Processing "${message.message.data}"...`);

setTimeout(() => {
console.log(`Finished procesing "${message.message.data}".`);
// After the message has been processed, set its processing state to true.
messages[message.ackId][1] = true;
}, target);
isProcessed = true;
}, 30000);
}

// The subscriber pulls a specific number of messages.
// The subscriber pulls a specified number of messages.
client
.pull(request)
.then(responses => {
// The first element of `responses` is a PullResponse object.
const response = responses[0];
// Obtain the first message.
const message = response.receivedMessages[0];

// Initialize `messages` with message ackId, message data and `false` as
// processing state. Then, start each message in a worker function.
response.receivedMessages.forEach(message => {
messages[message.ackId] = [message.message.data, false];
worker(message);
});

let numProcessed = 0;
// Send the message to the worker function.
worker(message);

// setInterval() gets run every 10s.
// setInterval() checks the worker process every 5 sec.
// If the pre-set ack deadline is n sec, it is best to
// set the interval to be every (n/2) sec.
const interval = setInterval(function() {
// Every 10s, we do a check on the processing states of the messages.
Object.keys(messages).forEach(ackId => {
if (messages[ackId][1]) {
// If the processing state for a particular message is true,
// We will ack the message.
const ackRequest = {
subscription: formattedSubscription,
ackIds: [ackId],
};

client.acknowledge(ackRequest).catch(err => {
// If the message has been processed..
if (isProcessed) {
const ackRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
};

//..acknowledges the message.
client
.acknowledge(ackRequest)
.then(() => {
console.log(`Acknowledged: "${message.message.data}".`);
// Exit after the message is acknowledged.
clearInterval(interval);
console.log(`Done.`);
})
.catch(err => {
console.error(err);
});

console.log(`Acknowledged: "${messages[ackId][0]}".`);

// Increment numProcessed by 1.
numProcessed += 1;

// Remove this message from `messages`.
delete messages[ackId];
} else {
// If the processing state of a particular message remains false,
// we will modify its ack deadline.
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [ackId],
ackDeadlineSeconds: ackDeadlineSeconds,
};

client.modifyAckDeadline(modifyAckRequest).catch(err => {
} else {
// If the message is not yet processed..
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
ackDeadlineSeconds: newAckDeadlineSeconds,
};

//..reset its ack deadline.
client
.modifyAckDeadline(modifyAckRequest)
.then(() => {
console.log(
`Reset ack deadline for "${
message.message.data
}" for ${newAckDeadlineSeconds}s.`
);
})
.catch(err => {
console.error(err);
});

console.log(
`Reset ack deadline for "${
messages[ackId][0]
}" for ${ackDeadlineSeconds}s.`
);
}

// If all messages have been processed, we clear out of the interval.
if (numProcessed === response.receivedMessages.length) {
clearInterval(interval);
console.log(`Done.`);
}
});
}, 10000);
}
}, 5000);
})
.catch(err => {
console.error(err);
Expand Down