Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Out of order read rows fix #1231

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import {Transform, TransformOptions} from 'stream';
import {Bytes, Mutation} from './mutation';
import {TableUtils} from './utils/table';

export type Value = string | number | boolean | Uint8Array;

Expand Down Expand Up @@ -259,6 +260,11 @@ export class ChunkTransformer extends Transform {
errorMessage = 'A new row cannot be reset';
} else if (lastRowKey === newRowKey) {
errorMessage = 'A commit happened but the same key followed';
} else if (
typeof lastRowKey !== 'undefined' &&
TableUtils.lessThanOrEqualTo(newRowKey as string, lastRowKey as string)
) {
errorMessage = 'A row key must be strictly increasing';
} else if (!chunk.familyName) {
errorMessage = 'A family must be set';
} else if (chunk.qualifier === null || chunk.qualifier === undefined) {
Expand Down
21 changes: 6 additions & 15 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -786,18 +786,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
};

if (lastRowKey) {
// TODO: lhs and rhs type shouldn't be string, it could be
// string, number, Uint8Array, boolean. Fix the type
// and clean up the casting.
const lessThan = (lhs: string, rhs: string) => {
const lhsBytes = Mutation.convertToBytes(lhs);
const rhsBytes = Mutation.convertToBytes(rhs);
return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1;
};
const greaterThan = (lhs: string, rhs: string) => lessThan(rhs, lhs);
const lessThanOrEqualTo = (lhs: string, rhs: string) =>
!greaterThan(lhs, rhs);

// Readjust and/or remove ranges based on previous valid row reads.
// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
Expand All @@ -810,11 +798,14 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
: range.end;
const startKeyIsRead =
!startValue ||
lessThanOrEqualTo(startValue as string, lastRowKey as string);
TableUtils.lessThanOrEqualTo(
startValue as string,
lastRowKey as string
);
const endKeyIsNotRead =
!endValue ||
(endValue as Buffer).length === 0 ||
lessThan(lastRowKey as string, endValue as string);
TableUtils.lessThan(lastRowKey as string, endValue as string);
if (startKeyIsRead) {
if (endKeyIsNotRead) {
// EndKey is not read, reset the range to start from lastRowKey open
Expand All @@ -831,7 +822,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

// Remove rowKeys already read.
rowKeys = rowKeys.filter(rowKey =>
greaterThan(rowKey, lastRowKey as string)
TableUtils.greaterThan(rowKey, lastRowKey as string)
);

// If there was a row limit in the original request and
Expand Down
18 changes: 18 additions & 0 deletions src/utils/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

import {GetRowsOptions, PrefixRange} from '../table';
import {Mutation} from '../mutation';

export class TableUtils {
static getRanges(options: GetRowsOptions) {
Expand Down Expand Up @@ -49,6 +50,23 @@ export class TableUtils {
return ranges;
}

// TODO: lhs and rhs type shouldn't be string, it could be
// string, number, Uint8Array, boolean. Fix the type
// and clean up the casting.
static lessThan(lhs: string, rhs: string) {
const lhsBytes = Mutation.convertToBytes(lhs);
const rhsBytes = Mutation.convertToBytes(rhs);
return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1;
}

static greaterThan(lhs: string, rhs: string) {
return this.lessThan(rhs, lhs);
}

static lessThanOrEqualTo(lhs: string, rhs: string) {
return !this.greaterThan(lhs, rhs);
}

static createPrefixRange(start: string): PrefixRange {
const prefix = start.replace(new RegExp('[\xff]+$'), '');
let endKey = '';
Expand Down
15 changes: 9 additions & 6 deletions testproxy/services/read-rows.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,15 @@ const readRows = ({clientMap}) =>
const bigtable = clientMap.get(clientId);
const table = getTableInfo(bigtable, tableName);
const rowsOptions = getRowsOptions(readRowsRequest);
const [rows] = await table.getRows(rowsOptions);

return {
status: {code: grpc.status.OK, details: []},
row: rows.map(getRowResponse),
};
try {
const [rows] = await table.getRows(rowsOptions);
return {
status: {code: grpc.status.OK, details: []},
row: rows.map(getRowResponse),
};
} catch (e) {
return {status: e};
}
});

module.exports = readRows;