diff --git a/packages/pubsub/src/subscription.js b/packages/pubsub/src/subscription.js index 0193f81ac71..685b46ff1c5 100644 --- a/packages/pubsub/src/subscription.js +++ b/packages/pubsub/src/subscription.js @@ -320,13 +320,17 @@ Subscription.prototype.ack_ = function(message) { Subscription.prototype.breakLease_ = function(message) { var messageIndex = this.inventory_.lease.indexOf(message.ackId); + if (messageIndex === -1) { + return; + } + this.inventory_.lease.splice(messageIndex, 1); this.inventory_.bytes -= message.length; - if (this.connectionPool) { - if (this.connectionPool.isPaused && !this.hasMaxMessages_()) { - this.connectionPool.resume(); - } + var pool = this.connectionPool; + + if (pool && pool.isPaused && !this.hasMaxMessages_()) { + pool.resume(); } if (!this.inventory_.lease.length) { @@ -730,7 +734,7 @@ Subscription.prototype.getMetadata = function(gaxOpts, callback) { * @return {boolean} */ Subscription.prototype.hasMaxMessages_ = function() { - return this.inventory_.lease.length >= this.flowControl.maxMessages || + return this.inventory_.lease.length > this.flowControl.maxMessages || this.inventory_.bytes >= this.flowControl.maxBytes; }; @@ -888,11 +892,16 @@ Subscription.prototype.openConnection_ = function() { }); pool.on('message', function(message) { - self.emit('message', self.leaseMessage_(message)); + if (!self.hasMaxMessages_()) { + self.emit('message', self.leaseMessage_(message)); + return; + } - if (self.hasMaxMessages_() && !pool.isPaused) { + if (!pool.isPaused) { pool.pause(); } + + message.nack(); }); pool.once('connected', function() { diff --git a/packages/pubsub/system-test/pubsub.js b/packages/pubsub/system-test/pubsub.js index ff49e59388c..58e59c2ca5f 100644 --- a/packages/pubsub/system-test/pubsub.js +++ b/packages/pubsub/system-test/pubsub.js @@ -479,6 +479,30 @@ describe('pubsub', function() { setTimeout(() => subscription.close(done), 2500); } }); + + it('should respect flow control limits', function(done) { + var maxMessages = 3; + var messageCount = 0; + + var subscription = topic.subscription(SUB_NAMES[0], { + flowControl: { + maxMessages: maxMessages + } + }); + + subscription.on('error', done); + subscription.on('message', onMessage); + + function onMessage() { + if (++messageCount < (maxMessages + 1)) { + return; + } + + setImmediate(function() { + subscription.close(done); + }); + } + }); }); describe('IAM', function() { diff --git a/packages/pubsub/test/connection-pool.js b/packages/pubsub/test/connection-pool.js index 2fb5097634f..b4fe57da855 100644 --- a/packages/pubsub/test/connection-pool.js +++ b/packages/pubsub/test/connection-pool.js @@ -1170,13 +1170,22 @@ describe('ConnectionPool', function() { }); it('should reset internal used props', function() { + var fakeDate = Date.now(); + var dateNow = Date.now; + + global.Date.now = function() { + return fakeDate; + }; + pool.failedConnectionAttempts = 100; pool.noConnectionsTime = 0; pool.open(); assert.strictEqual(pool.failedConnectionAttempts, 0); - assert.strictEqual(pool.noConnectionsTime, Date.now()); + assert.strictEqual(pool.noConnectionsTime, fakeDate); + + global.Date.now = dateNow; }); it('should listen for newListener events', function() { diff --git a/packages/pubsub/test/subscription.js b/packages/pubsub/test/subscription.js index 192421fd7d4..b2d206c501d 100644 --- a/packages/pubsub/test/subscription.js +++ b/packages/pubsub/test/subscription.js @@ -407,6 +407,19 @@ describe('Subscription', function() { assert.strictEqual(subscription.inventory_.bytes, 0); }); + it('should noop for unknown messages', function() { + var message = { + ackId: 'def', + data: new Buffer('world'), + length: 5 + }; + + subscription.breakLease_(message); + + assert.strictEqual(subscription.inventory_.lease.length, 1); + assert.strictEqual(subscription.inventory_.bytes, 5); + }); + describe('with connection pool', function() { it('should resume receiving messages if paused', function(done) { subscription.connectionPool = { @@ -1174,9 +1187,9 @@ describe('Subscription', function() { }); describe('hasMaxMessages_', function() { - it('should return true if the number of leases == maxMessages', function() { + it('should return true if the number of leases > maxMessages', function() { subscription.inventory_.lease = ['a', 'b', 'c']; - subscription.flowControl.maxMessages = 3; + subscription.flowControl.maxMessages = 2; assert(subscription.hasMaxMessages_()); }); @@ -1473,7 +1486,7 @@ describe('Subscription', function() { }); it('should pause the pool if sub is at max messages', function(done) { - var message = {}; + var message = { nack: fakeUtil.noop }; var leasedMessage = {}; subscription.leaseMessage_ = function() { @@ -1491,7 +1504,7 @@ describe('Subscription', function() { }); it('should not re-pause the pool', function(done) { - var message = {}; + var message = { nack: fakeUtil.noop }; var leasedMessage = {}; subscription.leaseMessage_ = function() { @@ -1513,6 +1526,23 @@ describe('Subscription', function() { done(); }); + it('should nack messages if over limit', function(done) { + var message = { nack: done }; + var leasedMessage = {}; + + subscription.leaseMessage_ = function() { + return leasedMessage; + }; + + subscription.hasMaxMessages_ = function() { + return true; + }; + + subscription.openConnection_(); + subscription.connectionPool.isPaused = true; + subscription.connectionPool.emit('message', message); + }); + it('should flush the queue when connected', function(done) { subscription.flushQueues_ = done;