Skip to content

Commit

Permalink
Refactor IPC communication to allow for async and cancellation.
Browse files Browse the repository at this point in the history
Refactor tests to flex both non-async and async methods.
  • Loading branch information
jander-msft committed Aug 5, 2021
1 parent c9657f9 commit 4c70bbb
Show file tree
Hide file tree
Showing 16 changed files with 755 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public EventPipeStreamProvider(MonitoringSourceConfiguration sourceConfig)
_stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
public async Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

EventPipeSession session = null;
try
{
session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
session = await client.StartEventPipeSessionAsync(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB, cancellationToken).ConfigureAwait(false);
}
catch (EndOfStreamException e)
{
Expand All @@ -49,22 +49,23 @@ public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, C
// Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
// using exceptions for normal termination of event stream.
await _stopProcessingSource.Task.ConfigureAwait(false);
StopSession(session);

await StopSessionAsync(session).ConfigureAwait(false);
});

return Task.FromResult(session.EventStream);
return session.EventStream;
}

public void StopProcessing()
{
_stopProcessingSource.TrySetResult(null);
}

private static void StopSession(EventPipeSession session)
private static async Task StopSessionAsync(EventPipeSession session)
{
try
{
session.Stop();
await session.StopAsync(CancellationToken.None).ConfigureAwait(false);
}
catch (EndOfStreamException)
{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,83 +6,125 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.NETCore.Client
{
public class EventPipeSession : IDisposable
{
private IEnumerable<EventPipeProvider> _providers;
private bool _requestRundown;
private int _circularBufferMB;
private long _sessionId;
private IpcEndpoint _endpoint;
private bool _disposedValue = false; // To detect redundant calls
private bool _stopped = false; // To detect redundant calls
private readonly IpcResponse _response;

internal EventPipeSession(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
private EventPipeSession(IpcEndpoint endpoint, IpcResponse response, long sessionId)
{
_endpoint = endpoint;
_providers = providers;
_requestRundown = requestRundown;
_circularBufferMB = circularBufferMB;

var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown);
var message = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2());
EventStream = IpcClient.SendMessage(endpoint, message, out var response);
switch ((DiagnosticsServerResponseId)response.Header.CommandId)
{
case DiagnosticsServerResponseId.OK:
_sessionId = BitConverter.ToInt64(response.Payload, 0);
break;
case DiagnosticsServerResponseId.Error:
var hr = BitConverter.ToInt32(response.Payload, 0);
throw new ServerErrorException($"EventPipe session start failed (HRESULT: 0x{hr:X8})");
default:
throw new ServerErrorException($"EventPipe session start failed - Server responded with unknown command");
}
_response = response;
_sessionId = sessionId;
}

public Stream EventStream { get; }
public Stream EventStream => _response.Continuation;

internal static EventPipeSession Start(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
{
IpcMessage requestMessage = CreateStartMessage(providers, requestRundown, circularBufferMB);
IpcResponse? response = IpcClient.SendMessageGetContinuation(endpoint, requestMessage);
return CreateSessionFromResponse(endpoint, ref response, nameof(Start));
}

internal static async Task<EventPipeSession> StartAsync(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB, CancellationToken cancellationToken)
{
IpcMessage requestMessage = CreateStartMessage(providers, requestRundown, circularBufferMB);
IpcResponse? response = await IpcClient.SendMessageGetContinuationAsync(endpoint, requestMessage, cancellationToken).ConfigureAwait(false);
return CreateSessionFromResponse(endpoint, ref response, nameof(StartAsync));
}

///<summary>
/// Stops the given session
///</summary>
public void Stop()
{
Debug.Assert(_sessionId > 0);

// Do not issue another Stop command if it has already been issued for this session instance.
if (_stopped)
if (TryCreateStopMessage(out IpcMessage requestMessage))
{
return;
try
{
IpcMessage response = IpcClient.SendMessage(_endpoint, requestMessage);

DiagnosticsClient.ValidateResponseMessage(response, nameof(Stop));
}
// On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
catch (IOException)
{
throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
}
}
else
}

public async Task StopAsync(CancellationToken cancellationToken)
{
if (TryCreateStopMessage(out IpcMessage requestMessage))
{
_stopped = true;
try
{
IpcMessage response = await IpcClient.SendMessageAsync(_endpoint, requestMessage, cancellationToken).ConfigureAwait(false);

DiagnosticsClient.ValidateResponseMessage(response, nameof(StopAsync));
}
// On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
catch (IOException)
{
throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
}
}
}

byte[] payload = BitConverter.GetBytes(_sessionId);
IpcMessage response;
private static IpcMessage CreateStartMessage(IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
{
var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown);
return new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2());
}

private static EventPipeSession CreateSessionFromResponse(IpcEndpoint endpoint, ref IpcResponse? response, string operationName)
{
try
{
response = IpcClient.SendMessage(_endpoint, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload));
DiagnosticsClient.ValidateResponseMessage(response.Value.Message, operationName);

long sessionId = BitConverter.ToInt64(response.Value.Message.Payload, 0);

var session = new EventPipeSession(endpoint, response.Value, sessionId);
response = null;
return session;
}
// On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
catch (IOException)
finally
{
throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
response?.Dispose();
}
}

switch ((DiagnosticsServerResponseId)response.Header.CommandId)
private bool TryCreateStopMessage(out IpcMessage stopMessage)
{
Debug.Assert(_sessionId > 0);

// Do not issue another Stop command if it has already been issued for this session instance.
if (_stopped)
{
case DiagnosticsServerResponseId.OK:
return;
case DiagnosticsServerResponseId.Error:
var hr = BitConverter.ToInt32(response.Payload, 0);
throw new ServerErrorException($"EventPipe session stop failed (HRESULT: 0x{hr:X8})");
default:
throw new ServerErrorException($"EventPipe session stop failed - Server responded with unknown command");
stopMessage = null;
return false;
}
else
{
_stopped = true;
}

byte[] payload = BitConverter.GetBytes(_sessionId);

stopMessage = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload);

return true;
}

protected virtual void Dispose(bool disposing)
Expand All @@ -101,7 +143,7 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
EventStream?.Dispose();
_response.Dispose();
}
_disposedValue = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.NETCore.Client
{
Expand All @@ -19,45 +20,104 @@ internal class IpcClient
/// </summary>
/// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
/// <returns>An <see cref="IpcMessage"/> that is the response message.</returns>
public static IpcMessage SendMessage(IpcEndpoint endpoint, IpcMessage message)
{
using (var stream = endpoint.Connect(ConnectTimeout))
using IpcResponse response = SendMessageGetContinuation(endpoint, message);
return response.Message;
}

/// <summary>
/// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId.
/// </summary>
/// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <returns>An <see cref="IpcResponse"/> containing the response message and continuation stream.</returns>
public static IpcResponse SendMessageGetContinuation(IpcEndpoint endpoint, IpcMessage message)
{
Stream stream = null;
try
{
stream = endpoint.Connect(ConnectTimeout);

Write(stream, message);
return Read(stream);

IpcMessage response = Read(stream);

return new IpcResponse(response, Exchange(ref stream, null));
}
finally
{
stream?.Dispose();
}
}

/// <summary>
/// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId
/// and returns the Stream for reuse in Optional Continuations.
/// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId.
/// </summary>
/// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <param name="response">out var for response message</param>
/// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
public static Stream SendMessage(IpcEndpoint endpoint, IpcMessage message, out IpcMessage response)
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An <see cref="IpcMessage"/> that is the response message.</returns>
public static async Task<IpcMessage> SendMessageAsync(IpcEndpoint endpoint, IpcMessage message, CancellationToken cancellationToken)
{
var stream = endpoint.Connect(ConnectTimeout);
Write(stream, message);
response = Read(stream);
return stream;
using IpcResponse response = await SendMessageGetContinuationAsync(endpoint, message, cancellationToken).ConfigureAwait(false);
return response.Message;
}

private static void Write(Stream stream, byte[] buffer)
/// <summary>
/// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId.
/// </summary>
/// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An <see cref="IpcResponse"/> containing the response message and continuation stream.</returns>
public static async Task<IpcResponse> SendMessageGetContinuationAsync(IpcEndpoint endpoint, IpcMessage message, CancellationToken cancellationToken)
{
stream.Write(buffer, 0, buffer.Length);
Stream stream = null;
try
{
stream = await endpoint.ConnectAsync(cancellationToken).ConfigureAwait(false);

await WriteAsync(stream, message, cancellationToken).ConfigureAwait(false);

IpcMessage response = await ReadAsync(stream, cancellationToken).ConfigureAwait(false);

return new IpcResponse(response, Exchange(ref stream, null));
}
finally
{
stream?.Dispose();
}
}

private static void Write(Stream stream, IpcMessage message)
{
Write(stream, message.Serialize());
byte[] buffer = message.Serialize();
stream.Write(buffer, 0, buffer.Length);
}

private static Task WriteAsync(Stream stream, IpcMessage message, CancellationToken cancellationToken)
{
byte[] buffer = message.Serialize();
return stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);
}

private static IpcMessage Read(Stream stream)
{
return IpcMessage.Parse(stream);
}

private static Task<IpcMessage> ReadAsync(Stream stream, CancellationToken cancellationToken)
{
return IpcMessage.ParseAsync(stream, cancellationToken);
}

private static Stream Exchange(ref Stream stream1, Stream stream2)
{
Stream intermediate = stream1;
stream1 = stream2;
return intermediate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ internal enum DiagnosticsServerCommandSet : byte
Server = 0xFF,
}

// For .NET 5 Preview 7 and Preview 8, use this with the
// DiagnosticsServerCommandSet.Server command set.
// For .NET 5 RC and later, use ProcessCommandId.ResumeRuntime with
// the DiagnosticsServerCommandSet.Process command set.
internal enum DiagnosticServerCommandId : byte
{
// 0x00 used in DiagnosticServerResponseId
ResumeRuntime = 0x01,
// 0xFF used DiagnosticServerResponseId
};

internal enum DiagnosticsServerResponseId : byte
{
OK = 0x00,
Expand Down
Loading

0 comments on commit 4c70bbb

Please sign in to comment.