Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support more than 100 long-lived streams #623

Merged
merged 10 commits into from
May 16, 2019
109 changes: 64 additions & 45 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import {
ReadOptions,
Settings,
} from './types';
import {requestTag} from './util';
import {Deferred, requestTag} from './util';
import {
validateBoolean,
validateFunction,
Expand Down Expand Up @@ -1131,22 +1131,19 @@ export class Firestore {
* @returns The given Stream once it is considered healthy.
*/
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream,
requestTag: string
): Promise<NodeJS.ReadableStream>;
): Promise<void>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadWriteStream,
requestTag: string,
request: {}
): Promise<NodeJS.ReadWriteStream>;
): Promise<void>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream | NodeJS.ReadWriteStream,
requestTag: string,
request?: {}
): Promise<NodeJS.ReadableStream | NodeJS.ReadWriteStream> {
): Promise<void> {
/** The last error we received and have not forwarded yet. */
let errorReceived: Error | null = null;

Expand All @@ -1172,7 +1169,6 @@ export class Firestore {
errorReceived
);
resultStream.emit('error', errorReceived);
releaser();
errorReceived = null;
} else if (!streamInitialized) {
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
Expand All @@ -1183,7 +1179,7 @@ export class Firestore {
// 'end' event we intend to forward here. We therefore need to wait
// until the API consumer registers their listeners (in the .then()
// call) before emitting any further events.
resolve(resultStream);
resolve();

// We execute the forwarding of the 'end' event via setTimeout() as
// V8 guarantees that the above the Promise chain is resolved before
Expand All @@ -1196,7 +1192,6 @@ export class Firestore {
'Forwarding stream close'
);
resultStream.emit('end');
releaser();
}
}, 0);
}
Expand Down Expand Up @@ -1238,7 +1233,6 @@ export class Firestore {
);
streamInitialized = true;
reject(err);
releaser();
} else {
errorReceived = err;
}
Expand Down Expand Up @@ -1346,36 +1340,49 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);
const result = new Deferred<NodeJS.ReadableStream>();

return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
'Received response: %j',
chunk
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger('Firestore.readStream', requestTag, 'Received error:', err);
reject(err);
}
}).then(stream => this._initializeStream(releaser, stream, requestTag));
this.push(chunk);
callback();
});

const resultStream = bun([stream, logStream]);
resultStream.on('close', lifetime.resolve);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are streams guaranteed to emit the close event? What happens in the case of an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://nodejs.org/api/stream.html, yes:

"A Writable stream will always emit the 'close' event if it is created with the emitClose option."
"A Readable stream will always emit the 'close' event if it is created with the emitClose option."

emitClose defaults to true.

I originally trusted this, but I spent more time and added test asserts. It turns out that the close event is not always emitted. To make the unit and system tests pass, I also have to wait for error/end and finish on writeable streams.

resultStream.on('end', lifetime.resolve);
resultStream.on('error', lifetime.resolve);

await this._initializeStream(resultStream, requestTag);
result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}

/**
Expand Down Expand Up @@ -1403,11 +1410,15 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);
const result = new Deferred<NodeJS.ReadWriteStream>();

this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
this._retry(attempts, requestTag, async () => {
wilhuff marked this conversation as resolved.
Show resolved Hide resolved
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

Expand All @@ -1423,14 +1434,22 @@ export class Firestore {
});

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(
releaser,
resultStream,
requestTag,
request
);
resultStream.on('close', lifetime.resolve);
resultStream.on('finish', lifetime.resolve);
resultStream.on('end', lifetime.resolve);
resultStream.on('error', lifetime.resolve);

await this._initializeStream(resultStream, requestTag, request);
result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}
}

Expand Down
33 changes: 13 additions & 20 deletions dev/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class ClientPool<T> {
*
* @private
*/
acquire(): T {
private acquire(): T {
let selectedClient: T | null = null;
let selectedRequestCount = 0;

Expand Down Expand Up @@ -79,7 +79,7 @@ export class ClientPool<T> {
* removing it from the pool of active clients.
* @private
*/
release(client: T): void {
private release(client: T): void {
let requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');

Expand All @@ -92,34 +92,27 @@ export class ClientPool<T> {
}

/**
* Creates a new function that will release the given client, when called.
*
* This guarantees that the given client can only be released once.
* The number of currently registered clients.
*
* @return Number of currently registered clients.
* @private
*/
createReleaser(client: T): () => void {
// Unfortunately, once the release() call is disconnected from the Promise
// returned from _initializeStream, there's no single callback in which the
// releaser can be guaranteed to be called once.
let released = false;
return () => {
if (!released) {
released = true;
this.release(client);
}
};
// Visible for testing.
get size(): number {
return this.activeClients.size;
}

/**
* The number of currently registered clients.
* The number of currently active operations.
*
* @return Number of currently registered clients.
* @return Number of currently active operations.
* @private
*/
// Visible for testing.
get size(): number {
return this.activeClients.size;
get opCount(): number {
let activeOperationCount = 0;
this.activeClients.forEach(count => (activeOperationCount += count));
return activeOperationCount;
}

/**
Expand Down
19 changes: 19 additions & 0 deletions dev/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@
* limitations under the License.
*/

/** A Promise implementation that supports deferred resolution. */
export class Deferred<R> {
promise: Promise<R>;
resolve: (value?: R | Promise<R>) => void = () => {};
reject: (reason?: Error) => void = () => {};

constructor() {
this.promise = new Promise(
(
resolve: (value?: R | Promise<R>) => void,
reject: (reason?: Error) => void
) => {
this.resolve = resolve;
this.reject = reject;
}
);
}
}

/**
* Generate a unique client-side identifier.
*
Expand Down
Loading