Skip to content

Commit

Permalink
Touch-up on Devesh's pull request
Browse files Browse the repository at this point in the history
  • Loading branch information
AArnott committed Dec 17, 2022
1 parent 4999e11 commit 7284090
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 141 deletions.
Binary file removed .DS_Store
Binary file not shown.
53 changes: 24 additions & 29 deletions src/Nerdbank.Streams/MultiplexingStream.Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class Channel : IDisposableObservable, IDuplexPipe
private long? localWindowSize;

/// <summary>
/// Indicates whether the <see cref="Dispose"/> method has been called.
/// Indicates whether the <see cref="Dispose(Exception)"/> method has been called.
/// </summary>
private bool isDisposed;

Expand Down Expand Up @@ -322,28 +322,21 @@ private long RemoteWindowRemaining
private bool BackpressureSupportEnabled => this.MultiplexingStream.protocolMajorVersion > 1;

/// <summary>
/// Immediately terminates the channel and shutdowns any ongoing communication.
/// Immediately terminates the channel and shuts down any ongoing communication.
/// </summary>
/// <remarks>
/// Because this method may terminate the channel immediately and thus can cause previously queued content to not actually be received by the remote party,
/// consider this method a "break glass" way of terminating a channel. The preferred method is that both sides "complete writing" and let the channel dispose itself.
/// </remarks>
public void Dispose()
{
string userDisposeMsg = "User triggered disposal";
MultiplexingProtocolException userDisposeException = new MultiplexingProtocolException(userDisposeMsg);

this.DisposeChannel(userDisposeException);
}
public void Dispose() => this.Dispose(null);

/// <summary>
/// Disposes the channel by releasing all resources associated with it.
/// </summary>
/// <param name="disposeException">The exception to dispose this channel with,
/// defaults to null.</param>
internal void DisposeChannel(Exception? disposeException = null)
/// <param name="disposeException">The exception to dispose this channel with.</param>
internal void Dispose(Exception? disposeException)
{
// Ensure that we don't call dispose more than once
// Ensure that we don't call dispose more than once.
lock (this.SyncObject)
{
if (this.isDisposed)
Expand Down Expand Up @@ -438,7 +431,7 @@ internal void DisposeChannel(Exception? disposeException = null)

internal async Task OnChannelTerminatedAsync(Exception? remoteError = null)
{
// Don't process the frame if the channel has already been diposed
// Don't process the frame if the channel has already been disposed.
lock (this.SyncObject)
{
if (this.isDisposed)
Expand All @@ -459,17 +452,14 @@ internal async Task OnChannelTerminatedAsync(Exception? remoteError = null)
// We fell victim to a race condition. It's OK to just swallow it because the writer was never created, so it needn't be completed.
}

// Terminate the channel
// Terminate the channel.
this.DisposeSelfOnFailure(Task.Run(async delegate
{
// Ensure that we processed the channel before terminating it
// Ensure that we processed the channel before terminating it.
await this.OptionsApplied.ConfigureAwait(false);

// Record that we were terminated remotely
this.IsRemotelyTerminated = true;

// Dispose of the channel
this.DisposeChannel(remoteError);
this.Dispose(remoteError);
}));
}

Expand Down Expand Up @@ -731,15 +721,16 @@ private void ApplyChannelOptions(ChannelOptions channelOptions)
this.mxStreamIOReaderCompleted = this.ProcessOutboundTransmissionsAsync();
this.DisposeSelfOnFailure(this.mxStreamIOReaderCompleted);
this.DisposeSelfOnFailure(this.AutoCloseOnPipesClosureAsync());

// Record that we have applied the channel options
this.optionsAppliedTaskSource?.TrySetResult(null);
}
catch (Exception ex)
{
this.optionsAppliedTaskSource?.TrySetException(ex);
throw;
}
finally
{
this.optionsAppliedTaskSource?.TrySetResult(null);
}
}

/// <summary>
Expand Down Expand Up @@ -890,13 +881,13 @@ private async Task ProcessOutboundTransmissionsAsync()
}
}

