Skip to content

Commit

Permalink
Improve transform perf (#46)
Browse files Browse the repository at this point in the history
* Avoid creating Buffers in convertFromBytes when possible.

This gave a 50% increase in performance for baseline chunktransformer transforms.

* Avoid convertFromBytes when validating chunks in chunktransformer.

* Only attempt expensive number decoding when necessary.

* Wrapped options as userOptions; pass isPossibleNumber as option
  • Loading branch information
kolodny authored and stephenplusplus committed Mar 15, 2018
1 parent 7abb15d commit e0237cc
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 40 deletions.
57 changes: 34 additions & 23 deletions src/chunktransformer.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ ChunkTransformer.prototype.validateResetRow = function(chunk) {
* Validates state for new row.
* @private
* @param {chunk} chunk chunk to validate
* @param {newRowKey} newRowKey newRowKey of the new row
*/
ChunkTransformer.prototype.validateNewRow = function(chunk) {
ChunkTransformer.prototype.validateNewRow = function(chunk, newRowKey) {
const row = this.row;
const prevRowKey = this.prevRowKey;
const newRowKey = Mutation.convertFromBytes(chunk.rowKey);
let errorMessage;

if (typeof row.key !== 'undefined') {
Expand Down Expand Up @@ -224,15 +224,30 @@ ChunkTransformer.prototype.validateNewRow = function(chunk) {
*/
ChunkTransformer.prototype.validateRowInProgress = function(chunk) {
const row = this.row;
const newRowKey = Mutation.convertFromBytes(chunk.rowKey);
let errorMessage;
if (chunk.rowKey && newRowKey !== '' && newRowKey !== row.key) {
errorMessage = 'A commit is required between row keys';
} else if (chunk.familyName && !chunk.qualifier) {
errorMessage = 'A qualifier must be specified';
if (chunk.rowKey && chunk.rowKey.length) {
const newRowKey = Mutation.convertFromBytes(chunk.rowKey);
if (
newRowKey &&
chunk.rowKey &&
newRowKey !== '' &&
newRowKey !== row.key
) {
this.destroy(
new TransformError({
message: 'A commit is required between row keys',
chunk,
})
);
return;
}
}
if (errorMessage) {
this.destroy(new TransformError({message: errorMessage, chunk}));
if (chunk.familyName && !chunk.qualifier) {
this.destroy(
new TransformError({
message: 'A qualifier must be specified',
chunk,
})
);
return;
}
this.validateResetRow(chunk);
Expand Down Expand Up @@ -270,13 +285,10 @@ ChunkTransformer.prototype.moveToNextState = function(chunk) {
* Process chunk when in NEW_ROW state.
* @private
* @param {chunks} chunk chunk to process
* @param {options} options options
* @param {callback} callback callback to call with row as and when generates
* @returns {boolean} return false to stop further processing.
*/
ChunkTransformer.prototype.processNewRow = function(chunk) {
const newRowKey = Mutation.convertFromBytes(chunk.rowKey);
this.validateNewRow(chunk);
this.validateNewRow(chunk, newRowKey);
if (chunk.familyName && chunk.qualifier) {
const row = this.row;
row.key = newRowKey;
Expand All @@ -285,7 +297,10 @@ ChunkTransformer.prototype.processNewRow = function(chunk) {
const qualifierName = Mutation.convertFromBytes(chunk.qualifier.value);
this.qualifiers = this.family[qualifierName] = [];
this.qualifier = {
value: Mutation.convertFromBytes(chunk.value, this.options),
value: Mutation.convertFromBytes(chunk.value, {
userOptions: this.options,
isPossibleNumber: true,
}),
labels: chunk.labels,
timestamp: chunk.timestampMicros,
};
Expand All @@ -297,9 +312,6 @@ ChunkTransformer.prototype.processNewRow = function(chunk) {
* Process chunk when in ROW_IN_PROGRESS state.
* @private
* @param {chunk} chunk chunk to process
* @param {*} options option
* @param {*} callback callback to call with row as and when generates
* @returns {boolean} return false to stop further processing.
*/
ChunkTransformer.prototype.processRowInProgress = function(chunk) {
this.validateRowInProgress(chunk);
Expand All @@ -317,7 +329,7 @@ ChunkTransformer.prototype.processRowInProgress = function(chunk) {
this.family[qualifierName] || [];
}
this.qualifier = {
value: Mutation.convertFromBytes(chunk.value, this.options),
value: Mutation.convertFromBytes(chunk.value, {userOptions: this.options}),
labels: chunk.labels,
timestamp: chunk.timestampMicros,
};
Expand All @@ -328,16 +340,15 @@ ChunkTransformer.prototype.processRowInProgress = function(chunk) {
* Process chunk when in CELl_IN_PROGRESS state.
* @private
* @param {chunk} chunk chunk to process
* @param {options} options options
* @param {callback} callback callback to call with row as and when generates
* @returns {boolean} return false to stop further processing.
*/
ChunkTransformer.prototype.processCellInProgress = function(chunk) {
this.validateCellInProgress(chunk);
if (chunk.resetRow) {
return this.reset();
}
this.qualifier.value += Mutation.convertFromBytes(chunk.value, this.options);
this.qualifier.value += Mutation.convertFromBytes(chunk.value, {
userOptions: this.options,
});
this.moveToNextState(chunk);
};

Expand Down
12 changes: 7 additions & 5 deletions src/mutation.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ var methods = (Mutation.methods = {
* @returns {string|number|buffer}
*/
Mutation.convertFromBytes = function(bytes, options) {
var buf = Buffer.from(bytes, 'base64');
var num = new Int64(buf).toNumber();
var buf = bytes instanceof Buffer ? bytes : Buffer.from(bytes, 'base64');
if (options && options.isPossibleNumber) {
var num = new Int64(buf).toNumber();

if (!isNaN(num) && isFinite(num)) {
return num;
if (!isNaN(num) && isFinite(num)) {
return num;
}
}

if (options && options.decode === false) {
if (options && options.userOptions && options.userOptions.decode === false) {
return buf;
}

Expand Down
10 changes: 6 additions & 4 deletions src/row.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Row.formatChunks_ = function(chunks, options) {
row.data = row.data || {};

if (chunk.rowKey) {
row.key = Mutation.convertFromBytes(chunk.rowKey, options);
row.key = Mutation.convertFromBytes(chunk.rowKey, {userOptions: options});
}

if (chunk.familyName) {
Expand All @@ -112,7 +112,9 @@ Row.formatChunks_ = function(chunks, options) {
}

if (chunk.qualifier) {
qualifierName = Mutation.convertFromBytes(chunk.qualifier.value, options);
qualifierName = Mutation.convertFromBytes(chunk.qualifier.value, {
userOptions: options,
});
}

if (family && qualifierName) {
Expand All @@ -121,7 +123,7 @@ Row.formatChunks_ = function(chunks, options) {

if (qualifier && chunk.value) {
qualifier.push({
value: Mutation.convertFromBytes(chunk.value, options),
value: Mutation.convertFromBytes(chunk.value, {userOptions: options}),
labels: chunk.labels,
timestamp: chunk.timestampMicros,
size: chunk.valueSize,
Expand Down Expand Up @@ -194,7 +196,7 @@ Row.formatFamilies_ = function(families, options) {
var value = cell.value;

if (options.decode !== false) {
value = Mutation.convertFromBytes(value);
value = Mutation.convertFromBytes(value, {isPossibleNumber: true});
}

return {
Expand Down
33 changes: 27 additions & 6 deletions test/mutation.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,23 @@ describe('Bigtable/Mutation', function() {
});

describe('convertFromBytes', function() {
it('should convert a base64 encoded number', function() {
var num = 10;
var encoded = new Int64(num).toBuffer().toString('base64');
var decoded = Mutation.convertFromBytes(encoded);
describe('isPossibleNumber', function() {
it('should convert a base64 encoded number when true', function() {
var num = 10;
var encoded = new Int64(num).toBuffer().toString('base64');
var decoded = Mutation.convertFromBytes(encoded, {
isPossibleNumber: true,
});

assert.strictEqual(num, decoded);
assert.strictEqual(num, decoded);
});
it('should not convert a base64 encoded number when false', function() {
var num = 10;
var encoded = new Int64(num).toBuffer().toString('base64');
var decoded = Mutation.convertFromBytes(encoded);

assert.notEqual(num, decoded);
});
});

it('should convert a base64 encoded string', function() {
Expand All @@ -63,13 +74,23 @@ describe('Bigtable/Mutation', function() {
it('should return a buffer if decode is set to false', function() {
var message = 'Hello!';
var encoded = Buffer.from(message).toString('base64');
const userOptions = {decode: false};
var decoded = Mutation.convertFromBytes(encoded, {
decode: false,
userOptions: userOptions,
});

assert(decoded instanceof Buffer);
assert.strictEqual(decoded.toString(), message);
});

it('should not create a new Buffer needlessly', function() {
var message = 'Hello!';
var encoded = Buffer.from(message);
const stub = sinon.stub(Buffer, 'from');
const decoded = Mutation.convertFromBytes(encoded);
assert.strictEqual(stub.called, false);
assert.strictEqual(decoded.toString(), message);
});
});

describe('convertToBytes', function() {
Expand Down
4 changes: 2 additions & 2 deletions test/row.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ describe('Bigtable/Row', function() {
};

FakeMutation.convertFromBytes = sinon.spy(function(val, options) {
assert.strictEqual(options, formatOptions);
assert.deepStrictEqual(options, {userOptions: formatOptions});
return val.replace('unconverted', 'converted');
});

Expand Down Expand Up @@ -381,7 +381,7 @@ describe('Bigtable/Row', function() {
// 1 === qualifier
// 2 === value
var args = FakeMutation.convertFromBytes.getCall(2).args;
assert.strictEqual(args[1], formatOptions);
assert.deepStrictEqual(args[1], {userOptions: formatOptions});
});

it('should discard old data when reset row is found', function() {
Expand Down

0 comments on commit e0237cc

Please sign in to comment.