From e4c7cc62fce841c01f5d8f0d6f6ac0a7775f8b9a Mon Sep 17 00:00:00 2001 From: Moshe Kolodny Date: Fri, 19 Jan 2018 14:34:19 -0500 Subject: [PATCH] Smart read retries (#27) --- src/chunktransformer.js | 5 + src/table.js | 184 +++++++++--- system-test/data/read-rows-retry-test.json | 313 +++++++++++++++++++++ system-test/read-rows.js | 151 ++++++++++ test/chunktransformer.js | 8 + test/table.js | 191 ++++++++++++- 6 files changed, 809 insertions(+), 43 deletions(-) create mode 100644 system-test/data/read-rows-retry-test.json create mode 100644 system-test/read-rows.js diff --git a/src/chunktransformer.js b/src/chunktransformer.js index ecfdef811..fb6ebe803 100644 --- a/src/chunktransformer.js +++ b/src/chunktransformer.js @@ -48,6 +48,7 @@ function ChunkTransformer(options) { this.options.objectMode = true; // forcing object mode Transform.call(this, options); this._destroyed = false; + this.lastRowKey = undefined; this.reset(); } util.inherits(ChunkTransformer, Transform); @@ -86,6 +87,9 @@ ChunkTransformer.prototype._transform = function(data, enc, next) { return; } } + if (data.lastScannedRowKey) { + this.lastRowKey = data.lastScannedRowKey; + } next(); }; @@ -249,6 +253,7 @@ ChunkTransformer.prototype.moveToNextState = function(chunk) { if (chunk.commitRow) { this.push(row); this.commit(); + this.lastRowKey = row.key; } else { if (chunk.valueSize > 0) { this.state = RowStateEnum.CELL_IN_PROGRESS; diff --git a/src/table.js b/src/table.js index 92dddced7..34fd3e4ae 100644 --- a/src/table.js +++ b/src/table.js @@ -35,7 +35,10 @@ const ChunkTransformer = require('./chunktransformer.js'); // See protos/google/rpc/code.proto // (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE) -const RETRY_STATUS_CODES = new Set([4, 10, 14]); +const GRPC_RETRYABLE_STATUS_CODES = new Set([4, 10, 14]); + +// (409=ABORTED, 503=UNAVAILABLE, 14=DEADLINE_EXCEEDED) +const HTTP_RETRYABLE_STATUS_CODES = new Set([409, 503, 504]); /** * Create a Table object to interact with a Cloud Bigtable table. @@ -445,64 +448,161 @@ Table.prototype.createReadStream = function(options) { var self = this; options = options || {}; - options.ranges = options.ranges || []; - - var grpcOpts = { - service: 'Bigtable', - method: 'readRows', - }; + let maxRetries = is.number(this.maxRetries) ? this.maxRetries : 3; - var reqOpts = { - tableName: this.id, - objectMode: true, - }; + let rowKeys; + let ranges = options.ranges || []; + let filter; + let rowsLimit; + let rowsRead = 0; + let numRequestsMade = 0; if (options.start || options.end) { - options.ranges.push({ + ranges.push({ start: options.start, end: options.end, }); } - if (options.prefix) { - options.ranges.push(Table.createPrefixRange_(options.prefix)); + if (options.keys) { + rowKeys = options.keys; } - if (options.keys || options.ranges.length) { - reqOpts.rows = {}; - - if (options.keys) { - reqOpts.rows.rowKeys = options.keys.map(Mutation.convertToBytes); - } - - if (options.ranges.length) { - reqOpts.rows.rowRanges = options.ranges.map(function(range) { - return Filter.createRange(range.start, range.end, 'Key'); - }); - } + if (options.prefix) { + ranges.push(Table.createPrefixRange_(options.prefix)); } if (options.filter) { - reqOpts.filter = Filter.parse(options.filter); + filter = Filter.parse(options.filter); } if (options.limit) { - reqOpts.rowsLimit = options.limit; + rowsLimit = options.limit; } - const chunkTransformer = new ChunkTransformer({decode: options.decode}); - const stream = pumpify.obj([ - this.requestStream(grpcOpts, reqOpts), - chunkTransformer, - through.obj(function(rowData, enc, next) { - if (stream._ended) { - return next(); + + const userStream = through.obj(); + let chunkTransformer; + + const makeNewRequest = () => { + let lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : ''; + chunkTransformer = new ChunkTransformer({decode: options.decode}); + var grpcOpts = { + service: 'Bigtable', + method: 'readRows', + retryOpts: { + currentRetryAttempt: numRequestsMade, + }, + }; + + var reqOpts = { + tableName: this.id, + objectMode: true, + }; + if (lastRowKey) { + const lessThan = (lhs, rhs) => { + const lhsBytes = Mutation.convertToBytes(lhs); + const rhsBytes = Mutation.convertToBytes(rhs); + return lhsBytes.compare(rhsBytes) === -1; + }; + const greaterThan = (lhs, rhs) => lessThan(rhs, lhs); + const greaterThanOrEqualTo = (lhs, rhs) => !lessThan(rhs, lhs); + + if (ranges.length === 0) { + ranges.push({ + start: { + value: lastRowKey, + inclusive: false, + }, + }); + } else { + // Readjust and/or remove ranges based on previous valid row reads. + + // Iterate backward since items may need to be removed. + for (let index = ranges.length - 1; index >= 0; index--) { + const range = ranges[index]; + const startValue = is.object(range.start) + ? range.start.value + : range.start; + const endValue = is.object(range.end) ? range.end.value : range.end; + const isWithinStart = + !startValue || greaterThanOrEqualTo(startValue, lastRowKey); + const isWithinEnd = !endValue || lessThan(lastRowKey, endValue); + if (isWithinStart) { + if (isWithinEnd) { + // The lastRowKey is within this range, adjust the start value. + range.start = { + value: lastRowKey, + inclusive: false, + }; + } else { + // The lastRowKey is past this range, remove this range. + ranges.splice(index, 1); + } + } + } } - const row = self.row(rowData.key); - row.data = rowData.data; - next(null, row); - }), - ]); - return stream; + + // Remove rowKeys already read. + if (rowKeys) { + rowKeys = rowKeys.filter(rowKey => greaterThan(rowKey, lastRowKey)); + if (rowKeys.length === 0) { + rowKeys = null; + } + } + } + if (rowKeys || ranges.length) { + reqOpts.rows = {}; + if (rowKeys) { + reqOpts.rows.rowKeys = rowKeys.map(Mutation.convertToBytes); + } + if (ranges.length) { + reqOpts.rows.rowRanges = ranges.map(function(range) { + return Filter.createRange(range.start, range.end, 'Key'); + }); + } + } + if (filter) { + reqOpts.filter = filter; + } + + if (rowsLimit) { + reqOpts.rowsLimit = rowsLimit - rowsRead; + } + + const requestStream = this.requestStream(grpcOpts, reqOpts); + requestStream.on('request', () => numRequestsMade++); + + const rowStream = pumpify.obj([ + requestStream, + chunkTransformer, + through.obj(function(rowData, enc, next) { + if (chunkTransformer._destroyed || userStream._writableState.ended) { + return next(); + } + numRequestsMade = 0; + rowsRead++; + const row = self.row(rowData.key); + row.data = rowData.data; + next(null, row); + }), + ]); + + rowStream.on('error', error => { + rowStream.unpipe(userStream); + if ( + numRequestsMade <= maxRetries && + HTTP_RETRYABLE_STATUS_CODES.has(error.code) + ) { + makeNewRequest(); + } else { + userStream.emit('error', error); + } + }); + rowStream.pipe(userStream); + }; + + makeNewRequest(); + return userStream; }; /** @@ -987,7 +1087,7 @@ Table.prototype.mutate = function(entries, callback) { return; } - if (!RETRY_STATUS_CODES.has(entry.status.code)) { + if (!GRPC_RETRYABLE_STATUS_CODES.has(entry.status.code)) { pendingEntryIndices.delete(originalEntriesIndex); } diff --git a/system-test/data/read-rows-retry-test.json b/system-test/data/read-rows-retry-test.json new file mode 100644 index 000000000..9b1c4b747 --- /dev/null +++ b/system-test/data/read-rows-retry-test.json @@ -0,0 +1,313 @@ +{ + "tests": [ + + + { + "name": "simple read", + "max_retries": 3, + "request_options": [ + {} + ], + "responses": [ + { "row_keys": [ "a", "b", "c" ] } + ], + "row_keys_read": [ + [ "a", "b", "c" ] + ] + }, + + + + { + "name": "retries a failed read", + "max_retries": 3, + "request_options": [ + {}, + { "rowRanges": [ { "startKeyOpen": "b" } ] } + ], + "responses": [ + { "row_keys": [ "a", "b" ], "end_with_error": 503 }, + { "row_keys": [ "c" ] } + ], + "row_keys_read": [ + [ "a", "b" ], + [ "c" ] + ] + }, + + + + { + "name": "fails after all available retries", + "max_retries": 3, + "request_options": [ + {}, {}, {}, {} + ], + "responses": [ + { "end_with_error": 503 }, + { "end_with_error": 503 }, + { "end_with_error": 503 }, + { "end_with_error": 503 } + ], + "row_keys_read": [ + [], [], [], [] + ], + "error": 503 + }, + + + + + { + "name": "resets the retry counter after a successful read", + "max_retries": 3, + "request_options": [ + {}, + { "rowRanges": [ { "startKeyOpen": "a" } ] }, + { "rowRanges": [ { "startKeyOpen": "a" } ] }, + { "rowRanges": [ { "startKeyOpen": "a" } ] }, + { "rowRanges": [ { "startKeyOpen": "a" } ] }, + { "rowRanges": [ { "startKeyOpen": "b" } ] }, + { "rowRanges": [ { "startKeyOpen": "b" } ] }, + { "rowRanges": [ { "startKeyOpen": "b" } ] } + ], + "responses": [ + { "row_keys": [ "a" ], "end_with_error": 503 }, + { "end_with_error": 503 }, + { "end_with_error": 503 }, + { "end_with_error": 503 }, + { "row_keys": [ "b" ], "end_with_error": 503 }, + { "end_with_error": 503 }, + { "end_with_error": 503 }, + { "row_keys": [ "c" ] } + ], + "row_keys_read": [ + [ "a" ], [], [], [], [ "b" ], [], [], [ "c" ] + ] + }, + + + + { + "name": "moves the start point of a range being consumed", + "max_retries": 3, + "createReadStream_options": { + "ranges": [{ + "start": "a", + "end": "z" + }] + }, + "request_options": [ + { "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "z" } ] }, + { "rowRanges": [ { "startKeyOpen": "b", "endKeyClosed": "z" } ] } + ], + "responses": [ + { "row_keys": [ "a", "b" ], "end_with_error": 503 }, + { "row_keys": [ "c" ] } + ], + "row_keys_read": [ + [ "a", "b" ], + [ "c" ] + ] + }, + + + + { + "name": "removes ranges already consumed", + "max_retries": 3, + "createReadStream_options": { + "ranges": [{ + "start": "a", + "end": "c" + }, { + "start": "x", + "end": "z" + }] + }, + "request_options": [ + { "rowRanges": [ + { "startKeyClosed": "a", "endKeyClosed": "c" }, + { "startKeyClosed": "x", "endKeyClosed": "z" } + ] }, + { "rowRanges": [ { "startKeyClosed": "x", "endKeyClosed": "z" } ] } + ], + "responses": [ + { "row_keys": [ "a", "b", "c" ], "end_with_error": 503 }, + { "row_keys": [ "x" ] } + ], + "row_keys_read": [ + [ "a", "b", "c" ], + [ "x" ] + ] + }, + + + + { + "name": "removes keys already read", + "max_retries": 3, + "createReadStream_options": { + "keys": ["a", "b", "x"] + }, + "request_options": [ + { "rowKeys": [ "a", "b", "x" ] }, + { "rowKeys": [ "x" ], "rowRanges": [ { "startKeyOpen": "c" } ] } + ], + "responses": [ + { "row_keys": [ "a", "b", "c" ], "end_with_error": 503 }, + { "row_keys": [ "x" ] } + ], + "row_keys_read": [ + [ "a", "b", "c" ], + [ "x" ] + ] + }, + + + { + "name": "adjust the limit based on the number of rows read", + "max_retries": 3, + "createReadStream_options": { + "limit": 10 + }, + "request_options": [ + { "rowsLimit": 10 }, + { "rowsLimit": 8, "rowRanges": [ { "startKeyOpen": "b" } ] } + ], + "responses": [ + { "row_keys": [ "a", "b" ], "end_with_error": 503 }, + { "row_keys": [ "x" ] } + ], + "row_keys_read": [ + [ "a", "b" ], + [ "x" ] + ] + }, + + + + + + { + "name": "does the previous 5 things in one giant test case", + "max_retries": 3, + "createReadStream_options": { + "limit": 10, + "ranges": [{ + "start": "a", + "end": "c" + }, { + "start": "p", + "end": "s" + }, { + "start": "x", + "end": "z" + }], + "keys": [ "a", "b", "c", "p", "q", "r", "s", "x", "y", "z" ] + }, + "request_options": [ + { + "rowKeys": [ "a", "b", "c", "p", "q", "r", "s", "x", "y", "z" ], + "rowsLimit": 10, + "rowRanges": [ + { "startKeyClosed": "a", "endKeyClosed": "c" }, + { "startKeyClosed": "p", "endKeyClosed": "s" }, + { "startKeyClosed": "x", "endKeyClosed": "z" } + ] + }, + { + "rowKeys": [ "b", "c", "p", "q", "r", "s", "x", "y", "z" ], + "rowsLimit": 9, + "rowRanges": [ + { "startKeyOpen": "a", "endKeyClosed": "c" }, + { "startKeyClosed": "p", "endKeyClosed": "s" }, + { "startKeyClosed": "x", "endKeyClosed": "z" } + ] + }, + { + "rowKeys": [ "c", "p", "q", "r", "s", "x", "y", "z" ], + "rowsLimit": 8, + "rowRanges": [ + { "startKeyOpen": "b", "endKeyClosed": "c" }, + { "startKeyClosed": "p", "endKeyClosed": "s" }, + { "startKeyClosed": "x", "endKeyClosed": "z" } + ] + }, + { + "rowKeys": [ "c", "p", "q", "r", "s", "x", "y", "z" ], + "rowsLimit": 8, + "rowRanges": [ + { "startKeyOpen": "b", "endKeyClosed": "c" }, + { "startKeyClosed": "p", "endKeyClosed": "s" }, + { "startKeyClosed": "x", "endKeyClosed": "z" } + ] + }, + { + "rowKeys": [ "c", "p", "q", "r", "s", "x", "y", "z" ], + "rowsLimit": 8, + "rowRanges": [ + { "startKeyOpen": "b", "endKeyClosed": "c" }, + { "startKeyClosed": "p", "endKeyClosed": "s" }, + { "startKeyClosed": "x", "endKeyClosed": "z" } + ] + }, + { + "rowKeys": [ "c", "p", "q", "r", "s", "x", "y", "z" ], + "rowsLimit": 8, + "rowRanges": [ + { "startKeyOpen": "b", "endKeyClosed": "c" }, + { "startKeyClosed": "p", "endKeyClosed": "s" }, + { "startKeyClosed": "x", "endKeyClosed": "z" } + ] + }, + { + "rowKeys": [ "p", "q", "r", "s", "x", "y", "z" ], + "rowsLimit": 7, + "rowRanges": [ + { "startKeyClosed": "p", "endKeyClosed": "s" }, + { "startKeyClosed": "x", "endKeyClosed": "z" } + ] + }, + { + "rowKeys": [ "y", "z" ], + "rowsLimit": 2, + "rowRanges": [ + { "startKeyOpen": "x", "endKeyClosed": "z" } + ] + }, + { + "rowKeys": [ "z" ], + "rowsLimit": 1, + "rowRanges": [ + { "startKeyOpen": "y", "endKeyClosed": "z" } + ] + } + ], + "responses": [ + { "row_keys": [ "a" ], "end_with_error": 503 }, + { "row_keys": [ "b" ], "end_with_error": 503 }, + { "end_with_error": 503 }, + { "end_with_error": 503 }, + { "end_with_error": 503 }, + { "row_keys": [ "c" ], "end_with_error": 503 }, + { "row_keys": [ "p", "q", "r", "s", "x" ], "end_with_error": 503 }, + { "row_keys": [ "y" ], "end_with_error": 503 }, + { "row_keys": [ "z" ] } + ], + "row_keys_read": [ + [ "a" ], + [ "b" ], + [], + [], + [], + [ "c" ], + [ "p", "q", "r", "s", "x" ], + [ "y" ], + [ "z" ] + ] + } + + + ] +} diff --git a/system-test/read-rows.js b/system-test/read-rows.js new file mode 100644 index 000000000..0ec1717d3 --- /dev/null +++ b/system-test/read-rows.js @@ -0,0 +1,151 @@ +'use strict'; + +const Bigtable = require('../'); +const mutation = require('../src/mutation.js'); + +const tests = require('./data/read-rows-retry-test.json').tests; + +const assert = require('assert'); +const grpc = require('@google-cloud/common-grpc').grpc; +const sinon = require('sinon'); +const through = require('through2'); + +function dispatch(emitter, response) { + let emits = [{name: 'request'}]; + if (response.row_keys) { + emits.push.apply(emits, [ + {name: 'response', arg: 200}, + { + name: 'data', + arg: {chunks: response.row_keys.map(rowResponse)}, + }, + ]); + } + if (response.end_with_error) { + const error = new Error(); + error.code = response.end_with_error; + emits.push({name: 'error', arg: error}); + } else { + emits.push({name: 'end'}); + } + let index = 0; + setImmediate(next); + + function next() { + if (index < emits.length) { + const emit = emits[index]; + index++; + emitter.emit(emit.name, emit.arg); + setImmediate(next); + } + } +} + +function rowResponse(rowKey) { + return { + rowKey: mutation.convertToBytes(rowKey), + familyName: {value: 'family'}, + qualifier: {value: 'qualifier'}, + valueSize: 0, + timestampMicros: 0, + labels: [], + commitRow: true, + value: 'value', + }; +} + +describe('Bigtable/Table', () => { + const bigtable = new Bigtable(); + bigtable.grpcCredentials = grpc.credentials.createInsecure(); + + const INSTANCE = bigtable.instance('instance'); + const TABLE = INSTANCE.table('table'); + + describe('createReadStream', () => { + let clock; + let endCalled; + let error; + let requestedOptions; + let responses; + let rowKeysRead; + let stub; + + beforeEach(() => { + clock = sinon.useFakeTimers({ + toFake: [ + 'setTimeout', + 'clearTimeout', + 'setImmediate', + 'clearImmediate', + 'setInterval', + 'clearInterval', + 'Date', + 'nextTick', + ], + }); + endCalled = false; + error = null; + responses = null; + rowKeysRead = []; + requestedOptions = []; + stub = sinon + .stub(TABLE, 'requestStream') + .callsFake((grpcOpts, reqOpts) => { + let requestOptions = {}; + if (reqOpts.rows && reqOpts.rows.rowRanges) { + requestOptions.rowRanges = reqOpts.rows.rowRanges.map(range => { + const convertedRowRange = {}; + Object.keys(range).forEach( + key => (convertedRowRange[key] = range[key].asciiSlice()) + ); + return convertedRowRange; + }); + } + if (reqOpts.rows && reqOpts.rows.rowKeys) { + requestOptions.rowKeys = reqOpts.rows.rowKeys.map(rowKeys => + rowKeys.asciiSlice() + ); + } + if (reqOpts.rowsLimit) { + requestOptions.rowsLimit = reqOpts.rowsLimit; + } + requestedOptions.push(requestOptions); + rowKeysRead.push([]); + const emitter = through.obj(); + dispatch(emitter, responses.shift()); + return emitter; + }); + }); + + afterEach(() => { + clock.uninstall(); + stub.restore(); + }); + + tests.forEach(test => { + it(test.name, () => { + responses = test.responses; + TABLE.maxRetries = test.max_retries; + TABLE.createReadStream(test.createReadStream_options) + .on('data', row => rowKeysRead[rowKeysRead.length - 1].push(row.id)) + .on('end', () => (endCalled = true)) + .on('error', err => (error = err)); + clock.runAll(); + if (test.error) { + assert(!endCalled, `.on('end') should not have been invoked`); + assert.strictEqual(error.code, test.error); + } else { + assert(endCalled, `.on('end') shoud have been invoked`); + assert.ifError(error); + } + assert.deepStrictEqual(rowKeysRead, test.row_keys_read); + assert.strictEqual( + responses.length, + 0, + 'not all the responses were used' + ); + assert.deepStrictEqual(requestedOptions, test.request_options); + }); + }); + }); +}); diff --git a/test/chunktransformer.js b/test/chunktransformer.js index d53078687..b020522ac 100644 --- a/test/chunktransformer.js +++ b/test/chunktransformer.js @@ -828,6 +828,14 @@ describe('Bigtable/ChunkTransformer', function() { chunkTransformer._transform({chunks: chunks}, {}, callback); assert(!callback.called, 'unexpected call to next'); }); + it('should change the `lastRowKey` value for `data.lastScannedRowKey`', function() { + chunkTransformer._transform( + {chunks: [], lastScannedRowKey: 'foo'}, + {}, + callback + ); + assert.deepEqual(chunkTransformer.lastRowKey, 'foo'); + }); }); describe('reset', function() { it('should reset initial state', function() { diff --git a/test/table.js b/test/table.js index 3ae7062eb..27e916a91 100644 --- a/test/table.js +++ b/test/table.js @@ -17,6 +17,7 @@ 'use strict'; var assert = require('assert'); +var Buffer = require('buffer').Buffer; var extend = require('extend'); var nodeutil = require('util'); var proxyquire = require('proxyquire'); @@ -68,8 +69,9 @@ FakeRow.formatChunks_ = sinon.spy(function(chunks) { }); var FakeChunkTransformer = createFake(ChunkTransformer); -FakeChunkTransformer.prototype._transform = function(rows) { +FakeChunkTransformer.prototype._transform = function(rows, enc, next) { rows.forEach(row => this.push(row)); + next(); }; var FakeMutation = { @@ -375,6 +377,9 @@ describe('Bigtable/Table', function() { assert.deepEqual(grpcOpts, { service: 'Bigtable', method: 'readRows', + retryOpts: { + currentRetryAttempt: 0, + }, }); assert.strictEqual(reqOpts.tableName, TABLE_NAME); @@ -751,6 +756,190 @@ describe('Bigtable/Table', function() { .on('data', done); }); }); + + describe('retries', function() { + var callCreateReadStream; + var emitters; // = [function(stream) { stream.push([{ key: 'a' }]); stream.end(); }, ...]; + var makeRetryableError; + var reqOptsCalls; + var setTimeoutSpy; + beforeEach(function() { + FakeChunkTransformer.prototype._transform = function(rows, enc, next) { + rows.forEach(row => this.push(row)); + this.lastRowKey = rows[rows.length - 1].key; + next(); + }; + FakeChunkTransformer.prototype._flush = function(cb) { + cb(); + }; + callCreateReadStream = (options, verify) => { + table + .createReadStream(options) + .on('end', verify) + .resume(); // The stream starts paused unless it has a `.data()` callback. + }; + emitters = null; // This needs to be assigned in each test case. + makeRetryableError = () => { + var error = new Error('retry me!'); + error.code = 409; + return error; + }; + FakeFilter.createRange = function(start, end) { + var range = {}; + if (start) { + range.start = start.value || start; + range.startInclusive = + typeof start === 'object' ? start.inclusive : true; + } + if (end) { + range.end = end.value || end; + } + return range; + }; + FakeMutation.convertToBytes = function(value) { + return Buffer.from(value); + }; + reqOptsCalls = []; + setTimeoutSpy = sinon.stub(global, 'setTimeout').callsFake(fn => fn()); + table.requestStream = function(_, reqOpts) { + reqOptsCalls.push(reqOpts); + var stream = new Stream({ + objectMode: true, + }); + + setImmediate(function() { + stream.emit('request'); + emitters.shift()(stream); + }); + + return stream; + }; + }); + afterEach(function() { + setTimeoutSpy.restore(); + }); + + it('should do a retry the stream is interrupted', function(done) { + emitters = [ + function(stream) { + stream.emit('error', makeRetryableError()); + stream.end(); + }, + function(stream) { + stream.end(); + }, + ]; + callCreateReadStream(null, () => { + assert.strictEqual(reqOptsCalls.length, 2); + done(); + }); + }); + + it('should have a range which starts after the last read key', function(done) { + emitters = [ + function(stream) { + stream.push([{key: 'a'}]); + stream.emit('error', makeRetryableError()); + }, + function(stream) { + stream.end(); + }, + ]; + callCreateReadStream(null, () => { + assert.strictEqual(reqOptsCalls[0].rows, undefined); + assert.deepStrictEqual(reqOptsCalls[1].rows, { + rowRanges: [{start: 'a', startInclusive: false}], + }); + done(); + }); + }); + + it('should move the active range start to after the last read key', function(done) { + emitters = [ + function(stream) { + stream.push([{key: 'a'}]); + stream.emit('error', makeRetryableError()); + }, + function(stream) { + stream.end(); + }, + ]; + callCreateReadStream({ranges: [{start: 'a'}]}, () => { + assert.deepStrictEqual(reqOptsCalls[0].rows, { + rowRanges: [{start: 'a', startInclusive: true}], + }); + assert.deepStrictEqual(reqOptsCalls[1].rows, { + rowRanges: [{start: 'a', startInclusive: false}], + }); + done(); + }); + }); + + it('should remove ranges which were already read', function(done) { + emitters = [ + function(stream) { + stream.push([{key: 'a'}]); + stream.push([{key: 'b'}]); + stream.emit('error', makeRetryableError()); + }, + function(stream) { + stream.push([{key: 'c'}]); + stream.end(); + }, + ]; + callCreateReadStream( + {ranges: [{start: 'a', end: 'b'}, {start: 'c'}]}, + () => { + var allRanges = [ + {start: 'a', end: 'b', startInclusive: true}, + {start: 'c', startInclusive: true}, + ]; + assert.deepStrictEqual(reqOptsCalls[0].rows, { + rowRanges: allRanges, + }); + assert.deepStrictEqual(reqOptsCalls[1].rows, { + rowRanges: allRanges.slice(1), + }); + done(); + } + ); + }); + + it('should remove the keys which were already read', function(done) { + emitters = [ + function(stream) { + stream.push([{key: 'a'}]); + stream.emit('error', makeRetryableError()); + }, + function(stream) { + stream.end([{key: 'c'}]); + }, + ]; + callCreateReadStream({keys: ['a', 'b']}, () => { + assert.strictEqual(reqOptsCalls[0].rows.rowKeys.length, 2); + assert.strictEqual(reqOptsCalls[1].rows.rowKeys.length, 1); + done(); + }); + }); + + it('should remove `keys` if they were all read', function(done) { + emitters = [ + function(stream) { + stream.push([{key: 'a'}]); + stream.emit('error', makeRetryableError()); + }, + function(stream) { + stream.push([{key: 'c'}]); + stream.end(); + }, + ]; + callCreateReadStream({keys: ['a']}, () => { + assert.strictEqual(reqOptsCalls[0].rows.rowKeys.length, 1); + assert.strictEqual(reqOptsCalls[1].rows.rowKeys, undefined); + done(); + }); + }); + }); }); describe('deleteRows', function() {