Skip to content

Commit

Permalink
fix: Support more than 100 long-lived streams (#623)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian authored May 16, 2019
1 parent 43ac9c6 commit 9474e3f
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 112 deletions.
109 changes: 64 additions & 45 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import {
ReadOptions,
Settings,
} from './types';
import {requestTag} from './util';
import {Deferred, requestTag} from './util';
import {
validateBoolean,
validateFunction,
Expand Down Expand Up @@ -1131,22 +1131,19 @@ export class Firestore {
* @returns The given Stream once it is considered healthy.
*/
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream,
requestTag: string
): Promise<NodeJS.ReadableStream>;
): Promise<void>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadWriteStream,
requestTag: string,
request: {}
): Promise<NodeJS.ReadWriteStream>;
): Promise<void>;
private _initializeStream(
releaser: () => void,
resultStream: NodeJS.ReadableStream | NodeJS.ReadWriteStream,
requestTag: string,
request?: {}
): Promise<NodeJS.ReadableStream | NodeJS.ReadWriteStream> {
): Promise<void> {
/** The last error we received and have not forwarded yet. */
let errorReceived: Error | null = null;

Expand All @@ -1172,7 +1169,6 @@ export class Firestore {
errorReceived
);
resultStream.emit('error', errorReceived);
releaser();
errorReceived = null;
} else if (!streamInitialized) {
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
Expand All @@ -1183,7 +1179,7 @@ export class Firestore {
// 'end' event we intend to forward here. We therefore need to wait
// until the API consumer registers their listeners (in the .then()
// call) before emitting any further events.
resolve(resultStream);
resolve();

// We execute the forwarding of the 'end' event via setTimeout() as
// V8 guarantees that the above the Promise chain is resolved before
Expand All @@ -1196,7 +1192,6 @@ export class Firestore {
'Forwarding stream close'
);
resultStream.emit('end');
releaser();
}
}, 0);
}
Expand Down Expand Up @@ -1238,7 +1233,6 @@ export class Firestore {
);
streamInitialized = true;
reject(err);
releaser();
} else {
errorReceived = err;
}
Expand Down Expand Up @@ -1346,36 +1340,49 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);
const result = new Deferred<NodeJS.ReadableStream>();

return this._retry(attempts, requestTag, () => {
return new Promise<NodeJS.ReadableStream>((resolve, reject) => {
try {
this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

this._retry(attempts, requestTag, async () => {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Sending request: %j',
request
'Received response: %j',
chunk
);
const stream = gapicClient[methodName](request, callOptions);
const logStream = through2.obj(function(this, chunk, enc, callback) {
logger(
'Firestore.readStream',
requestTag,
'Received response: %j',
chunk
);
this.push(chunk);
callback();
});
resolve(bun([stream, logStream]));
} catch (err) {
logger('Firestore.readStream', requestTag, 'Received error:', err);
reject(err);
}
}).then(stream => this._initializeStream(releaser, stream, requestTag));
this.push(chunk);
callback();
});

const resultStream = bun([stream, logStream]);
resultStream.on('close', lifetime.resolve);
resultStream.on('end', lifetime.resolve);
resultStream.on('error', lifetime.resolve);

await this._initializeStream(resultStream, requestTag);
result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}

/**
Expand Down Expand Up @@ -1403,11 +1410,15 @@ export class Firestore {
const attempts = allowRetries ? MAX_REQUEST_RETRIES : 1;
const callOptions = this.createCallOptions();

const gapicClient = this._clientPool.acquire();
const releaser = this._clientPool.createReleaser(gapicClient);
const result = new Deferred<NodeJS.ReadWriteStream>();

this._clientPool.run(gapicClient => {
// While we return the stream to the callee early, we don't want to
// release the GAPIC client until the callee has finished processing the
// stream.
const lifetime = new Deferred<void>();

return this._retry(attempts, requestTag, () => {
return Promise.resolve().then(() => {
this._retry(attempts, requestTag, async () => {
logger('Firestore.readWriteStream', requestTag, 'Opening stream');
const requestStream = gapicClient[methodName](callOptions);

Expand All @@ -1423,14 +1434,22 @@ export class Firestore {
});

const resultStream = bun([requestStream, logStream]);
return this._initializeStream(
releaser,
resultStream,
requestTag,
request
);
resultStream.on('close', lifetime.resolve);
resultStream.on('finish', lifetime.resolve);
resultStream.on('end', lifetime.resolve);
resultStream.on('error', lifetime.resolve);

await this._initializeStream(resultStream, requestTag, request);
result.resolve(resultStream);
}).catch(err => {
lifetime.resolve();
result.reject(err);
});

return lifetime.promise;
});

return result.promise;
}
}

Expand Down
33 changes: 13 additions & 20 deletions dev/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class ClientPool<T> {
*
* @private
*/
acquire(): T {
private acquire(): T {
let selectedClient: T | null = null;
let selectedRequestCount = 0;

Expand Down Expand Up @@ -79,7 +79,7 @@ export class ClientPool<T> {
* removing it from the pool of active clients.
* @private
*/
release(client: T): void {
private release(client: T): void {
let requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');

Expand All @@ -92,34 +92,27 @@ export class ClientPool<T> {
}

/**
* Creates a new function that will release the given client, when called.
*
* This guarantees that the given client can only be released once.
* The number of currently registered clients.
*
* @return Number of currently registered clients.
* @private
*/
createReleaser(client: T): () => void {
// Unfortunately, once the release() call is disconnected from the Promise
// returned from _initializeStream, there's no single callback in which the
// releaser can be guaranteed to be called once.
let released = false;
return () => {
if (!released) {
released = true;
this.release(client);
}
};
// Visible for testing.
get size(): number {
return this.activeClients.size;
}

/**
* The number of currently registered clients.
* The number of currently active operations.
*
* @return Number of currently registered clients.
* @return Number of currently active operations.
* @private
*/
// Visible for testing.
get size(): number {
return this.activeClients.size;
get opCount(): number {
let activeOperationCount = 0;
this.activeClients.forEach(count => (activeOperationCount += count));
return activeOperationCount;
}

/**
Expand Down
19 changes: 19 additions & 0 deletions dev/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@
* limitations under the License.
*/

/** A Promise implementation that supports deferred resolution. */
export class Deferred<R> {
promise: Promise<R>;
resolve: (value?: R | Promise<R>) => void = () => {};
reject: (reason?: Error) => void = () => {};

constructor() {
this.promise = new Promise(
(
resolve: (value?: R | Promise<R>) => void,
reject: (reason?: Error) => void
) => {
this.resolve = resolve;
this.reject = reject;
}
);
}
}

/**
* Generate a unique client-side identifier.
*
Expand Down
Loading

0 comments on commit 9474e3f

Please sign in to comment.