Skip to content

Commit

Permalink
fix: Fix client pooling for long-lived listens (#614)
Browse files Browse the repository at this point in the history
Fixes firebase/firebase-admin-node#499

Previously _initializeStream returned a Promise that indicated that the
stream was "released", i.e. that it was was ready for attaching
listeners.

#256 Added pooled clients and changed the
callers of _initializeStream to reuse this promise such that when it was
resolved, the stream could be returned to the pool. This works when
listeners are short-lived, but fails when listeners run indefinitely.

This change arranges to release the clients back to the pool only after
the stream has completed, which allows an arbitrary number of indefinite
listens to run without problems.

This turns out to be fiendishly difficult to test given the current structure
of the code. A second pass at this that reformulates this as just another
stream that composes with the others would make this easier to understand and
test. For now, this fix unblocks the customers waiting on the referenced issue.
  • Loading branch information
wilhuff authored May 10, 2019
1 parent c119250 commit 479bc9c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 54 deletions.
111 changes: 59 additions & 52 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1011,12 +1011,13 @@ export class Firestore {
* @returns The given Stream once it is considered healthy.
*/
private _initializeStream(
resultStream: NodeJS.ReadableStream,
releaser: () => void, resultStream: NodeJS.ReadableStream,
requestTag: string): Promise<NodeJS.ReadableStream>;
private _initializeStream(
resultStream: NodeJS.ReadWriteStream, requestTag: string,
request: {}): Promise<NodeJS.ReadWriteStream>;
releaser: () => void, resultStream: NodeJS.ReadWriteStream,
requestTag: string, request: {}): Promise<NodeJS.ReadWriteStream>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream|NodeJS.ReadWriteStream,
requestTag: string,
request?: {}): Promise<NodeJS.ReadableStream|NodeJS.ReadWriteStream> {
Expand All @@ -1027,7 +1028,7 @@ export class Firestore {
* Whether we have resolved the Promise and returned the stream to the
* caller.
*/
let streamReleased = false;
let streamInitialized = false;

/**
* Whether the stream end has been reached. This has to be forwarded to the
Expand All @@ -1036,16 +1037,17 @@ export class Firestore {
let endCalled = false;

return new Promise((resolve, reject) => {
const releaseStream = () => {
const streamReady = () => {
if (errorReceived) {
logger(
'Firestore._initializeStream', requestTag,
'Emit error:', errorReceived);
resultStream.emit('error', errorReceived);
releaser();
errorReceived = null;
} else if (!streamReleased) {
} else if (!streamInitialized) {
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
streamReleased = true;
streamInitialized = true;
resultStream.pause();

// Calling 'stream.pause()' only holds up 'data' events and not the
Expand All @@ -1063,6 +1065,7 @@ export class Firestore {
'Firestore._initializeStream', requestTag,
'Forwarding stream close');
resultStream.emit('end');
releaser();
}
}, 0);
}
Expand All @@ -1073,14 +1076,14 @@ export class Firestore {
// possible to avoid the default stream behavior (which is just to log and
// continue).
resultStream.on('readable', () => {
releaseStream();
streamReady();
});

resultStream.on('end', () => {
logger(
'Firestore._initializeStream', requestTag, 'Received stream end');
endCalled = true;
releaseStream();
streamReady();
});

resultStream.on('error', err => {
Expand All @@ -1089,12 +1092,13 @@ export class Firestore {
'Received stream error:', err);
// If we receive an error before we were able to receive any data,
// reject this stream.
if (!streamReleased) {
if (!streamInitialized) {
logger(
'Firestore._initializeStream', requestTag,
'Received initial error:', err);
streamReleased = true;
streamInitialized = true;
reject(err);
releaser();
} else {
errorReceived = err;
}
Expand All @@ -1112,7 +1116,7 @@ export class Firestore {
logger(
'Firestore._initializeStream', requestTag,
'Marking stream as healthy');
releaseStream();
streamReady();
});
}
});
Expand Down Expand Up @@ -1182,32 +1186,33 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

return this._clientPool.run(gapicClient => {
return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
logger(
'Firestore.readStream', requestTag,
'Sending request: %j', request);
const stream = gapicClient[methodName](request, callOptions);
const logStream =
through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger(
'Firestore.readStream', requestTag,
'Received error:', err);
reject(err);
}
})
.then(stream => this._initializeStream(stream, requestTag));
});
const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);

return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
logger(
'Firestore.readStream', requestTag, 'Sending request: %j',
request);
const stream = gapicClient[methodName](request, callOptions);
const logStream =
through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger(
'Firestore.readStream', requestTag,
'Received error:', err);
reject(err);
}
})
.then(stream => this._initializeStream(releaser, stream, requestTag));
});
}

Expand All @@ -1233,23 +1238,25 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

return this._clientPool.run(gapicClient => {
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);
const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);

const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream', requestTag,
'Received response: %j', chunk);
this.push(chunk);
callback();
});
return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(resultStream, requestTag, request);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readWriteStream', requestTag, 'Received response: %j',
chunk);
this.push(chunk);
callback();
});

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(
releaser, resultStream, requestTag, request);
});
});
}
Expand Down
24 changes: 22 additions & 2 deletions dev/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class ClientPool<T> {
*
* @private
*/
private acquire(): T {
acquire(): T {
let selectedClient: T|null = null;
let selectedRequestCount = 0;

Expand Down Expand Up @@ -77,7 +77,7 @@ export class ClientPool<T> {
* removing it from the pool of active clients.
* @private
*/
private release(client: T): void {
release(client: T): void {
let requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');

Expand All @@ -89,6 +89,26 @@ export class ClientPool<T> {
}
}

/**
* Creates a new function that will release the given client, when called.
*
* This guarantees that the given client can only be released once.
*
* @private
*/
createReleaser(client: T): () => void {
// Unfortunately, once the release() call is disconnected from the Promise
// returned from _initializeStream, there's no single callback in which the
// releaser can be guaranteed to be called once.
let released = false;
return () => {
if (!released) {
released = true;
this.release(client);
}
};
}

/**
* The number of currently registered clients.
*
Expand Down

0 comments on commit 479bc9c

Please sign in to comment.