diff --git a/dev/src/index.ts b/dev/src/index.ts index 19f557027..805540c02 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -58,6 +58,7 @@ import { import {WriteBatch} from './write-batch'; import api = google.firestore.v1; +import {Deferred} from '../test/util/helpers'; export { CollectionReference, @@ -1353,40 +1354,48 @@ export class Firestore { const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; const callOptions = this.createCallOptions(); - return this._clientPool.run(gapicClient => { - return this._retry(attempts, requestTag, () => { - return new Promise((resolve, reject) => { - try { - logger( - 'Firestore.readStream', - requestTag, - 'Sending request: %j', - request - ); - const stream = gapicClient[methodName](request, callOptions); - const logStream = through2.obj(function( - this, - chunk, - enc, - callback - ) { - logger( - 'Firestore.readStream', - requestTag, - 'Received response: %j', - chunk - ); - this.push(chunk); - callback(); - }); - resolve(bun([stream, logStream])); - } catch (err) { - logger('Firestore.readStream', requestTag, 'Received error:', err); - reject(err); - } - }).then(stream => this._initializeStream(stream, requestTag)); - }); + const result = new Deferred(); + + this._clientPool.run(gapicClient => { + // While we return the stream to the callee early, we don't want to + // release the GAPIC client until the callee has finished processing the + // stream. + const lifetime = new Deferred(); + + this._retry(attempts, requestTag, async () => { + logger( + 'Firestore.readStream', + requestTag, + 'Sending request: %j', + request + ); + const stream = gapicClient[methodName](request, callOptions); + const logStream = through2.obj(function(this, chunk, enc, callback) { + logger( + 'Firestore.readStream', + requestTag, + 'Received response: %j', + chunk + ); + this.push(chunk); + callback(); + }); + const resultStream = bun([stream, logStream]); + return this._initializeStream(resultStream, requestTag); + }) + .then(stream => { + stream.on('close', lifetime.resolve); + result.resolve(stream); + }) + .catch(err => { + result.reject(err); + lifetime.resolve(); + }); + + return lifetime.promise; }); + + return result.promise; } /** @@ -1414,28 +1423,45 @@ export class Firestore { const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1; const callOptions = this.createCallOptions(); - return this._clientPool.run(gapicClient => { - return this._retry(attempts, requestTag, () => { - return Promise.resolve().then(() => { - logger('Firestore.readWriteStream', requestTag, 'Opening stream'); - const requestStream = gapicClient[methodName](callOptions); + const result = new Deferred(); - const logStream = through2.obj(function(this, chunk, enc, callback) { - logger( - 'Firestore.readWriteStream', - requestTag, - 'Received response: %j', - chunk - ); - this.push(chunk); - callback(); - }); + this._clientPool.run(gapicClient => { + // While we return the stream to the callee early, we don't want to + // release the GAPIC client until the callee has finished processing the + // stream. + const lifetime = new Deferred(); - const resultStream = bun([requestStream, logStream]); - return this._initializeStream(resultStream, requestTag, request); + this._retry(attempts, requestTag, async () => { + logger('Firestore.readWriteStream', requestTag, 'Opening stream'); + const requestStream = gapicClient[methodName](callOptions); + + const logStream = through2.obj(function(this, chunk, enc, callback) { + logger( + 'Firestore.readWriteStream', + requestTag, + 'Received response: %j', + chunk + ); + this.push(chunk); + callback(); }); - }); + + const resultStream = bun([requestStream, logStream]); + return this._initializeStream(resultStream, requestTag, request); + }) + .then(stream => { + stream.on('close', lifetime.resolve); + result.resolve(stream); + }) + .catch(err => { + result.reject(err); + lifetime.resolve(); + }); + + return lifetime.promise; }); + + return result.promise; } } diff --git a/dev/system-test/firestore.ts b/dev/system-test/firestore.ts index 496acb190..3e90d42a6 100644 --- a/dev/system-test/firestore.ts +++ b/dev/system-test/firestore.ts @@ -31,6 +31,7 @@ import { Timestamp, } from '../src'; import {autoId} from '../src/util'; +import {Deferred} from '../test/util/helpers'; const version = require('../../package.json').version; @@ -913,6 +914,36 @@ describe('DocumentReference class', () => { maybeRun(); }); }); + + it('handles more than 100 streams', async () => { + const ref = randomCol.doc('doc'); + + const emptyResults: Array> = []; + const documentResults: Array> = []; + const unsubscribeCallbacks: Array<() => void> = []; + + // A single GAPIC client can only handle 100 concurrent streams. We set + // up 100+ long-lived listeners to verify that Firestore pools requests + // across multiple clients. + for (let i = 0; i < 150; ++i) { + emptyResults[i] = new Deferred(); + documentResults[i] = new Deferred(); + unsubscribeCallbacks[i] = randomCol + .where('i', '>', i) + .onSnapshot(snapshot => { + if (snapshot.size === 0) { + emptyResults[i].resolve(); + } else if (snapshot.size === 1) { + documentResults[i].resolve(); + } + }); + } + + await Promise.all(emptyResults.map(d => d.promise)); + ref.set({i: 1337}); + await Promise.all(documentResults.map(d => d.promise)); + unsubscribeCallbacks.forEach(c => c()); + }); }); });