Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiplexingStream transmit errors when shutting down channels #517

Merged
merged 86 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
2df4bb9
Read exception being received but not thrown
sarda-devesh May 30, 2022
439906b
Basic Error Test
sarda-devesh May 31, 2022
50aeb0f
Update test to make sure error message is received
sarda-devesh May 31, 2022
5410649
Added while loop in error test
sarda-devesh Jun 1, 2022
80b899e
Implemented basic interface for error serialization
sarda-devesh Jun 5, 2022
e9b529f
Added in locking for Dispose in MultiplexingStream.Channel and revert…
sarda-devesh Jun 9, 2022
e860c10
Merge branch 'AArnott:main' into main
sarda-devesh Jun 9, 2022
d2b6a00
Updated SDK version locally
sarda-devesh Jun 9, 2022
4cc7a2d
Implemented custom serialization using messagepack
sarda-devesh Jun 14, 2022
d3560e1
Added support for write error only in V2 and V3
sarda-devesh Jun 18, 2022
89d6744
Accepted all incoming changes
sarda-devesh Jun 23, 2022
409c07a
Merge branch 'AArnott-main' into main
sarda-devesh Jun 23, 2022
33be374
Removed all changes in current branch
sarda-devesh Jun 23, 2022
19fc8f3
Reimplemented formatter code for messpack to write error interaction
sarda-devesh Jun 23, 2022
8d3e2b8
Rewrote client side code of dealing content write error
sarda-devesh Jun 23, 2022
b379da9
Rewrote code on remote to process write error but not close stream
sarda-devesh Jun 23, 2022
738e67b
Implemented sketch solution to get OfferPipeWithError to pass
sarda-devesh Jun 23, 2022
61a6edd
Changed dispose to close writer with exception if exception passed in…
sarda-devesh Jun 25, 2022
2b86c2d
Changed to using faulting exception instead of custom created field
sarda-devesh Jun 28, 2022
1cc3272
Updated formatter for error message
sarda-devesh Jul 25, 2022
425a208
Modified formatter to handle errors
sarda-devesh Jul 25, 2022
90f0fd3
Implemented error handling in the channel
sarda-devesh Jul 25, 2022
d3d270d
Wrote unit test
sarda-devesh Jul 25, 2022
783068c
Passing basic error test
sarda-devesh Jul 25, 2022
7e1ba18
Merge branch 'main' of https://github.com/AArnott/Nerdbank.Streams in…
sarda-devesh Jul 25, 2022
c95905b
Resolved merge conflict?
sarda-devesh Jul 25, 2022
8b0e062
Merge pull request #3 from sarda-devesh/devesh/main
sarda-devesh Jul 25, 2022
9a28a50
Updated documentation
sarda-devesh Jul 25, 2022
d2ed543
Channels get completed with errors
sarda-devesh Jul 25, 2022
7d23ff7
Changed C# Stream to complete with errors
sarda-devesh Jul 30, 2022
75496f7
Added public field of remote exception to easily access remote error
sarda-devesh Jul 30, 2022
9116de1
Added interop test to ensure that error is sent properly
sarda-devesh Jul 30, 2022
7932c26
Changed C# code to not have public field
sarda-devesh Aug 3, 2022
e6ffbec
Code push
sarda-devesh Aug 5, 2022
9ba19a7
Code save
sarda-devesh Aug 5, 2022
1f6e252
Changed back to basic state
sarda-devesh Aug 5, 2022
317d1eb
Complete restore try
sarda-devesh Aug 5, 2022
1a64f90
Downloaded code from upstream master
sarda-devesh Aug 5, 2022
0e9656c
Changed formatter to support formatting error messages
sarda-devesh Aug 9, 2022
9448156
Added code on the remote to receive exceptions from the sender
sarda-devesh Aug 9, 2022
26add64
Remote side completes with an error test passing
sarda-devesh Aug 9, 2022
27d56a7
All MultiplexingStream tests passing
sarda-devesh Aug 10, 2022
61ca8ff
v2 Interop tests passing
sarda-devesh Aug 18, 2022
b8721f4
Passing all typescript tests
sarda-devesh Aug 18, 2022
e4f8cea
Including changes in Nerdbank.Stream
sarda-devesh Aug 18, 2022
626242e
All C# tests passsing
sarda-devesh Aug 18, 2022
00dd45c
Fixed style issues in C# code
sarda-devesh Aug 20, 2022
a8c5fbd
Fixed typescript style issue
sarda-devesh Aug 20, 2022
f9ed5a2
Added comments to C# test
sarda-devesh Aug 20, 2022
ed2d5d6
Added a writing error class
sarda-devesh Aug 20, 2022
270a484
Updated writing error class in C#
sarda-devesh Aug 20, 2022
da0f302
Merge branch 'AArnott:main' into main
sarda-devesh Aug 20, 2022
6cd7a62
Fixed linter issues
sarda-devesh Aug 20, 2022
1c92e32
Some doc and syntax touch-ups
AArnott Aug 25, 2022
6b3691e
More syntax touch-ups
AArnott Aug 25, 2022
2021f29
Fix a regression my recent syntax changes made
AArnott Aug 25, 2022
06adacf
Fix doc comment
AArnott Aug 25, 2022
1e87e00
Fixed style issues
sarda-devesh Aug 25, 2022
782ad6d
Revert namespace change
sarda-devesh Aug 25, 2022
d6ec961
Fixed build errors
sarda-devesh Aug 25, 2022
0586aeb
Switched to using protocol version for cast checking
sarda-devesh Aug 27, 2022
562caf2
Added fault method
sarda-devesh Aug 27, 2022
3e4baf8
Fixed formatter mismatch
sarda-devesh Aug 27, 2022
d8dcb0c
Moved newly added tests with other tests
sarda-devesh Aug 29, 2022
64a3e03
Merge branch 'AArnott:main' into main
sarda-devesh Sep 15, 2022
2aaea24
Added check for channel dispoal before sending of error
sarda-devesh Sep 17, 2022
d0a63e5
Code push to see error in pipeline
sarda-devesh Sep 17, 2022
bafac8a
Fixed uncessary lock acquisiton
sarda-devesh Sep 18, 2022
3b7352b
Tried to get error to show up locally
sarda-devesh Sep 19, 2022
ebf0903
Added trace statements for better error visibility
sarda-devesh Sep 19, 2022
b33e34e
Changed channel options error to be more verbose
sarda-devesh Sep 19, 2022
3de306f
Merge branch 'AArnott:main' into main
sarda-devesh Sep 19, 2022
fd44d7e
Tried to fix race condition
sarda-devesh Sep 20, 2022
738add2
Only swallow exception if it was not user specified
sarda-devesh Sep 20, 2022
a2eae37
Retrigger build pipeline
sarda-devesh Sep 20, 2022
53a142b
Add check in TryAcceptOffer that channel is disposed
sarda-devesh Sep 20, 2022
29fc1f8
Verify that it was an object disposed exception
sarda-devesh Sep 20, 2022
3ab4f46
Revert to old state to check for disposal
sarda-devesh Sep 20, 2022
b38d67b
Retrigger build pipeline try 2
sarda-devesh Sep 20, 2022
b0248f5
Checked for acceptance transition to faulted
sarda-devesh Sep 20, 2022
d9f4933
Added log message inside AcceptChannelOrThrow
sarda-devesh Sep 20, 2022
9c3360b
Retrigger build pipeline try 3
sarda-devesh Sep 20, 2022
9b40d83
Check for both acceptance and completion
sarda-devesh Sep 20, 2022
125e2fa
Pipeline rebuild
sarda-devesh Sep 20, 2022
f6819e9
Keep track of channel acceptance in process outbound
sarda-devesh Sep 20, 2022
a2aef17
Touch-ups
AArnott Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 137 additions & 31 deletions src/Nerdbank.Streams/MultiplexingStream.Channel.cs

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.ChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public ChannelOptions()
/// The <see cref="PipeWriter"/> specified in <see cref="IDuplexPipe.Output"/> *must* be created with <see cref="PipeOptions.PauseWriterThreshold"/> that *exceeds*
/// the value of <see cref="ChannelReceivingWindowSize"/> and <see cref="Options.DefaultChannelReceivingWindowSize"/>.
/// </para>
/// <para>
/// A faulted <see cref="IDuplexPipe.Input" /> (one where <see cref="PipeReader.CompleteAsync(Exception)" /> is called with an exception)
/// will have its exception relayed to the remote party before closing the channel.
/// </para>
/// </remarks>
/// <exception cref="ArgumentException">Thrown if set to an <see cref="IDuplexPipe"/> that returns <c>null</c> for either of its properties.</exception>
public IDuplexPipe? ExistingPipe
Expand Down
7 changes: 7 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ internal enum ControlCode : byte
/// allowing them to send more data.
/// </summary>
ContentProcessed,

