Skip to content

Commit

Permalink
Memory Leak when using MessageService#showProgress on Backend #13253
Browse files Browse the repository at this point in the history
* dispose cancellation event listeners in RpcProtocol
  • Loading branch information
jfaltermeier committed Jan 10, 2024
1 parent 6a7bd87 commit 5fd6b5e
Showing 1 changed file with 47 additions and 1 deletion.
48 changes: 47 additions & 1 deletion packages/core/src/common/message-rpc/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,31 @@ export interface RpcProtocolOptions {
mode?: 'default' | 'clientOnly' | 'serverOnly'
}

/**
* Wrapper for a {@link Disposable} that is not available immediately.
*/
export class MaybeDisposable {

private disposed = false;
private disposable: Disposable | undefined = undefined;

setDisposable(disposable: Disposable): void {
if (this.disposed) {
disposable.dispose();
} else {
this.disposable = disposable;
}
}

dispose(): void {
this.disposed = true;
if (this.disposable) {
this.disposable.dispose();
this.disposable = undefined;
}
}
}

/**
* Establish a RPC protocol on top of a given channel. By default the rpc protocol is bi-directional, meaning it is possible to send
* requests and notifications to the remote side (i.e. acts as client) as well as receiving requests and notifications from the remote side (i.e. acts as a server).
Expand All @@ -57,6 +82,7 @@ export class RpcProtocol {
static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token';

protected readonly pendingRequests: Map<number, Deferred<any>> = new Map();
protected readonly pendingRequestCancellationEventListeners: Map<number, MaybeDisposable> = new Map();

protected nextMessageId: number = 0;

Expand All @@ -80,6 +106,8 @@ export class RpcProtocol {
channel.onClose(event => {
this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason)));
this.pendingRequests.clear();
this.pendingRequestCancellationEventListeners.forEach(disposable => disposable.dispose());
this.pendingRequestCancellationEventListeners.clear();
this.toDispose.dispose();
});
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
Expand Down Expand Up @@ -131,6 +159,7 @@ export class RpcProtocol {
} else {
throw new Error(`No reply handler for reply with id: ${id}`);
}
this.disposeCancellationEventListener(id);
}

protected handleReplyErr(id: number, error: any): void {
Expand All @@ -141,6 +170,15 @@ export class RpcProtocol {
} else {
throw new Error(`No reply handler for error reply with id: ${id}`);
}
this.disposeCancellationEventListener(id);
}

protected disposeCancellationEventListener(id: number): void {
const toDispose = this.pendingRequestCancellationEventListeners.get(id);
if (toDispose) {
this.pendingRequestCancellationEventListeners.delete(id);
toDispose.dispose();
}
}

sendRequest<T>(method: string, args: any[]): Promise<T> {
Expand All @@ -157,14 +195,21 @@ export class RpcProtocol {

this.pendingRequests.set(id, reply);

// register disposable before output.commit()
const maybeDisposable = new MaybeDisposable();
this.pendingRequestCancellationEventListeners.set(id, maybeDisposable);

const output = this.channel.getWriteBuffer();
this.encoder.request(output, id, method, args);
output.commit();

if (cancellationToken?.isCancellationRequested) {
this.sendCancel(id);
} else {
cancellationToken?.onCancellationRequested(() => this.sendCancel(id));
const disposable = cancellationToken?.onCancellationRequested(() => this.sendCancel(id));
if (disposable) {
maybeDisposable.setDisposable(disposable);
}
}

return reply.promise;
Expand Down Expand Up @@ -233,3 +278,4 @@ export class RpcProtocol {
this.onNotificationEmitter.fire({ method, args });
}
}

0 comments on commit 5fd6b5e

Please sign in to comment.