Skip to content

Commit

Permalink
Smart read retries (googleapis#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolodny authored and stephenplusplus committed Jan 19, 2018
1 parent 2cf623c commit e4c7cc6
Show file tree
Hide file tree
Showing 6 changed files with 809 additions and 43 deletions.
5 changes: 5 additions & 0 deletions src/chunktransformer.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ function ChunkTransformer(options) {
this.options.objectMode = true; // forcing object mode
Transform.call(this, options);
this._destroyed = false;
this.lastRowKey = undefined;
this.reset();
}
util.inherits(ChunkTransformer, Transform);
Expand Down Expand Up @@ -86,6 +87,9 @@ ChunkTransformer.prototype._transform = function(data, enc, next) {
return;
}
}
if (data.lastScannedRowKey) {
this.lastRowKey = data.lastScannedRowKey;
}
next();
};

Expand Down Expand Up @@ -249,6 +253,7 @@ ChunkTransformer.prototype.moveToNextState = function(chunk) {
if (chunk.commitRow) {
this.push(row);
this.commit();
this.lastRowKey = row.key;
} else {
if (chunk.valueSize > 0) {
this.state = RowStateEnum.CELL_IN_PROGRESS;
Expand Down
184 changes: 142 additions & 42 deletions src/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ const ChunkTransformer = require('./chunktransformer.js');

// See protos/google/rpc/code.proto
// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
const RETRY_STATUS_CODES = new Set([4, 10, 14]);
const GRPC_RETRYABLE_STATUS_CODES = new Set([4, 10, 14]);

// (409=ABORTED, 503=UNAVAILABLE, 14=DEADLINE_EXCEEDED)
const HTTP_RETRYABLE_STATUS_CODES = new Set([409, 503, 504]);

/**
* Create a Table object to interact with a Cloud Bigtable table.
Expand Down Expand Up @@ -445,64 +448,161 @@ Table.prototype.createReadStream = function(options) {
var self = this;

options = options || {};
options.ranges = options.ranges || [];

var grpcOpts = {
service: 'Bigtable',
method: 'readRows',
};
let maxRetries = is.number(this.maxRetries) ? this.maxRetries : 3;

var reqOpts = {
tableName: this.id,
objectMode: true,
};
let rowKeys;
let ranges = options.ranges || [];
let filter;
let rowsLimit;
let rowsRead = 0;
let numRequestsMade = 0;

if (options.start || options.end) {
options.ranges.push({
ranges.push({
start: options.start,
end: options.end,
});
}

if (options.prefix) {
options.ranges.push(Table.createPrefixRange_(options.prefix));
if (options.keys) {
rowKeys = options.keys;
}

if (options.keys || options.ranges.length) {
reqOpts.rows = {};

if (options.keys) {
reqOpts.rows.rowKeys = options.keys.map(Mutation.convertToBytes);
}

if (options.ranges.length) {
reqOpts.rows.rowRanges = options.ranges.map(function(range) {
return Filter.createRange(range.start, range.end, 'Key');
});
}
if (options.prefix) {
ranges.push(Table.createPrefixRange_(options.prefix));
}

if (options.filter) {
reqOpts.filter = Filter.parse(options.filter);
filter = Filter.parse(options.filter);
}

if (options.limit) {
reqOpts.rowsLimit = options.limit;
rowsLimit = options.limit;
}
const chunkTransformer = new ChunkTransformer({decode: options.decode});
const stream = pumpify.obj([
this.requestStream(grpcOpts, reqOpts),
chunkTransformer,
through.obj(function(rowData, enc, next) {
if (stream._ended) {
return next();

const userStream = through.obj();
let chunkTransformer;

const makeNewRequest = () => {
let lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';
chunkTransformer = new ChunkTransformer({decode: options.decode});
var grpcOpts = {
service: 'Bigtable',
method: 'readRows',
retryOpts: {
currentRetryAttempt: numRequestsMade,
},
};

var reqOpts = {
tableName: this.id,
objectMode: true,
};
if (lastRowKey) {
const lessThan = (lhs, rhs) => {
const lhsBytes = Mutation.convertToBytes(lhs);
const rhsBytes = Mutation.convertToBytes(rhs);
return lhsBytes.compare(rhsBytes) === -1;
};
const greaterThan = (lhs, rhs) => lessThan(rhs, lhs);
const greaterThanOrEqualTo = (lhs, rhs) => !lessThan(rhs, lhs);

if (ranges.length === 0) {
ranges.push({
start: {
value: lastRowKey,
inclusive: false,
},
});
} else {
// Readjust and/or remove ranges based on previous valid row reads.

// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
const range = ranges[index];
const startValue = is.object(range.start)
? range.start.value
: range.start;
const endValue = is.object(range.end) ? range.end.value : range.end;
const isWithinStart =
!startValue || greaterThanOrEqualTo(startValue, lastRowKey);
const isWithinEnd = !endValue || lessThan(lastRowKey, endValue);
if (isWithinStart) {
if (isWithinEnd) {
// The lastRowKey is within this range, adjust the start value.
range.start = {
value: lastRowKey,
inclusive: false,
};
} else {
// The lastRowKey is past this range, remove this range.
ranges.splice(index, 1);
}
}
}
}
const row = self.row(rowData.key);
row.data = rowData.data;
next(null, row);
}),
]);
return stream;

// Remove rowKeys already read.
if (rowKeys) {
rowKeys = rowKeys.filter(rowKey => greaterThan(rowKey, lastRowKey));
if (rowKeys.length === 0) {
rowKeys = null;
}
}
}
if (rowKeys || ranges.length) {
reqOpts.rows = {};
if (rowKeys) {
reqOpts.rows.rowKeys = rowKeys.map(Mutation.convertToBytes);
}
if (ranges.length) {
reqOpts.rows.rowRanges = ranges.map(function(range) {
return Filter.createRange(range.start, range.end, 'Key');
});
}
}
if (filter) {
reqOpts.filter = filter;
}

if (rowsLimit) {
reqOpts.rowsLimit = rowsLimit - rowsRead;
}

const requestStream = this.requestStream(grpcOpts, reqOpts);
requestStream.on('request', () => numRequestsMade++);

const rowStream = pumpify.obj([
requestStream,
chunkTransformer,
through.obj(function(rowData, enc, next) {
if (chunkTransformer._destroyed || userStream._writableState.ended) {
return next();
}
numRequestsMade = 0;
rowsRead++;
const row = self.row(rowData.key);
row.data = rowData.data;
next(null, row);
}),
]);

rowStream.on('error', error => {
rowStream.unpipe(userStream);
if (
numRequestsMade <= maxRetries &&
HTTP_RETRYABLE_STATUS_CODES.has(error.code)
) {
makeNewRequest();
} else {
userStream.emit('error', error);
}
});
rowStream.pipe(userStream);
};

makeNewRequest();
return userStream;
};

/**
Expand Down Expand Up @@ -987,7 +1087,7 @@ Table.prototype.mutate = function(entries, callback) {
return;
}

if (!RETRY_STATUS_CODES.has(entry.status.code)) {
if (!GRPC_RETRYABLE_STATUS_CODES.has(entry.status.code)) {
pendingEntryIndices.delete(originalEntriesIndex);
}

Expand Down
Loading

0 comments on commit e4c7cc6

Please sign in to comment.