/// <summary>
/// Sent when one party experiences an exception related to a particular channel and carries details regarding the error,
/// when using protocol version 2 or later.
/// This is sent before a <see cref="ContentWritingCompleted"/> frame closes that channel.
/// </summary>
ContentWritingError,
}
}
}
48 changes: 48 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.Formatters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Nerdbank.Streams
{
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
Expand Down Expand Up @@ -495,6 +496,53 @@ internal override Channel.AcceptanceParameters DeserializeAcceptanceParameters(R
return new Channel.AcceptanceParameters(remoteWindowSize);
}

/// <summary>
/// Returns the serialized representation of a <see cref="WriteError"/> object using <see cref="MessagePack"/>.
/// </summary>
/// <param name="error">An instance of <see cref="WriteError"/> that we want to seralize.</param>
/// <returns>A <see cref="Sequence{T}"/> which is the serialized version of the error.</returns>
internal ReadOnlySequence<byte> SerializeWriteError(WriteError error)
{
// Create the payload
Sequence<byte> errorSequence = new();
MessagePackWriter writer = new(errorSequence);

// Write the error message and the protocol version to the payload
writer.WriteArrayHeader(1);
writer.Write(error.Message);

// Return the payload to the caller
writer.Flush();
return errorSequence.AsReadOnlySequence;
}

/// <summary>
/// Extracts an <see cref="WriteError"/> object from the payload using <see cref="MessagePack"/>.
/// </summary>
/// <param name="serializedError">The payload we are trying to extract the error object from.</param>
/// <returns>A <see cref="WriteError"/> object.</returns>
internal WriteError DeserializeWriteError(ReadOnlySequence<byte> serializedError)
{
MessagePackReader reader = new(serializedError);
int numElements = reader.ReadArrayHeader();

string? errorMessage = null;
for (int i = 0; i < numElements; i++)
{
switch (i)
{
case 0:
errorMessage = reader.ReadString();
break;
default:
reader.Skip();
break;
}
}

return new WriteError(errorMessage ?? string.Empty);
}

protected virtual (FrameHeader Header, ReadOnlySequence<byte> Payload) DeserializeFrame(ReadOnlySequence<byte> frameSequence)
{
var reader = new MessagePackReader(frameSequence);
Expand Down
32 changes: 32 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.WriteError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Andrew Arnott. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Nerdbank.Streams
{
/// <content>
/// Contains the <see cref="WriteError"/> nested type.
/// </content>
public partial class MultiplexingStream
{
/// <summary>
/// A class containing information about a write error and which is sent to the
/// remote alongside <see cref="MultiplexingStream.ControlCode.ContentWritingError"/>.
/// </summary>
internal class WriteError
{
/// <summary>
/// Initializes a new instance of the <see cref="WriteError"/> class.
/// </summary>
/// <param name="message">The error message we want to send to the receiver.</param>
internal WriteError(string? message)
{
this.Message = message;
}

/// <summary>
/// Gets the error message associated with this error.
/// </summary>
internal string? Message { get; }
}
}
}
130 changes: 121 additions & 9 deletions src/Nerdbank.Streams/MultiplexingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ private enum TraceEventId
/// Raised when the protocol handshake is starting, to annouce the major version being used.
/// </summary>
HandshakeStarted,

/// <summary>
/// Raised when receiving or sending a <see cref="ControlCode.ContentWritingError"/>.
/// </summary>
WriteError,

/// <summary>
/// An error occurred that is likely not fatal.
/// </summary>
NonFatalInternalError,
}

/// <summary>
Expand All @@ -220,7 +230,7 @@ private enum TraceEventId
/// Gets the logger used by this instance.
/// </summary>
/// <value>Never null.</value>
public TraceSource TraceSource { get; }
public TraceSource TraceSource { get; private set; }

/// <summary>
/// Gets the default window size used for new channels that do not specify a value for <see cref="ChannelOptions.ChannelReceivingWindowSize"/>.
Expand All @@ -244,6 +254,11 @@ private enum TraceEventId
/// </summary>
private Func<QualifiedChannelId, string, TraceSource?>? DefaultChannelTraceSourceFactory { get; }

/// <summary>
/// Gets a value indicating whether this stream can send or receive frames of type<see cref="ControlCode.ContentWritingError"/>.
/// </summary>
private bool ContentWritingErrorSupported => this.protocolMajorVersion > 1;

/// <summary>
/// Initializes a new instance of the <see cref="MultiplexingStream"/> class
/// with <see cref="Options.ProtocolMajorVersion"/> set to 3.
Expand Down Expand Up @@ -837,6 +852,9 @@ private async Task ReadStreamAsync()
case ControlCode.ChannelTerminated:
await this.OnChannelTerminatedAsync(header.RequiredChannelId).ConfigureAwait(false);
break;
case ControlCode.ContentWritingError:
this.OnContentWritingError(header.RequiredChannelId, frame.Value.Payload);
break;
default:
break;
}
Expand Down Expand Up @@ -919,6 +937,56 @@ private async Task OnChannelTerminatedAsync(QualifiedChannelId channelId)
}
}

