From 9f13575452a9bc8e510f598208e7d68aac647118 Mon Sep 17 00:00:00 2001 From: Ernest Landrito Date: Wed, 16 Nov 2016 15:50:02 -0800 Subject: [PATCH] Added functionality canceling an operation. --- lib/lro.js | 74 +++++++++++++++++++++++++++++++------------- test/api_callable.js | 11 +++++++ 2 files changed, 63 insertions(+), 22 deletions(-) diff --git a/lib/lro.js b/lib/lro.js index f90b09a2c..e47c81f6b 100644 --- a/lib/lro.js +++ b/lib/lro.js @@ -29,25 +29,22 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +var _ = require('lodash'); var util = require('util'); var NormalApiCaller = require('./api_callable').NormalApiCaller; var createBackoffSettings = require('.gax').createBackoffSettings; -function LongrunningApicaller(longrunningDescriptor) { +function LongrunningApiCaller(longrunningDescriptor) { NormalApiCaller.call(this); this.operationsApi = longrunningDescriptor.operationsApi; - this.decoder = longrunningDescriptor.decoder; + this.protoDescriptorPool = longrunningDescriptor.protoDescriptorPool; } -util.inherits(LongrunningApicaller, NormalApiCaller); +util.inherits(LongrunningApiCaller, NormalApiCaller); -LongrunningApicaller.prototype.call = function( - apiCall, argument, settings, canceller) { - if (!settings.isLongrunning) { - NormalApiCaller.prototype.call.call( - this, apiCall, argument, settings, canceller); - return; - } +LongrunningApiCaller.prototype.poll = function( + apiCall, settings, argument, callback) { + var cancelFunc; var backoffSettings = settings.longrunning.backoffSettings; if (!backoffSettings) { @@ -60,26 +57,44 @@ LongrunningApicaller.prototype.call = function( var delay = backoffSettings.initialRetryDelayMillis; var now = new Date(); var deadline = now.getTime() + backoffSettings.totalTimeoutMillis; + var currentOperationPromise; - function retryOperation(op) { + function retryOperation(responses) { + op = responses[0]; + cancelFunc = new function() { + this.operationsApi.cancelOperation({name: op.name}); + if (currentOperationPromise) { + currentOperationPromise.cancel(); + } + } if (op.done) { if (op.result.hasError) { var error; error = new Error(op.result.error.message); error.code = op.result.error.code; - canceller.callback(error); + callback(error); } else { + // TODO: add metadata decoding. var response = op.result.response; - if (this.decoder) { - response = this.decoder(response); + var decoder; + if (settings.longrunning.decoder) { + decoder = settings.longrunning.decoder; + } else { + var protoDescriptor = + _.get(this.protoDescriptorPool, response.type_url, null); + decoder = protoDescriptor ? protoDescriptor.decode : null; + } + + if (decoder) { + response = decoder(response.value); } - canceller.callback(null, response); + callback(null, [response, metadata, op]); } return; } if (now.getTime() >= deadline) { - canceller.callback(new Error('Total timeout exceeded before any' + + callback(new Error('Total timeout exceeded before any' + 'response was received')); return; } @@ -88,29 +103,44 @@ LongrunningApicaller.prototype.call = function( setTimeout(function() { now = new Date(); delay = Math.min(delay * delayMult, maxDelay); - this.operationsApi.getOperation({name: op.name}) + currentOperationPromise = this.operationsApi.getOperation({name: op.name}) .then(function(operation) { retryOperation(operation); }); }, toSleep); } - apiCall(argument, function(err, response) { + cancelFunc = apiCall(argument, function(err, response) { if (err) { - canceller.callback(err); + callback(err); return; } retryOperation(response); }); + + return { + cancel: cancelFunc + } +} + +LongrunningApiCaller.prototype.call = function( + apiCall, argument, settings, canceller) { + if (settings.isLongrunning) { + canceller.call(this.poll.bind(this, apiCall, settings), argument); + } else { + NormalApiCaller.prototype.call.call( + this, apiCall, argument, settings, canceller); + return; + } }; -function LongrunningDescriptor(operationsApi, decoder) { +function LongrunningDescriptor(operationsApi, protoDescriptorPool) { this.operationsApi = operationsApi; - this.decoder = decoder; + this.protoDescriptorPool = protoDescriptorPool; } LongrunningDescriptor.prototype.apiCaller = function() { - return new LongrunningApicaller(this); + return new LongrunningApiCaller(this); }; exports.LongrunningDescriptor = LongrunningDescriptor; diff --git a/test/api_callable.js b/test/api_callable.js index e4601a1eb..8bdc3cf27 100644 --- a/test/api_callable.js +++ b/test/api_callable.js @@ -36,6 +36,7 @@ var apiCallable = require('../lib/api_callable'); var gax = require('../lib/gax'); var PageDescriptor = require('../lib/page_streaming').PageDescriptor; var BundleDescriptor = require('../lib/bundling').BundleDescriptor; +var longrunningDescriptor = require('../lib/lro').longrunningDescriptor; var streaming = require('../lib/streaming'); var expect = require('chai').expect; var sinon = require('sinon'); @@ -799,3 +800,13 @@ describe('streaming', function() { }); }); }); + +describe('longrunning', function() { + function createLongrunningCall(func, operationsApi, protoDescriptorPool) { + var settings = new gax.CallSettings(); + var longrunningDescriptor = + new LongrunningDescriptor(operationsApi, protoDescriptorPool); + return apiCallable.createApiCall( + Promise.resolve(func), settings, new streaming.StreamDescriptor(type)); + } +});