diff --git a/src/table.ts b/src/table.ts index e558c3cbf..5da77593f 100644 --- a/src/table.ts +++ b/src/table.ts @@ -729,13 +729,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const options = opts || {}; const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3; let activeRequestStream: AbortableDuplex; - let rowKeys: string[] | null; + let rowKeys: string[]; const ranges = options.ranges || []; let filter: {} | null; - let rowsLimit: number; + const rowsLimit = options.limit || 0; + const hasLimit = rowsLimit !== 0; let rowsRead = 0; let numRequestsMade = 0; + rowKeys = options.keys || []; + if (options.start || options.end) { if (options.ranges || options.prefix || options.prefixes) { throw new Error( @@ -748,10 +751,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); }); } - if (options.keys) { - rowKeys = options.keys; - } - if (options.prefix) { if (options.ranges || options.start || options.end || options.prefixes) { throw new Error( @@ -772,12 +771,14 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); }); } - if (options.filter) { - filter = Filter.parse(options.filter); + // If rowKeys and ranges are both empty, the request is a full table scan. + // Add an empty range to simplify the resumption logic. + if (rowKeys.length === 0 && ranges.length === 0) { + ranges.push({}); } - if (options.limit) { - rowsLimit = options.limit; + if (options.filter) { + filter = Filter.parse(options.filter); } const userStream = new PassThrough({objectMode: true}); @@ -785,6 +786,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); userStream.end = () => { rowStream?.unpipe(userStream); if (activeRequestStream) { + // TODO: properly end the stream instead of abort activeRequestStream.abort(); } return end(); @@ -808,90 +810,90 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); }; if (lastRowKey) { + // TODO: lhs and rhs type shouldn't be string, it could be + // string, number, Uint8Array, boolean. Fix the type + // and clean up the casting. const lessThan = (lhs: string, rhs: string) => { const lhsBytes = Mutation.convertToBytes(lhs); const rhsBytes = Mutation.convertToBytes(rhs); return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1; }; const greaterThan = (lhs: string, rhs: string) => lessThan(rhs, lhs); - const greaterThanOrEqualTo = (lhs: string, rhs: string) => - !lessThan(rhs, lhs); - - if (ranges.length === 0) { - ranges.push({ - start: { - value: lastRowKey, - inclusive: false, - }, - }); - } else { - // Readjust and/or remove ranges based on previous valid row reads. - - // Iterate backward since items may need to be removed. - for (let index = ranges.length - 1; index >= 0; index--) { - const range = ranges[index]; - const startValue = is.object(range.start) - ? (range.start as BoundData).value - : range.start; - const endValue = is.object(range.end) - ? (range.end as BoundData).value - : range.end; - const isWithinStart = - !startValue || - greaterThanOrEqualTo(startValue as string, lastRowKey as string); - const isWithinEnd = - !endValue || lessThan(lastRowKey as string, endValue as string); - if (isWithinStart) { - if (isWithinEnd) { - // The lastRowKey is within this range, adjust the start - // value. - range.start = { - value: lastRowKey, - inclusive: false, - }; - } else { - // The lastRowKey is past this range, remove this range. - ranges.splice(index, 1); - } + const lessThanOrEqualTo = (lhs: string, rhs: string) => + !greaterThan(lhs, rhs); + + // Readjust and/or remove ranges based on previous valid row reads. + // Iterate backward since items may need to be removed. + for (let index = ranges.length - 1; index >= 0; index--) { + const range = ranges[index]; + const startValue = is.object(range.start) + ? (range.start as BoundData).value + : range.start; + const endValue = is.object(range.end) + ? (range.end as BoundData).value + : range.end; + const startKeyIsRead = + !startValue || + lessThanOrEqualTo(startValue as string, lastRowKey as string); + const endKeyIsNotRead = + !endValue || + (endValue as Buffer).length === 0 || + lessThan(lastRowKey as string, endValue as string); + if (startKeyIsRead) { + if (endKeyIsNotRead) { + // EndKey is not read, reset the range to start from lastRowKey open + range.start = { + value: lastRowKey, + inclusive: false, + }; + } else { + // EndKey is read, remove this range + ranges.splice(index, 1); } } } // Remove rowKeys already read. - if (rowKeys) { - rowKeys = rowKeys.filter(rowKey => - greaterThan(rowKey, lastRowKey as string) - ); - if (rowKeys.length === 0) { - rowKeys = null; - } - } - } - if (rowKeys || ranges.length) { - reqOpts.rows = {}; + rowKeys = rowKeys.filter(rowKey => + greaterThan(rowKey, lastRowKey as string) + ); - if (rowKeys) { - reqOpts.rows.rowKeys = rowKeys.map( - Mutation.convertToBytes - ) as {} as Uint8Array[]; + // If there was a row limit in the original request and + // we've already read all the rows, end the stream and + // do not retry. + if (hasLimit && rowsLimit === rowsRead) { + userStream.end(); + return; } - - if (ranges.length) { - reqOpts.rows.rowRanges = ranges.map(range => - Filter.createRange( - range.start as BoundData, - range.end as BoundData, - 'Key' - ) - ); + // If all the row keys and ranges are read, end the stream + // and do not retry. + if (rowKeys.length === 0 && ranges.length === 0) { + userStream.end(); + return; } } + // Create the new reqOpts + reqOpts.rows = {}; + + // TODO: preprocess all the keys and ranges to Bytes + reqOpts.rows.rowKeys = rowKeys.map( + Mutation.convertToBytes + ) as {} as Uint8Array[]; + + reqOpts.rows.rowRanges = ranges.map(range => + Filter.createRange( + range.start as BoundData, + range.end as BoundData, + 'Key' + ) + ); + if (filter) { reqOpts.filter = filter; } - if (rowsLimit) { + if (hasLimit) { reqOpts.rowsLimit = rowsLimit - rowsRead; } diff --git a/system-test/data/read-rows-retry-test.json b/system-test/data/read-rows-retry-test.json index 5136b9973..037c0e1f6 100644 --- a/system-test/data/read-rows-retry-test.json +++ b/system-test/data/read-rows-retry-test.json @@ -6,7 +6,10 @@ "name": "simple read", "max_retries": 3, "request_options": [ - {} + { + "rowKeys": [], + "rowRanges": [{}] + } ], "responses": [ { "row_keys": [ "a", "b", "c" ] } @@ -22,8 +25,10 @@ "name": "retries a failed read", "max_retries": 3, "request_options": [ - {}, - { "rowRanges": [ { "startKeyOpen": "b" } ] } + { "rowKeys": [], + "rowRanges": [{}] + }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] } ], "responses": [ { "row_keys": [ "a", "b" ], "end_with_error": 4 }, @@ -41,7 +46,18 @@ "name": "fails after all available retries", "max_retries": 3, "request_options": [ - {}, {}, {}, {} + { "rowKeys": [], + "rowRanges": [{}] + }, + { "rowKeys": [], + "rowRanges": [{}] + }, + { "rowKeys": [], + "rowRanges": [{}] + }, + { "rowKeys": [], + "rowRanges": [{}] + } ], "responses": [ { "end_with_error": 4 }, @@ -62,14 +78,16 @@ "name": "resets the retry counter after a successful read", "max_retries": 3, "request_options": [ - {}, - { "rowRanges": [ { "startKeyOpen": "a" } ] }, - { "rowRanges": [ { "startKeyOpen": "a" } ] }, - { "rowRanges": [ { "startKeyOpen": "a" } ] }, - { "rowRanges": [ { "startKeyOpen": "a" } ] }, - { "rowRanges": [ { "startKeyOpen": "b" } ] }, - { "rowRanges": [ { "startKeyOpen": "b" } ] }, - { "rowRanges": [ { "startKeyOpen": "b" } ] } + { "rowKeys": [], + "rowRanges": [{}] + }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] } ], "responses": [ { "row_keys": [ "a" ], "end_with_error": 4 }, @@ -98,8 +116,8 @@ }] }, "request_options": [ - { "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "z" } ] }, - { "rowRanges": [ { "startKeyOpen": "b", "endKeyClosed": "z" } ] } + { "rowKeys": [], "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "z" } ] }, + { "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b", "endKeyClosed": "z" } ] } ], "responses": [ { "row_keys": [ "a", "b" ], "end_with_error": 4 }, @@ -126,11 +144,13 @@ }] }, "request_options": [ - { "rowRanges": [ + { "rowKeys": [], + "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "c" }, { "startKeyClosed": "x", "endKeyClosed": "z" } ] }, - { "rowRanges": [ { "startKeyClosed": "x", "endKeyClosed": "z" } ] } + { "rowKeys": [], + "rowRanges": [ { "startKeyClosed": "x", "endKeyClosed": "z" } ] } ], "responses": [ { "row_keys": [ "a", "b", "c" ], "end_with_error": 4 }, @@ -151,8 +171,8 @@ "keys": ["a", "b", "x"] }, "request_options": [ - { "rowKeys": [ "a", "b", "x" ] }, - { "rowKeys": [ "x" ], "rowRanges": [ { "startKeyOpen": "c" } ] } + { "rowKeys": [ "a", "b", "x" ], "rowRanges": [] }, + { "rowKeys": [ "x" ], "rowRanges": [] } ], "responses": [ { "row_keys": [ "a", "b", "c" ], "end_with_error": 4 }, @@ -172,8 +192,8 @@ "limit": 10 }, "request_options": [ - { "rowsLimit": 10 }, - { "rowsLimit": 8, "rowRanges": [ { "startKeyOpen": "b" } ] } + { "rowKeys": [], "rowRanges": [{}], "rowsLimit": 10 }, + { "rowsLimit": 8, "rowKeys":[], "rowRanges": [ { "startKeyOpen": "b" } ] } ], "responses": [ { "row_keys": [ "a", "b" ], "end_with_error": 4 }, diff --git a/test/table.ts b/test/table.ts index 9ba6acd78..50b8064d4 100644 --- a/test/table.ts +++ b/test/table.ts @@ -1268,9 +1268,12 @@ describe('Bigtable/Table', () => { }) as {} as EventEmitter, ]; + const fullScan = {rowKeys: [], rowRanges: [{}]}; + callCreateReadStream(null, () => { - assert.strictEqual(reqOptsCalls[0].rows, undefined); + assert.deepStrictEqual(reqOptsCalls[0].rows, fullScan); assert.deepStrictEqual(reqOptsCalls[1].rows, { + rowKeys: [], rowRanges: [{start: 'a', startInclusive: false}], }); done(); @@ -1290,9 +1293,11 @@ describe('Bigtable/Table', () => { callCreateReadStream({ranges: [{start: 'a'}]}, () => { assert.deepStrictEqual(reqOptsCalls[0].rows, { + rowKeys: [], rowRanges: [{start: 'a', startInclusive: true}], }); assert.deepStrictEqual(reqOptsCalls[1].rows, { + rowKeys: [], rowRanges: [{start: 'a', startInclusive: false}], }); done(); @@ -1322,9 +1327,11 @@ describe('Bigtable/Table', () => { {start: 'c', startInclusive: true}, ]; assert.deepStrictEqual(reqOptsCalls[0].rows, { + rowKeys: [], rowRanges: allRanges, }); assert.deepStrictEqual(reqOptsCalls[1].rows, { + rowKeys: [], rowRanges: allRanges.slice(1), }); done(); @@ -1349,21 +1356,78 @@ describe('Bigtable/Table', () => { }); }); - it('should remove `keys` if they were all read', done => { + it('should not retry if limit is reached', done => { emitters = [ ((stream: Duplex) => { stream.push([{key: 'a'}]); + stream.push([{key: 'b'}]); stream.emit('error', makeRetryableError()); }) as {} as EventEmitter, + ]; + + const options = { + ranges: [{start: 'a', end: 'c'}], + limit: 2, + }; + + callCreateReadStream(options, () => { + assert.strictEqual(reqOptsCalls.length, 1); + done(); + }); + }); + + it('should not retry if all the keys are read', done => { + emitters = [ ((stream: Duplex) => { - stream.push([{key: 'c'}]); - stream.end(); + stream.push([{key: 'a'}]); + stream.emit('error', makeRetryableError()); }) as {} as EventEmitter, ]; callCreateReadStream({keys: ['a']}, () => { - assert.strictEqual(reqOptsCalls[0].rows.rowKeys.length, 1); - assert.strictEqual(reqOptsCalls[1].rows.rowKeys, undefined); + assert.strictEqual(reqOptsCalls.length, 1); + done(); + }); + }); + + it('shouldn not retry if all the ranges are read', done => { + emitters = [ + ((stream: Duplex) => { + stream.push([{key: 'c'}]); + stream.emit('error', makeRetryableError()); + }) as {} as EventEmitter, + ]; + + const options = { + ranges: [{start: 'a', end: 'c', endInclusive: true}], + }; + + callCreateReadStream(options, () => { + assert.strictEqual(reqOptsCalls.length, 1); + assert.deepStrictEqual(reqOptsCalls[0].rows, { + rowKeys: [], + rowRanges: [{start: 'a', end: 'c', startInclusive: true}], + }); + done(); + }); + }); + + it('shouldn not retry with keys and ranges that are read', done => { + emitters = [ + ((stream: Duplex) => { + stream.push([{key: 'a1'}]); + stream.push([{key: 'd'}]); + stream.emit('error', makeRetryableError()); + }) as {} as EventEmitter, + ]; + + const options = { + ranges: [{start: 'a', end: 'b'}], + keys: ['c', 'd'], + }; + + callCreateReadStream(options, () => { + assert.strictEqual(reqOptsCalls.length, 1); done(); }); });