/// <summary>
/// Occurs when the channel receives a frame with code <see cref="ControlCode.ContentWritingError"/> from the remote.
/// </summary>
/// <param name="channelId">The channel id of the sender of the frame.</param>
/// <param name="payload">The payload that the sender sent in the frame.</param>
private void OnContentWritingError(QualifiedChannelId channelId, ReadOnlySequence<byte> payload)
{
// Get the channel that send this frame
Channel channel;
lock (this.syncObject)
{
channel = this.openChannels[channelId];
}

// Determines if the channel is in a state to receive messages
bool channelInValidState = channelId.Source != ChannelSource.Local || channel.IsAccepted;

// If the channel is in a valid state and we have a valid protocol version, then process the message
if (channelInValidState && this.ContentWritingErrorSupported)
{
// Deserialize the payload and verify that it was in an expected state
V2Formatter errorDeserializingFormattter = (V2Formatter)this.formatter;
WriteError error = errorDeserializingFormattter.DeserializeWriteError(payload);

// Get the error message and complete the channel using it
string errorMessage = error.Message ?? "<unspecified>";
MultiplexingProtocolException channelClosingException = new MultiplexingProtocolException($"Remote party indicated writing error: {errorMessage}");
channel.OnContentWritingCompleted(channelClosingException);
}
else if (channelInValidState && !this.ContentWritingErrorSupported)
{
// The channel is in a valid state but we have a protocol version that doesn't support processing errrors
// so don't do anything.
if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Warning) ?? false)
{
this.TraceSource.TraceEvent(
TraceEventType.Warning,
(int)TraceEventId.WriteError,
"Rejecting writing error from channel {0} as MultiplexingStream has protocol version of {1}",
channelId,
this.protocolMajorVersion);
}
}
else
{
// The channel is in an invalid state so throw an error indicating so
throw new MultiplexingProtocolException($"Remote party indicated they encountered errors writing to channel {channelId} before accepting it.");
}
}

