From a2a13d4f14def7142fce7981af2049ff8983b1d3 Mon Sep 17 00:00:00 2001 From: Devesh Sarda Date: Sat, 17 Dec 2022 16:10:38 -0700 Subject: [PATCH 1/2] Include an error message payload alongside ChannelTerminated frames --- .DS_Store | Bin 0 -> 8196 bytes .../MultiplexingStream.Channel.cs | 218 ++++++++++++------ .../MultiplexingStream.Formatters.cs | 60 +++++ src/Nerdbank.Streams/MultiplexingStream.cs | 53 ++++- src/nerdbank-streams/src/Channel.ts | 17 +- .../src/MultiplexingStream.ts | 36 ++- .../src/MultiplexingStreamFormatters.ts | 35 +++ .../tests/MultiplexingStream.Interop.spec.ts | 39 ++++ .../src/tests/MultiplexingStream.spec.ts | 49 +++- .../Nerdbank.Streams.Interop.Tests/Program.cs | 28 +++ .../MultiplexingStreamTests.cs | 108 ++++++++- 11 files changed, 545 insertions(+), 98 deletions(-) create mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..3f7458e99860d9668c451cb3fa5bc8992072f82a GIT binary patch literal 8196 zcmeHM%We}f6g_SN1f{f<3ZfE1niV^O`UfFRgd!eAq+PHhFQSxbCdwp;fPn1z0apA1 zEB=7*V1dL(AR)oI_6XUYgsuumwJZBhJm*|rAKNp!Qvj+q>+J$N07|T4x5{jm6uB=o zSJtwgYqW}dppTmvW60Xip+iSdASe(N2nqxRf&%}80(@snDXn<#o4cV63IqktqyqAM zh*-r|GrKTWKRVdx6o9tKZe4iIJV0}z%vLkIFjiCP(_{}SqpB>$P=?O?k-%xInOzub z=un0Zm5r>j3`ObD$%`}`s%mU#g91T;yaJNDZ=#Ktm|^1N?=kcE5qr{RoOC;J(jig~ z*KO9yPhEHgZ2y^!3g$RQ?0NaFH>qe z!GIA@{h0JRe6ZG`z!@ayx?@6gc2Rx2z|=))oo!T^W5)E>;rKCiIpS=R-0FzFHmjs} z%#g8GsBnHgM8#EjYK~nuDy^4|zkqvO?L9QO;w>~>wCJHWEIlq0)u(zrJ&O^ZF{&10 zll5;ix|F|D&d%F&qL}07JpJtJea4a9F-12AyUr6c%hpRYpk{bVRR$i9$~nA(*80dp zQ7;FzfrpH|%bB$G7&3zuJ65!FxEFb18q}zc`?O0=3m%j4%F?f_m5Q}mDO*vCD!MV^ z6IpYI#g*Y29qJQ$K(9;2Qn$!@Yq5&-E#C~i-K*Xc_r}5%h@0e4KZsgbs7pjmdA22X zS%WcSSh|DPQZip#%p!Sw;`!$TcZvIE;m#44@Gs7t%Fjf8wUFz0PLJ|57!p5P6^Xh* z-WQ9VTj!#YTsvZY!YV~@ u3u85fjZVjDbvjP_`yYn9jtCV_HM0w2*~9YJKLl90OnCq6B?!$KRNxo0GP_m) literal 0 HcmV?d00001 diff --git a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs index f27f4594..421aacff 100644 --- a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs +++ b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs @@ -322,7 +322,7 @@ private long RemoteWindowRemaining private bool BackpressureSupportEnabled => this.MultiplexingStream.protocolMajorVersion > 1; /// - /// Closes this channel and releases all resources associated with it. + /// Immediately terminates the channel and shutdowns any ongoing communication. /// /// /// Because this method may terminate the channel immediately and thus can cause previously queued content to not actually be received by the remote party, @@ -330,87 +330,121 @@ private long RemoteWindowRemaining /// public void Dispose() { - if (!this.IsDisposed) - { - this.acceptanceSource.TrySetCanceled(); - this.optionsAppliedTaskSource?.TrySetCanceled(); + string userDisposeMsg = "User triggered disposal"; + MultiplexingProtocolException userDisposeException = new MultiplexingProtocolException(userDisposeMsg); - PipeWriter? mxStreamIOWriter; - lock (this.SyncObject) + this.DisposeChannel(userDisposeException); + } + + /// + /// Disposes the channel by releasing all resources associated with it. + /// + /// The exception to dispose this channel with, + /// defaults to null. + internal void DisposeChannel(Exception? disposeException = null) + { + // Ensure that we don't call dispose more than once + lock (this.SyncObject) + { + if (this.isDisposed) { - this.isDisposed = true; - mxStreamIOWriter = this.mxStreamIOWriter; + return; } - // Complete writing so that the mxstream cannot write to this channel any more. - // We must also cancel a pending flush since no one is guaranteed to be reading this any more - // and we don't want to deadlock on a full buffer in a disposed channel's pipe. - if (mxStreamIOWriter is not null) - { - mxStreamIOWriter.CancelPendingFlush(); - _ = this.mxStreamIOWriterSemaphore.EnterAsync().ContinueWith( - static (releaser, state) => - { - try - { - Channel self = (Channel)state; + // First call to dispose + this.isDisposed = true; + } - PipeWriter? mxStreamIOWriter; - lock (self.SyncObject) - { - mxStreamIOWriter = self.mxStreamIOWriter; - } + this.acceptanceSource.TrySetCanceled(); + this.optionsAppliedTaskSource?.TrySetCanceled(); - mxStreamIOWriter?.Complete(); - self.mxStreamIOWriterCompleted.Set(); - } - finally + PipeWriter? mxStreamIOWriter; + lock (this.SyncObject) + { + mxStreamIOWriter = this.mxStreamIOWriter; + } + + // Complete writing so that the mxstream cannot write to this channel any more. + // We must also cancel a pending flush since no one is guaranteed to be reading this any more + // and we don't want to deadlock on a full buffer in a disposed channel's pipe. + if (mxStreamIOWriter is not null) + { + mxStreamIOWriter.CancelPendingFlush(); + _ = this.mxStreamIOWriterSemaphore.EnterAsync().ContinueWith( + static (releaser, state) => + { + try + { + Channel self = (Channel)state; + + PipeWriter? mxStreamIOWriter; + lock (self.SyncObject) { - releaser.Result.Dispose(); + mxStreamIOWriter = self.mxStreamIOWriter; } - }, - this, - CancellationToken.None, - TaskContinuationOptions.OnlyOnRanToCompletion, - TaskScheduler.Default); - } - if (this.mxStreamIOReader is not null) - { - // We don't own the user's PipeWriter to complete it (so they can't write anything more to this channel). - // We can't know whether there is or will be more bytes written to the user's PipeWriter, - // but we need to terminate our reader for their writer as part of reclaiming resources. - // Cancel the pending or next read operation so the reader loop will immediately notice and shutdown. - this.mxStreamIOReader.CancelPendingRead(); - - // Only Complete the reader if our async reader doesn't own it to avoid thread-safety bugs. - PipeReader? mxStreamIOReader = null; - lock (this.SyncObject) - { - if (this.mxStreamIOReader is not UnownedPipeReader) + mxStreamIOWriter?.Complete(); + self.mxStreamIOWriterCompleted.Set(); + } + finally { - mxStreamIOReader = this.mxStreamIOReader; - this.mxStreamIOReader = null; + releaser.Result.Dispose(); } - } + }, + this, + CancellationToken.None, + TaskContinuationOptions.OnlyOnRanToCompletion, + TaskScheduler.Default); + } - mxStreamIOReader?.Complete(); + if (this.mxStreamIOReader is not null) + { + // We don't own the user's PipeWriter to complete it (so they can't write anything more to this channel). + // We can't know whether there is or will be more bytes written to the user's PipeWriter, + // but we need to terminate our reader for their writer as part of reclaiming resources. + // Cancel the pending or next read operation so the reader loop will immediately notice and shutdown. + this.mxStreamIOReader.CancelPendingRead(); + + // Only Complete the reader if our async reader doesn't own it to avoid thread-safety bugs. + PipeReader? mxStreamIOReader = null; + lock (this.SyncObject) + { + if (this.mxStreamIOReader is not UnownedPipeReader) + { + mxStreamIOReader = this.mxStreamIOReader; + this.mxStreamIOReader = null; + } } - // Unblock the reader that might be waiting on this. - this.remoteWindowHasCapacity.Set(); + mxStreamIOReader?.Complete(); + } - this.disposalTokenSource.Cancel(); + // Set the completion source based on whether we are disposing due to an error + if (disposeException != null) + { + this.completionSource.TrySetException(disposeException); + } + else + { this.completionSource.TrySetResult(null); - this.MultiplexingStream.OnChannelDisposed(this); } + + // Unblock the reader that might be waiting on this. + this.remoteWindowHasCapacity.Set(); + + this.disposalTokenSource.Cancel(); + this.MultiplexingStream.OnChannelDisposed(this, disposeException); } - internal async Task OnChannelTerminatedAsync() + internal async Task OnChannelTerminatedAsync(Exception? remoteError = null) { - if (this.IsDisposed) + // Don't process the frame if the channel has already been diposed + lock (this.SyncObject) { - return; + if (this.isDisposed) + { + return; + } } try @@ -424,6 +458,19 @@ internal async Task OnChannelTerminatedAsync() { // We fell victim to a race condition. It's OK to just swallow it because the writer was never created, so it needn't be completed. } + + // Terminate the channel + this.DisposeSelfOnFailure(Task.Run(async delegate + { + // Ensure that we processed the channel before terminating it + await this.OptionsApplied.ConfigureAwait(false); + + // Record that we were terminated remotely + this.IsRemotelyTerminated = true; + + // Dispose of the channel + this.DisposeChannel(remoteError); + })); } /// @@ -684,16 +731,15 @@ private void ApplyChannelOptions(ChannelOptions channelOptions) this.mxStreamIOReaderCompleted = this.ProcessOutboundTransmissionsAsync(); this.DisposeSelfOnFailure(this.mxStreamIOReaderCompleted); this.DisposeSelfOnFailure(this.AutoCloseOnPipesClosureAsync()); + + // Record that we have applied the channel options + this.optionsAppliedTaskSource?.TrySetResult(null); } catch (Exception ex) { this.optionsAppliedTaskSource?.TrySetException(ex); throw; } - finally - { - this.optionsAppliedTaskSource?.TrySetResult(null); - } } /// @@ -771,6 +817,7 @@ private async Task ProcessOutboundTransmissionsAsync() // We don't use a CancellationToken on this call because we prefer the exception-free cancellation path used by our Dispose method (CancelPendingRead). ReadResult result = await mxStreamIOReader.ReadAsync().ConfigureAwait(false); + if (result.IsCanceled) { // We've been asked to cancel. Presumably the channel has faulted or been disposed. @@ -832,13 +879,25 @@ private async Task ProcessOutboundTransmissionsAsync() } catch (Exception ex) { - if (ex is OperationCanceledException && this.DisposalToken.IsCancellationRequested) + await mxStreamIOReader!.CompleteAsync(ex).ConfigureAwait(false); + + // Record this as a faulting exception if the channel hasn't been disposed. + lock (this.SyncObject) { - await mxStreamIOReader!.CompleteAsync().ConfigureAwait(false); + if (!this.IsDisposed) + { + this.faultingException ??= ex; + } } - else + + // Add a trace indicating that we caught an exception in ProcessOutbound + if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information)) { - await mxStreamIOReader!.CompleteAsync(ex).ConfigureAwait(false); + this.TraceSource.TraceEvent( + TraceEventType.Error, + 0, + "Rethrowing caught exception in ProcessOutboundAsync: {0}", + ex.Message); } throw; @@ -911,23 +970,30 @@ private async Task AutoCloseOnPipesClosureAsync() this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.ChannelAutoClosing, "Channel {0} \"{1}\" self-closing because both reader and writer are complete.", this.QualifiedId, this.Name); } - this.Dispose(); + this.DisposeChannel(null); } private void Fault(Exception exception) { - if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Critical) ?? false) - { - this.TraceSource!.TraceEvent(TraceEventType.Critical, (int)TraceEventId.FatalError, "Channel Closing self due to exception: {0}", exception); - } + this.mxStreamIOReader?.CancelPendingRead(); + // If the channel has already been disposed then only cancel the reader lock (this.SyncObject) { + if (this.isDisposed) + { + return; + } + this.faultingException ??= exception; } - this.mxStreamIOReader?.CancelPendingRead(); - this.Dispose(); + if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Critical) ?? false) + { + this.TraceSource!.TraceEvent(TraceEventType.Warning, 0, "Channel faulting self due to exception: {0}", this.faultingException.Message); + } + + this.DisposeChannel(this.faultingException); } private void DisposeSelfOnFailure(Task task) diff --git a/src/Nerdbank.Streams/MultiplexingStream.Formatters.cs b/src/Nerdbank.Streams/MultiplexingStream.Formatters.cs index c0550733..ca14ca83 100644 --- a/src/Nerdbank.Streams/MultiplexingStream.Formatters.cs +++ b/src/Nerdbank.Streams/MultiplexingStream.Formatters.cs @@ -410,6 +410,66 @@ internal override void WriteFrame(FrameHeader header, ReadOnlySequence pay } } + /// + /// Creates a payload to sent alongside . + /// + /// The exception to send to the remote side if there is one. + /// The payload to send when a channel gets terminated. + internal ReadOnlySequence SerializeException(Exception? exception) + { + if (exception == null) + { + return ReadOnlySequence.Empty; + } + + var sequence = new Sequence(); + var writer = new MessagePackWriter(sequence); + + writer.WriteArrayHeader(1); + + // Get the exception to send to the remote side + string errorMessage = exception.Message; + if (errorMessage == null || errorMessage.Length == 0) + { + errorMessage = string.Format( + "Exception of type '{0}' was thrown", + exception.GetType().ToString()); + } + + // Write the error message to the payload being sent + writer.Write(errorMessage); + writer.Flush(); + + return sequence; + } + + /// + /// Gets the error message in the payload if there is one. + /// + /// The payload that could contain an error message. + /// The error message in this payload if there is one, null otherwise. + internal Exception? DeserializeException(ReadOnlySequence payload) + { + // If it is empty then return null + if (payload.IsEmpty) + { + return null; + } + + var reader = new MessagePackReader(payload); + int numElements = reader.ReadArrayHeader(); + + // We received an empty payload + if (numElements == 0) + { + return null; + } + + // Get the exception message and return it as an exception + string remoteErrorMsg = reader.ReadString(); + return new MultiplexingProtocolException($"Received error from remote side: {remoteErrorMsg}"); + } + internal override ReadOnlySequence SerializeContentProcessed(long bytesProcessed) { var sequence = new Sequence(); diff --git a/src/Nerdbank.Streams/MultiplexingStream.cs b/src/Nerdbank.Streams/MultiplexingStream.cs index ba0fdbcc..18b72e81 100644 --- a/src/Nerdbank.Streams/MultiplexingStream.cs +++ b/src/Nerdbank.Streams/MultiplexingStream.cs @@ -238,6 +238,11 @@ private enum TraceEventId /// internal CancellationToken DisposalToken => this.disposalTokenSource.Token; + /// + /// Gets a value indicating whether channels in this stream supports sending/receiving termination exceptions. + /// + private bool SupportTerminatedExceptions => this.protocolMajorVersion > 1; + /// /// Gets a factory for instances to attach to a newly opened /// when its is . @@ -835,7 +840,7 @@ private async Task ReadStreamAsync() this.OnContentWritingCompleted(header.RequiredChannelId); break; case ControlCode.ChannelTerminated: - await this.OnChannelTerminatedAsync(header.RequiredChannelId).ConfigureAwait(false); + await this.OnChannelTerminatedAsync(header.RequiredChannelId, frame.Value.Payload).ConfigureAwait(false); break; default: break; @@ -892,7 +897,8 @@ private async Task ReadStreamAsync() /// Occurs when the remote party has terminated a channel (including canceling an offer). /// /// The ID of the terminated channel. - private async Task OnChannelTerminatedAsync(QualifiedChannelId channelId) + /// The payload sent from the remote side alongside the channel terminated frame. + private async Task OnChannelTerminatedAsync(QualifiedChannelId channelId, ReadOnlySequence payload) { Channel? channel; lock (this.syncObject) @@ -913,9 +919,25 @@ private async Task OnChannelTerminatedAsync(QualifiedChannelId channelId) if (channel is Channel) { - await channel.OnChannelTerminatedAsync().ConfigureAwait(false); - channel.IsRemotelyTerminated = true; - channel.Dispose(); + // Try to get the exception sent from the remote side if there was one sent + Exception? remoteException = null; + if (this.SupportTerminatedExceptions && !payload.IsEmpty) + { + V2Formatter castedFormatter = (V2Formatter)this.formatter; + remoteException = castedFormatter.DeserializeException(payload); + } + + if (remoteException != null && this.TraceSource.Switch.ShouldTrace(TraceEventType.Information)) + { + this.TraceSource.TraceEvent( + TraceEventType.Warning, + (int)TraceEventId.ChannelDisposed, + "Received Channel Terminated for channel {0} with exception: {1}", + channelId, + remoteException.Message); + } + + await channel.OnChannelTerminatedAsync(remoteException).ConfigureAwait(false); } } @@ -1098,18 +1120,35 @@ private void AcceptChannelOrThrow(Channel channel, ChannelOptions options) /// Raised when is called and any local transmission is completed. /// /// The channel that is closing down. - private void OnChannelDisposed(Channel channel) + /// The exception to sent to the remote side alongside the disposal. + private void OnChannelDisposed(Channel channel, Exception? exception = null) { Requires.NotNull(channel, nameof(channel)); if (!this.Completion.IsCompleted && !this.DisposalToken.IsCancellationRequested) { + // Determine the header to send alongside the error payload + var header = new FrameHeader + { + Code = ControlCode.ChannelTerminated, + ChannelId = channel.QualifiedId, + }; + + // If there is an error and we support sending errors then + // serialize the exception and store it in the payload + ReadOnlySequence payload = default; + if (this.SupportTerminatedExceptions && exception != null) + { + V2Formatter castedFormatter = (V2Formatter)this.formatter; + payload = castedFormatter.SerializeException(exception); + } + if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information)) { this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.ChannelDisposed, "Local channel {0} \"{1}\" stream disposed.", channel.QualifiedId, channel.Name); } - this.SendFrame(ControlCode.ChannelTerminated, channel.QualifiedId); + this.SendFrame(header, payload, this.DisposalToken); } } diff --git a/src/nerdbank-streams/src/Channel.ts b/src/nerdbank-streams/src/Channel.ts index 79171273..0d093326 100644 --- a/src/nerdbank-streams/src/Channel.ts +++ b/src/nerdbank-streams/src/Channel.ts @@ -54,9 +54,10 @@ export abstract class Channel implements IDisposableObservable { } /** - * Closes this channel. + * Closes this channel. If the channel with protocol versions > 1 + * are disposed with an error, then the error is transmitted to the remote side. */ - public dispose(): void { + public dispose(error : Error | null = null): void { // The interesting stuff is in the derived class. this._isDisposed = true; } @@ -247,7 +248,7 @@ export class ChannelClass extends Channel { } } - public dispose(): void { + public dispose(error : Error | null = null): void { if (!this.isDisposed) { super.dispose(); @@ -261,10 +262,14 @@ export class ChannelClass extends Channel { this._duplex.end(); this._duplex.push(null); - this._completion.resolve(); + if (error) { + this._completion.reject(error); + } else { + this._completion.resolve(); + } // Send the notification, but we can't await the result of this. - caught(this._multiplexingStream.onChannelDisposed(this)); + caught(this._multiplexingStream.onChannelDisposed(this, error)); } } @@ -281,4 +286,4 @@ export class ChannelClass extends Channel { } } } -} +} \ No newline at end of file diff --git a/src/nerdbank-streams/src/MultiplexingStream.ts b/src/nerdbank-streams/src/MultiplexingStream.ts index 1f6eafcc..0c6931d1 100644 --- a/src/nerdbank-streams/src/MultiplexingStream.ts +++ b/src/nerdbank-streams/src/MultiplexingStream.ts @@ -378,6 +378,7 @@ export abstract class MultiplexingStream implements IDisposableObservable { public dispose() { this.disposalTokenSource.cancel(); this._completionSource.resolve(); + this.formatter.end(); [this.locallyOfferedOpenChannels, this.remotelyOfferedOpenChannels].forEach(cb => { for (const channelId in cb) { @@ -387,6 +388,7 @@ export abstract class MultiplexingStream implements IDisposableObservable { // Acceptance gets rejected when a channel is disposed. // Avoid a node.js crash or test failure for unobserved channels (e.g. offers for channels from the other party that no one cared to receive on this side). caught(channel.acceptance); + channel.dispose(); } } @@ -585,10 +587,22 @@ export class MultiplexingStreamClass extends MultiplexingStream { } } - public async onChannelDisposed(channel: ChannelClass) { + public async onChannelDisposed(channel: ChannelClass, error : Error | null = null) { if (!this._completionSource.isCompleted) { try { - await this.sendFrame(ControlCode.ChannelTerminated, channel.qualifiedId); + // Determine the payload to send to the error + let payloadToSend : Buffer = Buffer.alloc(0); + if(this.protocolMajorVersion > 1 && error != null) { + // Get the formatter use to serialize the error + const castedFormatter = this.formatter as MultiplexingStreamV2Formatter; + + // Get the payload using the formatter + payloadToSend = castedFormatter.serializeException(error); + } + + // Create the header and send the frame to the remote side + const frameHeader = new FrameHeader(ControlCode.ChannelTerminated, channel.qualifiedId); + await this.sendFrameAsync(frameHeader, payloadToSend); } catch (err) { // Swallow exceptions thrown about channel disposal if the whole stream has been taken down. if (this.isDisposed) { @@ -630,7 +644,7 @@ export class MultiplexingStreamClass extends MultiplexingStream { this.onContentWritingCompleted(frame.header.requiredChannel); break; case ControlCode.ChannelTerminated: - this.onChannelTerminated(frame.header.requiredChannel); + this.onChannelTerminated(frame.header.requiredChannel, frame.payload); break; default: break; @@ -729,12 +743,24 @@ export class MultiplexingStreamClass extends MultiplexingStream { * Occurs when the remote party has terminated a channel (including canceling an offer). * @param channelId The ID of the terminated channel. */ - private onChannelTerminated(channelId: QualifiedChannelId) { + private onChannelTerminated(channelId: QualifiedChannelId, payload: Buffer) { const channel = this.getOpenChannel(channelId); if (channel) { this.deleteOpenChannel(channelId); this.removeChannelFromOfferedQueue(channel); - channel.dispose(); + + // Extract the exception that we received from the remote side + let remoteException : Error | null = null; + if(this.protocolMajorVersion > 1 && payload.length > 0) { + // Get the formatter + const castedFormatter = this.formatter as MultiplexingStreamV2Formatter; + + // Extract the error using the casted formatter + remoteException = castedFormatter.deserializeException(payload); + } + + // Dispose the channel + channel.dispose(remoteException); } } } diff --git a/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts b/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts index af625669..c1042fa4 100644 --- a/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts +++ b/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts @@ -293,6 +293,41 @@ export class MultiplexingStreamV2Formatter extends MultiplexingStreamFormatter { return msgpack.decode(payload)[0]; } + serializeException(error: Error | null) : Buffer { + // If the error doesn't exist then return an empty buffer + if (!error) { + return Buffer.alloc(0); + } + + // Determine the error message to add to the payload + let errorMsg : string = error.message; + if (!errorMsg) { + errorMsg = `Error of type ${error.name} was thrown`; + } + + // Return the buffer for this payload + const payload : any[] = [errorMsg]; + return msgpack.encode(payload); + } + + deserializeException(payload: Buffer) : Error | null { + // If the payload is empty then return null + if(payload.length === 0) { + return null; + } + + // Make sure that the message pack object contains a message + const msgpackObject = msgpack.decode(payload); + if (!msgpackObject || msgpackObject.length === 0) { + return null; + } + + // Get error message and return the error to the remote side + let errorMsg : string = msgpack.decode(payload)[0]; + errorMsg = `Received error from remote side: ${errorMsg}`; + return new Error(errorMsg); + } + protected async readMessagePackAsync(cancellationToken: CancellationToken): Promise<{} | [] | null> { const streamEnded = new Deferred(); while (true) { diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts index 15341867..49f05a9e 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts @@ -72,12 +72,51 @@ import { ChannelOptions } from "../ChannelOptions"; expect(recv).toEqual("recv: theclient\n"); }); + it("Can offer error completed channel", async () => { + const errorMsg : string = "Hello world"; + const error : Error = new Error(errorMsg); + + const channelToCompleteWithError = await mx.offerChannelAsync("clientErrorOffer"); + const communicationChannel = await mx.offerChannelAsync("clientErrorOfferComm"); + + channelToCompleteWithError.dispose(error); + channelToCompleteWithError.completion.then(_ => { + throw new Error("Channel disposed with error didn't complete with error"); + }).catch( (channelCompleteErr) => { + expect(channelCompleteErr.message).toContain(errorMsg); + }); + + let expectedErrMessage = `Received error from remote side: ${errorMsg}`; + if(protocolMajorVersion <= 1) { + expectedErrMessage = "Completed with no error"; + } + + const response = await readLineAsync(communicationChannel.stream); + expect(response).toContain(expectedErrMessage); + }) + it("Can accept channel", async () => { const channel = await mx.acceptChannelAsync("serverOffer"); const recv = await readLineAsync(channel.stream); await writeAsync(channel.stream, `recv: ${recv}`); }); + it("Can accept error completed channel", async () => { + const channelCompletedWithError = await mx.acceptChannelAsync("serverErrorOffer"); + const errorExpectedMessage : string = "Received error from remote side: Hello World"; + const channelCompleted = new Deferred(); + + channelCompletedWithError.completion.then(async _ => { + expect(protocolMajorVersion).toEqual(1); + channelCompleted.resolve(); + }).catch(async error => { + expect(error.message).toContain(errorExpectedMessage); + channelCompleted.resolve(); + }) + + await channelCompleted.promise; + }) + it("Exchange lots of data", async () => { const channel = await mx.offerChannelAsync("clientOffer", { channelReceivingWindowSize: 16 }); const bigdata = 'ABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEF\n'; diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts index 5ad2ea3f..52911f36 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts @@ -212,7 +212,7 @@ import { nextTick } from "process"; mx2.acceptChannelAsync("test"), ]); channels[0].stream.write("abc"); - expect(await getBufferFrom(channels[1].stream, 3)).toEqual(new Buffer("abc")); + expect(await getBufferFrom(channels[1].stream, 3)).toEqual(Buffer.from("abc")); }); it("Can exchange data over two channels", async () => { @@ -225,8 +225,8 @@ import { nextTick } from "process"; channels[0].stream.write("abc"); channels[3].stream.write("def"); channels[3].stream.write("ghi"); - expect(await getBufferFrom(channels[2].stream, 3)).toEqual(new Buffer("abc")); - expect(await getBufferFrom(channels[1].stream, 6)).toEqual(new Buffer("defghi")); + expect(await getBufferFrom(channels[2].stream, 3)).toEqual(Buffer.from("abc")); + expect(await getBufferFrom(channels[1].stream, 6)).toEqual(Buffer.from("defghi")); }); it("end of channel", async () => { @@ -249,6 +249,49 @@ import { nextTick } from "process"; await channels[1].completion; }); + it("channel terminated with error", async() => { + // Determine the error to complete the local channel with + const errorMsg : string = "Hello world"; + const error : Error = new Error(errorMsg); + + // Get the channels to send/receive data over + const channels = await Promise.all([ + mx1.offerChannelAsync("test"), + mx2.acceptChannelAsync("test"), + ]); + const localChannel = channels[0]; + const remoteChannel = channels[1]; + + const localChannelCompleted = new Deferred(); + const remoteChannelCompleted = new Deferred(); + + // Dispose the local channel + localChannel.dispose(error); + + // Ensure that the local channel is always completed with the expected error + localChannel.completion.then(response => { + localChannelCompleted.reject(); + throw new Error("Channel disposed with error didn't complete with error"); + }).catch(localChannelErr => { + localChannelCompleted.resolve(); + expect(localChannelErr.message).toContain(errorMsg); + }); + + // Ensure that the remote channel only throws an error for protocol version > 1 + remoteChannel.completion.then( response => { + remoteChannelCompleted.resolve(); + expect(protocolMajorVersion).toEqual(1); + }).catch(remoteChannelErr => { + remoteChannelCompleted.resolve(); + expect(protocolMajorVersion).toBeGreaterThan(1); + expect(remoteChannelErr.message).toContain(errorMsg); + }); + + // Ensure that we don't call multiplexing dispose too soon + await localChannelCompleted.promise; + await remoteChannelCompleted.promise; + }) + it("channels complete when mxstream is disposed", async () => { const channels = await Promise.all([ mx1.offerChannelAsync("test"), diff --git a/test/Nerdbank.Streams.Interop.Tests/Program.cs b/test/Nerdbank.Streams.Interop.Tests/Program.cs index d5b5a46e..3c72fb51 100644 --- a/test/Nerdbank.Streams.Interop.Tests/Program.cs +++ b/test/Nerdbank.Streams.Interop.Tests/Program.cs @@ -62,6 +62,8 @@ private async Task RunAsync(int protocolMajorVersion) { this.ClientOfferAsync().Forget(); this.ServerOfferAsync().Forget(); + this.ClientOffersErrorCompletedChannel().Forget(); + this.ServerOffersErrorCompletedChannel().Forget(); if (protocolMajorVersion >= 3) { @@ -71,6 +73,25 @@ private async Task RunAsync(int protocolMajorVersion) await this.mx.Completion; } + private async Task ClientOffersErrorCompletedChannel() + { + MultiplexingStream.Channel? expectedErrorChannel = await this.mx.AcceptChannelAsync("clientErrorOffer"); + MultiplexingStream.Channel? communicationChannel = await this.mx.AcceptChannelAsync("clientErrorOfferComm"); + (StreamReader _, StreamWriter writer) = CreateStreamIO(communicationChannel); + + string responseMessage = "Completed with no error"; + try + { + await expectedErrorChannel.Completion; + } + catch (Exception e) + { + responseMessage = e.Message; + } + + await writer.WriteLineAsync(responseMessage); + } + private async Task ClientOfferAsync() { MultiplexingStream.Channel? channel = await this.mx.AcceptChannelAsync("clientOffer"); @@ -79,6 +100,13 @@ private async Task ClientOfferAsync() await w.WriteLineAsync($"recv: {line}"); } + private async Task ServerOffersErrorCompletedChannel() + { + MultiplexingStream.Channel? expectedErrorChannel = await this.mx.OfferChannelAsync("serverErrorOffer"); + string errrorMessage = "Hello World"; + await expectedErrorChannel.Output.CompleteAsync(new Exception(errrorMessage)); + } + private async Task ServerOfferAsync() { MultiplexingStream.Channel? channel = await this.mx.OfferChannelAsync("serverOffer"); diff --git a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs index 83e6717c..46031565 100644 --- a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs +++ b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs @@ -196,6 +196,74 @@ public async Task OfferWriteOnlyPipe() await Task.WhenAll(ch1.Completion, ch2.Completion).WithCancellation(this.TimeoutToken); } + [Fact] + public async Task OfferErrorCompletedPipe() + { + // Prepare a readonly pipe that is completed with an error + string localErrMsg = "Hello World"; + string remoteErrMsg = "Received error from remote side: " + localErrMsg; + var pipe = new Pipe(); + await pipe.Writer.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken); + pipe.Writer.Complete(new ApplicationException(localErrMsg)); + + // Offer the error pipe to the remote side and get the remote side to try to accept the channel + MultiplexingStream.Channel localChannel = this.mx1.CreateChannel(new MultiplexingStream.ChannelOptions { ExistingPipe = new DuplexPipe(pipe.Reader) }); + await this.WaitForEphemeralChannelOfferToPropagateAsync(); + MultiplexingStream.Channel remoteChannel = this.mx2.AcceptChannel(localChannel.QualifiedId.Id); + + // The local channel should always complete with the error + await VerifyChannelCompleted(localChannel, localErrMsg); + + // The remote side should only receive the remote exception for protocol versions > 1 + await VerifyChannelCompleted(remoteChannel, this.ProtocolMajorVersion > 1 ? remoteErrMsg : null); + } + + [Fact] + public async Task OfferEmptyErrorCompletedPipe() + { + string localErrMsg = string.Empty; + string remoteErrMsg = "Received error from remote side: Exception of type 'System.IndexOutOfRangeException' was thrown"; + + // Prepare a readonly pipe that is completed with an error + var pipe = new Pipe(); + await pipe.Writer.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken); + pipe.Writer.Complete(new IndexOutOfRangeException(string.Empty)); + + // Offer the error pipe to the remote side and get the remote side to try to accept the channel + MultiplexingStream.Channel localChannel = this.mx1.CreateChannel(new MultiplexingStream.ChannelOptions { ExistingPipe = new DuplexPipe(pipe.Reader) }); + await this.WaitForEphemeralChannelOfferToPropagateAsync(); + MultiplexingStream.Channel remoteChannel = this.mx2.AcceptChannel(localChannel.QualifiedId.Id); + + // The local channel should always complete with the error + await VerifyChannelCompleted(localChannel, localErrMsg); + + // The remote side should only receive the remote exception for protocol versions > 1 + await VerifyChannelCompleted(remoteChannel, this.ProtocolMajorVersion > 1 ? remoteErrMsg : null); + } + + [Fact] + public async Task OfferNullErrorCompletedPipe() + { + string localErrMsg = "Exception of type 'System.NullReferenceException' was thrown."; + string remoteErrMsg = "Received error from remote side: Exception of type 'System.NullReferenceException' was thrown."; + + // Prepare a readonly pipe that is completed with an error + var pipe = new Pipe(); + await pipe.Writer.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken); + pipe.Writer.Complete(new NullReferenceException(null)); + + // Offer the error pipe to the remote side and get the remote side to try to accept the channel + MultiplexingStream.Channel localChannel = this.mx1.CreateChannel(new MultiplexingStream.ChannelOptions { ExistingPipe = new DuplexPipe(pipe.Reader) }); + await this.WaitForEphemeralChannelOfferToPropagateAsync(); + MultiplexingStream.Channel remoteChannel = this.mx2.AcceptChannel(localChannel.QualifiedId.Id); + + // The local channel should always complete with the error + await VerifyChannelCompleted(localChannel, localErrMsg); + + // The remote side should only receive the remote exception for protocol versions > 1 + await VerifyChannelCompleted(remoteChannel, this.ProtocolMajorVersion > 1 ? remoteErrMsg : null); + } + [Fact] public async Task Dispose_CancelsOutstandingOperations() { @@ -207,6 +275,30 @@ public async Task Dispose_CancelsOutstandingOperations() Assert.True(accept.IsCanceled); } + [Fact] + public async Task Dispose_CompleteWithErrorAfterwards() + { + string expectedLocalErrMsg = "User triggered disposal"; + string expectedRemoteErrMsg = "Received error from remote side: User triggered disposal"; + + // Create the local and remote channels using channel names + Task? localChannelTask = this.mx1.OfferChannelAsync("completeAfterwards", this.TimeoutToken); + Task? remoteChannelTask = this.mx2.AcceptChannelAsync("completeAfterwards", this.TimeoutToken); + await this.WaitForEphemeralChannelOfferToPropagateAsync(); + MultiplexingStream.Channel remoteChannel = await remoteChannelTask; + MultiplexingStream.Channel localChannel = await localChannelTask; + + // Dispose the local channel and then complete it later with an error + localChannel.Dispose(); + await localChannel.Output.CompleteAsync(new InvalidOperationException("Complete about dispose")); + + // Ensure that the local channel triggered through a user disposal + await VerifyChannelCompleted(localChannel, expectedLocalErrMsg); + + // Ensure that the remote channel received the user disposal for protocol version > 1 + await VerifyChannelCompleted(remoteChannel, this.ProtocolMajorVersion > 1 ? expectedRemoteErrMsg : null); + } + [Fact] public async Task Disposal_DisposesTransportStream() { @@ -220,7 +312,8 @@ public async Task Dispose_DisposesChannels() (MultiplexingStream.Channel channel1, MultiplexingStream.Channel channel2) = await this.EstablishChannelsAsync("A"); await this.mx1.DisposeAsync(); Assert.True(channel1.IsDisposed); - await channel1.Completion.WithCancellation(this.TimeoutToken); + await VerifyChannelCompleted(channel1, "User triggered disposal"); + #pragma warning disable CS0618 // Type or member is obsolete await channel1.Input.WaitForWriterCompletionAsync().WithCancellation(this.TimeoutToken); await channel1.Output.WaitForReaderCompletionAsync().WithCancellation(this.TimeoutToken); @@ -1179,6 +1272,19 @@ public async Task FaultingChannelReader() await this.ReadAtLeastAsync(mx2Baseline.Input, 3); } + protected static async Task VerifyChannelCompleted(MultiplexingStream.Channel channel, string? expectedErrMsg) + { + if (expectedErrMsg != null) + { + Exception completionException = await Assert.ThrowsAnyAsync(() => channel.Completion); + Assert.Equal(expectedErrMsg, completionException.Message); + } + else + { + await channel.Completion; + } + } + protected static Task CompleteChannelsAsync(params MultiplexingStream.Channel[] channels) { foreach (MultiplexingStream.Channel? channel in channels) From 728409065ba795bc55a51a4e69617ddd03ef5863 Mon Sep 17 00:00:00 2001 From: Andrew Arnott Date: Sat, 17 Dec 2022 16:19:43 -0700 Subject: [PATCH 2/2] Touch-up on Devesh's pull request --- .DS_Store | Bin 8196 -> 0 bytes .../MultiplexingStream.Channel.cs | 53 ++++++++---------- .../MultiplexingStream.Formatters.cs | 19 ++----- src/Nerdbank.Streams/MultiplexingStream.cs | 47 +++++++--------- src/nerdbank-streams/src/Channel.ts | 12 ++-- .../src/MultiplexingStream.ts | 37 ++++-------- .../src/MultiplexingStreamFormatters.ts | 22 +++----- .../tests/MultiplexingStream.Interop.spec.ts | 12 ++-- .../src/tests/MultiplexingStream.spec.ts | 10 ++-- .../Nerdbank.Streams.Interop.Tests/Program.cs | 4 +- .../MultiplexingStreamTests.cs | 24 ++++---- 11 files changed, 99 insertions(+), 141 deletions(-) delete mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 3f7458e99860d9668c451cb3fa5bc8992072f82a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 8196 zcmeHM%We}f6g_SN1f{f<3ZfE1niV^O`UfFRgd!eAq+PHhFQSxbCdwp;fPn1z0apA1 zEB=7*V1dL(AR)oI_6XUYgsuumwJZBhJm*|rAKNp!Qvj+q>+J$N07|T4x5{jm6uB=o zSJtwgYqW}dppTmvW60Xip+iSdASe(N2nqxRf&%}80(@snDXn<#o4cV63IqktqyqAM zh*-r|GrKTWKRVdx6o9tKZe4iIJV0}z%vLkIFjiCP(_{}SqpB>$P=?O?k-%xInOzub z=un0Zm5r>j3`ObD$%`}`s%mU#g91T;yaJNDZ=#Ktm|^1N?=kcE5qr{RoOC;J(jig~ z*KO9yPhEHgZ2y^!3g$RQ?0NaFH>qe z!GIA@{h0JRe6ZG`z!@ayx?@6gc2Rx2z|=))oo!T^W5)E>;rKCiIpS=R-0FzFHmjs} z%#g8GsBnHgM8#EjYK~nuDy^4|zkqvO?L9QO;w>~>wCJHWEIlq0)u(zrJ&O^ZF{&10 zll5;ix|F|D&d%F&qL}07JpJtJea4a9F-12AyUr6c%hpRYpk{bVRR$i9$~nA(*80dp zQ7;FzfrpH|%bB$G7&3zuJ65!FxEFb18q}zc`?O0=3m%j4%F?f_m5Q}mDO*vCD!MV^ z6IpYI#g*Y29qJQ$K(9;2Qn$!@Yq5&-E#C~i-K*Xc_r}5%h@0e4KZsgbs7pjmdA22X zS%WcSSh|DPQZip#%p!Sw;`!$TcZvIE;m#44@Gs7t%Fjf8wUFz0PLJ|57!p5P6^Xh* z-WQ9VTj!#YTsvZY!YV~@ u3u85fjZVjDbvjP_`yYn9jtCV_HM0w2*~9YJKLl90OnCq6B?!$KRNxo0GP_m) diff --git a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs index 421aacff..ce5c2acc 100644 --- a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs +++ b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs @@ -97,7 +97,7 @@ public class Channel : IDisposableObservable, IDuplexPipe private long? localWindowSize; /// - /// Indicates whether the method has been called. + /// Indicates whether the method has been called. /// private bool isDisposed; @@ -322,28 +322,21 @@ private long RemoteWindowRemaining private bool BackpressureSupportEnabled => this.MultiplexingStream.protocolMajorVersion > 1; /// - /// Immediately terminates the channel and shutdowns any ongoing communication. + /// Immediately terminates the channel and shuts down any ongoing communication. /// /// /// Because this method may terminate the channel immediately and thus can cause previously queued content to not actually be received by the remote party, /// consider this method a "break glass" way of terminating a channel. The preferred method is that both sides "complete writing" and let the channel dispose itself. /// - public void Dispose() - { - string userDisposeMsg = "User triggered disposal"; - MultiplexingProtocolException userDisposeException = new MultiplexingProtocolException(userDisposeMsg); - - this.DisposeChannel(userDisposeException); - } + public void Dispose() => this.Dispose(null); /// /// Disposes the channel by releasing all resources associated with it. /// - /// The exception to dispose this channel with, - /// defaults to null. - internal void DisposeChannel(Exception? disposeException = null) + /// The exception to dispose this channel with. + internal void Dispose(Exception? disposeException) { - // Ensure that we don't call dispose more than once + // Ensure that we don't call dispose more than once. lock (this.SyncObject) { if (this.isDisposed) @@ -438,7 +431,7 @@ internal void DisposeChannel(Exception? disposeException = null) internal async Task OnChannelTerminatedAsync(Exception? remoteError = null) { - // Don't process the frame if the channel has already been diposed + // Don't process the frame if the channel has already been disposed. lock (this.SyncObject) { if (this.isDisposed) @@ -459,17 +452,14 @@ internal async Task OnChannelTerminatedAsync(Exception? remoteError = null) // We fell victim to a race condition. It's OK to just swallow it because the writer was never created, so it needn't be completed. } - // Terminate the channel + // Terminate the channel. this.DisposeSelfOnFailure(Task.Run(async delegate { - // Ensure that we processed the channel before terminating it + // Ensure that we processed the channel before terminating it. await this.OptionsApplied.ConfigureAwait(false); - // Record that we were terminated remotely this.IsRemotelyTerminated = true; - - // Dispose of the channel - this.DisposeChannel(remoteError); + this.Dispose(remoteError); })); } @@ -731,15 +721,16 @@ private void ApplyChannelOptions(ChannelOptions channelOptions) this.mxStreamIOReaderCompleted = this.ProcessOutboundTransmissionsAsync(); this.DisposeSelfOnFailure(this.mxStreamIOReaderCompleted); this.DisposeSelfOnFailure(this.AutoCloseOnPipesClosureAsync()); - - // Record that we have applied the channel options - this.optionsAppliedTaskSource?.TrySetResult(null); } catch (Exception ex) { this.optionsAppliedTaskSource?.TrySetException(ex); throw; } + finally + { + this.optionsAppliedTaskSource?.TrySetResult(null); + } } /// @@ -890,13 +881,13 @@ private async Task ProcessOutboundTransmissionsAsync() } } - // Add a trace indicating that we caught an exception in ProcessOutbound + // Add a trace indicating that we caught an exception. if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information)) { this.TraceSource.TraceEvent( TraceEventType.Error, 0, - "Rethrowing caught exception in ProcessOutboundAsync: {0}", + "Rethrowing caught exception in " + nameof(this.ProcessOutboundTransmissionsAsync) + ": {0}", ex.Message); } @@ -970,7 +961,7 @@ private async Task AutoCloseOnPipesClosureAsync() this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.ChannelAutoClosing, "Channel {0} \"{1}\" self-closing because both reader and writer are complete.", this.QualifiedId, this.Name); } - this.DisposeChannel(null); + this.Dispose(null); } private void Fault(Exception exception) @@ -988,12 +979,16 @@ private void Fault(Exception exception) this.faultingException ??= exception; } - if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Critical) ?? false) + if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Error) ?? false) { - this.TraceSource!.TraceEvent(TraceEventType.Warning, 0, "Channel faulting self due to exception: {0}", this.faultingException.Message); + this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.ChannelFatalError, "Channel faulted with exception: {0}", this.faultingException); + if (exception != this.faultingException) + { + this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.ChannelFatalError, "A subsequent fault exception was reported: {0}", exception); + } } - this.DisposeChannel(this.faultingException); + this.Dispose(this.faultingException); } private void DisposeSelfOnFailure(Task task) diff --git a/src/Nerdbank.Streams/MultiplexingStream.Formatters.cs b/src/Nerdbank.Streams/MultiplexingStream.Formatters.cs index ca14ca83..732ed4db 100644 --- a/src/Nerdbank.Streams/MultiplexingStream.Formatters.cs +++ b/src/Nerdbank.Streams/MultiplexingStream.Formatters.cs @@ -411,7 +411,7 @@ internal override void WriteFrame(FrameHeader header, ReadOnlySequence pay } /// - /// Creates a payload to sent alongside . + /// Creates a payload for a frame. /// /// The exception to send to the remote side if there is one. /// The payload to send when a channel gets terminated. @@ -428,16 +428,7 @@ internal ReadOnlySequence SerializeException(Exception? exception) writer.WriteArrayHeader(1); // Get the exception to send to the remote side - string errorMessage = exception.Message; - if (errorMessage == null || errorMessage.Length == 0) - { - errorMessage = string.Format( - "Exception of type '{0}' was thrown", - exception.GetType().ToString()); - } - - // Write the error message to the payload being sent - writer.Write(errorMessage); + writer.Write($"{exception.GetType().Name}: {exception.Message}"); writer.Flush(); return sequence; @@ -450,7 +441,7 @@ internal ReadOnlySequence SerializeException(Exception? exception) /// The error message in this payload if there is one, null otherwise. internal Exception? DeserializeException(ReadOnlySequence payload) { - // If it is empty then return null + // An empty payload means the remote side closed the channel without an exception. if (payload.IsEmpty) { return null; @@ -459,13 +450,13 @@ internal ReadOnlySequence SerializeException(Exception? exception) var reader = new MessagePackReader(payload); int numElements = reader.ReadArrayHeader(); - // We received an empty payload + // We received an empty payload. if (numElements == 0) { return null; } - // Get the exception message and return it as an exception + // Get the exception message and return it as an exception. string remoteErrorMsg = reader.ReadString(); return new MultiplexingProtocolException($"Received error from remote side: {remoteErrorMsg}"); } diff --git a/src/Nerdbank.Streams/MultiplexingStream.cs b/src/Nerdbank.Streams/MultiplexingStream.cs index 18b72e81..37b9473b 100644 --- a/src/Nerdbank.Streams/MultiplexingStream.cs +++ b/src/Nerdbank.Streams/MultiplexingStream.cs @@ -173,6 +173,10 @@ private enum TraceEventId { HandshakeSuccessful = 1, HandshakeFailed, + + /// + /// A fatal error occurred at the overall multiplexing stream level, taking down the whole connection. + /// FatalError, UnexpectedChannelAccept, ChannelAutoClosing, @@ -209,6 +213,11 @@ private enum TraceEventId /// Raised when the protocol handshake is starting, to annouce the major version being used. /// HandshakeStarted, + + /// + /// A fatal exception occurred that took down one channel. + /// + ChannelFatalError, } /// @@ -238,11 +247,6 @@ private enum TraceEventId /// internal CancellationToken DisposalToken => this.disposalTokenSource.Token; - /// - /// Gets a value indicating whether channels in this stream supports sending/receiving termination exceptions. - /// - private bool SupportTerminatedExceptions => this.protocolMajorVersion > 1; - /// /// Gets a factory for instances to attach to a newly opened /// when its is . @@ -685,7 +689,7 @@ public async ValueTask DisposeAsync() { foreach (KeyValuePair entry in this.openChannels) { - entry.Value.Dispose(); + entry.Value.Dispose(new ObjectDisposedException(nameof(MultiplexingStream))); } foreach (KeyValuePair>> entry in this.acceptingChannels) @@ -919,22 +923,18 @@ private async Task OnChannelTerminatedAsync(QualifiedChannelId channelId, ReadOn if (channel is Channel) { - // Try to get the exception sent from the remote side if there was one sent - Exception? remoteException = null; - if (this.SupportTerminatedExceptions && !payload.IsEmpty) - { - V2Formatter castedFormatter = (V2Formatter)this.formatter; - remoteException = castedFormatter.DeserializeException(payload); - } + // Try to get the exception sent from the remote side if there was one sent. + Exception? remoteException = (this.formatter as V2Formatter)?.DeserializeException(payload); - if (remoteException != null && this.TraceSource.Switch.ShouldTrace(TraceEventType.Information)) + if (remoteException != null && this.TraceSource.Switch.ShouldTrace(TraceEventType.Error)) { this.TraceSource.TraceEvent( - TraceEventType.Warning, - (int)TraceEventId.ChannelDisposed, - "Received Channel Terminated for channel {0} with exception: {1}", + TraceEventType.Error, + (int)TraceEventId.ChannelFatalError, + "Received {2} for channel {0} with exception: {1}", channelId, - remoteException.Message); + remoteException.Message, + ControlCode.ChannelTerminated); } await channel.OnChannelTerminatedAsync(remoteException).ConfigureAwait(false); @@ -1117,10 +1117,10 @@ private void AcceptChannelOrThrow(Channel channel, ChannelOptions options) } /// - /// Raised when is called and any local transmission is completed. + /// Raised when is called and any local transmission is completed. /// /// The channel that is closing down. - /// The exception to sent to the remote side alongside the disposal. + /// The exception to send to the remote side alongside the disposal. private void OnChannelDisposed(Channel channel, Exception? exception = null) { Requires.NotNull(channel, nameof(channel)); @@ -1136,12 +1136,7 @@ private void OnChannelDisposed(Channel channel, Exception? exception = null) // If there is an error and we support sending errors then // serialize the exception and store it in the payload - ReadOnlySequence payload = default; - if (this.SupportTerminatedExceptions && exception != null) - { - V2Formatter castedFormatter = (V2Formatter)this.formatter; - payload = castedFormatter.SerializeException(exception); - } + ReadOnlySequence payload = (this.formatter as V2Formatter)?.SerializeException(exception) ?? default; if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information)) { diff --git a/src/nerdbank-streams/src/Channel.ts b/src/nerdbank-streams/src/Channel.ts index 0d093326..1dad2ebe 100644 --- a/src/nerdbank-streams/src/Channel.ts +++ b/src/nerdbank-streams/src/Channel.ts @@ -54,10 +54,10 @@ export abstract class Channel implements IDisposableObservable { } /** - * Closes this channel. If the channel with protocol versions > 1 - * are disposed with an error, then the error is transmitted to the remote side. + * Closes this channel. + * @param error An optional error to send to the remote side, if this multiplexing stream is using protocol versions >= 2. */ - public dispose(error : Error | null = null): void { + public dispose(error?: Error | null): void { // The interesting stuff is in the derived class. this._isDisposed = true; } @@ -248,7 +248,7 @@ export class ChannelClass extends Channel { } } - public dispose(error : Error | null = null): void { + public dispose(error?: Error | null): void { if (!this.isDisposed) { super.dispose(); @@ -269,7 +269,7 @@ export class ChannelClass extends Channel { } // Send the notification, but we can't await the result of this. - caught(this._multiplexingStream.onChannelDisposed(this, error)); + caught(this._multiplexingStream.onChannelDisposed(this, error ?? null)); } } @@ -286,4 +286,4 @@ export class ChannelClass extends Channel { } } } -} \ No newline at end of file +} diff --git a/src/nerdbank-streams/src/MultiplexingStream.ts b/src/nerdbank-streams/src/MultiplexingStream.ts index 0c6931d1..1a1ae7fb 100644 --- a/src/nerdbank-streams/src/MultiplexingStream.ts +++ b/src/nerdbank-streams/src/MultiplexingStream.ts @@ -106,15 +106,15 @@ export abstract class MultiplexingStream implements IDisposableObservable { * @param options Options to customize the behavior of the stream. * @returns The multiplexing stream. */ - public static Create( + public static Create( stream: NodeJS.ReadWriteStream, - options?: MultiplexingStreamOptions) : MultiplexingStream { + options?: MultiplexingStreamOptions): MultiplexingStream { options ??= { protocolMajorVersion: 3 }; options.protocolMajorVersion ??= 3; const formatter: MultiplexingStreamFormatter | undefined = options.protocolMajorVersion === 3 ? new MultiplexingStreamV3Formatter(stream) : - undefined; + undefined; if (!formatter) { throw new Error(`Protocol major version ${options.protocolMajorVersion} is not supported. Try CreateAsync instead.`); } @@ -587,22 +587,15 @@ export class MultiplexingStreamClass extends MultiplexingStream { } } - public async onChannelDisposed(channel: ChannelClass, error : Error | null = null) { + public async onChannelDisposed(channel: ChannelClass, error: Error | null) { if (!this._completionSource.isCompleted) { try { - // Determine the payload to send to the error - let payloadToSend : Buffer = Buffer.alloc(0); - if(this.protocolMajorVersion > 1 && error != null) { - // Get the formatter use to serialize the error - const castedFormatter = this.formatter as MultiplexingStreamV2Formatter; - - // Get the payload using the formatter - payloadToSend = castedFormatter.serializeException(error); - } + const payload = this.protocolMajorVersion > 1 && error + ? (this.formatter as MultiplexingStreamV2Formatter).serializeException(error) + : Buffer.alloc(0); - // Create the header and send the frame to the remote side const frameHeader = new FrameHeader(ControlCode.ChannelTerminated, channel.qualifiedId); - await this.sendFrameAsync(frameHeader, payloadToSend); + await this.sendFrameAsync(frameHeader, payload); } catch (err) { // Swallow exceptions thrown about channel disposal if the whole stream has been taken down. if (this.isDisposed) { @@ -749,17 +742,11 @@ export class MultiplexingStreamClass extends MultiplexingStream { this.deleteOpenChannel(channelId); this.removeChannelFromOfferedQueue(channel); - // Extract the exception that we received from the remote side - let remoteException : Error | null = null; - if(this.protocolMajorVersion > 1 && payload.length > 0) { - // Get the formatter - const castedFormatter = this.formatter as MultiplexingStreamV2Formatter; - - // Extract the error using the casted formatter - remoteException = castedFormatter.deserializeException(payload); - } + // Extract the exception that we received from the remote side. + const remoteException = this.protocolMajorVersion > 1 + ? (this.formatter as MultiplexingStreamV2Formatter).deserializeException(payload) + : null; - // Dispose the channel channel.dispose(remoteException); } } diff --git a/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts b/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts index e11245e1..e0d5a6bf 100644 --- a/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts +++ b/src/nerdbank-streams/src/MultiplexingStreamFormatters.ts @@ -293,26 +293,20 @@ export class MultiplexingStreamV2Formatter extends MultiplexingStreamFormatter { return msgpack.decode(payload)[0]; } - serializeException(error: Error | null) : Buffer { - // If the error doesn't exist then return an empty buffer + serializeException(error: Error | null): Buffer { + // If the error doesn't exist then return an empty buffer. if (!error) { return Buffer.alloc(0); } - // Determine the error message to add to the payload - let errorMsg : string = error.message; - if (!errorMsg) { - errorMsg = `Error of type ${error.name} was thrown`; - } - - // Return the buffer for this payload - const payload : any[] = [errorMsg]; + const errorMsg: string = `${error.name}: ${error.message}`; + const payload: any[] = [errorMsg]; return msgpack.encode(payload); } - deserializeException(payload: Buffer) : Error | null { - // If the payload is empty then return null - if(payload.length === 0) { + deserializeException(payload: Buffer): Error | null { + // If the payload is empty then return null. + if (payload.length === 0) { return null; } @@ -323,7 +317,7 @@ export class MultiplexingStreamV2Formatter extends MultiplexingStreamFormatter { } // Get error message and return the error to the remote side - let errorMsg : string = msgpack.decode(payload)[0]; + let errorMsg: string = msgpack.decode(payload)[0]; errorMsg = `Received error from remote side: ${errorMsg}`; return new Error(errorMsg); } diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts index 5e5f3044..fd0444bb 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.Interop.spec.ts @@ -73,8 +73,8 @@ import { ChannelOptions } from "../ChannelOptions"; }); it("Can offer error completed channel", async () => { - const errorMsg : string = "Hello world"; - const error : Error = new Error(errorMsg); + const errorMsg: string = "Hello world"; + const error: Error = new Error(errorMsg); const channelToCompleteWithError = await mx.offerChannelAsync("clientErrorOffer"); const communicationChannel = await mx.offerChannelAsync("clientErrorOfferComm"); @@ -82,12 +82,12 @@ import { ChannelOptions } from "../ChannelOptions"; channelToCompleteWithError.dispose(error); channelToCompleteWithError.completion.then(_ => { throw new Error("Channel disposed with error didn't complete with error"); - }).catch( (channelCompleteErr) => { + }).catch((channelCompleteErr) => { expect(channelCompleteErr.message).toContain(errorMsg); }); - let expectedErrMessage = `Received error from remote side: ${errorMsg}`; - if(protocolMajorVersion <= 1) { + let expectedErrMessage = `Received error from remote side: Error: ${errorMsg}`; + if (protocolMajorVersion <= 1) { expectedErrMessage = "Completed with no error"; } @@ -103,7 +103,7 @@ import { ChannelOptions } from "../ChannelOptions"; it("Can accept error completed channel", async () => { const channelCompletedWithError = await mx.acceptChannelAsync("serverErrorOffer"); - const errorExpectedMessage : string = "Received error from remote side: Hello World"; + const errorExpectedMessage: string = "Received error from remote side: Exception: Hello World"; const channelCompleted = new Deferred(); channelCompletedWithError.completion.then(async _ => { diff --git a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts index 94ee3d9d..1bfd251c 100644 --- a/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts +++ b/src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts @@ -249,10 +249,10 @@ import { nextTick } from "process"; await channels[1].completion; }); - it("channel terminated with error", async() => { - // Determine the error to complete the local channel with - const errorMsg : string = "Hello world"; - const error : Error = new Error(errorMsg); + it("channel terminated with error", async () => { + // Determine the error to complete the local channel with. + const errorMsg: string = "Hello world"; + const error: Error = new Error(errorMsg); // Get the channels to send/receive data over const channels = await Promise.all([ @@ -278,7 +278,7 @@ import { nextTick } from "process"; }); // Ensure that the remote channel only throws an error for protocol version > 1 - remoteChannel.completion.then( response => { + remoteChannel.completion.then(response => { remoteChannelCompleted.resolve(); expect(protocolMajorVersion).toEqual(1); }).catch(remoteChannelErr => { diff --git a/test/Nerdbank.Streams.Interop.Tests/Program.cs b/test/Nerdbank.Streams.Interop.Tests/Program.cs index 3c72fb51..657bc5c1 100644 --- a/test/Nerdbank.Streams.Interop.Tests/Program.cs +++ b/test/Nerdbank.Streams.Interop.Tests/Program.cs @@ -103,8 +103,8 @@ private async Task ClientOfferAsync() private async Task ServerOffersErrorCompletedChannel() { MultiplexingStream.Channel? expectedErrorChannel = await this.mx.OfferChannelAsync("serverErrorOffer"); - string errrorMessage = "Hello World"; - await expectedErrorChannel.Output.CompleteAsync(new Exception(errrorMessage)); + string errorMessage = "Hello World"; + await expectedErrorChannel.Output.CompleteAsync(new Exception(errorMessage)); } private async Task ServerOfferAsync() diff --git a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs index 46031565..101fc06b 100644 --- a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs +++ b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs @@ -201,7 +201,7 @@ public async Task OfferErrorCompletedPipe() { // Prepare a readonly pipe that is completed with an error string localErrMsg = "Hello World"; - string remoteErrMsg = "Received error from remote side: " + localErrMsg; + string remoteErrMsg = $"Received error from remote side: {nameof(ApplicationException)}: {localErrMsg}"; var pipe = new Pipe(); await pipe.Writer.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken); pipe.Writer.Complete(new ApplicationException(localErrMsg)); @@ -222,7 +222,7 @@ public async Task OfferErrorCompletedPipe() public async Task OfferEmptyErrorCompletedPipe() { string localErrMsg = string.Empty; - string remoteErrMsg = "Received error from remote side: Exception of type 'System.IndexOutOfRangeException' was thrown"; + string remoteErrMsg = $"Received error from remote side: {nameof(IndexOutOfRangeException)}: {localErrMsg}"; // Prepare a readonly pipe that is completed with an error var pipe = new Pipe(); @@ -245,7 +245,7 @@ public async Task OfferEmptyErrorCompletedPipe() public async Task OfferNullErrorCompletedPipe() { string localErrMsg = "Exception of type 'System.NullReferenceException' was thrown."; - string remoteErrMsg = "Received error from remote side: Exception of type 'System.NullReferenceException' was thrown."; + string remoteErrMsg = $"Received error from remote side: {nameof(NullReferenceException)}: {localErrMsg}"; // Prepare a readonly pipe that is completed with an error var pipe = new Pipe(); @@ -278,25 +278,21 @@ public async Task Dispose_CancelsOutstandingOperations() [Fact] public async Task Dispose_CompleteWithErrorAfterwards() { - string expectedLocalErrMsg = "User triggered disposal"; - string expectedRemoteErrMsg = "Received error from remote side: User triggered disposal"; - // Create the local and remote channels using channel names Task? localChannelTask = this.mx1.OfferChannelAsync("completeAfterwards", this.TimeoutToken); Task? remoteChannelTask = this.mx2.AcceptChannelAsync("completeAfterwards", this.TimeoutToken); - await this.WaitForEphemeralChannelOfferToPropagateAsync(); MultiplexingStream.Channel remoteChannel = await remoteChannelTask; MultiplexingStream.Channel localChannel = await localChannelTask; - // Dispose the local channel and then complete it later with an error + // Dispose the local channel and then complete the writer that *we* own later with an error. localChannel.Dispose(); - await localChannel.Output.CompleteAsync(new InvalidOperationException("Complete about dispose")); + await localChannel.Output.CompleteAsync(new InvalidOperationException("Complete after dispose")); - // Ensure that the local channel triggered through a user disposal - await VerifyChannelCompleted(localChannel, expectedLocalErrMsg); + // Ensure that the local channel completed without error (because we disposed before faulting the PipeWriter). + await VerifyChannelCompleted(localChannel, null); - // Ensure that the remote channel received the user disposal for protocol version > 1 - await VerifyChannelCompleted(remoteChannel, this.ProtocolMajorVersion > 1 ? expectedRemoteErrMsg : null); + // Ensure that the remote channel similarly did not receive notice of any fault. + await VerifyChannelCompleted(remoteChannel, null); } [Fact] @@ -312,7 +308,7 @@ public async Task Dispose_DisposesChannels() (MultiplexingStream.Channel channel1, MultiplexingStream.Channel channel2) = await this.EstablishChannelsAsync("A"); await this.mx1.DisposeAsync(); Assert.True(channel1.IsDisposed); - await VerifyChannelCompleted(channel1, "User triggered disposal"); + await VerifyChannelCompleted(channel1, new ObjectDisposedException(nameof(MultiplexingStream)).Message); #pragma warning disable CS0618 // Type or member is obsolete await channel1.Input.WaitForWriterCompletionAsync().WithCancellation(this.TimeoutToken);