Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in EndpointInfoSourceTests. #776

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public sealed class AppRunner : IAsyncDisposable

public int ProcessId => _adapter.ProcessId;
jander-msft marked this conversation as resolved.
Show resolved Hide resolved

public Task<int> ProcessIdTask => _adapter.ProcessIdTask;

/// <summary>
/// Name of the scenario to run in the application.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public sealed class LoggingRunnerAdapter : IAsyncDisposable
{
private readonly CancellationTokenSource _cancellation = new();
private readonly ITestOutputHelper _outputHelper;
private readonly TaskCompletionSource<int> _processIdSource =
new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly DotNetRunner _runner;
private readonly List<string> _standardErrorLines = new();
private readonly List<string> _standardOutputLines = new();
Expand All @@ -30,9 +32,14 @@ public sealed class LoggingRunnerAdapter : IAsyncDisposable
public int ExitCode => _exitCode.HasValue ?
_exitCode.Value : throw new InvalidOperationException("Must call WaitForExitAsync before getting exit code.");

/// <remarks>
/// Only access this property after the <see cref="StartAsync(CancellationToken)"/> method has completed.
/// </remarks>
public int ProcessId => _processId.HasValue ?
_processId.Value : throw new InvalidOperationException("Process was not started.");

public Task<int> ProcessIdTask => _processIdSource.Task;

public event Action<string> ReceivedStandardErrorLine;

public event Action<string> ReceivedStandardOutputLine;
Expand All @@ -56,6 +63,8 @@ public async ValueTask DisposeAsync()

_cancellation.Cancel();

_processIdSource.TrySetCanceled(_cancellation.Token);

// Shutdown the runner
_outputHelper.WriteLine("Stopping...");
_runner.ForceClose();
Expand Down Expand Up @@ -96,11 +105,15 @@ public async Task StartAsync(CancellationToken token)
}
_outputHelper.WriteLine("End Environment:");

_outputHelper.WriteLine("Starting...");
await _runner.StartAsync(token).ConfigureAwait(false);
using (var _ = token.Register(() => _processIdSource.TrySetCanceled(token)))
{
_outputHelper.WriteLine("Starting...");
await _runner.StartAsync(token).ConfigureAwait(false);
}

_processId = _runner.ProcessId;
_outputHelper.WriteLine("Process ID: {0}", _runner.ProcessId);
_processId = _runner.ProcessId;
_processIdSource.TrySetResult(_runner.ProcessId);

_standardErrorTask = ReadLinesAsync(_runner.StandardError, _standardErrorLines, ReceivedStandardErrorLine, _cancellation.Token);
_standardOutputTask = ReadLinesAsync(_runner.StandardOutput, _standardOutputLines, ReceivedStandardOutputLine, _cancellation.Token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,12 @@ public static async Task WithCancellation(this Task task, CancellationToken toke
localTokenSource.Cancel();
}
}