private void OnContentWritingCompleted(QualifiedChannelId channelId)
{
Channel channel;
Expand Down Expand Up @@ -1064,12 +1132,7 @@ private bool TryAcceptChannel(Channel channel, ChannelOptions options)
Requires.NotNull(channel, nameof(channel));
Requires.NotNull(options, nameof(options));

if (channel.TryAcceptOffer(options))
{
return true;
}

return false;
return channel.TryAcceptOffer(options);
}

private void AcceptChannelOrThrow(Channel channel, ChannelOptions options)
Expand All @@ -1079,7 +1142,12 @@ private void AcceptChannelOrThrow(Channel channel, ChannelOptions options)

if (!this.TryAcceptChannel(channel, options))
{
if (channel.IsAccepted)
// If we disposed of the channel due to a user provided error then ignore the error.
if (channel.IsDisposed && (channel.Completion.IsFaulted || channel.Acceptance.IsFaulted))
{
return;
}
else if (channel.IsAccepted)
{
throw new InvalidOperationException("Channel is already accepted.");
}
Expand Down Expand Up @@ -1113,6 +1181,50 @@ private void OnChannelDisposed(Channel channel)
}
}

/// <summary>
/// Informs the remote party of a local error that prevents sending all the required data to this channel
/// by transmitting a <see cref="ControlCode.ContentWritingError"/> frame.
/// </summary>
/// <param name="channel">The channel whose writing was halted.</param>
/// <param name="exception">The exception that caused the writing to be haulted.</param>
private void OnChannelWritingError(Channel channel, Exception exception)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Error))
{
this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.WriteError, "Local channel {0} encountered write error {1}", channel.QualifiedId, exception.Message);
}

