Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiplexingStream transmit errors when shutting down channels #517

Merged
merged 86 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
2df4bb9
Read exception being received but not thrown
sarda-devesh May 30, 2022
439906b
Basic Error Test
sarda-devesh May 31, 2022
50aeb0f
Update test to make sure error message is received
sarda-devesh May 31, 2022
5410649
Added while loop in error test
sarda-devesh Jun 1, 2022
80b899e
Implemented basic interface for error serialization
sarda-devesh Jun 5, 2022
e9b529f
Added in locking for Dispose in MultiplexingStream.Channel and revert…
sarda-devesh Jun 9, 2022
e860c10
Merge branch 'AArnott:main' into main
sarda-devesh Jun 9, 2022
d2b6a00
Updated SDK version locally
sarda-devesh Jun 9, 2022
4cc7a2d
Implemented custom serialization using messagepack
sarda-devesh Jun 14, 2022
d3560e1
Added support for write error only in V2 and V3
sarda-devesh Jun 18, 2022
89d6744
Accepted all incoming changes
sarda-devesh Jun 23, 2022
409c07a
Merge branch 'AArnott-main' into main
sarda-devesh Jun 23, 2022
33be374
Removed all changes in current branch
sarda-devesh Jun 23, 2022
19fc8f3
Reimplemented formatter code for messpack to write error interaction
sarda-devesh Jun 23, 2022
8d3e2b8
Rewrote client side code of dealing content write error
sarda-devesh Jun 23, 2022
b379da9
Rewrote code on remote to process write error but not close stream
sarda-devesh Jun 23, 2022
738e67b
Implemented sketch solution to get OfferPipeWithError to pass
sarda-devesh Jun 23, 2022
61a6edd
Changed dispose to close writer with exception if exception passed in…
sarda-devesh Jun 25, 2022
2b86c2d
Changed to using faulting exception instead of custom created field
sarda-devesh Jun 28, 2022
1cc3272
Updated formatter for error message
sarda-devesh Jul 25, 2022
425a208
Modified formatter to handle errors
sarda-devesh Jul 25, 2022
90f0fd3
Implemented error handling in the channel
sarda-devesh Jul 25, 2022
d3d270d
Wrote unit test
sarda-devesh Jul 25, 2022
783068c
Passing basic error test
sarda-devesh Jul 25, 2022
7e1ba18
Merge branch 'main' of https://github.com/AArnott/Nerdbank.Streams in…
sarda-devesh Jul 25, 2022
c95905b
Resolved merge conflict?
sarda-devesh Jul 25, 2022
8b0e062
Merge pull request #3 from sarda-devesh/devesh/main
sarda-devesh Jul 25, 2022
9a28a50
Updated documentation
sarda-devesh Jul 25, 2022
d2ed543
Channels get completed with errors
sarda-devesh Jul 25, 2022
7d23ff7
Changed C# Stream to complete with errors
sarda-devesh Jul 30, 2022
75496f7
Added public field of remote exception to easily access remote error
sarda-devesh Jul 30, 2022
9116de1
Added interop test to ensure that error is sent properly
sarda-devesh Jul 30, 2022
7932c26
Changed C# code to not have public field
sarda-devesh Aug 3, 2022
e6ffbec
Code push
sarda-devesh Aug 5, 2022
9ba19a7
Code save
sarda-devesh Aug 5, 2022
1f6e252
Changed back to basic state
sarda-devesh Aug 5, 2022
317d1eb
Complete restore try
sarda-devesh Aug 5, 2022
1a64f90
Downloaded code from upstream master
sarda-devesh Aug 5, 2022
0e9656c
Changed formatter to support formatting error messages
sarda-devesh Aug 9, 2022
9448156
Added code on the remote to receive exceptions from the sender
sarda-devesh Aug 9, 2022
26add64
Remote side completes with an error test passing
sarda-devesh Aug 9, 2022
27d56a7
All MultiplexingStream tests passing
sarda-devesh Aug 10, 2022
61ca8ff
v2 Interop tests passing
sarda-devesh Aug 18, 2022
b8721f4
Passing all typescript tests
sarda-devesh Aug 18, 2022
e4f8cea
Including changes in Nerdbank.Stream
sarda-devesh Aug 18, 2022
626242e
All C# tests passsing
sarda-devesh Aug 18, 2022
00dd45c
Fixed style issues in C# code
sarda-devesh Aug 20, 2022
a8c5fbd
Fixed typescript style issue
sarda-devesh Aug 20, 2022
f9ed5a2
Added comments to C# test
sarda-devesh Aug 20, 2022
ed2d5d6
Added a writing error class
sarda-devesh Aug 20, 2022
270a484
Updated writing error class in C#
sarda-devesh Aug 20, 2022
da0f302
Merge branch 'AArnott:main' into main
sarda-devesh Aug 20, 2022
6cd7a62
Fixed linter issues
sarda-devesh Aug 20, 2022
1c92e32
Some doc and syntax touch-ups
AArnott Aug 25, 2022
6b3691e
More syntax touch-ups
AArnott Aug 25, 2022
2021f29
Fix a regression my recent syntax changes made
AArnott Aug 25, 2022
06adacf
Fix doc comment
AArnott Aug 25, 2022
1e87e00
Fixed style issues
sarda-devesh Aug 25, 2022
782ad6d
Revert namespace change
sarda-devesh Aug 25, 2022
d6ec961
Fixed build errors
sarda-devesh Aug 25, 2022
0586aeb
Switched to using protocol version for cast checking
sarda-devesh Aug 27, 2022
562caf2
Added fault method
sarda-devesh Aug 27, 2022
3e4baf8
Fixed formatter mismatch
sarda-devesh Aug 27, 2022
d8dcb0c
Moved newly added tests with other tests
sarda-devesh Aug 29, 2022
64a3e03
Merge branch 'AArnott:main' into main
sarda-devesh Sep 15, 2022
2aaea24
Added check for channel dispoal before sending of error
sarda-devesh Sep 17, 2022
d0a63e5
Code push to see error in pipeline
sarda-devesh Sep 17, 2022
bafac8a
Fixed uncessary lock acquisiton
sarda-devesh Sep 18, 2022
3b7352b
Tried to get error to show up locally
sarda-devesh Sep 19, 2022
ebf0903
Added trace statements for better error visibility
sarda-devesh Sep 19, 2022
b33e34e
Changed channel options error to be more verbose
sarda-devesh Sep 19, 2022
3de306f
Merge branch 'AArnott:main' into main
sarda-devesh Sep 19, 2022
fd44d7e
Tried to fix race condition
sarda-devesh Sep 20, 2022
738add2
Only swallow exception if it was not user specified
sarda-devesh Sep 20, 2022
a2eae37
Retrigger build pipeline
sarda-devesh Sep 20, 2022
53a142b
Add check in TryAcceptOffer that channel is disposed
sarda-devesh Sep 20, 2022
29fc1f8
Verify that it was an object disposed exception
sarda-devesh Sep 20, 2022
3ab4f46
Revert to old state to check for disposal
sarda-devesh Sep 20, 2022
b38d67b
Retrigger build pipeline try 2
sarda-devesh Sep 20, 2022
b0248f5
Checked for acceptance transition to faulted
sarda-devesh Sep 20, 2022
d9f4933
Added log message inside AcceptChannelOrThrow
sarda-devesh Sep 20, 2022
9c3360b
Retrigger build pipeline try 3
sarda-devesh Sep 20, 2022
9b40d83
Check for both acceptance and completion
sarda-devesh Sep 20, 2022
125e2fa
Pipeline rebuild
sarda-devesh Sep 20, 2022
f6819e9
Keep track of channel acceptance in process outbound
sarda-devesh Sep 20, 2022
a2aef17
Touch-ups
AArnott Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 60 additions & 13 deletions src/Nerdbank.Streams/MultiplexingStream.Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public class Channel : IDisposableObservable, IDuplexPipe
private Exception? faultingException;

