Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
landrito committed Nov 21, 2016
1 parent 9f13575 commit 308375b
Show file tree
Hide file tree
Showing 3 changed files with 376 additions and 59 deletions.
50 changes: 49 additions & 1 deletion lib/gax.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@
* pageToken.
* @property {boolean=} isBundling - If set to false and the call is configured
* for bundling, bundling is not performed.
* @property {boolean=} isLongrunning - If set to false and the call returns a
* longrunning operation, the initial operation will be returned and polling
* will not be performed.
* @property {Object=} longrunning
* @property {anyDecoder=} longrunning.responseDecoder - The decoder to unpack
* the response message. If not provided, the response will be unpacked if
* the proto descriptor of the any value is found in the longrunningDescriptor
* protoDescriptorPool.
* @property {anyDecoder=} longrunning.metadataDecoder - The decoder to unpack
* the metadata message. If not provided, the metadata will be unpacked if
* the proto descriptor of the any value is found in the longrunningDescriptor
* protoDescriptorPool.
* @property {backoffSettings=} longrunning.backoffSettings - The backoff
* settings used to poll operation.
* @example
* // suppress bundling for bundled method.
* api.bundlingMethod(
Expand All @@ -72,6 +86,13 @@
* });
*/

/**
* A callback to upack a google.protobuf.Any message.
* @callback anyDecoder
* @param {google.protobuf.Any} message - The message to unpacked.
* @return {Object} - The unpacked message.
*/

/**
* Per-call configurable settings for retrying upon transient failure.
* @typedef {Object} RetryOptions
Expand Down Expand Up @@ -160,6 +181,10 @@ function CallSettings(settings) {
this.otherArgs = settings.otherArgs || {};
this.bundleOptions = settings.bundleOptions;
this.isBundling = ('isBundling' in settings) ? settings.isBundling : true;
this.isLongrunning =
('isLongrunning' in settings) ? settings.isLongrunning : true;
this.longrunningOptions =
('longrunningOptions' in settings) ? settings.longrunningOptions : {};
}
exports.CallSettings = CallSettings;

Expand All @@ -181,6 +206,8 @@ CallSettings.prototype.merge = function merge(options) {
var pageToken = this.pageToken;
var otherArgs = this.otherArgs;
var isBundling = this.isBundling;
var isLongrunning = this.isLongrunning;
var longrunningOptions = this.longrunningOptions;
if ('timeout' in options) {
timeout = options.timeout;
}
Expand Down Expand Up @@ -216,14 +243,35 @@ CallSettings.prototype.merge = function merge(options) {
delete retry.backoffSettings.totalTimeoutMillis;
}

if ('isLongrunning' in options) {
isLongrunning = options.isLongrunning;
}

if ('longrunningOptions' in options) {
if ('responseDecoder' in options.longrunningOptions) {
longrunningOptions.responseDecoder =
options.longrunningOptions.responseDecoder;
}
if ('metadataDecoder' in options.longrunningOptions) {
longrunningOptions.metadataDecoder =
options.longrunningOptions.metadataDecoder;
}
if ('backoffSettings' in options.longrunningOptions) {
longrunningOptions.backoffSettings =
options.longrunningOptions.backoffSettings;
}
}

return new CallSettings({
timeout: timeout,
retry: retry,
bundleOptions: this.bundleOptions,
longrunningOptions: longrunningOptions,
autoPaginate: autoPaginate,
pageToken: pageToken,
otherArgs: otherArgs,
isBundling: isBundling});
isBundling: isBundling,
isLongrunning: isLongrunning});
};

/**
Expand Down
126 changes: 74 additions & 52 deletions lib/lro.js → lib/longrunning.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,65 +30,77 @@
*/

var _ = require('lodash');
var util = require('util');
var createBackoffSettings = require('./gax').createBackoffSettings;
var NormalApiCaller = require('./api_callable').NormalApiCaller;
var createBackoffSettings = require('.gax').createBackoffSettings;
var util = require('util');

function LongrunningApiCaller(longrunningDescriptor) {
NormalApiCaller.call(this);
this.operationsApi = longrunningDescriptor.operationsApi;
this.protoDescriptorPool = longrunningDescriptor.protoDescriptorPool;
this.longrunningDescriptor = longrunningDescriptor;
}

util.inherits(LongrunningApiCaller, NormalApiCaller);

