Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback interface to IEndpointInfoSource implementations #743

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,16 @@ public async Task ServerSourceThrowsWhenMultipleStartTest()
[MemberData(nameof(GetTfmsSupportingPortListener))]
public async Task ServerSourceAddRemoveSingleConnectionTest(TargetFrameworkMoniker appTfm)
{
await using var source = CreateServerSource(out string transportName);
ServerEndpointInfoCallback callback = new(_outputHelper);
await using var source = CreateServerSource(out string transportName, callback);
source.Start();

var endpointInfos = await GetEndpointInfoAsync(source);
Assert.Empty(endpointInfos);

AppRunner runner = CreateAppRunner(transportName, appTfm);

Task newEndpointInfoTask = source.WaitForNewEndpointInfoAsync(runner, CommonTestTimeouts.StartProcess);
Task newEndpointInfoTask = callback.WaitForNewEndpointInfoAsync(runner, CommonTestTimeouts.StartProcess);

await runner.ExecuteAsync(async () =>
{
Expand Down Expand Up @@ -149,7 +150,8 @@ await runner.ExecuteAsync(async () =>
[MemberData(nameof(GetTfmsSupportingPortListener))]
public async Task ServerSourceAddRemoveMultipleConnectionTest(TargetFrameworkMoniker appTfm)
{
await using var source = CreateServerSource(out string transportName);
ServerEndpointInfoCallback callback = new(_outputHelper);
await using var source = CreateServerSource(out string transportName, callback);
source.Start();

var endpointInfos = await GetEndpointInfoAsync(source);
Expand All @@ -163,7 +165,7 @@ public async Task ServerSourceAddRemoveMultipleConnectionTest(TargetFrameworkMon
for (int i = 0; i < appCount; i++)
{
runners[i] = CreateAppRunner(transportName, appTfm, appId: i + 1);
newEndpointInfoTasks[i] = source.WaitForNewEndpointInfoAsync(runners[i], CommonTestTimeouts.StartProcess);
newEndpointInfoTasks[i] = callback.WaitForNewEndpointInfoAsync(runners[i], CommonTestTimeouts.StartProcess);
}

await runners.ExecuteAsync(async () =>
Expand Down Expand Up @@ -208,11 +210,17 @@ public static IEnumerable<object[]> GetTfmsSupportingPortListener()
yield return new object[] { TargetFrameworkMoniker.Net60 };
}

private TestServerEndpointInfoSource CreateServerSource(out string transportName)
private ServerEndpointInfoSource CreateServerSource(out string transportName, ServerEndpointInfoCallback callback = null)
{
DiagnosticPortHelper.Generate(DiagnosticPortConnectionMode.Listen, out _, out transportName);
_outputHelper.WriteLine("Starting server endpoint info source at '" + transportName + "'.");
return new TestServerEndpointInfoSource(transportName, _outputHelper);

List<IEndpointInfoSourceCallbacks> callbacks = new();
if (null != callback)
{
callbacks.Add(callback);
}
return new ServerEndpointInfoSource(transportName, callbacks);
}

private AppRunner CreateAppRunner(string transportName, TargetFrameworkMoniker tfm, int appId = 1)
Expand Down Expand Up @@ -243,56 +251,60 @@ private static void VerifyConnection(AppRunner runner, IEndpointInfo endpointInf
Assert.NotNull(endpointInfo.Endpoint);
}

private sealed class TestServerEndpointInfoSource : ServerEndpointInfoSource
private sealed class ServerEndpointInfoCallback : IEndpointInfoSourceCallbacks
{
private readonly ITestOutputHelper _outputHelper;
private readonly List<Tuple<AppRunner, TaskCompletionSource<EndpointInfo>>> _addedEndpointInfoSources = new();
private readonly List<(AppRunner Runner, TaskCompletionSource<IEndpointInfo> CompletionSource)> _addedEndpointInfoSources = new();

public TestServerEndpointInfoSource(string transportPath, ITestOutputHelper outputHelper)
: base(transportPath)
public ServerEndpointInfoCallback(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
}

public async Task<EndpointInfo> WaitForNewEndpointInfoAsync(AppRunner runner, TimeSpan timeout)
public async Task<IEndpointInfo> WaitForNewEndpointInfoAsync(AppRunner runner, TimeSpan timeout)
{
TaskCompletionSource<EndpointInfo> addedEndpointInfoSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<IEndpointInfo> addedEndpointInfoSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
using CancellationTokenSource timeoutCancellation = new();
var token = timeoutCancellation.Token;
using var _ = token.Register(() => addedEndpointInfoSource.TrySetCanceled(token));

lock (_addedEndpointInfoSources)
{
_addedEndpointInfoSources.Add(Tuple.Create(runner, addedEndpointInfoSource));
_addedEndpointInfoSources.Add(new (runner, addedEndpointInfoSource));
_outputHelper.WriteLine($"[Wait] Register App{runner.AppId}");
}

_outputHelper.WriteLine($"[Wait] Wait for App{runner.AppId} notification");
timeoutCancellation.CancelAfter(timeout);
EndpointInfo endpointInfo = await addedEndpointInfoSource.Task;
IEndpointInfo endpointInfo = await addedEndpointInfoSource.Task;
_outputHelper.WriteLine($"[Wait] Received App{runner.AppId} notification");

return endpointInfo;
}

internal override void OnAddedEndpointInfo(EndpointInfo info)
public Task OnBeforeResumeAsync(IEndpointInfo endpointInfo, CancellationToken cancellationToken)
{
_outputHelper.WriteLine($"[Source] Added: {info.DebuggerDisplay}");
return Task.CompletedTask;
}

public void OnAddedEndpointInfo(IEndpointInfo info)
{
_outputHelper.WriteLine($"[Source] Added: {ToOutputString(info)}");

lock (_addedEndpointInfoSources)
{
jander-msft marked this conversation as resolved.
Show resolved Hide resolved
_outputHelper.WriteLine($"[Source] Start notifications for process {info.ProcessId}");

foreach (var sourceTuple in _addedEndpointInfoSources)
foreach (var sourceTuple in _addedEndpointInfoSources.ToList())
{
AppRunner runner = sourceTuple.Item1;
AppRunner runner = sourceTuple.Runner;
_outputHelper.WriteLine($"[Source] Checking App{runner.AppId}");
try
{
if (info.ProcessId == sourceTuple.Item1.ProcessId)
if (info.ProcessId == runner.ProcessId)
{
_outputHelper.WriteLine($"[Source] Notifying App{runner.AppId}");
sourceTuple.Item2.TrySetResult(info);
sourceTuple.CompletionSource.TrySetResult(info);
_addedEndpointInfoSources.Remove(sourceTuple);
break;
}
Expand All @@ -308,9 +320,14 @@ internal override void OnAddedEndpointInfo(EndpointInfo info)
}
}

internal override void OnRemovedEndpointInfo(EndpointInfo info)
public void OnRemovedEndpointInfo(IEndpointInfo info)
{
_outputHelper.WriteLine($"[Source] Removed: {ToOutputString(info)}");
}

private static string ToOutputString(IEndpointInfo info)
{
_outputHelper.WriteLine($"[Source] Removed: {info.DebuggerDisplay}");
return FormattableString.Invariant($"PID={info.ProcessId}, Cookie={info.RuntimeInstanceCookie}");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.Diagnostics.Monitoring.WebApi;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.Tools.Monitor
{
/// <summary>
/// Callback interface for notifications on state changes for an IEndpointInfoSource implementation.
/// </summary>
internal interface IEndpointInfoSourceCallbacks
{
Task OnBeforeResumeAsync(IEndpointInfo endpointInfo, CancellationToken cancellationToken);

void OnAddedEndpointInfo(IEndpointInfo endpointInfo);

void OnRemovedEndpointInfo(IEndpointInfo endpointInfo);
}
}
43 changes: 25 additions & 18 deletions src/Tools/dotnet-monitor/EndpointInfo/ServerEndpointInfoSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ namespace Microsoft.Diagnostics.Tools.Monitor
/// <summary>
/// Aggregates diagnostic endpoints that are established at a transport path via a reversed server.
/// </summary>
internal class ServerEndpointInfoSource : IEndpointInfoSourceInternal, IAsyncDisposable
internal sealed class ServerEndpointInfoSource : IEndpointInfoSourceInternal, IAsyncDisposable
{
// The amount of time to wait when checking if the a endpoint info should be
// pruned from the list of endpoint infos. If the runtime doesn't have a viable connection within
// this time, it will be pruned from the list.
private static readonly TimeSpan PruneWaitForConnectionTimeout = TimeSpan.FromMilliseconds(250);

private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
private readonly IEnumerable<IEndpointInfoSourceCallbacks> _callbacks;
private readonly IList<EndpointInfo> _endpointInfos = new List<EndpointInfo>();
private readonly SemaphoreSlim _endpointInfosSemaphore = new SemaphoreSlim(1);
private readonly string _transportPath;
Expand All @@ -41,8 +42,9 @@ internal class ServerEndpointInfoSource : IEndpointInfoSourceInternal, IAsyncDis
/// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix.
/// On all other systems, this must be the full file path of the socket.
/// </param>
public ServerEndpointInfoSource(string transportPath)
public ServerEndpointInfoSource(string transportPath, IEnumerable<IEndpointInfoSourceCallbacks> callbacks = null)
{
_callbacks = callbacks ?? Enumerable.Empty<IEndpointInfoSourceCallbacks>();
_transportPath = transportPath;
}

Expand Down Expand Up @@ -134,13 +136,18 @@ public async Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(CancellationT
await Task.WhenAll(checkMap.Values).ConfigureAwait(false);

// Remove connections for failed checks
foreach (KeyValuePair<EndpointInfo, Task<bool>> entry in checkMap)
foreach ((EndpointInfo endpointInfo, Task<bool> checkTask) in checkMap)
{
if (entry.Value.Result)
if (checkTask.Result)
{
_endpointInfos.Remove(entry.Key);
OnRemovedEndpointInfo(entry.Key);
_server?.RemoveConnection(entry.Key.RuntimeInstanceCookie);
_endpointInfos.Remove(endpointInfo);

foreach (IEndpointInfoSourceCallbacks callback in _callbacks)
{
callback.OnRemovedEndpointInfo(endpointInfo);
}

_server?.RemoveConnection(endpointInfo.RuntimeInstanceCookie);
}
}

Expand Down Expand Up @@ -208,6 +215,13 @@ private async Task ResumeAndQueueEndpointInfo(IpcEndpointInfo info, Cancellation
{
try
{
EndpointInfo endpointInfo = EndpointInfo.FromIpcEndpointInfo(info);

foreach (IEndpointInfoSourceCallbacks callback in _callbacks)
{
await callback.OnBeforeResumeAsync(endpointInfo, token).ConfigureAwait(false);
}

// Send ResumeRuntime message for runtime instances that connect to the server. This will allow
// those instances that are configured to pause on start to resume after the diagnostics
// connection has been made. Instances that are not configured to pause on startup will ignore
Expand All @@ -222,14 +236,15 @@ private async Task ResumeAndQueueEndpointInfo(IpcEndpointInfo info, Cancellation
// The runtime likely doesn't understand the ResumeRuntime command.
}

EndpointInfo endpointInfo = EndpointInfo.FromIpcEndpointInfo(info);

await _endpointInfosSemaphore.WaitAsync(token).ConfigureAwait(false);
try
{
_endpointInfos.Add(endpointInfo);

OnAddedEndpointInfo(endpointInfo);
foreach (IEndpointInfoSourceCallbacks callback in _callbacks)
{
callback.OnAddedEndpointInfo(endpointInfo);
}
}
finally
{
Expand All @@ -244,14 +259,6 @@ private async Task ResumeAndQueueEndpointInfo(IpcEndpointInfo info, Cancellation
}
}

internal virtual void OnAddedEndpointInfo(EndpointInfo info)
{
}

internal virtual void OnRemovedEndpointInfo(EndpointInfo info)
{
}

private void VerifyNotDisposed()
{
if (_disposed)
Expand Down