/// <summary>
/// The <see cref="PipeReader"/> to use to get data to be transmitted over the <see cref="Streams.MultiplexingStream"/>.
/// The <see cref="PipeReader"/> to use to get data to be transmitted over the <see cref="Streams.MultiplexingStream"/>. Any errors passed to this
/// <see cref="PipeReader"/> are transmitted to the remote side.
/// </summary>
private PipeReader? mxStreamIOReader;

Expand Down Expand Up @@ -142,6 +143,11 @@ public class Channel : IDisposableObservable, IDuplexPipe
/// </summary>
private bool? existingPipeGiven;

/// <summary>
/// A value indicating whether this <see cref="Channel"/> received an error from a remote party in <see cref="OnContentWritingCompleted(MultiplexingProtocolException?)"/>.
/// </summary>
private bool receivedRemoteException;

/// <summary>
/// Initializes a new instance of the <see cref="Channel"/> class.
/// </summary>
Expand Down Expand Up @@ -361,7 +367,7 @@ public void Dispose()
mxStreamIOWriter = self.mxStreamIOWriter;
}

mxStreamIOWriter?.Complete();
mxStreamIOWriter?.Complete(self.GetRemoteException());
self.mxStreamIOWriterCompleted.Set();
}
finally
Expand Down Expand Up @@ -401,7 +407,17 @@ public void Dispose()
this.remoteWindowHasCapacity.Set();