// Add a trace indicating that we caught an exception in ProcessOutbound
// Add a trace indicating that we caught an exception.
if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceEvent(
TraceEventType.Error,
0,
"Rethrowing caught exception in ProcessOutboundAsync: {0}",
"Rethrowing caught exception in " + nameof(this.ProcessOutboundTransmissionsAsync) + ": {0}",
ex.Message);
}

Expand Down Expand Up @@ -970,7 +961,7 @@ private async Task AutoCloseOnPipesClosureAsync()
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.ChannelAutoClosing, "Channel {0} \"{1}\" self-closing because both reader and writer are complete.", this.QualifiedId, this.Name);
}

this.DisposeChannel(null);
this.Dispose(null);
}

private void Fault(Exception exception)
Expand All @@ -988,12 +979,16 @@ private void Fault(Exception exception)
this.faultingException ??= exception;
}

if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Critical) ?? false)
if (this.TraceSource?.Switch.ShouldTrace(TraceEventType.Error) ?? false)
{
this.TraceSource!.TraceEvent(TraceEventType.Warning, 0, "Channel faulting self due to exception: {0}", this.faultingException.Message);
this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.ChannelFatalError, "Channel faulted with exception: {0}", this.faultingException);
if (exception != this.faultingException)
{
this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.ChannelFatalError, "A subsequent fault exception was reported: {0}", exception);
}
}

this.DisposeChannel(this.faultingException);
this.Dispose(this.faultingException);
}

private void DisposeSelfOnFailure(Task task)
Expand Down
19 changes: 5 additions & 14 deletions src/Nerdbank.Streams/MultiplexingStream.Formatters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ internal override void WriteFrame(FrameHeader header, ReadOnlySequence<byte> pay
}

/// <summary>
/// Creates a payload to sent alongside <see cref="ControlCode.ChannelTerminated"/>.
/// Creates a payload for a <see cref="ControlCode.ChannelTerminated"/> frame.
/// </summary>
/// <param name="exception">The exception to send to the remote side if there is one.</param>
/// <returns>The payload to send when a channel gets terminated.</returns>
Expand All @@ -428,16 +428,7 @@ internal ReadOnlySequence<byte> SerializeException(Exception? exception)
writer.WriteArrayHeader(1);

// Get the exception to send to the remote side
string errorMessage = exception.Message;
if (errorMessage == null || errorMessage.Length == 0)
{
errorMessage = string.Format(
"Exception of type '{0}' was thrown",
exception.GetType().ToString());
}

// Write the error message to the payload being sent
writer.Write(errorMessage);
writer.Write($"{exception.GetType().Name}: {exception.Message}");
writer.Flush();

