From d8ae845f3cc06b256e01b959ada047b828c3a93d Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Thu, 26 Dec 2019 19:47:20 +0800 Subject: [PATCH] feat: allow specifying how many idle GRPC channels to keep --- dev/src/index.ts | 20 +++++++++ dev/src/pool.ts | 82 ++++++++++++++++++------------------ dev/src/types.ts | 7 ++++ dev/test/index.ts | 13 ++++++ dev/test/pool.ts | 91 ++++++++++++++++++++++++++++++++++------ dev/test/util/helpers.ts | 32 ++++++++------ types/firestore.d.ts | 7 ++++ 7 files changed, 187 insertions(+), 65 deletions(-) diff --git a/dev/src/index.ts b/dev/src/index.ts index f5c064137..eb23c94bd 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -130,6 +130,11 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix'; */ const MAX_REQUEST_RETRIES = 5; +/*! + * The default number of idle GRPC channel to keep. + */ +const DEFAULT_MAX_IDLE_CHANNELS = 1; + /*! * The maximum number of concurrent requests supported by a single GRPC channel, * as enforced by Google's Frontend. If the SDK issues more than 100 concurrent @@ -317,6 +322,10 @@ export class Firestore { * can specify a `keyFilename` instead. * @param {string=} settings.host The host to connect to. * @param {boolean=} settings.ssl Whether to use SSL when connecting. + * @param {number=} settings.maxIdleChannels The maximum number of idle GRPC + * channels to keep. A smaller number of idle channels reduces memory usage + * but increases request latency for clients with fluctuating request rates. + * Defaults to 1. */ constructor(settings?: Settings) { const libraryHeader = { @@ -372,8 +381,13 @@ export class Firestore { logger('Firestore', null, 'Detected GCF environment'); } + const maxIdleChannels = + this._settings.maxIdleChannels === undefined + ? DEFAULT_MAX_IDLE_CHANNELS + : this._settings.maxIdleChannels; this._clientPool = new ClientPool( MAX_CONCURRENT_REQUESTS_PER_CLIENT, + maxIdleChannels, /* clientFactory= */ () => { let client: GapicClient; @@ -455,6 +469,12 @@ export class Firestore { validateBoolean('settings.ssl', settings.ssl); } + if (settings.maxIdleChannels !== undefined) { + validateInteger('settings.maxIdleChannels', settings.maxIdleChannels, { + minValue: 0, + }); + } + this._settings = settings; this._serializer = new Serializer(this); } diff --git a/dev/src/pool.ts b/dev/src/pool.ts index 4010817fa..031abdea7 100644 --- a/dev/src/pool.ts +++ b/dev/src/pool.ts @@ -44,6 +44,8 @@ export class ClientPool { /** * @param concurrentOperationLimit The number of operations that each client * can handle. + * @param maxIdleClients The maximum number of idle clients to keep before + * garbage collecting. * @param clientFactory A factory function called as needed when new clients * are required. * @param clientDestructor A cleanup function that is called when a client is @@ -51,6 +53,7 @@ export class ClientPool { */ constructor( private readonly concurrentOperationLimit: number, + private readonly maxIdleClients: number, private readonly clientFactory: () => T, private readonly clientDestructor: (client: T) => Promise = () => Promise.resolve() @@ -66,8 +69,11 @@ export class ClientPool { let selectedClient: T | null = null; let selectedRequestCount = 0; - this.activeClients.forEach((requestCount, client) => { - if (!selectedClient && requestCount < this.concurrentOperationLimit) { + for (const [client, requestCount] of this.activeClients) { + if ( + requestCount > selectedRequestCount && + requestCount < this.concurrentOperationLimit + ) { logger( 'ClientPool.acquire', requestTag, @@ -77,7 +83,7 @@ export class ClientPool { selectedClient = client; selectedRequestCount = requestCount; } - }); + } if (!selectedClient) { logger('ClientPool.acquire', requestTag, 'Creating a new client'); @@ -99,23 +105,43 @@ export class ClientPool { * @private */ private async release(requestTag: string, client: T): Promise { - let requestCount = this.activeClients.get(client) || 0; + const requestCount = this.activeClients.get(client) || 0; assert(requestCount > 0, 'No active request'); + this.activeClients.set(client, requestCount! - 1); - requestCount = requestCount! - 1; - this.activeClients.set(client, requestCount); + if (this.shouldGarbageCollectClient(client)) { + this.activeClients.delete(client); + await this.clientDestructor(client); + logger('ClientPool.release', requestTag, 'Garbage collected 1 client'); + } + } - if (requestCount === 0) { - const deletedCount = await this.garbageCollect(); - if (deletedCount) { - logger( - 'ClientPool.release', - requestTag, - 'Garbage collected %s clients', - deletedCount - ); - } + /** + * Given the current operation counts, determines if the given client should + * be garbage collected. + * @private + */ + private shouldGarbageCollectClient(client: T): boolean { + if (this.activeClients.get(client) !== 0) { + return false; } + + // Compute the remaining capacity of the ClientPool. If the capacity exceeds + // the total capacity that maxIdleClients could hold, garbage collect. We + // look at the capacity rather than just at the current request count to + // allows us to: + // - Use `maxIdleClients:1` to preserve legacy behavior (there is always at + // least one active client as a single client can never exceed the + // concurrent operation limit of itself). + // - Use `maxIdleClients:0` to shut down the client pool completely when all + // clients are idle. + let idleCapacityCount = 0; + for (const [_, count] of this.activeClients) { + idleCapacityCount += this.concurrentOperationLimit - count; + } + return ( + idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit + ); } /** @@ -177,28 +203,4 @@ export class ClientPool { await this.clientDestructor(client); } } - - /** - * Deletes clients that are no longer executing operations. Keeps up to one - * idle client to reduce future initialization costs. - * - * @return Number of clients deleted. - * @private - */ - private async garbageCollect(): Promise { - let idleClients = 0; - const cleanUpTasks: Array> = []; - for (const [client, requestCount] of this.activeClients) { - if (requestCount === 0) { - ++idleClients; - - if (idleClients > 1) { - this.activeClients.delete(client); - cleanUpTasks.push(this.clientDestructor(client)); - } - } - } - await Promise.all(cleanUpTasks); - return idleClients - 1; - } } diff --git a/dev/src/types.ts b/dev/src/types.ts index 7085284c4..363f9125f 100644 --- a/dev/src/types.ts +++ b/dev/src/types.ts @@ -72,6 +72,13 @@ export interface Settings { /** Whether to use SSL when connecting. */ ssl?: boolean; + /** + * The maximum number of idle GRPC channels to keep. A smaller number of idle + * channels reduces memory usage but increases request latency for clients + * with fluctuating request rates. Defaults to 1. + */ + maxIdleChannels?: number; + // tslint:disable-next-line:no-any [key: string]: any; // Accept other properties, such as GRPC settings. } diff --git a/dev/test/index.ts b/dev/test/index.ts index 84201cd28..241385f45 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -491,6 +491,19 @@ describe('instantiation', () => { } }); + it('validates maxIdleChannels', () => { + const invalidValues = [-1, 'foo', 1.3]; + + for (const value of invalidValues) { + expect(() => { + const settings = {...DEFAULT_SETTINGS, maxIdleChannels: value}; + new Firestore.Firestore(settings as InvalidApiUsage); + }).to.throw(); + } + + new Firestore.Firestore({maxIdleChannels: 1}); + }); + it('uses project id from constructor', () => { const firestore = new Firestore.Firestore({projectId: 'foo'}); diff --git a/dev/test/pool.ts b/dev/test/pool.ts index cd50ff73b..de26786f9 100644 --- a/dev/test/pool.ts +++ b/dev/test/pool.ts @@ -32,7 +32,7 @@ function deferredPromises(count: number): Array> { describe('Client pool', () => { it('creates new instances as needed', () => { - const clientPool = new ClientPool<{}>(3, () => { + const clientPool = new ClientPool<{}>(3, 0, () => { return {}; }); @@ -52,7 +52,7 @@ describe('Client pool', () => { }); it('re-uses idle instances', () => { - const clientPool = new ClientPool<{}>(2, () => { + const clientPool = new ClientPool<{}>(2, 0, () => { return {}; }); @@ -80,8 +80,50 @@ describe('Client pool', () => { }); }); + it('bin packs operations', async () => { + let clientCount = 0; + const clientPool = new ClientPool(2, 0, () => { + return ++clientCount; + }); + + expect(clientPool.size).to.equal(0); + + // Create 2 operations, which should schedule 2 operations on the first + // client, 2 on the second and 1 on the third. + const operationPromises = deferredPromises(7); + clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(1); + return operationPromises[0].promise; + }); + clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(1); + return operationPromises[1].promise; + }); + const thirdOperation = clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(2); + return operationPromises[2].promise; + }); + clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(2); + return operationPromises[3].promise; + }); + clientPool.run(REQUEST_TAG, client => { + expect(client).to.be.equal(3); + return operationPromises[4].promise; + }); + + operationPromises[2].resolve(); + await thirdOperation; + + // A newly scheduled operation should use the first client that has a free + // slot (the second client). + clientPool.run(REQUEST_TAG, async client => { + expect(client).to.be.equal(2); + }); + }); + it('garbage collects after success', () => { - const clientPool = new ClientPool<{}>(2, () => { + const clientPool = new ClientPool<{}>(2, 0, () => { return {}; }); @@ -110,12 +152,12 @@ describe('Client pool', () => { operationPromises.forEach(deferred => deferred.resolve()); return Promise.all(completionPromises).then(() => { - expect(clientPool.size).to.equal(1); + expect(clientPool.size).to.equal(0); }); }); it('garbage collects after error', () => { - const clientPool = new ClientPool<{}>(2, () => { + const clientPool = new ClientPool<{}>(2, 0, () => { return {}; }); @@ -145,7 +187,7 @@ describe('Client pool', () => { return Promise.all(completionPromises.map(p => p.catch(() => {}))).then( () => { - expect(clientPool.size).to.equal(1); + expect(clientPool.size).to.equal(0); } ); }); @@ -155,9 +197,8 @@ describe('Client pool', () => { const clientPool = new ClientPool<{}>( 1, - () => { - return {}; - }, + 0, + () => ({}), () => Promise.resolve(garbageCollect.resolve()) ); @@ -173,7 +214,7 @@ describe('Client pool', () => { }); it('forwards success', () => { - const clientPool = new ClientPool<{}>(1, () => { + const clientPool = new ClientPool<{}>(1, 0, () => { return {}; }); @@ -182,7 +223,7 @@ describe('Client pool', () => { }); it('forwards failure', () => { - const clientPool = new ClientPool<{}>(1, () => { + const clientPool = new ClientPool<{}>(1, 0, () => { return {}; }); @@ -192,8 +233,34 @@ describe('Client pool', () => { return expect(op).to.eventually.be.rejectedWith('Generated error'); }); + it('keeps pool of idle clients', async () => { + const clientPool = new ClientPool<{}>( + /* concurrentOperationLimit= */ 1, + /* maxIdleClients= */ 3, + () => { + return {}; + } + ); + + const operationPromises = deferredPromises(4); + clientPool.run(REQUEST_TAG, () => operationPromises[0].promise); + clientPool.run(REQUEST_TAG, () => operationPromises[1].promise); + clientPool.run(REQUEST_TAG, () => operationPromises[2].promise); + const lastOp = clientPool.run( + REQUEST_TAG, + () => operationPromises[3].promise + ); + expect(clientPool.size).to.equal(4); + + // Resolve all pending operations. Note that one client is removed, while + // 3 are kept for further usage. + operationPromises.forEach(deferred => deferred.resolve()); + await lastOp; + expect(clientPool.size).to.equal(3); + }); + it('rejects subsequent operations after being terminated', () => { - const clientPool = new ClientPool<{}>(1, () => { + const clientPool = new ClientPool<{}>(1, 0, () => { return {}; }); diff --git a/dev/test/util/helpers.ts b/dev/test/util/helpers.ts index d47a8ff67..9877f6faa 100644 --- a/dev/test/util/helpers.ts +++ b/dev/test/util/helpers.ts @@ -105,20 +105,26 @@ export function createInstance( const firestore = new Firestore(); firestore.settings(initializationOptions); - const clientPool = new ClientPool(/* concurrentRequestLimit= */ 1, () => { - const gapicClient: GapicClient = new v1(initializationOptions); - if (apiOverrides) { - Object.keys(apiOverrides).forEach(override => { - const apiOverride = (apiOverrides as {[k: string]: unknown})[override]; - if (override !== 'getProjectId') { - gapicClient._innerApiCalls[override] = apiOverride; - } else { - gapicClient[override] = apiOverride; - } - }); + const clientPool = new ClientPool( + /* concurrentRequestLimit= */ 1, + /* maxIdleClients= */ 0, + () => { + const gapicClient: GapicClient = new v1(initializationOptions); + if (apiOverrides) { + Object.keys(apiOverrides).forEach(override => { + const apiOverride = (apiOverrides as {[k: string]: unknown})[ + override + ]; + if (override !== 'getProjectId') { + gapicClient._innerApiCalls[override] = apiOverride; + } else { + gapicClient[override] = apiOverride; + } + }); + } + return gapicClient; } - return gapicClient; - }); + ); firestore['_clientPool'] = clientPool; diff --git a/types/firestore.d.ts b/types/firestore.d.ts index 3a7122922..5d2513bf1 100644 --- a/types/firestore.d.ts +++ b/types/firestore.d.ts @@ -80,6 +80,13 @@ declare namespace FirebaseFirestore { /** Whether to use SSL when connecting. */ ssl?: boolean; + /** + * The maximum number of idle GRPC channels to keep. A smaller number of idle + * channels reduces memory usage but increases request latency for clients + * with fluctuating request rates. Defaults to 1. + */ + maxIdleChannels?: number; + [key: string]: any; // Accept other properties, such as GRPC settings. }