this.disposalTokenSource.Cancel();
this.completionSource.TrySetResult(null);

// If we are disposing due to receiving or sending an exception, relay that to our client.
if (this.faultingException != null)
{
this.completionSource.TrySetException(this.faultingException);
}
else
{
this.completionSource.TrySetResult(null);
}

this.MultiplexingStream.OnChannelDisposed(this);
}
}
Expand All @@ -418,7 +434,7 @@ internal async Task OnChannelTerminatedAsync()
// We Complete the writer because only the writing (logical) thread should complete it
// to avoid race conditions, and Channel.Dispose can be called from any thread.
using PipeWriterRental writerRental = await this.GetReceivedMessagePipeWriterAsync().ConfigureAwait(false);
await writerRental.Writer.CompleteAsync().ConfigureAwait(false);
await writerRental.Writer.CompleteAsync(this.GetRemoteException()).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
Expand Down Expand Up @@ -497,32 +513,49 @@ internal async ValueTask OnContentAsync(ReadOnlySequence<byte> payload, Cancella
/// <summary>
/// Called by the <see cref="MultiplexingStream"/> when when it will not be writing any more data to the channel.
/// </summary>
internal void OnContentWritingCompleted()
/// <param name="error">The error in writing that originated on the remote side, if applicable.</param>
internal void OnContentWritingCompleted(MultiplexingProtocolException? error = null)
{
// If we have already received an error from the remote side then no need to complete the channel again.
if (this.receivedRemoteException)
{
return;
}

// Set the state of the channel based on whether we are completing due to an error.
lock (this.SyncObject)
{
this.faultingException ??= error;
this.receivedRemoteException = error != null;
}

this.DisposeSelfOnFailure(Task.Run(async delegate
{
if (!this.IsDisposed)
{
try
{
// If the channel is not disposed, then first try to close the writer used by the channel owner
using PipeWriterRental writerRental = await this.GetReceivedMessagePipeWriterAsync().ConfigureAwait(false);
await writerRental.Writer.CompleteAsync().ConfigureAwait(false);
await writerRental.Writer.CompleteAsync(this.GetRemoteException()).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
// If not, try to close the underlying writer.
if (this.mxStreamIOWriter != null)
{
using AsyncSemaphore.Releaser releaser = await this.mxStreamIOWriterSemaphore.EnterAsync().ConfigureAwait(false);
await this.mxStreamIOWriter.CompleteAsync().ConfigureAwait(false);
await this.mxStreamIOWriter.CompleteAsync(this.GetRemoteException()).ConfigureAwait(false);
}
}
}
else
{
// If the channel has not been disposed then just close the underlying writer
if (this.mxStreamIOWriter != null)
{
using AsyncSemaphore.Releaser releaser = await this.mxStreamIOWriterSemaphore.EnterAsync().ConfigureAwait(false);
await this.mxStreamIOWriter.CompleteAsync().ConfigureAwait(false);
await this.mxStreamIOWriter.CompleteAsync(this.GetRemoteException()).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -644,6 +677,15 @@ private async ValueTask<PipeWriterRental> GetReceivedMessagePipeWriterAsync(Canc
}
}

/// <summary>
/// Gets the <see cref="Exception"/> exception that we received from the remote side when completing this channel.
/// </summary>
/// <returns>The exception sent from the remote if there is one, null otherwise.</returns>
private Exception? GetRemoteException()
sarda-devesh marked this conversation as resolved.
Show resolved Hide resolved
{
return this.receivedRemoteException ? this.faultingException : null;
}

/// <summary>
/// Apply channel options to this channel, including setting up or linking to an user-supplied pipe writer/reader pair.
/// </summary>
Expand Down Expand Up @@ -832,13 +874,22 @@ private async Task ProcessOutboundTransmissionsAsync()
}
catch (Exception ex)
{
// If the operation had been cancelled then we are expecting to receive this error so don't transmit it.
if (ex is OperationCanceledException && this.DisposalToken.IsCancellationRequested)
{
await mxStreamIOReader!.CompleteAsync().ConfigureAwait(false);
}
else
{
// If not record it as the error to dispose this channel with
lock (this.SyncObject)
{
this.faultingException = ex;
sarda-devesh marked this conversation as resolved.
Show resolved Hide resolved
}

// Since we're not expecting to receive this error, transmit the error to the remote side.
await mxStreamIOReader!.CompleteAsync(ex).ConfigureAwait(false);
this.MultiplexingStream.OnChannelWritingError(this, ex);
}

throw;
Expand Down Expand Up @@ -916,11 +967,7 @@ private async Task AutoCloseOnPipesClosureAsync()

private void Fault(Exception exception)
{
if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Critical) ?? false)
{
this.TraceSource!.TraceEvent(TraceEventType.Critical, (int)TraceEventId.FatalError, "Channel Closing self due to exception: {0}", exception);
}

// Record the faulting exception unless it is not the original exception.
lock (this.SyncObject)
{
this.faultingException ??= exception;
Expand Down
4 changes: 4 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.ChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public ChannelOptions()
/// The <see cref="PipeWriter"/> specified in <see cref="IDuplexPipe.Output"/> *must* be created with <see cref="PipeOptions.PauseWriterThreshold"/> that *exceeds*
/// the value of <see cref="ChannelReceivingWindowSize"/> and <see cref="Options.DefaultChannelReceivingWindowSize"/>.
/// </para>
/// <para>
/// A faulted <see cref="IDuplexPipe.Input" /> (one where <see cref="PipeReader.CompleteAsync(Exception)" /> is called with an exception)
/// will have its exception relayed to the remote party before closing the channel.
/// </para>
/// </remarks>
/// <exception cref="ArgumentException">Thrown if set to an <see cref="IDuplexPipe"/> that returns <c>null</c> for either of its properties.</exception>
public IDuplexPipe? ExistingPipe
Expand Down
6 changes: 6 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ internal enum ControlCode : byte
/// allowing them to send more data.
/// </summary>
ContentProcessed,

/// <summary>
/// Sent when one party experiences an exception related to a particular channel and carries details regarding the error.
/// This is sent right before a <see cref="ContentWritingCompleted"/> frame closes that channel.
/// </summary>
ContentWritingError,
}
}
}
51 changes: 51 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.Formatters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,57 @@ internal override Channel.AcceptanceParameters DeserializeAcceptanceParameters(R
return new Channel.AcceptanceParameters(remoteWindowSize);
}

/// <summary>
/// Returns the serialized representation of a <see cref="WriteError"/> object using <see cref="MessagePack"/>.
/// </summary>
/// <param name="protocolVersion">The protocol version to include in the serialized error buffer.</param>
sarda-devesh marked this conversation as resolved.
Show resolved Hide resolved
/// <param name="error">An instance of <see cref="WriteError"/> that we want to seralize.</param>
/// <returns>A <see cref="Sequence{T}"/> which is the serialized version of the error.</returns>
internal ReadOnlySequence<byte> SerializeWriteError(int protocolVersion, WriteError error)
{
// Create the payload
using Sequence<byte> errorSequence = new(ArrayPool<byte>.Shared);
MessagePackWriter writer = new(errorSequence);

// Write the error message and the protocol version to the payload
writer.WriteArrayHeader(2);
writer.WriteInt32(protocolVersion);
writer.Write(error.ErrorMessage);

// Return the payload to the caller
writer.Flush();
return errorSequence.AsReadOnlySequence;
}

/// <summary>
/// Extracts an <see cref="WriteError"/> object from the payload using <see cref="MessagePack"/>.
/// </summary>
/// <param name="serializedError">The payload we are trying to extract the error object from.</param>
/// <param name="expectedVersion">The protocol version we expect to be associated with the error object.</param>
/// <returns>A <see cref="WriteError"/> object if the payload is correctly formatted and has the expected protocol version,
/// null otherwise. </returns>
internal WriteError? DeserializeWriteError(ReadOnlySequence<byte> serializedError, int expectedVersion)
{
MessagePackReader reader = new(serializedError);

// The payload should only have the error message and the protocol version.
if (reader.ReadArrayHeader() != 2)
{
return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would probably be worth tracing about. Also, generally speaking we should read as much as we recognize, and disregard additional fields. For example, seeing 3 fields shouldn't change our ability to deserialize the first two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see but then if we see three fields where we expect to see only two should be communicated to the user. I think as a middle ground, I will add a trace to warn the user that we encountered additional fields but not make it a blocking error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. If you trace it at all, please use Verbose as the tracing level, as it is perfectly acceptable for some future version of the protocol to add a 3rd field and interop with a remote party that only supports 2 and for them to interact without errors or excessive logging.

}

// Verify that the protocol version of the payload matches our expected value
int senderVersion = reader.ReadInt32();
if (senderVersion != expectedVersion)
{
return null;
}

// Extract the error message and use that to create the write error object
string errorMessage = reader.ReadString();
return new WriteError(errorMessage);
}

protected virtual (FrameHeader Header, ReadOnlySequence<byte> Payload) DeserializeFrame(ReadOnlySequence<byte> frameSequence)
{
var reader = new MessagePackReader(frameSequence);
Expand Down
32 changes: 32 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.WriteError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Andrew Arnott. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Nerdbank.Streams
{
/// <content>
/// Contains the <see cref="WriteError"/> nested type.
/// </content>
public partial class MultiplexingStream
{
/// <summary>
/// A class containing information about a write error and which is sent to the
/// remote alongside <see cref="MultiplexingStream.ControlCode.ContentWritingError"/>.
/// </summary>
internal class WriteError
{
/// <summary>
/// Initializes a new instance of the <see cref="WriteError"/> class.
/// </summary>
/// <param name="message">The error message we want to send to the receiver.</param>
internal WriteError(string message)
{
this.ErrorMessage = message;
}

/// <summary>
/// Gets the error message associated with this error.
/// </summary>
internal string ErrorMessage { get; }
}
}
}
Loading