Skip to content

Commit

Permalink
feat: allow specifying how many idle GRPC channels to keep
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Dec 26, 2019
1 parent 3cd93c8 commit d8ae845
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 65 deletions.
20 changes: 20 additions & 0 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix';
*/
const MAX_REQUEST_RETRIES = 5;

/*!
* The default number of idle GRPC channel to keep.
*/
const DEFAULT_MAX_IDLE_CHANNELS = 1;

/*!
* The maximum number of concurrent requests supported by a single GRPC channel,
* as enforced by Google's Frontend. If the SDK issues more than 100 concurrent
Expand Down Expand Up @@ -317,6 +322,10 @@ export class Firestore {
* can specify a `keyFilename` instead.
* @param {string=} settings.host The host to connect to.
* @param {boolean=} settings.ssl Whether to use SSL when connecting.
* @param {number=} settings.maxIdleChannels The maximum number of idle GRPC
* channels to keep. A smaller number of idle channels reduces memory usage
* but increases request latency for clients with fluctuating request rates.
* Defaults to 1.
*/
constructor(settings?: Settings) {
const libraryHeader = {
Expand Down Expand Up @@ -372,8 +381,13 @@ export class Firestore {
logger('Firestore', null, 'Detected GCF environment');
}

const maxIdleChannels =
this._settings.maxIdleChannels === undefined
? DEFAULT_MAX_IDLE_CHANNELS
: this._settings.maxIdleChannels;
this._clientPool = new ClientPool(
MAX_CONCURRENT_REQUESTS_PER_CLIENT,
maxIdleChannels,
/* clientFactory= */ () => {
let client: GapicClient;

Expand Down Expand Up @@ -455,6 +469,12 @@ export class Firestore {
validateBoolean('settings.ssl', settings.ssl);
}

if (settings.maxIdleChannels !== undefined) {
validateInteger('settings.maxIdleChannels', settings.maxIdleChannels, {
minValue: 0,
});
}

this._settings = settings;
this._serializer = new Serializer(this);
}
Expand Down
82 changes: 42 additions & 40 deletions dev/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ export class ClientPool<T> {
/**
* @param concurrentOperationLimit The number of operations that each client
* can handle.
* @param maxIdleClients The maximum number of idle clients to keep before
* garbage collecting.
* @param clientFactory A factory function called as needed when new clients
* are required.
* @param clientDestructor A cleanup function that is called when a client is
* disposed of.
*/
constructor(
private readonly concurrentOperationLimit: number,
private readonly maxIdleClients: number,
private readonly clientFactory: () => T,
private readonly clientDestructor: (client: T) => Promise<void> = () =>
Promise.resolve()
Expand All @@ -66,8 +69,11 @@ export class ClientPool<T> {
let selectedClient: T | null = null;
let selectedRequestCount = 0;

this.activeClients.forEach((requestCount, client) => {
if (!selectedClient && requestCount < this.concurrentOperationLimit) {
for (const [client, requestCount] of this.activeClients) {
if (
requestCount > selectedRequestCount &&
requestCount < this.concurrentOperationLimit
) {
logger(
'ClientPool.acquire',
requestTag,
Expand All @@ -77,7 +83,7 @@ export class ClientPool<T> {
selectedClient = client;
selectedRequestCount = requestCount;
}
});
}

if (!selectedClient) {
logger('ClientPool.acquire', requestTag, 'Creating a new client');
Expand All @@ -99,23 +105,43 @@ export class ClientPool<T> {
* @private
*/
private async release(requestTag: string, client: T): Promise<void> {
let requestCount = this.activeClients.get(client) || 0;
const requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');
this.activeClients.set(client, requestCount! - 1);

requestCount = requestCount! - 1;
this.activeClients.set(client, requestCount);
if (this.shouldGarbageCollectClient(client)) {
this.activeClients.delete(client);
await this.clientDestructor(client);
logger('ClientPool.release', requestTag, 'Garbage collected 1 client');
}
}

if (requestCount === 0) {
const deletedCount = await this.garbageCollect();
if (deletedCount) {
logger(
'ClientPool.release',
requestTag,
'Garbage collected %s clients',
deletedCount
);
}
/**
* Given the current operation counts, determines if the given client should
* be garbage collected.
* @private
*/
private shouldGarbageCollectClient(client: T): boolean {
if (this.activeClients.get(client) !== 0) {
return false;
}

// Compute the remaining capacity of the ClientPool. If the capacity exceeds
// the total capacity that maxIdleClients could hold, garbage collect. We
// look at the capacity rather than just at the current request count to
// allows us to:
// - Use `maxIdleClients:1` to preserve legacy behavior (there is always at
// least one active client as a single client can never exceed the
// concurrent operation limit of itself).
// - Use `maxIdleClients:0` to shut down the client pool completely when all
// clients are idle.
let idleCapacityCount = 0;
for (const [_, count] of this.activeClients) {
idleCapacityCount += this.concurrentOperationLimit - count;
}
return (
idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit
);
}

/**
Expand Down Expand Up @@ -177,28 +203,4 @@ export class ClientPool<T> {
await this.clientDestructor(client);
}
}

/**
* Deletes clients that are no longer executing operations. Keeps up to one
* idle client to reduce future initialization costs.
*
* @return Number of clients deleted.
* @private
*/
private async garbageCollect(): Promise<number> {
let idleClients = 0;
const cleanUpTasks: Array<Promise<void>> = [];
for (const [client, requestCount] of this.activeClients) {
if (requestCount === 0) {
++idleClients;

if (idleClients > 1) {
this.activeClients.delete(client);
cleanUpTasks.push(this.clientDestructor(client));
}
}
}
await Promise.all(cleanUpTasks);
return idleClients - 1;
}
}
7 changes: 7 additions & 0 deletions dev/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ export interface Settings {
/** Whether to use SSL when connecting. */
ssl?: boolean;

/**
* The maximum number of idle GRPC channels to keep. A smaller number of idle
* channels reduces memory usage but increases request latency for clients
* with fluctuating request rates. Defaults to 1.
*/
maxIdleChannels?: number;

// tslint:disable-next-line:no-any
[key: string]: any; // Accept other properties, such as GRPC settings.
}
Expand Down
13 changes: 13 additions & 0 deletions dev/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,19 @@ describe('instantiation', () => {
}
});

it('validates maxIdleChannels', () => {
const invalidValues = [-1, 'foo', 1.3];

for (const value of invalidValues) {
expect(() => {
const settings = {...DEFAULT_SETTINGS, maxIdleChannels: value};
new Firestore.Firestore(settings as InvalidApiUsage);
}).to.throw();
}

new Firestore.Firestore({maxIdleChannels: 1});
});

it('uses project id from constructor', () => {
const firestore = new Firestore.Firestore({projectId: 'foo'});

Expand Down
91 changes: 79 additions & 12 deletions dev/test/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function deferredPromises(count: number): Array<Deferred<void>> {

describe('Client pool', () => {
it('creates new instances as needed', () => {
const clientPool = new ClientPool<{}>(3, () => {
const clientPool = new ClientPool<{}>(3, 0, () => {
return {};
});

Expand All @@ -52,7 +52,7 @@ describe('Client pool', () => {
});

it('re-uses idle instances', () => {
const clientPool = new ClientPool<{}>(2, () => {
const clientPool = new ClientPool<{}>(2, 0, () => {
return {};
});

Expand Down Expand Up @@ -80,8 +80,50 @@ describe('Client pool', () => {
});
});

it('bin packs operations', async () => {
let clientCount = 0;
const clientPool = new ClientPool<number>(2, 0, () => {
return ++clientCount;
});

expect(clientPool.size).to.equal(0);

// Create 2 operations, which should schedule 2 operations on the first
// client, 2 on the second and 1 on the third.
const operationPromises = deferredPromises(7);
clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(1);
return operationPromises[0].promise;
});
clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(1);
return operationPromises[1].promise;
});
const thirdOperation = clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(2);
return operationPromises[2].promise;
});
clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(2);
return operationPromises[3].promise;
});
clientPool.run(REQUEST_TAG, client => {
expect(client).to.be.equal(3);
return operationPromises[4].promise;
});

operationPromises[2].resolve();
await thirdOperation;

// A newly scheduled operation should use the first client that has a free
// slot (the second client).
clientPool.run(REQUEST_TAG, async client => {
expect(client).to.be.equal(2);
});
});

it('garbage collects after success', () => {
const clientPool = new ClientPool<{}>(2, () => {
const clientPool = new ClientPool<{}>(2, 0, () => {
return {};
});

Expand Down Expand Up @@ -110,12 +152,12 @@ describe('Client pool', () => {
operationPromises.forEach(deferred => deferred.resolve());

return Promise.all(completionPromises).then(() => {
expect(clientPool.size).to.equal(1);
expect(clientPool.size).to.equal(0);
});
});

it('garbage collects after error', () => {
const clientPool = new ClientPool<{}>(2, () => {
const clientPool = new ClientPool<{}>(2, 0, () => {
return {};
});

Expand Down Expand Up @@ -145,7 +187,7 @@ describe('Client pool', () => {

return Promise.all(completionPromises.map(p => p.catch(() => {}))).then(
() => {
expect(clientPool.size).to.equal(1);
expect(clientPool.size).to.equal(0);
}
);
});
Expand All @@ -155,9 +197,8 @@ describe('Client pool', () => {

const clientPool = new ClientPool<{}>(
1,
() => {
return {};
},
0,
() => ({}),
() => Promise.resolve(garbageCollect.resolve())
);

Expand All @@ -173,7 +214,7 @@ describe('Client pool', () => {
});

it('forwards success', () => {
const clientPool = new ClientPool<{}>(1, () => {
const clientPool = new ClientPool<{}>(1, 0, () => {
return {};
});

Expand All @@ -182,7 +223,7 @@ describe('Client pool', () => {
});

it('forwards failure', () => {
const clientPool = new ClientPool<{}>(1, () => {
const clientPool = new ClientPool<{}>(1, 0, () => {
return {};
});

Expand All @@ -192,8 +233,34 @@ describe('Client pool', () => {
return expect(op).to.eventually.be.rejectedWith('Generated error');
});

it('keeps pool of idle clients', async () => {
const clientPool = new ClientPool<{}>(
/* concurrentOperationLimit= */ 1,
/* maxIdleClients= */ 3,
() => {
return {};
}
);

const operationPromises = deferredPromises(4);
clientPool.run(REQUEST_TAG, () => operationPromises[0].promise);
clientPool.run(REQUEST_TAG, () => operationPromises[1].promise);
clientPool.run(REQUEST_TAG, () => operationPromises[2].promise);
const lastOp = clientPool.run(
REQUEST_TAG,
() => operationPromises[3].promise
);
expect(clientPool.size).to.equal(4);

// Resolve all pending operations. Note that one client is removed, while
// 3 are kept for further usage.
operationPromises.forEach(deferred => deferred.resolve());
await lastOp;
expect(clientPool.size).to.equal(3);
});

it('rejects subsequent operations after being terminated', () => {
const clientPool = new ClientPool<{}>(1, () => {
const clientPool = new ClientPool<{}>(1, 0, () => {
return {};
});

Expand Down
Loading

0 comments on commit d8ae845

Please sign in to comment.