Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include an error message payload alongside ChannelTerminated frames #536

Merged
merged 3 commits into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 135 additions & 74 deletions src/Nerdbank.Streams/MultiplexingStream.Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class Channel : IDisposableObservable, IDuplexPipe
private long? localWindowSize;

/// <summary>
/// Indicates whether the <see cref="Dispose"/> method has been called.
/// Indicates whether the <see cref="Dispose(Exception)"/> method has been called.
/// </summary>
private bool isDisposed;

Expand Down Expand Up @@ -322,95 +322,122 @@ private long RemoteWindowRemaining
private bool BackpressureSupportEnabled => this.MultiplexingStream.protocolMajorVersion > 1;

/// <summary>
/// Closes this channel and releases all resources associated with it.
/// Immediately terminates the channel and shuts down any ongoing communication.
/// </summary>
/// <remarks>
/// Because this method may terminate the channel immediately and thus can cause previously queued content to not actually be received by the remote party,
/// consider this method a "break glass" way of terminating a channel. The preferred method is that both sides "complete writing" and let the channel dispose itself.
/// </remarks>
public void Dispose()
public void Dispose() => this.Dispose(null);

/// <summary>
/// Disposes the channel by releasing all resources associated with it.
/// </summary>
/// <param name="disposeException">The exception to dispose this channel with.</param>
internal void Dispose(Exception? disposeException)
{
if (!this.IsDisposed)
// Ensure that we don't call dispose more than once.
lock (this.SyncObject)
{
this.acceptanceSource.TrySetCanceled();
this.optionsAppliedTaskSource?.TrySetCanceled();

PipeWriter? mxStreamIOWriter;
lock (this.SyncObject)
if (this.isDisposed)
{
this.isDisposed = true;
mxStreamIOWriter = this.mxStreamIOWriter;
return;
}

// Complete writing so that the mxstream cannot write to this channel any more.
// We must also cancel a pending flush since no one is guaranteed to be reading this any more
// and we don't want to deadlock on a full buffer in a disposed channel's pipe.
if (mxStreamIOWriter is not null)
{
mxStreamIOWriter.CancelPendingFlush();
_ = this.mxStreamIOWriterSemaphore.EnterAsync().ContinueWith(
static (releaser, state) =>
{
try
{
Channel self = (Channel)state;
// First call to dispose
this.isDisposed = true;
}

PipeWriter? mxStreamIOWriter;
lock (self.SyncObject)
{
mxStreamIOWriter = self.mxStreamIOWriter;
}
this.acceptanceSource.TrySetCanceled();
this.optionsAppliedTaskSource?.TrySetCanceled();

mxStreamIOWriter?.Complete();
self.mxStreamIOWriterCompleted.Set();
}
finally
PipeWriter? mxStreamIOWriter;
lock (this.SyncObject)
{
mxStreamIOWriter = this.mxStreamIOWriter;
}

// Complete writing so that the mxstream cannot write to this channel any more.
// We must also cancel a pending flush since no one is guaranteed to be reading this any more
// and we don't want to deadlock on a full buffer in a disposed channel's pipe.
if (mxStreamIOWriter is not null)
{
mxStreamIOWriter.CancelPendingFlush();
_ = this.mxStreamIOWriterSemaphore.EnterAsync().ContinueWith(
static (releaser, state) =>
{
try
{
Channel self = (Channel)state;

PipeWriter? mxStreamIOWriter;
lock (self.SyncObject)
{
releaser.Result.Dispose();
mxStreamIOWriter = self.mxStreamIOWriter;
}
},
this,
CancellationToken.None,
TaskContinuationOptions.OnlyOnRanToCompletion,
TaskScheduler.Default);
}

if (this.mxStreamIOReader is not null)
{
// We don't own the user's PipeWriter to complete it (so they can't write anything more to this channel).
// We can't know whether there is or will be more bytes written to the user's PipeWriter,
// but we need to terminate our reader for their writer as part of reclaiming resources.
// Cancel the pending or next read operation so the reader loop will immediately notice and shutdown.
this.mxStreamIOReader.CancelPendingRead();

// Only Complete the reader if our async reader doesn't own it to avoid thread-safety bugs.
PipeReader? mxStreamIOReader = null;
lock (this.SyncObject)
{
if (this.mxStreamIOReader is not UnownedPipeReader)
mxStreamIOWriter?.Complete();
self.mxStreamIOWriterCompleted.Set();
}
finally
{
mxStreamIOReader = this.mxStreamIOReader;
this.mxStreamIOReader = null;
releaser.Result.Dispose();
}
}
},
this,
CancellationToken.None,
TaskContinuationOptions.OnlyOnRanToCompletion,
TaskScheduler.Default);
}

mxStreamIOReader?.Complete();
if (this.mxStreamIOReader is not null)
{
// We don't own the user's PipeWriter to complete it (so they can't write anything more to this channel).
// We can't know whether there is or will be more bytes written to the user's PipeWriter,
// but we need to terminate our reader for their writer as part of reclaiming resources.
// Cancel the pending or next read operation so the reader loop will immediately notice and shutdown.
this.mxStreamIOReader.CancelPendingRead();

// Only Complete the reader if our async reader doesn't own it to avoid thread-safety bugs.
PipeReader? mxStreamIOReader = null;
lock (this.SyncObject)
{
if (this.mxStreamIOReader is not UnownedPipeReader)
{
mxStreamIOReader = this.mxStreamIOReader;
this.mxStreamIOReader = null;
}
}

// Unblock the reader that might be waiting on this.
this.remoteWindowHasCapacity.Set();
mxStreamIOReader?.Complete();
}

this.disposalTokenSource.Cancel();
// Set the completion source based on whether we are disposing due to an error
if (disposeException != null)
{
this.completionSource.TrySetException(disposeException);
}
else
{
this.completionSource.TrySetResult(null);
this.MultiplexingStream.OnChannelDisposed(this);
}

// Unblock the reader that might be waiting on this.
this.remoteWindowHasCapacity.Set();

this.disposalTokenSource.Cancel();
this.MultiplexingStream.OnChannelDisposed(this, disposeException);
}

internal async Task OnChannelTerminatedAsync()
internal async Task OnChannelTerminatedAsync(Exception? remoteError = null)
{
if (this.IsDisposed)
// Don't process the frame if the channel has already been disposed.
lock (this.SyncObject)
{
return;
if (this.isDisposed)
{
return;
}
}

try
Expand All @@ -424,6 +451,16 @@ internal async Task OnChannelTerminatedAsync()
{
// We fell victim to a race condition. It's OK to just swallow it because the writer was never created, so it needn't be completed.
}

// Terminate the channel.
this.DisposeSelfOnFailure(Task.Run(async delegate
{
// Ensure that we processed the channel before terminating it.
await this.OptionsApplied.ConfigureAwait(false);

this.IsRemotelyTerminated = true;
this.Dispose(remoteError);
}));
}

/// <summary>
Expand Down Expand Up @@ -771,6 +808,7 @@ private async Task ProcessOutboundTransmissionsAsync()

// We don't use a CancellationToken on this call because we prefer the exception-free cancellation path used by our Dispose method (CancelPendingRead).
ReadResult result = await mxStreamIOReader.ReadAsync().ConfigureAwait(false);

if (result.IsCanceled)
{
// We've been asked to cancel. Presumably the channel has faulted or been disposed.
Expand Down Expand Up @@ -832,13 +870,25 @@ private async Task ProcessOutboundTransmissionsAsync()
}
catch (Exception ex)
{
if (ex is OperationCanceledException && this.DisposalToken.IsCancellationRequested)
await mxStreamIOReader!.CompleteAsync(ex).ConfigureAwait(false);

// Record this as a faulting exception if the channel hasn't been disposed.
lock (this.SyncObject)
{
await mxStreamIOReader!.CompleteAsync().ConfigureAwait(false);
if (!this.IsDisposed)
{
this.faultingException ??= ex;
}
}
else

// Add a trace indicating that we caught an exception.
if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
{
await mxStreamIOReader!.CompleteAsync(ex).ConfigureAwait(false);
this.TraceSource.TraceEvent(
TraceEventType.Error,
0,
"Rethrowing caught exception in " + nameof(this.ProcessOutboundTransmissionsAsync) + ": {0}",
ex.Message);
}

throw;
Expand Down Expand Up @@ -911,23 +961,34 @@ private async Task AutoCloseOnPipesClosureAsync()
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.ChannelAutoClosing, "Channel {0} \"{1}\" self-closing because both reader and writer are complete.", this.QualifiedId, this.Name);
}

