Skip to content

Commit

Permalink
pubsub: nack overflow messages (#2650)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored Oct 5, 2017
1 parent 95eeb02 commit b3e3fc3
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 12 deletions.
23 changes: 16 additions & 7 deletions packages/pubsub/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,17 @@ Subscription.prototype.ack_ = function(message) {
Subscription.prototype.breakLease_ = function(message) {
var messageIndex = this.inventory_.lease.indexOf(message.ackId);

if (messageIndex === -1) {
return;
}

this.inventory_.lease.splice(messageIndex, 1);
this.inventory_.bytes -= message.length;

if (this.connectionPool) {
if (this.connectionPool.isPaused && !this.hasMaxMessages_()) {
this.connectionPool.resume();
}
var pool = this.connectionPool;

if (pool && pool.isPaused && !this.hasMaxMessages_()) {
pool.resume();
}

if (!this.inventory_.lease.length) {
Expand Down Expand Up @@ -730,7 +734,7 @@ Subscription.prototype.getMetadata = function(gaxOpts, callback) {
* @return {boolean}
*/
Subscription.prototype.hasMaxMessages_ = function() {
return this.inventory_.lease.length >= this.flowControl.maxMessages ||
return this.inventory_.lease.length > this.flowControl.maxMessages ||
this.inventory_.bytes >= this.flowControl.maxBytes;
};

Expand Down Expand Up @@ -888,11 +892,16 @@ Subscription.prototype.openConnection_ = function() {
});

pool.on('message', function(message) {
self.emit('message', self.leaseMessage_(message));
if (!self.hasMaxMessages_()) {
self.emit('message', self.leaseMessage_(message));
return;
}

if (self.hasMaxMessages_() && !pool.isPaused) {
if (!pool.isPaused) {
pool.pause();
}

message.nack();
});

pool.once('connected', function() {
Expand Down
24 changes: 24 additions & 0 deletions packages/pubsub/system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,30 @@ describe('pubsub', function() {
setTimeout(() => subscription.close(done), 2500);
}
});

it('should respect flow control limits', function(done) {
var maxMessages = 3;
var messageCount = 0;

var subscription = topic.subscription(SUB_NAMES[0], {
flowControl: {
maxMessages: maxMessages
}
});

subscription.on('error', done);
subscription.on('message', onMessage);

function onMessage() {
if (++messageCount < (maxMessages + 1)) {
return;
}

setImmediate(function() {
subscription.close(done);
});
}
});
});

describe('IAM', function() {
Expand Down
11 changes: 10 additions & 1 deletion packages/pubsub/test/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -1170,13 +1170,22 @@ describe('ConnectionPool', function() {
});

it('should reset internal used props', function() {
var fakeDate = Date.now();
var dateNow = Date.now;

global.Date.now = function() {
return fakeDate;
};

pool.failedConnectionAttempts = 100;
pool.noConnectionsTime = 0;

pool.open();

assert.strictEqual(pool.failedConnectionAttempts, 0);
assert.strictEqual(pool.noConnectionsTime, Date.now());
assert.strictEqual(pool.noConnectionsTime, fakeDate);

global.Date.now = dateNow;
});

it('should listen for newListener events', function() {
Expand Down
38 changes: 34 additions & 4 deletions packages/pubsub/test/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,19 @@ describe('Subscription', function() {
assert.strictEqual(subscription.inventory_.bytes, 0);
});

it('should noop for unknown messages', function() {
var message = {
ackId: 'def',
data: new Buffer('world'),
length: 5
};

subscription.breakLease_(message);

assert.strictEqual(subscription.inventory_.lease.length, 1);
assert.strictEqual(subscription.inventory_.bytes, 5);
});

describe('with connection pool', function() {
it('should resume receiving messages if paused', function(done) {
subscription.connectionPool = {
Expand Down Expand Up @@ -1174,9 +1187,9 @@ describe('Subscription', function() {
});

describe('hasMaxMessages_', function() {
it('should return true if the number of leases == maxMessages', function() {
it('should return true if the number of leases > maxMessages', function() {
subscription.inventory_.lease = ['a', 'b', 'c'];
subscription.flowControl.maxMessages = 3;
subscription.flowControl.maxMessages = 2;

assert(subscription.hasMaxMessages_());
});
Expand Down Expand Up @@ -1473,7 +1486,7 @@ describe('Subscription', function() {
});

it('should pause the pool if sub is at max messages', function(done) {
var message = {};
var message = { nack: fakeUtil.noop };
var leasedMessage = {};

subscription.leaseMessage_ = function() {
Expand All @@ -1491,7 +1504,7 @@ describe('Subscription', function() {
});

it('should not re-pause the pool', function(done) {
var message = {};
var message = { nack: fakeUtil.noop };
var leasedMessage = {};

subscription.leaseMessage_ = function() {
Expand All @@ -1513,6 +1526,23 @@ describe('Subscription', function() {
done();
});

it('should nack messages if over limit', function(done) {
var message = { nack: done };
var leasedMessage = {};

subscription.leaseMessage_ = function() {
return leasedMessage;
};

subscription.hasMaxMessages_ = function() {
return true;
};

subscription.openConnection_();
subscription.connectionPool.isPaused = true;
subscription.connectionPool.emit('message', message);
});

it('should flush the queue when connected', function(done) {
subscription.flushQueues_ = done;

Expand Down

0 comments on commit b3e3fc3

Please sign in to comment.