Skip to content

Commit

Permalink
Refactor IPC communication to allow for async and cancellation. (#2350)
Browse files Browse the repository at this point in the history
* Refactor IPC communication to allow for async and cancellation.
Refactor tests to flex both non-async and async methods.
  • Loading branch information
jander-msft authored Aug 31, 2021
1 parent a2106e1 commit e0189ea
Show file tree
Hide file tree
Showing 18 changed files with 792 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public EventPipeStreamProvider(MonitoringSourceConfiguration sourceConfig)
_stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
public async Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

EventPipeSession session = null;
try
{
session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
session = await client.StartEventPipeSessionAsync(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB, cancellationToken).ConfigureAwait(false);
}
catch (EndOfStreamException e)
{
Expand All @@ -49,22 +49,25 @@ public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, C
// Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
// using exceptions for normal termination of event stream.
await _stopProcessingSource.Task.ConfigureAwait(false);
StopSession(session);

await StopSessionAsync(session).ConfigureAwait(false);
});

return Task.FromResult(session.EventStream);
return session.EventStream;
}

public void StopProcessing()
{
_stopProcessingSource.TrySetResult(null);
}

private static void StopSession(EventPipeSession session)
private static async Task StopSessionAsync(EventPipeSession session)
{
// Cancel after a generous amount of time if process ended before command is sent.
using CancellationTokenSource cancellationSource = new(IpcClient.ConnectTimeout);
try
{
session.Stop();
await session.StopAsync(cancellationSource.Token).ConfigureAwait(false);
}
catch (EndOfStreamException)
{
Expand All @@ -74,6 +77,10 @@ private static void StopSession(EventPipeSession session)
catch (TimeoutException)
{
}
// We may time out if the process ended before we sent StopTracing command. We can just exit in that case.
catch (OperationCanceledException)
{
}
// On Unix platforms, we may actually get a PNSE since the pipe is gone with the process, and Runtime Client Library
// does not know how to distinguish a situation where there is no pipe to begin with, or where the process has exited
// before collection started and got rid of a pipe that once existed.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,83 +6,125 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.NETCore.Client
{
public class EventPipeSession : IDisposable
{
private IEnumerable<EventPipeProvider> _providers;
private bool _requestRundown;
private int _circularBufferMB;
private long _sessionId;
private IpcEndpoint _endpoint;
private bool _disposedValue = false; // To detect redundant calls
private bool _stopped = false; // To detect redundant calls
private readonly IpcResponse _response;

internal EventPipeSession(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
private EventPipeSession(IpcEndpoint endpoint, IpcResponse response, long sessionId)
{
_endpoint = endpoint;
_providers = providers;
_requestRundown = requestRundown;
_circularBufferMB = circularBufferMB;

var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown);
var message = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2());
EventStream = IpcClient.SendMessage(endpoint, message, out var response);
switch ((DiagnosticsServerResponseId)response.Header.CommandId)
{
case DiagnosticsServerResponseId.OK:
_sessionId = BitConverter.ToInt64(response.Payload, 0);
break;
case DiagnosticsServerResponseId.Error:
var hr = BitConverter.ToInt32(response.Payload, 0);
throw new ServerErrorException($"EventPipe session start failed (HRESULT: 0x{hr:X8})");
default:
throw new ServerErrorException($"EventPipe session start failed - Server responded with unknown command");
}
_response = response;
_sessionId = sessionId;
}

public Stream EventStream { get; }
public Stream EventStream => _response.Continuation;

internal static EventPipeSession Start(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
{
IpcMessage requestMessage = CreateStartMessage(providers, requestRundown, circularBufferMB);
IpcResponse? response = IpcClient.SendMessageGetContinuation(endpoint, requestMessage);
return CreateSessionFromResponse(endpoint, ref response, nameof(Start));
}

internal static async Task<EventPipeSession> StartAsync(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB, CancellationToken cancellationToken)
{
IpcMessage requestMessage = CreateStartMessage(providers, requestRundown, circularBufferMB);
IpcResponse? response = await IpcClient.SendMessageGetContinuationAsync(endpoint, requestMessage, cancellationToken).ConfigureAwait(false);
return CreateSessionFromResponse(endpoint, ref response, nameof(StartAsync));
}

///<summary>
/// Stops the given session
///</summary>
public void Stop()
{
Debug.Assert(_sessionId > 0);

// Do not issue another Stop command if it has already been issued for this session instance.
if (_stopped)
if (TryCreateStopMessage(out IpcMessage requestMessage))
{
return;
try
{
IpcMessage response = IpcClient.SendMessage(_endpoint, requestMessage);

DiagnosticsClient.ValidateResponseMessage(response, nameof(Stop));
}
// On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
catch (IOException)
{
throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
}
}
else
}

public async Task StopAsync(CancellationToken cancellationToken)
{
if (TryCreateStopMessage(out IpcMessage requestMessage))
{
_stopped = true;
try
{
IpcMessage response = await IpcClient.SendMessageAsync(_endpoint, requestMessage, cancellationToken).ConfigureAwait(false);

DiagnosticsClient.ValidateResponseMessage(response, nameof(StopAsync));
}
// On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
catch (IOException)
{
throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
}
}
}

byte[] payload = BitConverter.GetBytes(_sessionId);
IpcMessage response;
private static IpcMessage CreateStartMessage(IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
{
var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown);
return new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2());
}

private static EventPipeSession CreateSessionFromResponse(IpcEndpoint endpoint, ref IpcResponse? response, string operationName)
{
try
{
response = IpcClient.SendMessage(_endpoint, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload));
DiagnosticsClient.ValidateResponseMessage(response.Value.Message, operationName);

long sessionId = BitConverter.ToInt64(response.Value.Message.Payload, 0);

var session = new EventPipeSession(endpoint, response.Value, sessionId);
response = null;
return session;
}
// On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
catch (IOException)
finally
{
throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
response?.Dispose();
}
}

switch ((DiagnosticsServerResponseId)response.Header.CommandId)
private bool TryCreateStopMessage(out IpcMessage stopMessage)
{
Debug.Assert(_sessionId > 0);

// Do not issue another Stop command if it has already been issued for this session instance.
if (_stopped)
{
case DiagnosticsServerResponseId.OK:
return;
case DiagnosticsServerResponseId.Error:
var hr = BitConverter.ToInt32(response.Payload, 0);
throw new ServerErrorException($"EventPipe session stop failed (HRESULT: 0x{hr:X8})");
default:
throw new ServerErrorException($"EventPipe session stop failed - Server responded with unknown command");
stopMessage = null;
return false;
}
else
{
_stopped = true;
}

byte[] payload = BitConverter.GetBytes(_sessionId);

stopMessage = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload);

return true;
}

protected virtual void Dispose(bool disposing)
Expand All @@ -101,7 +143,7 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
EventStream?.Dispose();
_response.Dispose();
}
_disposedValue = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,59 +5,119 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.NETCore.Client
{
internal class IpcClient
{
// The amount of time to wait for a stream to be available for consumption by the Connect method.
// Normally expect the runtime to respond quickly but resource constrained machines may take longer.
private static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(30);
internal static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(30);

/// <summary>
/// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId.
/// Sends a single DiagnosticsIpc Message to the dotnet process associated with the <paramref name="endpoint"/>.
/// </summary>
/// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
/// <returns>An <see cref="IpcMessage"/> that is the response message.</returns>
public static IpcMessage SendMessage(IpcEndpoint endpoint, IpcMessage message)
{
using (var stream = endpoint.Connect(ConnectTimeout))
using IpcResponse response = SendMessageGetContinuation(endpoint, message);
return response.Message;
}

/// <summary>
/// Sends a single DiagnosticsIpc Message to the dotnet process associated with the <paramref name="endpoint"/>.
/// </summary>
/// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <returns>An <see cref="IpcResponse"/> containing the response message and continuation stream.</returns>
public static IpcResponse SendMessageGetContinuation(IpcEndpoint endpoint, IpcMessage message)
{
Stream stream = null;
try
{
stream = endpoint.Connect(ConnectTimeout);

Write(stream, message);
return Read(stream);

IpcMessage response = Read(stream);

return new IpcResponse(response, Release(ref stream));
}
finally
{
stream?.Dispose();
}
}

/// <summary>
/// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId
/// and returns the Stream for reuse in Optional Continuations.
/// Sends a single DiagnosticsIpc Message to the dotnet process associated with the <paramref name="endpoint"/>.
/// </summary>
/// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <param name="response">out var for response message</param>
/// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
public static Stream SendMessage(IpcEndpoint endpoint, IpcMessage message, out IpcMessage response)
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An <see cref="IpcMessage"/> that is the response message.</returns>
public static async Task<IpcMessage> SendMessageAsync(IpcEndpoint endpoint, IpcMessage message, CancellationToken cancellationToken)
{
var stream = endpoint.Connect(ConnectTimeout);
Write(stream, message);
response = Read(stream);
return stream;
using IpcResponse response = await SendMessageGetContinuationAsync(endpoint, message, cancellationToken).ConfigureAwait(false);
return response.Message;
}

private static void Write(Stream stream, byte[] buffer)
/// <summary>
/// Sends a single DiagnosticsIpc Message to the dotnet process associated with the <paramref name="endpoint"/>.
/// </summary>
/// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An <see cref="IpcResponse"/> containing the response message and continuation stream.</returns>
public static async Task<IpcResponse> SendMessageGetContinuationAsync(IpcEndpoint endpoint, IpcMessage message, CancellationToken cancellationToken)
{
stream.Write(buffer, 0, buffer.Length);
Stream stream = null;
try
{
stream = await endpoint.ConnectAsync(cancellationToken).ConfigureAwait(false);

await WriteAsync(stream, message, cancellationToken).ConfigureAwait(false);

IpcMessage response = await ReadAsync(stream, cancellationToken).ConfigureAwait(false);

return new IpcResponse(response, Release(ref stream));
}
finally
{
stream?.Dispose();
}
}

private static void Write(Stream stream, IpcMessage message)
{
Write(stream, message.Serialize());
byte[] buffer = message.Serialize();
stream.Write(buffer, 0, buffer.Length);
}

private static Task WriteAsync(Stream stream, IpcMessage message, CancellationToken cancellationToken)
{
byte[] buffer = message.Serialize();
return stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);
}

private static IpcMessage Read(Stream stream)
{
return IpcMessage.Parse(stream);
}

private static Task<IpcMessage> ReadAsync(Stream stream, CancellationToken cancellationToken)
{
return IpcMessage.ParseAsync(stream, cancellationToken);
}

private static Stream Release(ref Stream stream1)
{
Stream intermediate = stream1;
stream1 = null;
return intermediate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ internal enum DiagnosticsServerCommandSet : byte
Server = 0xFF,
}

// For .NET 5 Preview 7 and Preview 8, use this with the
// DiagnosticsServerCommandSet.Server command set.
// For .NET 5 RC and later, use ProcessCommandId.ResumeRuntime with
// the DiagnosticsServerCommandSet.Process command set.
internal enum DiagnosticServerCommandId : byte
{
// 0x00 used in DiagnosticServerResponseId
ResumeRuntime = 0x01,
// 0xFF used DiagnosticServerResponseId
};

internal enum DiagnosticsServerResponseId : byte
{
OK = 0x00,
Expand Down
Loading

0 comments on commit e0189ea

Please sign in to comment.