this.Dispose();
this.Dispose(null);
}

private void Fault(Exception exception)
{
if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Critical) ?? false)
{
this.TraceSource!.TraceEvent(TraceEventType.Critical, (int)TraceEventId.FatalError, "Channel Closing self due to exception: {0}", exception);
}
this.mxStreamIOReader?.CancelPendingRead();

// If the channel has already been disposed then only cancel the reader
lock (this.SyncObject)
{
if (this.isDisposed)
{
return;
}

this.faultingException ??= exception;
}

this.mxStreamIOReader?.CancelPendingRead();
this.Dispose();
if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Error) ?? false)
{
this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.ChannelFatalError, "Channel faulted with exception: {0}", this.faultingException);
if (exception != this.faultingException)
{
this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.ChannelFatalError, "A subsequent fault exception was reported: {0}", exception);
}
}

this.Dispose(this.faultingException);
}

private void DisposeSelfOnFailure(Task task)
Expand Down
51 changes: 51 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.Formatters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,57 @@ internal override void WriteFrame(FrameHeader header, ReadOnlySequence<byte> pay
}
}

/// <summary>
/// Creates a payload for a <see cref="ControlCode.ChannelTerminated"/> frame.
/// </summary>
/// <param name="exception">The exception to send to the remote side if there is one.</param>
/// <returns>The payload to send when a channel gets terminated.</returns>
internal ReadOnlySequence<byte> SerializeException(Exception? exception)
{
if (exception == null)
{
return ReadOnlySequence<byte>.Empty;
}

var sequence = new Sequence<byte>();
var writer = new MessagePackWriter(sequence);

writer.WriteArrayHeader(1);

// Get the exception to send to the remote side
writer.Write($"{exception.GetType().Name}: {exception.Message}");
writer.Flush();

return sequence;
}

/// <summary>
/// Gets the error message in the payload if there is one.
/// </summary>
/// <param name="payload">The payload that could contain an error message.</param>
/// <returns>The error message in this payload if there is one, null otherwise.</returns>
internal Exception? DeserializeException(ReadOnlySequence<byte> payload)
{
// An empty payload means the remote side closed the channel without an exception.
if (payload.IsEmpty)
{
return null;
}

var reader = new MessagePackReader(payload);
int numElements = reader.ReadArrayHeader();
sarda-devesh marked this conversation as resolved.
Show resolved Hide resolved

// We received an empty payload.
if (numElements == 0)
{
return null;
}

// Get the exception message and return it as an exception.
string remoteErrorMsg = reader.ReadString();
return new MultiplexingProtocolException($"Received error from remote side: {remoteErrorMsg}");
}

internal override ReadOnlySequence<byte> SerializeContentProcessed(long bytesProcessed)
{
var sequence = new Sequence<byte>();
Expand Down
Loading