Skip to content

Commit

Permalink
fix: fix createReadStream retry from sending a full table scan (#1026)
Browse files Browse the repository at this point in the history
* fix: fix createReadStream retry from sending a full table scan

* update comments

* add a comment and remove unnecessary if condition

* More clean up

* lint
  • Loading branch information
mutianf authored Mar 16, 2022
1 parent ac6d1c2 commit f926992
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 100 deletions.
150 changes: 76 additions & 74 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -729,13 +729,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const options = opts || {};
const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3;
let activeRequestStream: AbortableDuplex;
let rowKeys: string[] | null;
let rowKeys: string[];
const ranges = options.ranges || [];
let filter: {} | null;
let rowsLimit: number;
const rowsLimit = options.limit || 0;
const hasLimit = rowsLimit !== 0;
let rowsRead = 0;
let numRequestsMade = 0;

rowKeys = options.keys || [];

if (options.start || options.end) {
if (options.ranges || options.prefix || options.prefixes) {
throw new Error(
Expand All @@ -748,10 +751,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
});
}

if (options.keys) {
rowKeys = options.keys;
}

if (options.prefix) {
if (options.ranges || options.start || options.end || options.prefixes) {
throw new Error(
Expand All @@ -772,19 +771,22 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
});
}

if (options.filter) {
filter = Filter.parse(options.filter);
// If rowKeys and ranges are both empty, the request is a full table scan.
// Add an empty range to simplify the resumption logic.
if (rowKeys.length === 0 && ranges.length === 0) {
ranges.push({});
}

if (options.limit) {
rowsLimit = options.limit;
if (options.filter) {
filter = Filter.parse(options.filter);
}

const userStream = new PassThrough({objectMode: true});
const end = userStream.end.bind(userStream);
userStream.end = () => {
rowStream?.unpipe(userStream);
if (activeRequestStream) {
// TODO: properly end the stream instead of abort
activeRequestStream.abort();
}
return end();
Expand All @@ -808,90 +810,90 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
};

if (lastRowKey) {
// TODO: lhs and rhs type shouldn't be string, it could be
// string, number, Uint8Array, boolean. Fix the type
// and clean up the casting.
const lessThan = (lhs: string, rhs: string) => {
const lhsBytes = Mutation.convertToBytes(lhs);
const rhsBytes = Mutation.convertToBytes(rhs);
return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1;
};
const greaterThan = (lhs: string, rhs: string) => lessThan(rhs, lhs);
const greaterThanOrEqualTo = (lhs: string, rhs: string) =>
!lessThan(rhs, lhs);

if (ranges.length === 0) {
ranges.push({
start: {
value: lastRowKey,
inclusive: false,
},
});
} else {
// Readjust and/or remove ranges based on previous valid row reads.

// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
const range = ranges[index];
const startValue = is.object(range.start)
? (range.start as BoundData).value
: range.start;
const endValue = is.object(range.end)
? (range.end as BoundData).value
: range.end;
const isWithinStart =
!startValue ||
greaterThanOrEqualTo(startValue as string, lastRowKey as string);
const isWithinEnd =
!endValue || lessThan(lastRowKey as string, endValue as string);
if (isWithinStart) {
if (isWithinEnd) {
// The lastRowKey is within this range, adjust the start
// value.
range.start = {
value: lastRowKey,
inclusive: false,
};
} else {
// The lastRowKey is past this range, remove this range.
ranges.splice(index, 1);
}
const lessThanOrEqualTo = (lhs: string, rhs: string) =>
!greaterThan(lhs, rhs);

// Readjust and/or remove ranges based on previous valid row reads.
// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
const range = ranges[index];
const startValue = is.object(range.start)
? (range.start as BoundData).value
: range.start;
const endValue = is.object(range.end)
? (range.end as BoundData).value
: range.end;
const startKeyIsRead =
!startValue ||
lessThanOrEqualTo(startValue as string, lastRowKey as string);
const endKeyIsNotRead =
!endValue ||
(endValue as Buffer).length === 0 ||
lessThan(lastRowKey as string, endValue as string);
if (startKeyIsRead) {
if (endKeyIsNotRead) {
// EndKey is not read, reset the range to start from lastRowKey open
range.start = {
value: lastRowKey,
inclusive: false,
};
} else {
// EndKey is read, remove this range
ranges.splice(index, 1);
}
}
}

// Remove rowKeys already read.
if (rowKeys) {
rowKeys = rowKeys.filter(rowKey =>
greaterThan(rowKey, lastRowKey as string)
);
if (rowKeys.length === 0) {
rowKeys = null;
}
}
}
if (rowKeys || ranges.length) {
reqOpts.rows = {};
rowKeys = rowKeys.filter(rowKey =>
greaterThan(rowKey, lastRowKey as string)
);

if (rowKeys) {
reqOpts.rows.rowKeys = rowKeys.map(
Mutation.convertToBytes
) as {} as Uint8Array[];
// If there was a row limit in the original request and
// we've already read all the rows, end the stream and
// do not retry.
if (hasLimit && rowsLimit === rowsRead) {
userStream.end();
return;
}

if (ranges.length) {
reqOpts.rows.rowRanges = ranges.map(range =>
Filter.createRange(
range.start as BoundData,
range.end as BoundData,
'Key'
)
);
// If all the row keys and ranges are read, end the stream
// and do not retry.
if (rowKeys.length === 0 && ranges.length === 0) {
userStream.end();
return;
}
}

// Create the new reqOpts
reqOpts.rows = {};

// TODO: preprocess all the keys and ranges to Bytes
reqOpts.rows.rowKeys = rowKeys.map(
Mutation.convertToBytes
) as {} as Uint8Array[];

reqOpts.rows.rowRanges = ranges.map(range =>
Filter.createRange(
range.start as BoundData,
range.end as BoundData,
'Key'
)
);

if (filter) {
reqOpts.filter = filter;
}

if (rowsLimit) {
if (hasLimit) {
reqOpts.rowsLimit = rowsLimit - rowsRead;
}

Expand Down
60 changes: 40 additions & 20 deletions system-test/data/read-rows-retry-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"name": "simple read",
"max_retries": 3,
"request_options": [
{}
{
"rowKeys": [],
"rowRanges": [{}]
}
],
"responses": [
{ "row_keys": [ "a", "b", "c" ] }
Expand All @@ -22,8 +25,10 @@
"name": "retries a failed read",
"max_retries": 3,
"request_options": [
{},
{ "rowRanges": [ { "startKeyOpen": "b" } ] }
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] }
],
"responses": [
{ "row_keys": [ "a", "b" ], "end_with_error": 4 },
Expand All @@ -41,7 +46,18 @@
"name": "fails after all available retries",
"max_retries": 3,
"request_options": [
{}, {}, {}, {}
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [],
"rowRanges": [{}]
}
],
"responses": [
{ "end_with_error": 4 },
Expand All @@ -62,14 +78,16 @@
"name": "resets the retry counter after a successful read",
"max_retries": 3,
"request_options": [
{},
{ "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowRanges": [ { "startKeyOpen": "b" } ] },
{ "rowRanges": [ { "startKeyOpen": "b" } ] },
{ "rowRanges": [ { "startKeyOpen": "b" } ] }
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] }
],
"responses": [
{ "row_keys": [ "a" ], "end_with_error": 4 },
Expand Down Expand Up @@ -98,8 +116,8 @@
}]
},
"request_options": [
{ "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "z" } ] },
{ "rowRanges": [ { "startKeyOpen": "b", "endKeyClosed": "z" } ] }
{ "rowKeys": [], "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "z" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b", "endKeyClosed": "z" } ] }
],
"responses": [
{ "row_keys": [ "a", "b" ], "end_with_error": 4 },
Expand All @@ -126,11 +144,13 @@
}]
},
"request_options": [
{ "rowRanges": [
{ "rowKeys": [],
"rowRanges": [
{ "startKeyClosed": "a", "endKeyClosed": "c" },
{ "startKeyClosed": "x", "endKeyClosed": "z" }
] },
{ "rowRanges": [ { "startKeyClosed": "x", "endKeyClosed": "z" } ] }
{ "rowKeys": [],
"rowRanges": [ { "startKeyClosed": "x", "endKeyClosed": "z" } ] }
],
"responses": [
{ "row_keys": [ "a", "b", "c" ], "end_with_error": 4 },
Expand All @@ -151,8 +171,8 @@
"keys": ["a", "b", "x"]
},
"request_options": [
{ "rowKeys": [ "a", "b", "x" ] },
{ "rowKeys": [ "x" ], "rowRanges": [ { "startKeyOpen": "c" } ] }
{ "rowKeys": [ "a", "b", "x" ], "rowRanges": [] },
{ "rowKeys": [ "x" ], "rowRanges": [] }
],
"responses": [
{ "row_keys": [ "a", "b", "c" ], "end_with_error": 4 },
Expand All @@ -172,8 +192,8 @@
"limit": 10
},
"request_options": [
{ "rowsLimit": 10 },
{ "rowsLimit": 8, "rowRanges": [ { "startKeyOpen": "b" } ] }
{ "rowKeys": [], "rowRanges": [{}], "rowsLimit": 10 },
{ "rowsLimit": 8, "rowKeys":[], "rowRanges": [ { "startKeyOpen": "b" } ] }
],
"responses": [
{ "row_keys": [ "a", "b" ], "end_with_error": 4 },
Expand Down
Loading

0 comments on commit f926992

Please sign in to comment.