Skip to content

Commit

Permalink
Added functionality canceling an operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
landrito committed Nov 18, 2016
1 parent 975223a commit 9f13575
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 22 deletions.
74 changes: 52 additions & 22 deletions lib/lro.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,22 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

var _ = require('lodash');
var util = require('util');
var NormalApiCaller = require('./api_callable').NormalApiCaller;
var createBackoffSettings = require('.gax').createBackoffSettings;

function LongrunningApicaller(longrunningDescriptor) {
function LongrunningApiCaller(longrunningDescriptor) {
NormalApiCaller.call(this);
this.operationsApi = longrunningDescriptor.operationsApi;
this.decoder = longrunningDescriptor.decoder;
this.protoDescriptorPool = longrunningDescriptor.protoDescriptorPool;
}

util.inherits(LongrunningApicaller, NormalApiCaller);
util.inherits(LongrunningApiCaller, NormalApiCaller);

LongrunningApicaller.prototype.call = function(
apiCall, argument, settings, canceller) {
if (!settings.isLongrunning) {
NormalApiCaller.prototype.call.call(
this, apiCall, argument, settings, canceller);
return;
}
LongrunningApiCaller.prototype.poll = function(
apiCall, settings, argument, callback) {
var cancelFunc;

var backoffSettings = settings.longrunning.backoffSettings;
if (!backoffSettings) {
Expand All @@ -60,26 +57,44 @@ LongrunningApicaller.prototype.call = function(
var delay = backoffSettings.initialRetryDelayMillis;
var now = new Date();
var deadline = now.getTime() + backoffSettings.totalTimeoutMillis;
var currentOperationPromise;

function retryOperation(op) {
function retryOperation(responses) {
op = responses[0];
cancelFunc = new function() {
this.operationsApi.cancelOperation({name: op.name});
if (currentOperationPromise) {
currentOperationPromise.cancel();
}
}
if (op.done) {
if (op.result.hasError) {
var error;
error = new Error(op.result.error.message);
error.code = op.result.error.code;
canceller.callback(error);
callback(error);
} else {
// TODO: add metadata decoding.
var response = op.result.response;
if (this.decoder) {
response = this.decoder(response);
var decoder;
if (settings.longrunning.decoder) {
decoder = settings.longrunning.decoder;
} else {
var protoDescriptor =
_.get(this.protoDescriptorPool, response.type_url, null);
decoder = protoDescriptor ? protoDescriptor.decode : null;
}

if (decoder) {
response = decoder(response.value);
}
canceller.callback(null, response);
callback(null, [response, metadata, op]);
}
return;
}

if (now.getTime() >= deadline) {
canceller.callback(new Error('Total timeout exceeded before any' +
callback(new Error('Total timeout exceeded before any' +
'response was received'));
return;
}
Expand All @@ -88,29 +103,44 @@ LongrunningApicaller.prototype.call = function(
setTimeout(function() {
now = new Date();
delay = Math.min(delay * delayMult, maxDelay);
this.operationsApi.getOperation({name: op.name})
currentOperationPromise = this.operationsApi.getOperation({name: op.name})
.then(function(operation) {
retryOperation(operation);
});
}, toSleep);
}

apiCall(argument, function(err, response) {
cancelFunc = apiCall(argument, function(err, response) {
if (err) {
canceller.callback(err);
callback(err);
return;
}
retryOperation(response);
});

return {
cancel: cancelFunc
}
}

LongrunningApiCaller.prototype.call = function(
apiCall, argument, settings, canceller) {
if (settings.isLongrunning) {
canceller.call(this.poll.bind(this, apiCall, settings), argument);
} else {
NormalApiCaller.prototype.call.call(
this, apiCall, argument, settings, canceller);
return;
}
};

function LongrunningDescriptor(operationsApi, decoder) {
function LongrunningDescriptor(operationsApi, protoDescriptorPool) {
this.operationsApi = operationsApi;
this.decoder = decoder;
this.protoDescriptorPool = protoDescriptorPool;
}

LongrunningDescriptor.prototype.apiCaller = function() {
return new LongrunningApicaller(this);
return new LongrunningApiCaller(this);
};

exports.LongrunningDescriptor = LongrunningDescriptor;
11 changes: 11 additions & 0 deletions test/api_callable.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var apiCallable = require('../lib/api_callable');
var gax = require('../lib/gax');
var PageDescriptor = require('../lib/page_streaming').PageDescriptor;
var BundleDescriptor = require('../lib/bundling').BundleDescriptor;
var longrunningDescriptor = require('../lib/lro').longrunningDescriptor;
var streaming = require('../lib/streaming');
var expect = require('chai').expect;
var sinon = require('sinon');
Expand Down Expand Up @@ -799,3 +800,13 @@ describe('streaming', function() {
});
});
});

describe('longrunning', function() {
function createLongrunningCall(func, operationsApi, protoDescriptorPool) {
var settings = new gax.CallSettings();
var longrunningDescriptor =
new LongrunningDescriptor(operationsApi, protoDescriptorPool);
return apiCallable.createApiCall(
Promise.resolve(func), settings, new streaming.StreamDescriptor(type));
}
});

0 comments on commit 9f13575

Please sign in to comment.