return sequence;
Expand All @@ -450,7 +441,7 @@ internal ReadOnlySequence<byte> SerializeException(Exception? exception)
/// <returns>The error message in this payload if there is one, null otherwise.</returns>
internal Exception? DeserializeException(ReadOnlySequence<byte> payload)
{
// If it is empty then return null
// An empty payload means the remote side closed the channel without an exception.
if (payload.IsEmpty)
{
return null;
Expand All @@ -459,13 +450,13 @@ internal ReadOnlySequence<byte> SerializeException(Exception? exception)
var reader = new MessagePackReader(payload);
int numElements = reader.ReadArrayHeader();

// We received an empty payload
// We received an empty payload.
if (numElements == 0)
{
return null;
}

// Get the exception message and return it as an exception
// Get the exception message and return it as an exception.
string remoteErrorMsg = reader.ReadString();
return new MultiplexingProtocolException($"Received error from remote side: {remoteErrorMsg}");
}
Expand Down
47 changes: 21 additions & 26 deletions src/Nerdbank.Streams/MultiplexingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ private enum TraceEventId
{
HandshakeSuccessful = 1,
HandshakeFailed,

/// <summary>
/// A fatal error occurred at the overall multiplexing stream level, taking down the whole connection.
/// </summary>
FatalError,
UnexpectedChannelAccept,
ChannelAutoClosing,
Expand Down Expand Up @@ -209,6 +213,11 @@ private enum TraceEventId
/// Raised when the protocol handshake is starting, to annouce the major version being used.
/// </summary>
HandshakeStarted,

/// <summary>
/// A fatal exception occurred that took down one channel.
/// </summary>
ChannelFatalError,
}

/// <summary>
Expand Down Expand Up @@ -238,11 +247,6 @@ private enum TraceEventId
/// </summary>
internal CancellationToken DisposalToken => this.disposalTokenSource.Token;

/// <summary>
/// Gets a value indicating whether channels in this stream supports sending/receiving termination exceptions.
/// </summary>
private bool SupportTerminatedExceptions => this.protocolMajorVersion > 1;

/// <summary>
/// Gets a factory for <see cref="TraceSource"/> instances to attach to a newly opened <see cref="Channel"/>
/// when its <see cref="ChannelOptions.TraceSource"/> is <see langword="null"/>.
Expand Down Expand Up @@ -685,7 +689,7 @@ public async ValueTask DisposeAsync()
{
foreach (KeyValuePair<QualifiedChannelId, Channel> entry in this.openChannels)
{
entry.Value.Dispose();
entry.Value.Dispose(new ObjectDisposedException(nameof(MultiplexingStream)));
}

foreach (KeyValuePair<string, Queue<TaskCompletionSource<Channel>>> entry in this.acceptingChannels)
Expand Down Expand Up @@ -919,22 +923,18 @@ private async Task OnChannelTerminatedAsync(QualifiedChannelId channelId, ReadOn

if (channel is Channel)
{
// Try to get the exception sent from the remote side if there was one sent
Exception? remoteException = null;
if (this.SupportTerminatedExceptions && !payload.IsEmpty)
{
V2Formatter castedFormatter = (V2Formatter)this.formatter;
remoteException = castedFormatter.DeserializeException(payload);
}
// Try to get the exception sent from the remote side if there was one sent.
Exception? remoteException = (this.formatter as V2Formatter)?.DeserializeException(payload);

if (remoteException != null && this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
if (remoteException != null && this.TraceSource.Switch.ShouldTrace(TraceEventType.Error))
{
this.TraceSource.TraceEvent(
TraceEventType.Warning,
(int)TraceEventId.ChannelDisposed,
"Received Channel Terminated for channel {0} with exception: {1}",
TraceEventType.Error,
(int)TraceEventId.ChannelFatalError,
"Received {2} for channel {0} with exception: {1}",
channelId,
remoteException.Message);
remoteException.Message,
ControlCode.ChannelTerminated);
}

await channel.OnChannelTerminatedAsync(remoteException).ConfigureAwait(false);
Expand Down Expand Up @@ -1117,10 +1117,10 @@ private void AcceptChannelOrThrow(Channel channel, ChannelOptions options)
}

/// <summary>
/// Raised when <see cref="Channel.Dispose"/> is called and any local transmission is completed.
/// Raised when <see cref="Channel.Dispose(Exception?)"/> is called and any local transmission is completed.
/// </summary>
/// <param name="channel">The channel that is closing down.</param>
/// <param name="exception">The exception to sent to the remote side alongside the disposal.</param>
/// <param name="exception">The exception to send to the remote side alongside the disposal.</param>
private void OnChannelDisposed(Channel channel, Exception? exception = null)
{
Requires.NotNull(channel, nameof(channel));
Expand All @@ -1136,12 +1136,7 @@ private void OnChannelDisposed(Channel channel, Exception? exception = null)

// If there is an error and we support sending errors then
// serialize the exception and store it in the payload
ReadOnlySequence<byte> payload = default;
if (this.SupportTerminatedExceptions && exception != null)
{
V2Formatter castedFormatter = (V2Formatter)this.formatter;
payload = castedFormatter.SerializeException(exception);
}
ReadOnlySequence<byte> payload = (this.formatter as V2Formatter)?.SerializeException(exception) ?? default;

if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
Expand Down
12 changes: 6 additions & 6 deletions src/nerdbank-streams/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ export abstract class Channel implements IDisposableObservable {
}

/**
* Closes this channel. If the channel with protocol versions > 1
* are disposed with an error, then the error is transmitted to the remote side.
* Closes this channel.
* @param error An optional error to send to the remote side, if this multiplexing stream is using protocol versions >= 2.
*/
public dispose(error : Error | null = null): void {
public dispose(error?: Error | null): void {
// The interesting stuff is in the derived class.
this._isDisposed = true;
}
Expand Down Expand Up @@ -248,7 +248,7 @@ export class ChannelClass extends Channel {
}
}

public dispose(error : Error | null = null): void {
public dispose(error?: Error | null): void {
if (!this.isDisposed) {
super.dispose();

Expand All @@ -269,7 +269,7 @@ export class ChannelClass extends Channel {
}

// Send the notification, but we can't await the result of this.
caught(this._multiplexingStream.onChannelDisposed(this, error));
caught(this._multiplexingStream.onChannelDisposed(this, error ?? null));
}
}

Expand All @@ -286,4 +286,4 @@ export class ChannelClass extends Channel {
}
}
}
}
}
37 changes: 12 additions & 25 deletions src/nerdbank-streams/src/MultiplexingStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ export abstract class MultiplexingStream implements IDisposableObservable {
* @param options Options to customize the behavior of the stream.
* @returns The multiplexing stream.
*/
public static Create(
public static Create(
stream: NodeJS.ReadWriteStream,
options?: MultiplexingStreamOptions) : MultiplexingStream {
options?: MultiplexingStreamOptions): MultiplexingStream {
options ??= { protocolMajorVersion: 3 };
options.protocolMajorVersion ??= 3;

const formatter: MultiplexingStreamFormatter | undefined =
options.protocolMajorVersion === 3 ? new MultiplexingStreamV3Formatter(stream) :
undefined;
undefined;
if (!formatter) {
throw new Error(`Protocol major version ${options.protocolMajorVersion} is not supported. Try CreateAsync instead.`);
}
Expand Down Expand Up @@ -587,22 +587,15 @@ export class MultiplexingStreamClass extends MultiplexingStream {
}
}

public async onChannelDisposed(channel: ChannelClass, error : Error | null = null) {
public async onChannelDisposed(channel: ChannelClass, error: Error | null) {
if (!this._completionSource.isCompleted) {
try {
// Determine the payload to send to the error
let payloadToSend : Buffer = Buffer.alloc(0);
if(this.protocolMajorVersion > 1 && error != null) {
// Get the formatter use to serialize the error
const castedFormatter = this.formatter as MultiplexingStreamV2Formatter;

// Get the payload using the formatter
payloadToSend = castedFormatter.serializeException(error);
}
const payload = this.protocolMajorVersion > 1 && error
? (this.formatter as MultiplexingStreamV2Formatter).serializeException(error)
: Buffer.alloc(0);

// Create the header and send the frame to the remote side
const frameHeader = new FrameHeader(ControlCode.ChannelTerminated, channel.qualifiedId);
await this.sendFrameAsync(frameHeader, payloadToSend);
await this.sendFrameAsync(frameHeader, payload);
} catch (err) {
// Swallow exceptions thrown about channel disposal if the whole stream has been taken down.
if (this.isDisposed) {
Expand Down Expand Up @@ -749,17 +742,11 @@ export class MultiplexingStreamClass extends MultiplexingStream {
this.deleteOpenChannel(channelId);
this.removeChannelFromOfferedQueue(channel);

// Extract the exception that we received from the remote side
let remoteException : Error | null = null;
if(this.protocolMajorVersion > 1 && payload.length > 0) {
// Get the formatter
const castedFormatter = this.formatter as MultiplexingStreamV2Formatter;

// Extract the error using the casted formatter
remoteException = castedFormatter.deserializeException(payload);
}
// Extract the exception that we received from the remote side.
const remoteException = this.protocolMajorVersion > 1
? (this.formatter as MultiplexingStreamV2Formatter).deserializeException(payload)
: null;

// Dispose the channel
channel.dispose(remoteException);
}
}
Expand Down
Loading

0 comments on commit 7284090

Please sign in to comment.