Skip to content

Commit

Permalink
spanner: add object caching (#2362)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored and stephenplusplus committed Jul 17, 2017
1 parent ec411f4 commit 6b7c948
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 46 deletions.
9 changes: 7 additions & 2 deletions packages/spanner/src/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ Database.formatName_ = function(instanceName, name) {
* });
*/
Database.prototype.close = function(callback) {
var self = this;

this.pool_.clear().then(function() {
self.parent.databases_.delete(self.id);
callback(null);
}, function(err) {
callback(err || new Error('Unable to close database connection.'));
Expand Down Expand Up @@ -327,9 +330,11 @@ Database.prototype.createTable = function(schema, callback) {
* });
*/
Database.prototype.delete = function(callback) {
return this.api.Database.dropDatabase({
var reqOpts = {
database: this.formattedName_
}, callback);
};

return this.api.Database.dropDatabase(reqOpts, callback);
};

/**
Expand Down
9 changes: 8 additions & 1 deletion packages/spanner/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var Instance = require('./instance.js');

var v1 = require('./v1');


/**
* [Cloud Spanner](https://cloud.google.com/spanner) is a highly scalable,
* transactional, managed, NewSQL database service. Cloud Spanner solves the
Expand Down Expand Up @@ -91,6 +92,8 @@ function Spanner(options) {
};

commonGrpc.Service.call(this, config, options);

this.instances_ = new Map();
}

util.inherits(Spanner, commonGrpc.Service);
Expand Down Expand Up @@ -464,7 +467,11 @@ Spanner.prototype.instance = function(name) {
throw new Error('A name is required to access an Instance object.');
}

return new Instance(this, name);
if (!this.instances_.has(name)) {
this.instances_.set(name, new Instance(this, name));
}

return this.instances_.get(name);
};

/**
Expand Down
22 changes: 19 additions & 3 deletions packages/spanner/src/instance.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ function Instance(spanner, name) {
spanner.createInstance(self.formattedName_, options, callback);
}
});

this.databases_ = new Map();
}

util.inherits(Instance, commonGrpc.ServiceObject);
Expand Down Expand Up @@ -283,7 +285,11 @@ Instance.prototype.database = function(name, poolOptions) {
throw new Error('A name is required to access a Database object.');
}

return new Database(this, name, poolOptions);
if (!this.databases_.has(name)) {
this.databases_.set(name, new Database(this, name, poolOptions));
}

return this.databases_.get(name);
};

/**
Expand Down Expand Up @@ -312,9 +318,19 @@ Instance.prototype.database = function(name, poolOptions) {
* });
*/
Instance.prototype.delete = function(callback) {
return this.api.Instance.deleteInstance({
var self = this;

var reqOpts = {
name: this.formattedName_
}, callback);
};

return this.api.Instance.deleteInstance(reqOpts, function(err, resp) {
if (!err) {
self.parent.instances_.delete(self.id);
}

callback(err, resp);
});
};

/**
Expand Down
8 changes: 5 additions & 3 deletions packages/spanner/src/session-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ SessionPool.prototype.getNextAvailableSession_ = function(options, callback) {
return;
}

this.pollForSession_(callback);
this.pollForSession_(options, callback);
};

/**
Expand All @@ -511,8 +511,9 @@ SessionPool.prototype.getNextAvailableSession_ = function(options, callback) {
* @param {function} callback - The callback function to be executed when a
* session is available.
*/
SessionPool.prototype.pollForSession_ = function(callback) {
SessionPool.prototype.pollForSession_ = function(options, callback) {
this.pendingAcquires.push({
options: options,
callback: callback,
timeout: this.acquireTimeout
});
Expand All @@ -531,7 +532,8 @@ SessionPool.prototype.pollForSession_ = function(callback) {
self.writePool && self.writePool.free;

if (hasFreeSession) {
self.getNextAvailableSession_(self.pendingAcquires.shift().callback);
var acquireReq = self.pendingAcquires.shift();
self.getNextAvailableSession_(acquireReq.options, acquireReq.callback);
}

if (!self.pendingAcquires.length) {
Expand Down
1 change: 0 additions & 1 deletion packages/spanner/src/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ Transaction.prototype.requestStream = function(config) {
return;
}

requestStream.end();
userStream.destroy();

if (self.shouldRetry_(err)) {
Expand Down
70 changes: 69 additions & 1 deletion packages/spanner/system-test/spanner.js
Original file line number Diff line number Diff line change
Expand Up @@ -3070,7 +3070,75 @@ var spanner = new Spanner(env);
});
});

it('should retry an aborted transaction', function(done) {
it('should retry an aborted txn when reading fails', function(done) {
var query = `SELECT * FROM ${table.name}`;
var attempts = 0;

var expectedRow = {
Key: 'k888',
NumberValue: null,
StringValue: 'abc'
};

database.runTransaction(function(err, transaction) {
assert.ifError(err);

transaction.run(query, function(err) {
assert.ifError(err);

var action = attempts++ === 0 ? runOtherTransaction : wrap;

action(function(err) {
assert.ifError(err);

transaction.run(query, function(err, rows) {
assert.ifError(err);

transaction.insert(table.name, {
Key: generateName('key'),
StringValue: generateName('val')
});

transaction.commit(function(err) {
assert.ifError(err);

var lastRow = rows.pop().toJSON();

assert.deepEqual(lastRow, expectedRow);
assert.strictEqual(attempts, 2);

done();
});
});
});
});
});

function runOtherTransaction(callback) {
database.runTransaction(function(err, transaction) {
if (err) {
callback(err);
return;
}

transaction.run(query, function(err) {
if (err) {
callback(err);
return;
}

transaction.insert(table.name, expectedRow);
transaction.commit(callback);
});
});
}

function wrap(callback) {
setImmediate(callback);
}
});

it('should retry an aborted txn when commit fails', function(done) {
var query = `SELECT * FROM ${table.name}`;
var attempts = 0;

Expand Down
21 changes: 19 additions & 2 deletions packages/spanner/test/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ describe('Database', function() {

var INSTANCE = {
api: {},
formattedName_: 'instance-name'
formattedName_: 'instance-name',
databases_: new Map()
};

var NAME = 'table-name';
Expand Down Expand Up @@ -199,15 +200,31 @@ describe('Database', function() {

describe('close', function() {
describe('success', function() {
it('should close the database', function(done) {
beforeEach(function() {
database.parent = INSTANCE;
database.pool_ = {
clear: function() {
return Promise.resolve();
}
};
});

it('should close the database', function(done) {
database.close(done);
});

it('should remove the database cache', function(done) {
var cache = INSTANCE.databases_;

cache.set(database.id, database);
assert(cache.has(database.id));

database.close(function(err) {
assert.ifError(err);
assert.strictEqual(cache.has(database.id), false);
done();
});
});
});

describe('error', function() {
Expand Down
23 changes: 22 additions & 1 deletion packages/spanner/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ describe('Spanner', function() {
});

describe('instantiation', function() {
it('should localize an instance map', function() {
assert(spanner.instances_ instanceof Map);
});

it('should promisify all the things', function() {
assert(promisified);
});
Expand Down Expand Up @@ -639,11 +643,28 @@ describe('Spanner', function() {
}, /A name is required to access an Instance object\./);
});

it('should return an Instance object', function() {
it('should create and cache an Instance', function() {
var cache = spanner.instances_;

assert.strictEqual(cache.has(NAME), false);

var instance = spanner.instance(NAME);

assert(instance instanceof FakeInstance);
assert.strictEqual(instance.calledWith_[0], spanner);
assert.strictEqual(instance.calledWith_[1], NAME);
assert.strictEqual(instance, cache.get(NAME));
});

it('should re-use cached objects', function() {
var cache = spanner.instances_;
var fakeInstance = {};

cache.set(NAME, fakeInstance);

var instance = spanner.instance(NAME);

assert.strictEqual(instance, fakeInstance);
});
});

Expand Down
Loading

0 comments on commit 6b7c948

Please sign in to comment.