Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(New) Synchronous Pull with Lease Management #272

Merged
merged 7 commits into from
Sep 28, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 60 additions & 24 deletions samples/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,26 @@ function synchronousPull(projectName, subscriptionName) {
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 3;
const ackDeadlineSeconds = 30;
const request = {
subscription: formattedSubscription,
maxMessages: maxMessages,
};
// `messages` is a dict that stores message ack ids as keys, and message
// data and the processing states (true if done, false if not) as values.
const messages = {};

// The worker function takes a message and starts a long-running process.
function worker(message) {
const target = Math.floor(Math.random() * 1e5);
console.log(`Processing "${message.message.data}" for ${target / 1e3}s...`);

setTimeout(() => {
console.log(`Finished procesing "${message.message.data}".`);
// After the message has been processed, set its processing state to true.
messages[message.ackId][1] = true;
}, target);
}

// The subscriber pulls a specific number of messages.
client
Expand All @@ -352,45 +368,65 @@ function synchronousPull(projectName, subscriptionName) {
// The first element of `responses` is a PullResponse object.
const response = responses[0];

// Process each received message in `response`.
// Initialize `messages` with message ackId, message data and `false` as
// processing state. Then, start each message in a worker function.
response.receivedMessages.forEach(message => {
// Create an arbitrarily large integer `target` to help
// simulate a long-running process.
const target = Math.floor(Math.random() * 1e9);
const ackDeadlineSeconds = 30;
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
ackDeadlineSeconds: ackDeadlineSeconds,
};

// Start a long-running process.
for (let i = 1; i <= target; i++) {
// If `i` reaches `target`, we assume the message has been
// processed, so we ack it. Else, we assume the message is
// still being processed, so we modify its ack deadline.
if (i === target) {
messages[message.ackId] = [message.message.data, false];
worker(message);
});

let numProcessed = 0;

// setInterval() gets run every 10s.
const interval = setInterval(function() {
// Every 10s, we do a check on the processing states of the messages.
Object.keys(messages).forEach(ackId => {
if (messages[ackId][1]) {
// If the processing state for a particular message is true,
// We will ack the message.
const ackRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
ackIds: [ackId],
};

client.acknowledge(ackRequest).catch(err => {
console.error(err);
});
console.log(`Acknowledged "${message.message.data}".`);
} else if (i % 1e8 === 0) {

console.log(`Acknowledged: "${messages[ackId][0]}".`);

// Increment numProcessed by 1.
numProcessed += 1;

// Remove this message from `messages`.
delete messages[ackId];
} else {
// If the processing state of a particular message remains false,
// we will modify its ack deadline.
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [ackId],
ackDeadlineSeconds: ackDeadlineSeconds,
};

client.modifyAckDeadline(modifyAckRequest).catch(err => {
console.error(err);
});

console.log(
`Reset ack deadline for "${
message.message.data
messages[ackId][0]
}" for ${ackDeadlineSeconds}s.`
);
}
}
});
console.log(`Done.`);

// If all messages have been processed, we clear out of the interval.
if (numProcessed === response.receivedMessages.length) {
clearInterval(interval);
console.log(`Done.`);
}
});
}, 10000);
})
.catch(err => {
console.error(err);
Expand Down