Skip to content

Commit

Permalink
Allowing 101 listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jul 12, 2018
1 parent f4cb701 commit f1dce39
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 74 deletions.
124 changes: 83 additions & 41 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

'use strict';

const assert = require('assert');
const bun = require('bun');
const extend = require('extend');
const is = require('is');
Expand Down Expand Up @@ -70,6 +71,11 @@ const FieldValue = require('./field-value').FieldValue;
*/
const Timestamp = require('./timestamp');

/*!
* @see ClientPool
*/
const ClientPool = require('./pool').ClientPool;

/*!
* @see CollectionReference
*/
Expand Down Expand Up @@ -106,12 +112,12 @@ let Transaction;
/*!
* @see v1beta1
*/
let v1beta1; // Lazy-loaded in `_ensureClient()`
let v1beta1; // Lazy-loaded in `_runRequest()`

/*!
* @see @google-cloud/common
*/
let common; // Lazy-loaded in `_ensureClient()`
let common; // Lazy-loaded in `_runRequest()`

/*!
* HTTP header for the resource prefix to improve routing and project isolation
Expand All @@ -126,6 +132,12 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix';
*/
const MAX_REQUEST_RETRIES = 5;

/*!
* The maximum number of concurrent requests supported by a GAPIC client.
* @type {number}
*/
const MAX_CONCURRENT_REQUESTS_PER_CLIENT = 100;

/*!
* GRPC Error code for 'UNAVAILABLE'.
* @type {number}
Expand Down Expand Up @@ -239,11 +251,11 @@ class Firestore {
});

/**
* A client pool to distribute requests over multiple GAPIC clients.
* @private
* @type {object|null}
* @property {FirestoreClient} Firestore The Firestore GAPIC client.
* @type {ClientPool|null}
*/
this._firestoreClient = null;
this._clientPool = null;

