Skip to content

Commit

Permalink
This should fix the linked bug
Browse files Browse the repository at this point in the history
More testing is necessary.

References #40
  • Loading branch information
andreashuber-lawo committed Apr 5, 2017
1 parent f627567 commit 8fcfa7b
Showing 1 changed file with 82 additions and 43 deletions.
125 changes: 82 additions & 43 deletions Lawo.EmberPlusSharp/Model/Consumer`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,18 @@ public int AutoSendInterval
/// <see cref="INode.ChildrenRetrievalPolicy"/> property set to a value other than
/// <see cref="ChildrenRetrievalPolicy.None">ChildrenRetrievalPolicy.None</see>.</remarks>
[SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1625:Element documentation must not be copied and pasted", Justification = "Intended, both exceptions can be thrown under the same circumstances.")]
public async Task SendAsync()
public Task SendAsync()
{
if (await this.SendCoreAsync())
{
await this.isVerifiedSource.Task;
}
var source = new TaskCompletionSource<bool>();

this.taskQueue.Enqueue(
async () =>
{
await this.SendCoreAsync();
source.SetResult(true);
});

return source.Task;
}

/// <summary>Stops synchronizing changes to the object tree accessible through the <see cref="Root"/> property
Expand All @@ -192,12 +198,13 @@ public void Dispose()
{
this.disposed = true;
this.root.SendRequired -= this.OnSendRequired;
this.client.ConnectionLost -= this.OnConnectionLost;
this.client.EmberDataReceived -= this.OnMessageReceived;
this.client.ConnectionLost -= this.receiveQueue.OnConnectionLost;
this.client.EmberDataReceived -= this.receiveQueue.OnMessageReceived;
this.isVerifiedSource.TrySetCanceled();
this.isSendRequiredSource.TrySetCanceled();
this.CancelAutoSendDelay();
this.receiveQueue.OnConnectionLost(this, new ConnectionLostEventArgs(null));
this.OnConnectionLost(this, new ConnectionLostEventArgs(null));
}
}

Expand All @@ -208,6 +215,7 @@ public void Dispose()
private static string GetVersion(IReadOnlyCollection<byte> applicationBytes) =>
string.Join(".", applicationBytes.Reverse().Select(b => b.ToString(InvariantCulture)));

private readonly TaskQueue taskQueue = new TaskQueue();
private readonly ReceiveQueue receiveQueue = new ReceiveQueue();
private readonly InvocationCollection pendingInvocations = new InvocationCollection();
private readonly StreamedParameterCollection streamedParameters = new StreamedParameterCollection();
Expand All @@ -217,8 +225,6 @@ private static string GetVersion(IReadOnlyCollection<byte> applicationBytes) =>
private readonly S101Message emberDataMessage;
private int autoSendInterval = 100;
private CancellationTokenSource autoSendDelayCancellationSource;
private TaskCompletionSource<bool> isSendRequiredSource;
private TaskCompletionSource<bool> isVerifiedSource;
private bool disposed;

private Consumer(S101Client client, int timeout, ChildrenRetrievalPolicy childrenRetrievalPolicy, byte slot)
Expand All @@ -231,7 +237,27 @@ private Consumer(S101Client client, int timeout, ChildrenRetrievalPolicy childre
this.client.ConnectionLost += this.receiveQueue.OnConnectionLost;
}

private async Task<bool> SendCoreAsync()
private void OnMessageReceived(object sender, MessageReceivedEventArgs e)
{
if (!e.IsAnotherMessageAvailable)
{
this.taskQueue.Enqueue(
async () =>
{
// We must not wait for changes here, because that would lead to a deadlock when client code
// calls SendAsync. Instead we just apply whatever is left in the queue and retrieve children
// (if there are any). If the queue is already empty (because client code has called SendAsync)
// and everything is complete, the two statements below don't do anything.
this.ApplyProviderChanges();
await this.RetrieveChildrenAsync();
});
}
}

private void OnConnectionLost(object sender, ConnectionLostEventArgs e) =>
this.taskQueue.Enqueue(() => this.WaitForProviderChangesAsync());

private async Task SendCoreAsync()
{
if (this.root.HasChanges)
{
Expand All @@ -247,7 +273,7 @@ private async Task<bool> SendCoreAsync()
await this.client.SendMessageAsync(this.emberDataMessage, stream.ToArray());
}

return await this.SendRequestAsync();
await this.RetrieveChildrenAsync();
}

private void CancelAutoSendDelay()
Expand All @@ -258,7 +284,13 @@ private void CancelAutoSendDelay()
}
}

private void OnSendRequired(object sender, EventArgs e) => this.isSendRequiredSource.TrySetResult(true);
private async void OnSendRequired(object sender, EventArgs e)
{
if ((this.autoSendInterval != Timeout.Infinite) && (await this.DelayAutoSend()))
{
this.taskQueue.Enqueue(this.SendCoreAsync);
}
}

private async Task RetrieveChildrenAsync()
{
Expand All @@ -283,33 +315,15 @@ private async void SendReceiveLoop()
{
Exception exception = null;
this.autoSendDelayCancellationSource = new CancellationTokenSource();
this.isSendRequiredSource = new TaskCompletionSource<bool>();
this.isVerifiedSource = new TaskCompletionSource<bool>();
this.client.EmberDataReceived += this.OnMessageReceived;
this.client.ConnectionLost += this.OnConnectionLost;
this.root.SendRequired += this.OnSendRequired;

try
{
var waitForSendRequiredTask = this.WaitForSendRequiredAsync();
var waitForProviderChangesTask = this.WaitForProviderChangesAsync();

while (true)
{
if (await Task.WhenAny(waitForSendRequiredTask, waitForProviderChangesTask) ==
waitForSendRequiredTask)
{
await waitForSendRequiredTask;
await this.SendCoreAsync();
waitForSendRequiredTask = this.WaitForSendRequiredAsync();
}
else
{
await waitForProviderChangesTask;
this.ApplyProviderChanges();
await this.RetrieveChildrenAsync();
this.isVerifiedSource.TrySetResult(false);
this.isVerifiedSource = new TaskCompletionSource<bool>();
waitForProviderChangesTask = this.WaitForProviderChangesAsync();
}
await this.taskQueue.ExecuteAsync();
}
}
catch (OperationCanceledException)
Expand All @@ -331,13 +345,6 @@ private async void SendReceiveLoop()
}
}

private async Task WaitForSendRequiredAsync()
{
await this.isSendRequiredSource.Task;
await this.DelayAutoSend();
this.isSendRequiredSource = new TaskCompletionSource<bool>();
}

private async Task RetrieveChildrenCoreAsync()
{
while (await this.SendRequestAsync())
Expand All @@ -357,20 +364,20 @@ private void ApplyProviderChanges()

private Task WaitForProviderChangesAsync() => this.receiveQueue.WaitForMessageAsync();

private async Task DelayAutoSend()
private async Task<bool> DelayAutoSend()
{
while (true)
{
try
{
await Task.Delay(this.autoSendInterval, this.autoSendDelayCancellationSource.Token);
return;
return true;
}
catch (OperationCanceledException)
{
if (this.disposed)
{
throw;
return false;
}
else
{
Expand Down Expand Up @@ -457,6 +464,38 @@ private bool WriteRequest(out MemoryStream stream)

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

private sealed class TaskQueue
{
internal void Enqueue(Func<Task> task)
{
this.queue.Enqueue(task);

if (this.queue.Count == 1)
{
this.nonEmpty.SetResult(true);
}
}

internal async Task ExecuteAsync()
{
await this.nonEmpty.Task;
var tasks = new Func<Task>[this.queue.Count];
this.queue.CopyTo(tasks, 0);
this.queue.Clear();
this.nonEmpty = new TaskCompletionSource<bool>();

foreach (var task in tasks)
{
await task();
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////

private readonly Queue<Func<Task>> queue = new Queue<Func<Task>>();
private TaskCompletionSource<bool> nonEmpty = new TaskCompletionSource<bool>();
}

private sealed class ReceiveQueue
{
internal int MessageCount => this.queue.Count;
Expand Down

0 comments on commit 8fcfa7b

Please sign in to comment.