Skip to content

Commit

Permalink
Ensure that multiplex doesn't dispose before terminated is received i…
Browse files Browse the repository at this point in the history
…n test
  • Loading branch information
sarda-devesh committed Nov 14, 2022
1 parent a00bfe9 commit 5141c68
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
14 changes: 6 additions & 8 deletions src/nerdbank-streams/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ export abstract class Channel implements IDisposableObservable {
}

/**
* Closes this channel. If an error is specified then the channel
* is completed with the error and sent to the remote side if the
* muliplexting stream version > 1.
* Closes this channel. If the channel with protocol versions > 1
* are disposed with an error, then the error is transmitted to the remote side.
*/
public dispose(error : Error | null = null) {
public dispose(error : Error | null = null): void {
// The interesting stuff is in the derived class.
this._isDisposed = true;
}
Expand Down Expand Up @@ -249,8 +248,7 @@ export class ChannelClass extends Channel {
}
}

public dispose(error: Error | null = null) {
console.log(`Received call to channel disposal with error ${error}`);
public dispose(error : Error | null = null): void {
if (!this.isDisposed) {
super.dispose();

Expand All @@ -264,13 +262,13 @@ export class ChannelClass extends Channel {
this._duplex.end();
this._duplex.push(null);

// Complete based on the error passed to the dispose method
if (error) {
this._completion.reject(error);
} else {
this._completion.resolve();
}

// Send the notification, but we can't await the result of this.
caught(this._multiplexingStream.onChannelDisposed(this, error));
}
}
Expand All @@ -288,4 +286,4 @@ export class ChannelClass extends Channel {
}
}
}
}
}
5 changes: 2 additions & 3 deletions src/nerdbank-streams/src/MultiplexingStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ export abstract class MultiplexingStream implements IDisposableObservable {
public dispose() {
this.disposalTokenSource.cancel();
this._completionSource.resolve();

this.formatter.end();
[this.locallyOfferedOpenChannels, this.remotelyOfferedOpenChannels].forEach(cb => {
for (const channelId in cb) {
Expand All @@ -387,6 +388,7 @@ export abstract class MultiplexingStream implements IDisposableObservable {
// Acceptance gets rejected when a channel is disposed.
// Avoid a node.js crash or test failure for unobserved channels (e.g. offers for channels from the other party that no one cared to receive on this side).
caught(channel.acceptance);

channel.dispose();
}
}
Expand Down Expand Up @@ -599,7 +601,6 @@ export class MultiplexingStreamClass extends MultiplexingStream {
}

// Create the header and send the frame to the remote side
console.log(`Inside onChannelDisposed with error ${error} and payload of length ${payloadToSend.length}`);
const frameHeader = new FrameHeader(ControlCode.ChannelTerminated, channel.qualifiedId);
await this.sendFrameAsync(frameHeader, payloadToSend);
} catch (err) {
Expand All @@ -626,7 +627,6 @@ export class MultiplexingStreamClass extends MultiplexingStream {
}

frame.header.flipChannelPerspective();
console.log(`Read frame with header ${frame.header.code} with payload of length ${frame.payload.length}`);
switch (frame.header.code) {
case ControlCode.Offer:
this.onOffer(frame.header.requiredChannel, frame.payload);
Expand Down Expand Up @@ -760,7 +760,6 @@ export class MultiplexingStreamClass extends MultiplexingStream {
}

// Dispose the channel
console.log(`Inside onChannelTerminated with payload of length ${payload.length} and exception ${remoteException}`);
channel.dispose(remoteException);
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,23 +262,34 @@ import { nextTick } from "process";
const localChannel = channels[0];
const remoteChannel = channels[1];

const localChannelCompleted = new Deferred<void>();
const remoteChannelCompleted = new Deferred<void>();

// Dispose the local channel
localChannel.dispose(error);

// Ensure that the local channel is always completed with the expected error
localChannel.completion.then(response => {
localChannelCompleted.reject();
throw new Error("Channel disposed with error didn't complete with error");
}).catch(error => {
localChannelCompleted.resolve();
expect(error.message).toContain(errorMsg);
});

// Ensure that the remote channel only throws an error for protocol version > 1
remoteChannel.completion.then( response => {
remoteChannelCompleted.resolve();
expect(protocolMajorVersion).toEqual(1);
}).catch(error => {
remoteChannelCompleted.resolve();
expect(protocolMajorVersion).toBeGreaterThan(1);
expect(error.message).toContain(errorMsg);
});

// Ensure that we don't call multiplexing dispose too soon
await localChannelCompleted.promise;
await remoteChannelCompleted.promise;
})

it("channels complete when mxstream is disposed", async () => {
Expand Down

0 comments on commit 5141c68

Please sign in to comment.