-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Support more than 100 long-lived streams #623
Changes from 2 commits
bdd3ce4
4d5250a
0ffaee6
52ff6d3
18c1a0e
1c4d016
cc8d75d
75fc446
3a529b1
29caec6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,7 @@ import { | |
import {WriteBatch} from './write-batch'; | ||
|
||
import api = google.firestore.v1; | ||
import {Deferred} from '../test/util/helpers'; | ||
|
||
export { | ||
CollectionReference, | ||
|
@@ -1144,18 +1145,15 @@ export class Firestore { | |
* @returns The given Stream once it is considered healthy. | ||
*/ | ||
private _initializeStream( | ||
releaser: () => void, | ||
resultStream: NodeJS.ReadableStream, | ||
requestTag: string | ||
): Promise<NodeJS.ReadableStream>; | ||
private _initializeStream( | ||
releaser: () => void, | ||
resultStream: NodeJS.ReadWriteStream, | ||
requestTag: string, | ||
request: {} | ||
): Promise<NodeJS.ReadWriteStream>; | ||
private _initializeStream( | ||
releaser: () => void, | ||
resultStream: NodeJS.ReadableStream | NodeJS.ReadWriteStream, | ||
requestTag: string, | ||
request?: {} | ||
|
@@ -1167,7 +1165,7 @@ export class Firestore { | |
* Whether we have resolved the Promise and returned the stream to the | ||
* caller. | ||
*/ | ||
let streamInitialized = false; | ||
let streamReleased = false; | ||
|
||
/** | ||
* Whether the stream end has been reached. This has to be forwarded to the | ||
|
@@ -1176,7 +1174,7 @@ export class Firestore { | |
let endCalled = false; | ||
|
||
return new Promise((resolve, reject) => { | ||
const streamReady = () => { | ||
const releaseStream = () => { | ||
if (errorReceived) { | ||
logger( | ||
'Firestore._initializeStream', | ||
|
@@ -1185,11 +1183,10 @@ export class Firestore { | |
errorReceived | ||
); | ||
resultStream.emit('error', errorReceived); | ||
releaser(); | ||
errorReceived = null; | ||
} else if (!streamInitialized) { | ||
} else if (!streamReleased) { | ||
logger('Firestore._initializeStream', requestTag, 'Releasing stream'); | ||
streamInitialized = true; | ||
streamReleased = true; | ||
resultStream.pause(); | ||
|
||
// Calling 'stream.pause()' only holds up 'data' events and not the | ||
|
@@ -1209,7 +1206,6 @@ export class Firestore { | |
'Forwarding stream close' | ||
); | ||
resultStream.emit('end'); | ||
releaser(); | ||
} | ||
}, 0); | ||
} | ||
|
@@ -1220,7 +1216,7 @@ export class Firestore { | |
// possible to avoid the default stream behavior (which is just to log and | ||
// continue). | ||
resultStream.on('readable', () => { | ||
streamReady(); | ||
releaseStream(); | ||
}); | ||
|
||
resultStream.on('end', () => { | ||
|
@@ -1230,7 +1226,7 @@ export class Firestore { | |
'Received stream end' | ||
); | ||
endCalled = true; | ||
streamReady(); | ||
releaseStream(); | ||
}); | ||
|
||
resultStream.on('error', err => { | ||
|
@@ -1242,16 +1238,15 @@ export class Firestore { | |
); | ||
// If we receive an error before we were able to receive any data, | ||
// reject this stream. | ||
if (!streamInitialized) { | ||
if (!streamReleased) { | ||
logger( | ||
'Firestore._initializeStream', | ||
requestTag, | ||
'Received initial error:', | ||
err | ||
); | ||
streamInitialized = true; | ||
streamReleased = true; | ||
reject(err); | ||
releaser(); | ||
} else { | ||
errorReceived = err; | ||
} | ||
|
@@ -1274,7 +1269,7 @@ export class Firestore { | |
requestTag, | ||
'Marking stream as healthy' | ||
); | ||
streamReady(); | ||
releaseStream(); | ||
}); | ||
} | ||
}); | ||
|
@@ -1359,36 +1354,48 @@ export class Firestore { | |
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; | ||
const callOptions = this.createCallOptions(); | ||
|
||
const gapicClient = this._clientPool.acquire(); | ||
const releaser = this._clientPool.createReleaser(gapicClient); | ||
const result = new Deferred<NodeJS.ReadableStream>(); | ||
|
||
return this._retry(attempts, requestTag, () => { | ||
return new Promise<NodeJS.ReadableStream>((resolve, reject) => { | ||
try { | ||
this._clientPool.run(gapicClient => { | ||
// While we return the stream to the callee early, we don't want to | ||
// release the GAPIC client until the callee has finished processing the | ||
// stream. | ||
const lifetime = new Deferred<void>(); | ||
|
||
this._retry(attempts, requestTag, async () => { | ||
logger( | ||
'Firestore.readStream', | ||
requestTag, | ||
'Sending request: %j', | ||
request | ||
); | ||
const stream = gapicClient[methodName](request, callOptions); | ||
const logStream = through2.obj(function(this, chunk, enc, callback) { | ||
logger( | ||
'Firestore.readStream', | ||
requestTag, | ||
'Sending request: %j', | ||
request | ||
'Received response: %j', | ||
chunk | ||
); | ||
const stream = gapicClient[methodName](request, callOptions); | ||
const logStream = through2.obj(function(this, chunk, enc, callback) { | ||
logger( | ||
'Firestore.readStream', | ||
requestTag, | ||
'Received response: %j', | ||
chunk | ||
); | ||
this.push(chunk); | ||
callback(); | ||
}); | ||
resolve(bun([stream, logStream])); | ||
} catch (err) { | ||
logger('Firestore.readStream', requestTag, 'Received error:', err); | ||
reject(err); | ||
} | ||
}).then(stream => this._initializeStream(releaser, stream, requestTag)); | ||
this.push(chunk); | ||
callback(); | ||
}); | ||
const resultStream = bun([stream, logStream]); | ||
return this._initializeStream(resultStream, requestTag); | ||
}) | ||
.then(stream => { | ||
stream.on('close', lifetime.resolve); | ||
result.resolve(stream); | ||
}) | ||
.catch(err => { | ||
lifetime.resolve(); | ||
result.reject(err); | ||
}); | ||
|
||
return lifetime.promise; | ||
}); | ||
|
||
return result.promise; | ||
} | ||
|
||
/** | ||
|
@@ -1416,11 +1423,15 @@ export class Firestore { | |
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; | ||
const callOptions = this.createCallOptions(); | ||
|
||
const gapicClient = this._clientPool.acquire(); | ||
const releaser = this._clientPool.createReleaser(gapicClient); | ||
const result = new Deferred<NodeJS.ReadWriteStream>(); | ||
|
||
this._clientPool.run(gapicClient => { | ||
// While we return the stream to the callee early, we don't want to | ||
// release the GAPIC client until the callee has finished processing the | ||
// stream. | ||
const lifetime = new Deferred<void>(); | ||
|
||
return this._retry(attempts, requestTag, () => { | ||
return Promise.resolve().then(() => { | ||
this._retry(attempts, requestTag, async () => { | ||
wilhuff marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger('Firestore.readWriteStream', requestTag, 'Opening stream'); | ||
const requestStream = gapicClient[methodName](callOptions); | ||
|
||
|
@@ -1436,14 +1447,21 @@ export class Firestore { | |
}); | ||
|
||
const resultStream = bun([requestStream, logStream]); | ||
return this._initializeStream( | ||
releaser, | ||
resultStream, | ||
requestTag, | ||
request | ||
); | ||
}); | ||
return this._initializeStream(resultStream, requestTag, request); | ||
}) | ||
.then(stream => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. async/await? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cleaned this up. I also removed the return value from _initializeStream as part of this since we always return the unmodified resultStream. |
||
stream.on('close', lifetime.resolve); | ||
result.resolve(stream); | ||
}) | ||
.catch(err => { | ||
lifetime.resolve(); | ||
result.reject(err); | ||
}); | ||
|
||
return lifetime.promise; | ||
}); | ||
|
||
return result.promise; | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ import { | |
Timestamp, | ||
} from '../src'; | ||
import {autoId} from '../src/util'; | ||
import {Deferred} from '../test/util/helpers'; | ||
|
||
const version = require('../../package.json').version; | ||
|
||
|
@@ -913,6 +914,37 @@ describe('DocumentReference class', () => { | |
maybeRun(); | ||
}); | ||
}); | ||
|
||
it('handles more than 100 concurrent listeners', async () => { | ||
const ref = randomCol.doc('doc'); | ||
|
||
const emptyResults: Array<Deferred<void>> = []; | ||
const documentResults: Array<Deferred<void>> = []; | ||
const unsubscribeCallbacks: Array<() => void> = []; | ||
|
||
// A single GAPIC client can only handle 100 concurrent streams. We set | ||
// up 100+ long-lived listeners to verify that Firestore pools requests | ||
// across multiple clients. | ||
for (let i = 0; i < 150; ++i) { | ||
emptyResults[i] = new Deferred<void>(); | ||
documentResults[i] = new Deferred<void>(); | ||
|
||
unsubscribeCallbacks[i] = randomCol | ||
.where('i', '>', i) | ||
.onSnapshot(snapshot => { | ||
if (snapshot.size === 0) { | ||
emptyResults[i].resolve(); | ||
} else if (snapshot.size === 1) { | ||
documentResults[i].resolve(); | ||
} | ||
}); | ||
} | ||
|
||
await Promise.all(emptyResults.map(d => d.promise)); | ||
ref.set({i: 1337}); | ||
await Promise.all(documentResults.map(d => d.promise)); | ||
unsubscribeCallbacks.forEach(c => c()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test verifies that all 150 listeners succeed but doesn't verify that everything has been properly released to the pool. Is it possible to check that pool.size is 150 once the listeners are started and then get back to zero after? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I worry that this test can succeed even if you remove the line that resolves the lifetime promise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a "shutdown" block to each tests that verifies that the operation count goes back to zero. I had to change some of the unit tests to make this work. |
||
}); | ||
}); | ||
}); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this name. "released" can be confused with releasing back into the pool. I certainly was confused that way when reading this.
This promise represents that the stream is ready for use or initialized or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the PR to keep the renames of your original commit.