Skip to content

Commit

Permalink
perf: workaround creating many AbortControllers
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Dec 14, 2023
1 parent c36c103 commit bb83edb
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 28 deletions.
30 changes: 4 additions & 26 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { on } from 'stream';
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

Expand Down Expand Up @@ -61,6 +60,7 @@ import type { ClientMetadata } from './handshake/client_metadata';
import { MessageStream, type OperationDescription } from './message_stream';
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
import { decompressResponse } from './wire_protocol/compression';
import { onData } from './wire_protocol/on_data';
import { getReadPreference, isSharded } from './wire_protocol/shared';

/** @internal */
Expand Down Expand Up @@ -1052,9 +1052,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
signal: this.controller.signal
});

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

if (options.noResponse) {
yield { ok: 1 };
return;
Expand All @@ -1080,9 +1077,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
}
}

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

yield document;
this.controller.signal.throwIfAborted();

Expand Down Expand Up @@ -1205,11 +1199,11 @@ const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
*/
export async function* readWireProtocolMessages(
connection: ModernConnection,
{ signal }: { signal?: AbortSignal } = {}
{ signal }: { signal: AbortSignal }
): AsyncGenerator<Buffer> {
const bufferPool = new BufferPool();
const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize;
for await (const [chunk] of on(connection.socket, 'data', { signal })) {
for await (const chunk of onData(connection.socket, { signal })) {
if (connection.delayedTimeoutId) {
clearTimeout(connection.delayedTimeoutId);
connection.delayedTimeoutId = null;
Expand Down Expand Up @@ -1277,7 +1271,7 @@ export async function writeCommand(
*/
export async function* readMany(
connection: ModernConnection,
options: { signal?: AbortSignal } = {}
options: { signal: AbortSignal }
): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of readWireProtocolMessages(connection, options)) {
const response = await decompressResponse(message);
Expand All @@ -1288,19 +1282,3 @@ export async function* readMany(
}
}
}

/**
* @internal
*
* Reads a single wire protocol message out of a connection.
*/
export async function read(
connection: ModernConnection,
options: { signal?: AbortSignal } = {}
): Promise<OpMsgResponse | OpQueryResponse> {
for await (const value of readMany(connection, options)) {
return value;
}

throw new MongoRuntimeError('unable to read message off of connection');
}
96 changes: 96 additions & 0 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { type EventEmitter } from 'events';

import { List, promiseWithResolvers } from '../../utils';

export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) {
const signal = options.signal;
signal.throwIfAborted();

// Preparing controlling queues and variables
const unconsumedEvents = new List<Buffer>();
const unconsumedPromises = new List<
Omit<ReturnType<typeof promiseWithResolvers<IteratorResult<Buffer>>>, 'promise'>
>();
let error: Error | null = null;
let finished = false;
const iterator: AsyncGenerator<Buffer> = {
next() {
// First, we consume all unread events
const value = unconsumedEvents.shift();
if (value != null) {
return Promise.resolve({ value, done: false });
}
// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error != null) {
const p = Promise.reject(error);
// Only the first element errors
error = null;
return p;
}
// If the iterator is finished, resolve to done
if (finished) return closeHandler();
// Wait until an event happens
const { promise, resolve, reject } = promiseWithResolvers<IteratorResult<Buffer>>();
unconsumedPromises.push({ resolve, reject });
return promise;
},
return() {
return closeHandler();
},
throw(err: Error) {
errorHandler(err);
return Promise.resolve({ value: undefined, done: true });
},
[Symbol.asyncIterator]() {
return this;
}
};
// Adding event handlers
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);
signal.addEventListener('abort', abortListener, { once: true });

return iterator;

function abortListener() {
errorHandler(signal.reason);
}

function eventHandler(value: Buffer) {
const promise = unconsumedPromises.shift();
if (promise != null) promise.resolve({ value, done: false });
else unconsumedEvents.push(value);
}

function errorHandler(err: Error) {
const promise = unconsumedPromises.shift();
if (promise != null) promise.reject(err);
else error = err;
void closeHandler();
}

function closeHandler() {
// Adding event handlers
emitter.off('data', eventHandler);
emitter.off('error', errorHandler);
signal.removeEventListener('abort', abortListener);
finished = true;
const doneResult = { value: undefined, done: finished } as const;

for (const promise of unconsumedPromises) {
promise.resolve(doneResult);
}

return Promise.resolve(doneResult);
}
}
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ export type {
DestroyOptions,
ModernConnection,
ProxyOptions,
read,
readMany,
writeCommand
} from './cmap/connection';
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ export async function abortable<T>(
}
}

function promiseWithResolvers<T>() {
export function promiseWithResolvers<T>() {
let resolve: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[0];
let reject: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[1];
const promise = new Promise<T>(function withResolversExecutor(promiseResolve, promiseReject) {
Expand Down

0 comments on commit bb83edb

Please sign in to comment.