Skip to content

Commit

Permalink
Don't release clients before stream close
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed May 15, 2019
1 parent bdd3ce4 commit f8404db
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 51 deletions.
128 changes: 77 additions & 51 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {
import {WriteBatch} from './write-batch';

import api = google.firestore.v1;
import {Deferred} from '../test/util/helpers';

export {
CollectionReference,
Expand Down Expand Up @@ -1353,40 +1354,48 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

return this._clientPool.run(gapicClient => {
return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(
this,
chunk,
enc,
callback
) {
logger(
'Firestore.readStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger('Firestore.readStream', requestTag, 'Received error:', err);
reject(err);
}
}).then(stream => this._initializeStream(stream, requestTag));
});
const result = new Deferred<NodeJS.ReadableStream>();

this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});
const resultStream = bun([stream, logStream]);
return this._initializeStream(resultStream, requestTag);
})
.then(stream => {
stream.on('close', lifetime.resolve);
result.resolve(stream);
})
.catch(err => {
result.reject(err);
lifetime.resolve();
});

return lifetime.promise;
});

return result.promise;
}

/**
Expand Down Expand Up @@ -1414,28 +1423,45 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

return this._clientPool.run(gapicClient => {
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);
const result = new Deferred<NodeJS.ReadWriteStream>();

const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});
this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(resultStream, requestTag, request);
this._retry(attempts, requestTag, async () => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});
});

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(resultStream, requestTag, request);
})
.then(stream => {
stream.on('close', lifetime.resolve);
result.resolve(stream);
})
.catch(err => {
result.reject(err);
lifetime.resolve();
});

return lifetime.promise;
});

return result.promise;
}
}

Expand Down
31 changes: 31 additions & 0 deletions dev/system-test/firestore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
Timestamp,
} from '../src';
import {autoId} from '../src/util';
import {Deferred} from '../test/util/helpers';

const version = require('../../package.json').version;

Expand Down Expand Up @@ -913,6 +914,36 @@ describe('DocumentReference class', () => {
maybeRun();
});
});

it('handles more than 100 streams', async () => {
const ref = randomCol.doc('doc');

const emptyResults: Array<Deferred<void>> = [];
const documentResults: Array<Deferred<void>> = [];
const unsubscribeCallbacks: Array<() => void> = [];

// A single GAPIC client can only handle 100 concurrent streams. We set
// up 100+ long-lived listeners to verify that Firestore pools requests
// across multiple clients.
for (let i = 0; i < 150; ++i) {
emptyResults[i] = new Deferred<void>();
documentResults[i] = new Deferred<void>();
unsubscribeCallbacks[i] = randomCol
.where('i', '>', i)
.onSnapshot(snapshot => {
if (snapshot.size === 0) {
emptyResults[i].resolve();
} else if (snapshot.size === 1) {
documentResults[i].resolve();
}
});
}

await Promise.all(emptyResults.map(d => d.promise));
ref.set({i: 1337});
await Promise.all(documentResults.map(d => d.promise));
unsubscribeCallbacks.forEach(c => c());
});
});
});

Expand Down

0 comments on commit f8404db

Please sign in to comment.