/**
* The configuration options for the GAPIC client.
Expand Down Expand Up @@ -731,47 +743,78 @@ follow these steps, YOUR APP MAY BREAK.`);
}

/**
* Initializes the client and detects the Firestore Project ID. Returns a
* Promise on completion. If the client is already initialized, the returned
* Promise resolves immediately.
* Executes a new request using the first available GAPIC client.
*
* @private
*/
_ensureClient() {
_runRequest(op) {
// Initialize the client pool if this is the first request.
if (!this._clientInitialized) {
common = require('@google-cloud/common');
this._clientInitialized = this._initClientPool().then(clientPool => {
this._clientPool = clientPool;
})
}

this._clientInitialized = new Promise((resolve, reject) => {
this._firestoreClient =
module.exports.v1beta1(this._initalizationOptions);
return this._clientInitialized.then(() => this._clientPool.run(op));
}

Firestore.log('Firestore', null, 'Initialized Firestore GAPIC Client');
/**
* Initializes the client pool and invokes Project ID detection. Returns a
* Promise on completion.
*
* @private
* @return {Promise<ClientPool>}
*/
_initClientPool() {
assert(!this._clientInitialized, 'Client pool already initialized');

// We schedule Project ID detection using `setImmediate` to allow the
// testing framework to provide its own implementation of
// `getProjectId`.
setImmediate(() => {
this._firestoreClient.getProjectId((err, projectId) => {
if (err) {
Firestore.log(
'Firestore._ensureClient', null,
'Failed to detect project ID: %s', err);
reject(err);
} else {
Firestore.log(
'Firestore._ensureClient', null, 'Detected project ID: %s',
projectId);
this._referencePath =
new ResourcePath(projectId, this._referencePath.databaseId);
resolve();
}
});
const clientPool =
new ClientPool(MAX_CONCURRENT_REQUESTS_PER_CLIENT, () => {
const gapicClient =
module.exports.v1beta1(this._initalizationOptions);
Firestore.log(
'Firestore', null, 'Initialized Firestore GAPIC Client');
return gapicClient;
});
});

const projectIdProvided = this._referencePath.projectId !== '{{projectId}}';

if (projectIdProvided) {
return Promise.resolve(clientPool);
} else {
return clientPool.run(client => this._detectProjectId(client))
.then(projectId => {
this._referencePath =
new ResourcePath(projectId, this._referencePath.databaseId);
return clientPool;
});
}
return this._clientInitialized;
}

/**
* Auto-detects the Firestore Project ID.
*
* @private
* @param {object} gapicClient - The Firestore GAPIC client.
* @return {Promise<string>} A Promise that resolves with the Project ID.
*/
_detectProjectId(gapicClient) {
return new Promise(
(resolve, reject) => {gapicClient.getProjectId((err, projectId) => {
if (err) {
Firestore.log(
'Firestore._detectProjectId', null,
'Failed to detect project ID: %s', err);
reject(err);
} else {
Firestore.log(
'Firestore._detectProjectId', null, 'Detected project ID: %s',
projectId);
resolve(projectId);
}
})});
}
/**
* Decorate the request options of an API request. This is used to replace
* any `{{projectId}}` placeholders with the value detected from the user's
Expand Down Expand Up @@ -978,14 +1021,14 @@ follow these steps, YOUR APP MAY BREAK.`);
request(methodName, request, requestTag, allowRetries) {
let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;

return this._ensureClient().then(() => {
return this._runRequest(gapicClient => {
const decorated = this._decorateRequest(request);
return this._retry(attempts, requestTag, () => {
return new Promise((resolve, reject) => {
Firestore.log(
'Firestore.request', requestTag, 'Sending request: %j',
decorated.request);
this._firestoreClient[methodName](
gapicClient[methodName](
decorated.request, decorated.gax, (err, result) => {
if (err) {
Firestore.log(
Expand Down Expand Up @@ -1023,15 +1066,15 @@ follow these steps, YOUR APP MAY BREAK.`);
readStream(methodName, request, requestTag, allowRetries) {
let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;

return this._ensureClient().then(() => {
return this._runRequest(gapicClient => {
const decorated = this._decorateRequest(request);
return this._retry(attempts, requestTag, () => {
return new Promise((resolve, reject) => {
try {
Firestore.log(
'Firestore.readStream', requestTag,
'Sending request: %j', decorated.request);
let stream = this._firestoreClient[methodName](
let stream = gapicClient[methodName](
decorated.request, decorated.gax);
let logger = through.obj(function(chunk, enc, callback) {
Firestore.log(
Expand Down Expand Up @@ -1075,16 +1118,15 @@ follow these steps, YOUR APP MAY BREAK.`);
let self = this;
let attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;

return this._ensureClient().then(() => {
return this._runRequest(gapicClient => {
const decorated = this._decorateRequest(request);
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
Firestore.log(
'Firestore.readWriteStream', requestTag, 'Opening stream');
// The generated bi-directional streaming API takes the list of GAX
// headers as its second argument.
let requestStream =
this._firestoreClient[methodName]({}, decorated.gax);
let requestStream = gapicClient[methodName]({}, decorated.gax);

// The transform stream to assign the project ID.
let transform = through.obj(function(chunk, encoding, callback) {
Expand Down
134 changes: 134 additions & 0 deletions src/pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*!
* Copyright 2018 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

import assert from 'assert';

/**
* An auto-resizing pool that distributes concurrent operations over multiple
* clients of type `T`.
*
* ClientPool is used within Firestore to manage a pool of GAPIC clients and
* automatically initializes multiple clients if we issue more than 100
* concurrent operations.
*/
export class ClientPool<T> {
private activeClients: Map<T, number> = new Map();

/**
* @param {number} concurrentOperationLimit - The number of operations that
* each client can handle.
* @param {() => T} clientFactory - A factory function called as needed when
* new clients are required.
*/
constructor(
private readonly concurrentOperationLimit: number,
private readonly clientFactory: () => T) {}

/**
* Returns an already existing client if it has less than the maximum number
* of concurrent operations or initializes and returns a new client.
*/
private acquire(): T {
let selectedClient: T|null = null;
let currentRequestCount = 0;

this.activeClients.forEach((requestCount, client) => {
if (!selectedClient && requestCount < this.concurrentOperationLimit) {
selectedClient = client;
currentRequestCount = requestCount;
}
});

if (!selectedClient) {
selectedClient = this.clientFactory();
assert(
!this.activeClients.has(selectedClient),
'The provided client factory returned an existing instance');
}

this.activeClients.set(selectedClient, currentRequestCount + 1);

return selectedClient!;
}

/**
* Reduces the number of operation for the provided client, potentially
* removing it from the pool of active clients.
*/
private release(client: T): void {
let currentRequestCount = this.activeClients.get(client);
assert(currentRequestCount! > 0, 'Active client not found');

currentRequestCount = currentRequestCount! - 1;
this.activeClients.set(client, currentRequestCount);

if (currentRequestCount === 0) {
this.garbageCollect();
}
}

/**
* The number of currently registered clients.
*
* @return {number} Number of currently registered clients.
*/
// Visible for testing.
get size(): number {
return this.activeClients.size;
}

/**
* Runs the provided operation in this pool. This function may create an
* additional client if all existing clients already operate at the concurrent
* operation limit.
*
* @param {(client: T) => Promise<V>} op - A callback function that returns a
* Promise. The client T will be returned to the pool when callback finishes.
* @return {Promise<V>} A Promise that resolves with the result of `op`.
*/
run<V>(op: (client: T) => Promise<V>): Promise<V> {
const client = this.acquire();

return op(client)
.catch(err => {
this.release(client);
return Promise.reject(err);
})
.then(res => {
this.release(client);
return res;
});
}

/**
* Deletes clients that are no longer executing operations. Keeps up to one
* idle client to reduce future initialization costs.
*/
private garbageCollect(): void {
let idleClients = 0;
this.activeClients.forEach((requestCount, client) => {
if (requestCount === 0) {
++idleClients;

if (idleClients > 1) {
this.activeClients.delete(client);
}
}
});
}
}
8 changes: 7 additions & 1 deletion system-test/firestore.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ let DocumentReference = require('../src/reference').DocumentReference;
let DocumentSnapshot =
require('../src/document')(DocumentReference).DocumentSnapshot;

let version = require('../../package.json').version;
let version;
try {
version = require('../../package.json').version;
} catch (e) {
version = require('../package.json').version;
}

let Firestore = require('../src');

if (process.env.NODE_ENV === 'DEBUG') {
Expand Down
Loading

0 comments on commit f1dce39

Please sign in to comment.