-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allowing 101 listeners #256
Conversation
0f4e688
to
331d3ec
Compare
Codecov Report
@@ Coverage Diff @@
## master #256 +/- ##
=====================================
Coverage 100% 100%
=====================================
Files 12 12
Lines 2104 2119 +15
Branches 457 458 +1
=====================================
+ Hits 2104 2119 +15
Continue to review full report at Codecov.
|
331d3ec
to
f1dce39
Compare
this.promise = new Promise( | ||
(resolve: (value?: R|Promise<R>) => void, | ||
reject: (reason?: Error) => void) => { | ||
this.resolve = resolve; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
src/pool.ts
Outdated
} | ||
|
||
/** | ||
* Reduces the number of operation for the provided client, potentially |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
test/pool.ts
Outdated
@@ -0,0 +1,158 @@ | |||
/** | |||
* Copyright 2017 Google Inc. All Rights Reserved. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
test/pool.ts
Outdated
|
||
const deferred = deferredPromises(4); | ||
|
||
clientPool.run(() => deferred[0].promise); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
test/pool.ts
Outdated
completionPromises.push(clientPool.run(() => operationPromises[3].promise)); | ||
expect(clientPool.size).to.eq(2); | ||
|
||
operationPromises[0].resolve(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Mike, since you have more context on this client, do you mind taking a look at this as well? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got 99 problems, but listening to 1 more problem isn't one of them anymore.
This looks good to me. Nice work with the auto-resizing pool stuff. I left a few nit / comment suggestions, but nothing major.
src/index.js
Outdated
@@ -126,6 +132,12 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix'; | |||
*/ | |||
const MAX_REQUEST_RETRIES = 5; | |||
|
|||
/*! | |||
* The maximum number of concurrent requests supported by a GAPIC client. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
src/index.js
Outdated
@@ -239,11 +251,11 @@ class Firestore { | |||
}); | |||
|
|||
/** | |||
* A client pool to distribute requests over multiple GAPIC clients. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
src/pool.ts
Outdated
* concurrent operations. | ||
*/ | ||
export class ClientPool<T> { | ||
private activeClients: Map<T, number> = new Map(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
src/pool.ts
Outdated
this.activeClients.forEach((requestCount, client) => { | ||
if (!selectedClient && requestCount < this.concurrentOperationLimit) { | ||
selectedClient = client; | ||
currentRequestCount = requestCount; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
src/pool.ts
Outdated
*/ | ||
private release(client: T): void { | ||
let currentRequestCount = this.activeClients.get(client); | ||
assert(currentRequestCount! > 0, 'Active client not found'); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
src/pool.ts
Outdated
* removing it from the pool of active clients. | ||
*/ | ||
private release(client: T): void { | ||
let currentRequestCount = this.activeClients.get(client); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
run<V>(op: (client: T) => Promise<V>): Promise<V> { | ||
const client = this.acquire(); | ||
|
||
return op(client) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
/** | ||
* Deletes clients that are no longer executing operations. Keeps up to one | ||
* idle client to reduce future initialization costs. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
system-test/firestore.js
Outdated
version = require('../../package.json').version; | ||
} catch (e) { | ||
version = require('../package.json').version; | ||
} |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
3ecd80b
to
e13acbe
Compare
e13acbe
to
0aeee35
Compare
Previously _initializeStream returned a Promise that indicated that the stream was "released", i.e. that it was was ready for attaching listeners. #256 Added pooled clients and changed the callers of _initializeStream to reuse this promise such that when it was resolved, the stream could be returned to the pool. This works when listeners are short-lived, but fails when listeners run indefinitely. This change arranges to release the clients back to the pool only after the stream has completed, which allows an arbitrary number of indefinite listens to run without problems.
Previously _initializeStream returned a Promise that indicated that the stream was "released", i.e. that it was was ready for attaching listeners. #256 Added pooled clients and changed the callers of _initializeStream to reuse this promise such that when it was resolved, the stream could be returned to the pool. This works when listeners are short-lived, but fails when listeners run indefinitely. This change arranges to release the clients back to the pool only after the stream has completed, which allows an arbitrary number of indefinite listens to run without problems.
Fixes firebase/firebase-admin-node#499 Previously _initializeStream returned a Promise that indicated that the stream was "released", i.e. that it was was ready for attaching listeners. #256 Added pooled clients and changed the callers of _initializeStream to reuse this promise such that when it was resolved, the stream could be returned to the pool. This works when listeners are short-lived, but fails when listeners run indefinitely. This change arranges to release the clients back to the pool only after the stream has completed, which allows an arbitrary number of indefinite listens to run without problems. This turns out to be fiendishly difficult to test given the current structure of the code. A second pass at this that reformulates this as just another stream that composes with the others would make this easier to understand and test. For now, this fix unblocks the customers waiting on the referenced issue.
The GAPIC client that handles all of Firestore's networking is limited to 100 connections. This PR lift this limit by replacing the static GAPIC client with a pool of clients. If more than 100 operations run concurrently, we create additional clients.