Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

capture write times to adjust lease timeout #99

Merged
merged 3 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/histogram.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

'use strict';

var MIN_VALUE = 10000;
var MAX_VALUE = 600000;
var extend = require('extend');

/*!
* The Histogram class is used to capture the lifespan of messages within the
Expand All @@ -27,7 +26,15 @@ var MAX_VALUE = 600000;
* @private
* @class
*/
function Histogram() {
function Histogram(options) {
this.options = extend(
{
min: 10000,
max: 600000,
},
options
);

this.data = new Map();
this.length = 0;
}
Expand All @@ -39,8 +46,8 @@ function Histogram() {
* @param {numnber} value - The value in milliseconds.
*/
Histogram.prototype.add = function(value) {
value = Math.max(value, MIN_VALUE);
value = Math.min(value, MAX_VALUE);
value = Math.max(value, this.options.min);
value = Math.min(value, this.options.max);
value = Math.ceil(value / 1000) * 1000;

if (!this.data.has(value)) {
Expand Down Expand Up @@ -75,7 +82,7 @@ Histogram.prototype.percentile = function(percent) {
}
}

return MIN_VALUE;
return this.options.min;
};

module.exports = Histogram;
107 changes: 59 additions & 48 deletions src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ function Subscription(pubsub, name, options) {
this.projectId = pubsub.projectId;
this.request = pubsub.request.bind(pubsub);
this.histogram = new Histogram();
this.latency_ = new Histogram({min: 0});

this.name = Subscription.formatName_(pubsub.projectId, name);

Expand Down Expand Up @@ -312,31 +313,17 @@ Subscription.prototype.acknowledge_ = function(ackIds, connId) {
var promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(function(
ackIdChunk
) {
if (!self.isConnected_()) {
return common.util.promisify(self.request).call(self, {
client: 'SubscriberClient',
method: 'acknowledge',
reqOpts: {
subscription: self.name,
ackIds: ackIdChunk,
},
});
if (self.isConnected_()) {
return self.writeTo_(connId, {ackIds: ackIdChunk});
}

return new Promise(function(resolve, reject) {
self.connectionPool.acquire(connId, function(err, connection) {
if (err) {
reject(err);
return;
}

connection.write(
{
ackIds: ackIdChunk,
},
resolve
);
});
return common.util.promisify(self.request).call(self, {
client: 'SubscriberClient',
method: 'acknowledge',
reqOpts: {
subscription: self.name,
ackIds: ackIdChunk,
},
});
});

Expand Down Expand Up @@ -902,33 +889,21 @@ Subscription.prototype.modifyAckDeadline_ = function(ackIds, deadline, connId) {
var promises = chunk(ackIds, MAX_ACK_IDS_PER_REQUEST).map(function(
ackIdChunk
) {
if (!self.isConnected_()) {
return common.util.promisify(self.request).call(self, {
client: 'SubscriberClient',
method: 'modifyAckDeadline',
reqOpts: {
subscription: self.name,
ackDeadlineSeconds: deadline,
ackIds: ackIdChunk,
},
if (self.isConnected_()) {
return self.writeTo_(connId, {
modifyDeadlineAckIds: ackIdChunk,
modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline),
});
}

return new Promise(function(resolve, reject) {
self.connectionPool.acquire(connId, function(err, connection) {
if (err) {
reject(err);
return;
}

connection.write(
{
modifyDeadlineAckIds: ackIdChunk,
modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline),
},
resolve
);
});
return common.util.promisify(self.request).call(self, {
client: 'SubscriberClient',
method: 'modifyAckDeadline',
reqOpts: {
subscription: self.name,
ackDeadlineSeconds: deadline,
ackIds: ackIdChunk,
},
});
});

Expand Down Expand Up @@ -1180,7 +1155,9 @@ Subscription.prototype.setLeaseTimeout_ = function() {
return;
}

var timeout = Math.random() * this.ackDeadline * 0.9;
var latency = this.latency_.percentile(99);
var timeout = Math.random() * this.ackDeadline * 0.9 - latency;

this.leaseTimeoutHandle_ = setTimeout(this.renewLeases_.bind(this), timeout);
};

Expand Down Expand Up @@ -1265,6 +1242,40 @@ Subscription.prototype.snapshot = function(name) {
return this.pubsub.snapshot.call(this, name);
};

/**
* Writes to specified duplex stream. This is useful for capturing write
* latencies that can later be used to adjust the auto lease timeout.
*
* @private
*
* @param {string} connId The ID of the connection to write to.
* @param {object} data The data to be written to the stream.
* @returns {Promise}
*/
Subscription.prototype.writeTo_ = function(connId, data) {
var self = this;
var startTime = Date.now();

return new Promise(function(resolve, reject) {
self.connectionPool.acquire(connId, function(err, connection) {
if (err) {
reject(err);
return;
}

connection.write(data, function(err) {
if (err) {
reject(err);
return;
}

self.latency_.add(Date.now() - startTime);
resolve();
});
});
});
};

/*! Developer Documentation
*
* All async methods (except for streams) will return a Promise in the event
Expand Down
12 changes: 12 additions & 0 deletions test/histogram.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ describe('Histogram', function() {
});

describe('initialization', function() {
it('should set default min/max values', function() {
assert.strictEqual(histogram.options.min, 10000);
assert.strictEqual(histogram.options.max, 600000);
});

it('should accept user defined min/max values', function() {
histogram = new Histogram({min: 5, max: 10});

assert.strictEqual(histogram.options.min, 5);
assert.strictEqual(histogram.options.max, 10);
});

it('should create a data map', function() {
assert(histogram.data instanceof Map);
});
Expand Down
Loading