diff --git a/src/index.js b/src/index.js index 14227dc67..d10ad315a 100644 --- a/src/index.js +++ b/src/index.js @@ -16,6 +16,7 @@ 'use strict'; +const assert = require('assert'); const bun = require('bun'); const extend = require('extend'); const is = require('is'); @@ -64,6 +65,11 @@ const FieldValue = require('./field-value').FieldValue; */ const Timestamp = require('./timestamp'); +/*! + * @see ClientPool + */ +const ClientPool = require('./pool').ClientPool; + /*! * @see CollectionReference */ @@ -100,12 +106,12 @@ let Transaction; /*! * @see v1beta1 */ -let v1beta1; // Lazy-loaded in `_ensureClient()` +let v1beta1; // Lazy-loaded in `_runRequest()` /*! * @see @google-cloud/common */ -let common; // Lazy-loaded in `_ensureClient()` +let common; // Lazy-loaded in `_runRequest()` /*! * HTTP header for the resource prefix to improve routing and project isolation @@ -120,6 +126,16 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix'; */ const MAX_REQUEST_RETRIES = 5; +/*! + * The maximum number of concurrent requests supported by a single GRPC channel, + * as enforced by Google's Frontend. If the SDK issues more than 100 concurrent + * operations, we need to use more than one GAPIC client since these clients + * multiplex all requests over a single channel. + * + * @type {number} + */ +const MAX_CONCURRENT_REQUESTS_PER_CLIENT = 100; + /*! * GRPC Error code for 'UNAVAILABLE'. * @type {number} @@ -233,11 +249,12 @@ class Firestore { }); /** + * A client pool to distribute requests over multiple GAPIC clients in order + * to work around a connection limit of 100 concurrent requests per client. * @private - * @type {object|null} - * @property {FirestoreClient} Firestore The Firestore GAPIC client. + * @type {ClientPool|null} */ - this._firestoreClient = null; + this._clientPool = null; /** * The configuration options for the GAPIC client. @@ -725,47 +742,78 @@ follow these steps, YOUR APP MAY BREAK.`); } /** - * Initializes the client and detects the Firestore Project ID. Returns a - * Promise on completion. If the client is already initialized, the returned - * Promise resolves immediately. + * Executes a new request using the first available GAPIC client. * * @private */ - _ensureClient() { + _runRequest(op) { + // Initialize the client pool if this is the first request. if (!this._clientInitialized) { common = require('@google-cloud/common'); + this._clientInitialized = this._initClientPool().then(clientPool => { + this._clientPool = clientPool; + }) + } - this._clientInitialized = new Promise((resolve, reject) => { - this._firestoreClient = - module.exports.v1beta1(this._initalizationOptions); + return this._clientInitialized.then(() => this._clientPool.run(op)); + } - Firestore.log('Firestore', null, 'Initialized Firestore GAPIC Client'); + /** + * Initializes the client pool and invokes Project ID detection. Returns a + * Promise on completion. + * + * @private + * @return {Promise} + */ + _initClientPool() { + assert(!this._clientInitialized, 'Client pool already initialized'); - // We schedule Project ID detection using `setImmediate` to allow the - // testing framework to provide its own implementation of - // `getProjectId`. - setImmediate(() => { - this._firestoreClient.getProjectId((err, projectId) => { - if (err) { - Firestore.log( - 'Firestore._ensureClient', null, - 'Failed to detect project ID: %s', err); - reject(err); - } else { - Firestore.log( - 'Firestore._ensureClient', null, 'Detected project ID: %s', - projectId); - this._referencePath = - new ResourcePath(projectId, this._referencePath.databaseId); - resolve(); - } - }); + const clientPool = + new ClientPool(MAX_CONCURRENT_REQUESTS_PER_CLIENT, () => { + const gapicClient = + module.exports.v1beta1(this._initalizationOptions); + Firestore.log( + 'Firestore', null, 'Initialized Firestore GAPIC Client'); + return gapicClient; }); - }); + + const projectIdProvided = this._referencePath.projectId !== '{{projectId}}'; + + if (projectIdProvided) { + return Promise.resolve(clientPool); + } else { + return clientPool.run(client => this._detectProjectId(client)) + .then(projectId => { + this._referencePath = + new ResourcePath(projectId, this._referencePath.databaseId); + return clientPool; + }); } - return this._clientInitialized; } + /** + * Auto-detects the Firestore Project ID. + * + * @private + * @param {object} gapicClient - The Firestore GAPIC client. + * @return {Promise} A Promise that resolves with the Project ID. + */ + _detectProjectId(gapicClient) { + return new Promise( + (resolve, reject) => {gapicClient.getProjectId((err, projectId) => { + if (err) { + Firestore.log( + 'Firestore._detectProjectId', null, + 'Failed to detect project ID: %s', err); + reject(err); + } else { + Firestore.log( + 'Firestore._detectProjectId', null, 'Detected project ID: %s', + projectId); + resolve(projectId); + } + })}); + } /** * Decorate the request options of an API request. This is used to replace * any `{{projectId}}` placeholders with the value detected from the user's @@ -801,7 +849,7 @@ follow these steps, YOUR APP MAY BREAK.`); * * @private * @param {number} attemptsRemaining - The number of available attempts. - * @param {string} requestTag A unique client-assigned identifier for this + * @param {string} requestTag - A unique client-assigned identifier for this * request. * @param {retryFunction} func - Method returning a Promise than can be * retried. @@ -972,14 +1020,14 @@ follow these steps, YOUR APP MAY BREAK.`); request(methodName, request, requestTag, allowRetries) { let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; - return this._ensureClient().then(() => { + return this._runRequest(gapicClient => { const decorated = this._decorateRequest(request); return this._retry(attempts, requestTag, () => { return new Promise((resolve, reject) => { Firestore.log( 'Firestore.request', requestTag, 'Sending request: %j', decorated.request); - this._firestoreClient[methodName]( + gapicClient[methodName]( decorated.request, decorated.gax, (err, result) => { if (err) { Firestore.log( @@ -1017,7 +1065,7 @@ follow these steps, YOUR APP MAY BREAK.`); readStream(methodName, request, requestTag, allowRetries) { let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; - return this._ensureClient().then(() => { + return this._runRequest(gapicClient => { const decorated = this._decorateRequest(request); return this._retry(attempts, requestTag, () => { return new Promise((resolve, reject) => { @@ -1025,7 +1073,7 @@ follow these steps, YOUR APP MAY BREAK.`); Firestore.log( 'Firestore.readStream', requestTag, 'Sending request: %j', decorated.request); - let stream = this._firestoreClient[methodName]( + let stream = gapicClient[methodName]( decorated.request, decorated.gax); let logger = through.obj(function(chunk, enc, callback) { Firestore.log( @@ -1069,7 +1117,7 @@ follow these steps, YOUR APP MAY BREAK.`); let self = this; let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; - return this._ensureClient().then(() => { + return this._runRequest(gapicClient => { const decorated = this._decorateRequest(request); return this._retry(attempts, requestTag, () => { return Promise.resolve().then(() => { @@ -1077,8 +1125,7 @@ follow these steps, YOUR APP MAY BREAK.`); 'Firestore.readWriteStream', requestTag, 'Opening stream'); // The generated bi-directional streaming API takes the list of GAX // headers as its second argument. - let requestStream = - this._firestoreClient[methodName]({}, decorated.gax); + let requestStream = gapicClient[methodName]({}, decorated.gax); // The transform stream to assign the project ID. let transform = through.obj(function(chunk, encoding, callback) { diff --git a/src/pool.ts b/src/pool.ts new file mode 100644 index 000000000..dae6cce0f --- /dev/null +++ b/src/pool.ts @@ -0,0 +1,135 @@ +/*! + * Copyright 2018 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +import assert from 'assert'; + +/** + * An auto-resizing pool that distributes concurrent operations over multiple + * clients of type `T`. + * + * ClientPool is used within Firestore to manage a pool of GAPIC clients and + * automatically initializes multiple clients if we issue more than 100 + * concurrent operations. + */ +export class ClientPool { + /** Stores each active clients and how many operations it has outstanding. */ + private activeClients: Map = new Map(); + + /** + * @param {number} concurrentOperationLimit - The number of operations that + * each client can handle. + * @param {() => T} clientFactory - A factory function called as needed when + * new clients are required. + */ + constructor( + private readonly concurrentOperationLimit: number, + private readonly clientFactory: () => T) {} + + /** + * Returns an already existing client if it has less than the maximum number + * of concurrent operations or initializes and returns a new client. + */ + private acquire(): T { + let selectedClient: T|null = null; + let selectedRequestCount = 0; + + this.activeClients.forEach((requestCount, client) => { + if (!selectedClient && requestCount < this.concurrentOperationLimit) { + selectedClient = client; + selectedRequestCount = requestCount; + } + }); + + if (!selectedClient) { + selectedClient = this.clientFactory(); + assert( + !this.activeClients.has(selectedClient), + 'The provided client factory returned an existing instance'); + } + + this.activeClients.set(selectedClient, selectedRequestCount + 1); + + return selectedClient!; + } + + /** + * Reduces the number of operations for the provided client, potentially + * removing it from the pool of active clients. + */ + private release(client: T): void { + let requestCount = this.activeClients.get(client) || 0; + assert(requestCount > 0, 'No active request'); + + requestCount = requestCount! - 1; + this.activeClients.set(client, requestCount); + + if (requestCount === 0) { + this.garbageCollect(); + } + } + + /** + * The number of currently registered clients. + * + * @return {number} Number of currently registered clients. + */ + // Visible for testing. + get size(): number { + return this.activeClients.size; + } + + /** + * Runs the provided operation in this pool. This function may create an + * additional client if all existing clients already operate at the concurrent + * operation limit. + * + * @param {(client: T) => Promise} op - A callback function that returns a + * Promise. The client T will be returned to the pool when callback finishes. + * @return {Promise} A Promise that resolves with the result of `op`. + */ + run(op: (client: T) => Promise): Promise { + const client = this.acquire(); + + return op(client) + .catch(err => { + this.release(client); + return Promise.reject(err); + }) + .then(res => { + this.release(client); + return res; + }); + } + + /** + * Deletes clients that are no longer executing operations. Keeps up to one + * idle client to reduce future initialization costs. + */ + private garbageCollect(): void { + let idleClients = 0; + this.activeClients.forEach((requestCount, client) => { + if (requestCount === 0) { + ++idleClients; + + if (idleClients > 1) { + this.activeClients.delete(client); + } + } + }); + } +} diff --git a/test/index.js b/test/index.js index f39b1d279..592031144 100644 --- a/test/index.js +++ b/test/index.js @@ -314,6 +314,22 @@ describe('instantiation', function() { assert(firestore instanceof Firestore); }); + it('uses project id from constructor', () => { + let firestore = new Firestore({ + projectId: PROJECT_ID, + sslCreds: grpc.credentials.createInsecure(), + timestampsInSnapshots: true, + keyFilename: './test/fake-certificate.json', + }); + + return firestore._runRequest(() => { + assert.equal( + firestore.formattedName, + `projects/${PROJECT_ID}/databases/(default)`); + return Promise.resolve(); + }); + }); + it('detects project id', function() { let firestore = new Firestore({ sslCreds: grpc.credentials.createInsecure(), @@ -324,48 +340,47 @@ describe('instantiation', function() { assert.equal( firestore.formattedName, 'projects/{{projectId}}/databases/(default)'); - let initialized = firestore._ensureClient(); + firestore._detectProjectId = () => Promise.resolve(PROJECT_ID); - let projectIdDetected = false; + return firestore._runRequest(() => { + assert.equal( + firestore.formattedName, + `projects/${PROJECT_ID}/databases/(default)`); + return Promise.resolve(); + }); + }); - firestore._firestoreClient.getProjectId = function(callback) { - projectIdDetected = true; - callback(null, PROJECT_ID); - }; + it('uses project id from gapic client', function() { + let firestore = new Firestore({ + sslCreds: grpc.credentials.createInsecure(), + timestampsInSnapshots: true, + keyFilename: './test/fake-certificate.json', + }); - firestore._firestoreClient._innerApiCalls.batchGetDocuments = function( - request) { - let expectedRequest = { - database: DATABASE_ROOT, - documents: [`${DATABASE_ROOT}/documents/collectionId/documentId`], - }; - assert.deepEqual(request, expectedRequest); - return stream(found('documentId')); - }; + assert.equal( + firestore.formattedName, 'projects/{{projectId}}/databases/(default)'); - return initialized.then(() => { - assert.equal(projectIdDetected, true); - return firestore.doc('collectionId/documentId').get(); + let gapicClient = {getProjectId: callback => callback(null, PROJECT_ID)}; + + return firestore._detectProjectId(gapicClient).then(projectId => { + assert.equal(projectId, PROJECT_ID); }); }); - it('handles error from project ID detection', function() { + it('handles error from project ID detection', () => { let firestore = new Firestore({ sslCreds: grpc.credentials.createInsecure(), timestampsInSnapshots: true, keyFilename: './test/fake-certificate.json', }); - let initialized = firestore._ensureClient(); - - firestore._firestoreClient.getProjectId = function(callback) { - callback(new Error('Project ID error')); + let gapicClient = { + getProjectId: callback => callback(new Error('Injected Error')) }; - return initialized.then( - () => assert.fail('Expected error missing'), err => { - assert.equal(err.message, 'Project ID error'); - }); + return firestore._detectProjectId(gapicClient) + .then(() => assert.fail('Error ignored')) + .catch(err => assert.equal('Injected Error', err.message)) }); it('exports all types', function() { diff --git a/test/pool.ts b/test/pool.ts new file mode 100644 index 000000000..55e6e8cfa --- /dev/null +++ b/test/pool.ts @@ -0,0 +1,153 @@ +/** + * Copyright 2018 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +import {use, expect} from 'chai'; +import * as chaiAsPromised from 'chai-as-promised'; + +import {ClientPool} from '../src/pool'; +import {Deferred} from './util/helpers'; + +use(chaiAsPromised.default); + +function deferredPromises(count: number): Array> { + const deferred: Array> = []; + for (let i = 0; i < count; ++i) { + deferred.push(new Deferred()); + } + return deferred; +} + +describe('Client pool', () => { + it('creates new instances as needed', () => { + const clientPool = new ClientPool<{}>(3, () => { + return {}; + }); + + expect(clientPool.size).to.eq(0); + + const operationPromises = deferredPromises(4); + + clientPool.run(() => operationPromises[0].promise); + expect(clientPool.size).to.eq(1); + clientPool.run(() => operationPromises[1].promise); + expect(clientPool.size).to.eq(1); + clientPool.run(() => operationPromises[2].promise); + expect(clientPool.size).to.eq(1); + + clientPool.run(() => operationPromises[3].promise); + expect(clientPool.size).to.eq(2); + }); + + it('re-uses idle instances', () => { + const clientPool = new ClientPool<{}>(2, () => { + return {}; + }); + + expect(clientPool.size).to.eq(0); + + const operationPromises = deferredPromises(5); + + const completionPromise = + clientPool.run(() => operationPromises[0].promise); + expect(clientPool.size).to.eq(1); + clientPool.run(() => operationPromises[1].promise); + expect(clientPool.size).to.eq(1); + clientPool.run(() => operationPromises[2].promise); + expect(clientPool.size).to.eq(2); + clientPool.run(() => operationPromises[3].promise); + expect(clientPool.size).to.eq(2); + + operationPromises[0].resolve(); + + return completionPromise.then(() => { + clientPool.run(() => operationPromises[4].promise); + expect(clientPool.size).to.eq(2); + }); + }); + + it('garbage collects after success', () => { + const clientPool = new ClientPool<{}>(2, () => { + return {}; + }); + + expect(clientPool.size).to.eq(0); + + const operationPromises = deferredPromises(4); + const completionPromises: Array> = []; + + completionPromises.push(clientPool.run(() => operationPromises[0].promise)); + expect(clientPool.size).to.eq(1); + completionPromises.push(clientPool.run(() => operationPromises[1].promise)); + expect(clientPool.size).to.eq(1); + completionPromises.push(clientPool.run(() => operationPromises[2].promise)); + expect(clientPool.size).to.eq(2); + completionPromises.push(clientPool.run(() => operationPromises[3].promise)); + expect(clientPool.size).to.eq(2); + + operationPromises.forEach(deferred => deferred.resolve()); + + return Promise.all(completionPromises).then(() => { + expect(clientPool.size).to.eq(1); + }); + }); + + it('garbage collects after error', () => { + const clientPool = new ClientPool<{}>(2, () => { + return {}; + }); + + expect(clientPool.size).to.eq(0); + + const operationPromises = deferredPromises(4); + const completionPromises: Array> = []; + + completionPromises.push(clientPool.run(() => operationPromises[0].promise)); + expect(clientPool.size).to.eq(1); + completionPromises.push(clientPool.run(() => operationPromises[1].promise)); + expect(clientPool.size).to.eq(1); + completionPromises.push(clientPool.run(() => operationPromises[2].promise)); + expect(clientPool.size).to.eq(2); + completionPromises.push(clientPool.run(() => operationPromises[3].promise)); + expect(clientPool.size).to.eq(2); + + operationPromises.forEach(deferred => deferred.reject()); + + return Promise.all(completionPromises.map(p => p.catch(() => {}))) + .then(() => { + expect(clientPool.size).to.eq(1); + }); + }); + + it('forwards success', () => { + const clientPool = new ClientPool<{}>(1, () => { + return {}; + }); + + const op = clientPool.run(() => Promise.resolve('Success')); + return expect(op).to.become('Success'); + }); + + it('forwards failure', () => { + const clientPool = new ClientPool<{}>(1, () => { + return {}; + }); + + const op = clientPool.run(() => Promise.reject('Generated error')); + return expect(op).to.eventually.be.rejectedWith('Generated error'); + }); +}); diff --git a/test/util/helpers.ts b/test/util/helpers.ts index 5e8c55db5..0a5f74788 100644 --- a/test/util/helpers.ts +++ b/test/util/helpers.ts @@ -19,14 +19,33 @@ import {GrpcClient} from 'google-gax'; import Firestore from '../../src'; +import {ClientPool} from '../../src/pool'; +import v1beta1 from '../../src/v1beta1'; /* tslint:disable:no-any */ +type GapicClient = any; const grpc = new GrpcClient({} as any).grpc; const SSL_CREDENTIALS = (grpc.credentials as any).createInsecure(); /* tslint:enable:no-any */ const PROJECT_ID = 'test-project'; +/** A Promise implementation that supports deferred resolution. */ +export class Deferred { + promise: Promise; + resolve: (value?: R|Promise) => void = () => {}; + reject: (reason?: Error) => void = () => {}; + + constructor() { + this.promise = new Promise( + (resolve: (value?: R|Promise) => void, + reject: (reason?: Error) => void) => { + this.resolve = resolve; + this.reject = reject; + }); + } +} + /** * Interface that defines the request handlers used by Firestore. */ @@ -47,7 +66,8 @@ export interface ApiOverride { * @param {ApiOverride} apiOverrides An object with the request handlers to * override. * @param {Object} firestoreSettings Firestore Settings to configure the client. - * @return {Firestore} A new Firestore client. + * @return {Promise} A Promise that resolves with the new Firestore + * client. */ export function createInstance( apiOverrides?: ApiOverride, firestoreSettings?: {}): Promise { @@ -62,13 +82,17 @@ export function createInstance( const firestore = new Firestore(initializationOptions); - return firestore._ensureClient().then(() => { + const clientPool = new ClientPool(/* concurrentRequestLimit= */ 1, () => { + const gapicClient: GapicClient = v1beta1(initializationOptions); if (apiOverrides) { Object.keys(apiOverrides).forEach(override => { - firestore._firestoreClient._innerApiCalls[override] = - apiOverrides[override]; + gapicClient._innerApiCalls[override] = apiOverrides[override]; }); } - return firestore; + return gapicClient; }); + + firestore._initClientPool = () => Promise.resolve(clientPool); + + return Promise.resolve(firestore); }