// Verify that we can send a message over this channel.
// The race condition here is handled within SendFrameAsync which will drop the frame
// if the conditions we're checking for here change after we check them, so our check here
// is just an optimization to avoid work when we can predict its failure.
bool channelInValidState = true;
lock (this.syncObject)
{
channelInValidState = !this.channelsPendingTermination.Contains(channel.QualifiedId)
&& this.openChannels.ContainsKey(channel.QualifiedId);
}

// If we can send messages over this channel and we have the correct protocol version then send the error
if (channelInValidState && this.ContentWritingErrorSupported)
{
// Create the payload to send to the remote side
V2Formatter errorSerializationFormatter = (V2Formatter)this.formatter;
WriteError error = new(exception.Message);
ReadOnlySequence<byte> serializedError = errorSerializationFormatter.SerializeWriteError(error);

// Create the frame header indicating that we encountered a content writing error
FrameHeader header = new FrameHeader
{
Code = ControlCode.ContentWritingError,
ChannelId = channel.QualifiedId,
};

// Send the frame alongside the payload to the remote side
this.SendFrame(header, serializedError, CancellationToken.None);
}
}

/// <summary>
/// Indicates that the local end will not be writing any more data to this channel,
/// leading to the transmission of a <see cref="ControlCode.ContentWritingCompleted"/> frame being sent for this channel.
Expand Down Expand Up @@ -1191,7 +1303,7 @@ private async Task SendFrameAsync(FrameHeader header, ReadOnlySequence<byte> pay
// In such cases, we should just suppress transmission of the frame because the other side does not care.
// ContentWritingCompleted can be sent to SendFrame after a ChannelTerminated message such that neither have been transmitted yet
// and thus wasn't in the termination collection until later, so forgive that too.
if (header.Code is ControlCode.ContentProcessed or ControlCode.ContentWritingCompleted)
if (header.Code is ControlCode.ContentProcessed or ControlCode.ContentWritingCompleted or ControlCode.ContentWritingError)
{
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.FrameSendSkipped, "Skipping {0} frame for channel {1} because we're about to terminate it.", header.Code, header.ChannelId);
return;
Expand Down
47 changes: 44 additions & 3 deletions src/nerdbank-streams/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ export abstract class Channel implements IDisposableObservable {
return this._isDisposed;
}

/**
* Closes this channel after transmitting an error to the remote party.
* @param error The error to transmit to the remote party.
*/
public fault(error: Error): Promise<void> {
// The interesting stuff is in the derived class.
return Promise.resolve();
}

/**
* Closes this channel.
*/
Expand All @@ -71,6 +80,7 @@ export class ChannelClass extends Channel {
private readonly _completion = new Deferred<void>();
public localWindowSize?: number;
private remoteWindowSize?: number;
private remoteError?: Error;

/**
* The number of bytes transmitted from here but not yet acknowledged as processed from there,
Expand Down Expand Up @@ -214,7 +224,13 @@ export class ChannelClass extends Channel {
return this._acceptance.resolve();
}

public onContent(buffer: Buffer | null) {
public onContent(buffer: Buffer | null, error?: Error) {
// If we have already received an error from the remote party, then don't process any future messages.
if (this.remoteError) {
return;
}

this.remoteError = error;
this._duplex.push(buffer);

// We should find a way to detect when we *actually* share the received buffer with the Channel's user
Expand Down Expand Up @@ -244,18 +260,43 @@ export class ChannelClass extends Channel {
}
}

public async fault(error: Error) {
// If the channel is already disposed then don't do anything
if (this.isDisposed) {
return;
}

// Send the error message to the remote side
await this._multiplexingStream.onChannelWritingError(this, error);

// Set the remote exception to the passed in error so that the channel is
// completed with this error
this.remoteError = error;

// Dispose of the channel
await this.dispose();
}

public async dispose() {
if (!this.isDisposed) {
super.dispose();

this._acceptance.reject(new CancellationToken.CancellationError("disposed"));

// For the pipes, we Complete *our* ends, and leave the user's ends alone.
// The completion will propagate when it's ready to.
// The completion will propagate when it's ready to. No need to destroy the duplex
// as the frame containing the error message has already been sent.
this._duplex.end();
this._duplex.push(null);

this._completion.resolve();
// If we are sending an error to the remote side or received an error from the remote,
// relay that information to the clients.
if (this.remoteError) {
this._completion.reject(this.remoteError);
} else {
this._completion.resolve();
}

await this._multiplexingStream.onChannelDisposed(this);
}
}
Expand Down
Loading