LongrunningApiCaller.prototype.poll = function(
apiCall, settings, argument, callback) {
var cancelFunc;
LongrunningApiCaller.prototype.call = function(
apiCall, argument, settings, canceller) {
if (settings.isLongrunning) {
canceller.call(this._poll.bind(this, apiCall, settings), argument);
} else {
NormalApiCaller.prototype.call.call(
this, apiCall, argument, settings, canceller);
}
};

var backoffSettings = settings.longrunning.backoffSettings;
LongrunningApiCaller.prototype._poll = function(
apiCall, settings, argument, callback) {
var backoffSettings = settings.longrunningOptions.backoffSettings;
if (!backoffSettings) {
backoffSettings =
createBackoffSettings(100, 1.3, 60000, null, null, null, 600000);
}

var now = new Date();
var delayMult = backoffSettings.retryDelayMultiplier;
var maxDelay = backoffSettings.maxRetryDelayMillis;
var delay = backoffSettings.initialRetryDelayMillis;
var now = new Date();
var deadline = now.getTime() + backoffSettings.totalTimeoutMillis;
var currentOperationPromise;
var operationsApi = this.longrunningDescriptor.operationsApi;

var op;
var grpcCanceller;
var promiseCanceller;

function retryOperation(responses) {
var retryOperation = function(responses) {
grpcCanceller = null;
op = responses[0];
cancelFunc = new function() {
this.operationsApi.cancelOperation({name: op.name});
if (currentOperationPromise) {
currentOperationPromise.cancel();
}
}

if (op.done) {
if (op.result.hasError) {
if (op.result === 'error') {
var error;
error = new Error(op.result.error.message);
error.code = op.result.error.code;
error = new Error(op.error.message);
error.code = op.error.code;
callback(error);
} else {
// TODO: add metadata decoding.
var response = op.result.response;
var decoder;
if (settings.longrunning.decoder) {
decoder = settings.longrunning.decoder;
var response;
if (settings.longrunningOptions.responseDecoder) {
if (op.response) {
response =
settings.longrunningOptions.responseDecoder(op.response.value);
}
} else {
var protoDescriptor =
_.get(this.protoDescriptorPool, response.type_url, null);
decoder = protoDescriptor ? protoDescriptor.decode : null;
response = this._unpack(op.response);
}

if (decoder) {
response = decoder(response.value);
var metadata;
if (settings.longrunningOptions.metadataDecoder) {
if (op.metadata) {
metadata =
settings.longrunningOptions.metadataDecoder(op.metadata.value);
}
} else {
metadata = this._unpack(op.metadata);
}
callback(null, [response, metadata, op]);
callback(null, response, metadata, op);
}
return;
}
Expand All @@ -99,39 +111,49 @@ LongrunningApiCaller.prototype.poll = function(
return;
}

var toSleep = Math.random() * delay;
setTimeout(function() {
now = new Date();
delay = Math.min(delay * delayMult, maxDelay);
currentOperationPromise = this.operationsApi.getOperation({name: op.name})
.then(function(operation) {
retryOperation(operation);
});
}, toSleep);
}
promiseCanceller = operationsApi.getOperation({name: op.name});
promiseCanceller.then(retryOperation);
}, delay);
};
retryOperation = retryOperation.bind(this);

cancelFunc = apiCall(argument, function(err, response) {
grpcCanceller = apiCall(argument, function(err) {
if (err) {
callback(err);
return;
}
retryOperation(response);
retryOperation(Array.prototype.slice.call(arguments, 1));
});

return {
cancel: cancelFunc
}
}
cancel: function() {
if (op) {
if (promiseCanceller) {
promiseCanceller.cancel();
}
operationsApi.cancelOperation({name: op.name}).then(function() {
callback(new Error('Operation ' + op.name + ' was canceled.'));
});
return;
}

LongrunningApiCaller.prototype.call = function(
apiCall, argument, settings, canceller) {
if (settings.isLongrunning) {
canceller.call(this.poll.bind(this, apiCall, settings), argument);
} else {
NormalApiCaller.prototype.call.call(
this, apiCall, argument, settings, canceller);
return;
}
if (grpcCanceller) {
grpcCanceller();
callback(new Error('cancelled'));
return;
}
}
};
};

LongrunningApiCaller.prototype._unpack = function(any) {
var protoDescriptor =
_.get(this.longrunningDescriptor.protoDescriptorPool, any.typeUrl, null);
var decoder = protoDescriptor ? protoDescriptor.decode : null;
return decoder ? decoder(any.value) : null;
};

function LongrunningDescriptor(operationsApi, protoDescriptorPool) {
Expand Down
Loading

0 comments on commit 308375b

Please sign in to comment.