public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken token)
{
await WithCancellation((Task)task, token);

return task.Result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public async Task ServerSourceAddRemoveSingleConnectionTest(TargetFrameworkMonik

AppRunner runner = CreateAppRunner(transportName, appTfm);

Task newEndpointInfoTask = callback.WaitForNewEndpointInfoAsync(runner, CommonTestTimeouts.StartProcess);
using CancellationTokenSource cancellation = new(CommonTestTimeouts.StartProcess);
Task newEndpointInfoTask = callback.WaitForNewEndpointInfoAsync(runner, cancellation.Token);

await runner.ExecuteAsync(async () =>
{
Expand Down Expand Up @@ -162,10 +163,11 @@ public async Task ServerSourceAddRemoveMultipleConnectionTest(TargetFrameworkMon
Task[] newEndpointInfoTasks = new Task[appCount];

// Start all app instances
using CancellationTokenSource cancellation = new(CommonTestTimeouts.StartProcess);
for (int i = 0; i < appCount; i++)
{
runners[i] = CreateAppRunner(transportName, appTfm, appId: i + 1);
newEndpointInfoTasks[i] = callback.WaitForNewEndpointInfoAsync(runners[i], CommonTestTimeouts.StartProcess);
newEndpointInfoTasks[i] = callback.WaitForNewEndpointInfoAsync(runners[i], cancellation.Token);
}

await runners.ExecuteAsync(async () =>
Expand Down Expand Up @@ -254,70 +256,89 @@ private static void VerifyConnection(AppRunner runner, IEndpointInfo endpointInf
private sealed class ServerEndpointInfoCallback : IEndpointInfoSourceCallbacks
{
private readonly ITestOutputHelper _outputHelper;
private readonly List<(AppRunner Runner, TaskCompletionSource<IEndpointInfo> CompletionSource)> _addedEndpointInfoSources = new();
private readonly SemaphoreSlim _completionEntriesSemaphore = new(1);
jander-msft marked this conversation as resolved.
Show resolved Hide resolved
private readonly List<CompletionEntry> _completionEntries = new();

public ServerEndpointInfoCallback(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
}

public async Task<IEndpointInfo> WaitForNewEndpointInfoAsync(AppRunner runner, TimeSpan timeout)
public async Task<IEndpointInfo> WaitForNewEndpointInfoAsync(AppRunner runner, CancellationToken token)
{
TaskCompletionSource<IEndpointInfo> addedEndpointInfoSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
using CancellationTokenSource timeoutCancellation = new();
var token = timeoutCancellation.Token;
using var _ = token.Register(() => addedEndpointInfoSource.TrySetCanceled(token));
CompletionEntry entry = new(runner);
using var _ = token.Register(() => entry.CompletionSource.TrySetCanceled(token));

lock (_addedEndpointInfoSources)
await _completionEntriesSemaphore.WaitAsync(token);
try
{
_addedEndpointInfoSources.Add(new (runner, addedEndpointInfoSource));
_completionEntries.Add(entry);
_outputHelper.WriteLine($"[Wait] Register App{runner.AppId}");
}
finally
{
_completionEntriesSemaphore.Release();
}

_outputHelper.WriteLine($"[Wait] Wait for App{runner.AppId} notification");
timeoutCancellation.CancelAfter(timeout);
IEndpointInfo endpointInfo = await addedEndpointInfoSource.Task;
IEndpointInfo endpointInfo = await entry.CompletionSource.Task;
_outputHelper.WriteLine($"[Wait] Received App{runner.AppId} notification");

return endpointInfo;
}

public Task OnBeforeResumeAsync(IEndpointInfo endpointInfo, CancellationToken cancellationToken)
public Task OnBeforeResumeAsync(IEndpointInfo endpointInfo, CancellationToken token)
{
return Task.CompletedTask;
}

public void OnAddedEndpointInfo(IEndpointInfo info)
public async Task OnAddedEndpointInfoAsync(IEndpointInfo info, CancellationToken token)
{
_outputHelper.WriteLine($"[Source] Added: {ToOutputString(info)}");

lock (_addedEndpointInfoSources)

await _completionEntriesSemaphore.WaitAsync(token);
try
{
_outputHelper.WriteLine($"[Source] Start notifications for process {info.ProcessId}");

foreach (var sourceTuple in _addedEndpointInfoSources.ToList())
// Create a mapping of the process ID tasks to the completion entries
IDictionary<Task<int>, CompletionEntry> map = new Dictionary<Task<int>, CompletionEntry>(_completionEntries.Count);
foreach (CompletionEntry entry in _completionEntries)
{
map.Add(entry.Runner.ProcessIdTask.WithCancellation(token), entry);
}

while (map.Count > 0)
{
AppRunner runner = sourceTuple.Runner;
_outputHelper.WriteLine($"[Source] Checking App{runner.AppId}");
try
// Wait for any of the process ID tasks to complete.
Task<int> completedTask = await Task.WhenAny(map.Keys);

map.Remove(completedTask, out CompletionEntry entry);

_outputHelper.WriteLine($"[Source] Checking App{entry.Runner.AppId}");

if (completedTask.IsCompletedSuccessfully)
jander-msft marked this conversation as resolved.
Show resolved Hide resolved
{
if (info.ProcessId == runner.ProcessId)
// If the process ID matches the one that was reported via the callback,
// then signal its completion source.
if (info.ProcessId == completedTask.Result)
{
_outputHelper.WriteLine($"[Source] Notifying App{runner.AppId}");
sourceTuple.CompletionSource.TrySetResult(info);
_addedEndpointInfoSources.Remove(sourceTuple);
_outputHelper.WriteLine($"[Source] Notifying App{entry.Runner.AppId}");
entry.CompletionSource.TrySetResult(info);

_completionEntries.Remove(entry);

break;
}
}
catch (InvalidOperationException)
{
// Thrown if app runner hasn't started process yet.
_outputHelper.WriteLine($"[Source] App{runner.AppId} has not start yet.");
}
}

_outputHelper.WriteLine($"[Source] Finished notifications for process {info.ProcessId}");
}
finally
{
_completionEntriesSemaphore.Release();
}
}

public void OnRemovedEndpointInfo(IEndpointInfo info)
Expand All @@ -329,6 +350,19 @@ private static string ToOutputString(IEndpointInfo info)
{
return FormattableString.Invariant($"PID={info.ProcessId}, Cookie={info.RuntimeInstanceCookie}");
}

private sealed class CompletionEntry
{
public CompletionEntry(AppRunner runner)
{
Runner = runner;
CompletionSource = new TaskCompletionSource<IEndpointInfo>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public AppRunner Runner { get; }

public TaskCompletionSource<IEndpointInfo> CompletionSource { get; }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal interface IEndpointInfoSourceCallbacks
{
Task OnBeforeResumeAsync(IEndpointInfo endpointInfo, CancellationToken cancellationToken);

void OnAddedEndpointInfo(IEndpointInfo endpointInfo);
Task OnAddedEndpointInfoAsync(IEndpointInfo endpointInfo, CancellationToken cancellationToken);

void OnRemovedEndpointInfo(IEndpointInfo endpointInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private async Task ResumeAndQueueEndpointInfo(IpcEndpointInfo info, Cancellation

foreach (IEndpointInfoSourceCallbacks callback in _callbacks)
{
callback.OnAddedEndpointInfo(endpointInfo);
await callback.OnAddedEndpointInfoAsync(endpointInfo, token).ConfigureAwait(false);
}
}
finally
Expand Down