diff --git a/Lawo.EmberPlusSharp/Model/Consumer`1.cs b/Lawo.EmberPlusSharp/Model/Consumer`1.cs
index a4ff1e2c..fe9d28fa 100644
--- a/Lawo.EmberPlusSharp/Model/Consumer`1.cs
+++ b/Lawo.EmberPlusSharp/Model/Consumer`1.cs
@@ -176,12 +176,18 @@ public int AutoSendInterval
/// property set to a value other than
/// ChildrenRetrievalPolicy.None.
[SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1625:Element documentation must not be copied and pasted", Justification = "Intended, both exceptions can be thrown under the same circumstances.")]
- public async Task SendAsync()
+ public Task SendAsync()
{
- if (await this.SendCoreAsync())
- {
- await this.isVerifiedSource.Task;
- }
+ var source = new TaskCompletionSource();
+
+ this.taskQueue.Enqueue(
+ async () =>
+ {
+ await this.SendCoreAsync();
+ source.SetResult(true);
+ });
+
+ return source.Task;
}
/// Stops synchronizing changes to the object tree accessible through the property
@@ -192,12 +198,13 @@ public void Dispose()
{
this.disposed = true;
this.root.SendRequired -= this.OnSendRequired;
+ this.client.ConnectionLost -= this.OnConnectionLost;
+ this.client.EmberDataReceived -= this.OnMessageReceived;
this.client.ConnectionLost -= this.receiveQueue.OnConnectionLost;
this.client.EmberDataReceived -= this.receiveQueue.OnMessageReceived;
- this.isVerifiedSource.TrySetCanceled();
- this.isSendRequiredSource.TrySetCanceled();
this.CancelAutoSendDelay();
this.receiveQueue.OnConnectionLost(this, new ConnectionLostEventArgs(null));
+ this.OnConnectionLost(this, new ConnectionLostEventArgs(null));
}
}
@@ -208,6 +215,7 @@ public void Dispose()
private static string GetVersion(IReadOnlyCollection applicationBytes) =>
string.Join(".", applicationBytes.Reverse().Select(b => b.ToString(InvariantCulture)));
+ private readonly TaskQueue taskQueue = new TaskQueue();
private readonly ReceiveQueue receiveQueue = new ReceiveQueue();
private readonly InvocationCollection pendingInvocations = new InvocationCollection();
private readonly StreamedParameterCollection streamedParameters = new StreamedParameterCollection();
@@ -217,8 +225,6 @@ private static string GetVersion(IReadOnlyCollection applicationBytes) =>
private readonly S101Message emberDataMessage;
private int autoSendInterval = 100;
private CancellationTokenSource autoSendDelayCancellationSource;
- private TaskCompletionSource isSendRequiredSource;
- private TaskCompletionSource isVerifiedSource;
private bool disposed;
private Consumer(S101Client client, int timeout, ChildrenRetrievalPolicy childrenRetrievalPolicy, byte slot)
@@ -231,7 +237,27 @@ private Consumer(S101Client client, int timeout, ChildrenRetrievalPolicy childre
this.client.ConnectionLost += this.receiveQueue.OnConnectionLost;
}
- private async Task SendCoreAsync()
+ private void OnMessageReceived(object sender, MessageReceivedEventArgs e)
+ {
+ if (!e.IsAnotherMessageAvailable)
+ {
+ this.taskQueue.Enqueue(
+ async () =>
+ {
+ // We must not wait for changes here, because that would lead to a deadlock when client code
+ // calls SendAsync. Instead we just apply whatever is left in the queue and retrieve children
+ // (if there are any). If the queue is already empty (because client code has called SendAsync)
+ // and everything is complete, the two statements below don't do anything.
+ this.ApplyProviderChanges();
+ await this.RetrieveChildrenAsync();
+ });
+ }
+ }
+
+ private void OnConnectionLost(object sender, ConnectionLostEventArgs e) =>
+ this.taskQueue.Enqueue(() => this.WaitForProviderChangesAsync());
+
+ private async Task SendCoreAsync()
{
if (this.root.HasChanges)
{
@@ -247,7 +273,7 @@ private async Task SendCoreAsync()
await this.client.SendMessageAsync(this.emberDataMessage, stream.ToArray());
}
- return await this.SendRequestAsync();
+ await this.RetrieveChildrenAsync();
}
private void CancelAutoSendDelay()
@@ -258,7 +284,13 @@ private void CancelAutoSendDelay()
}
}
- private void OnSendRequired(object sender, EventArgs e) => this.isSendRequiredSource.TrySetResult(true);
+ private async void OnSendRequired(object sender, EventArgs e)
+ {
+ if ((this.autoSendInterval != Timeout.Infinite) && (await this.DelayAutoSend()))
+ {
+ this.taskQueue.Enqueue(this.SendCoreAsync);
+ }
+ }
private async Task RetrieveChildrenAsync()
{
@@ -283,33 +315,15 @@ private async void SendReceiveLoop()
{
Exception exception = null;
this.autoSendDelayCancellationSource = new CancellationTokenSource();
- this.isSendRequiredSource = new TaskCompletionSource();
- this.isVerifiedSource = new TaskCompletionSource();
+ this.client.EmberDataReceived += this.OnMessageReceived;
+ this.client.ConnectionLost += this.OnConnectionLost;
this.root.SendRequired += this.OnSendRequired;
try
{
- var waitForSendRequiredTask = this.WaitForSendRequiredAsync();
- var waitForProviderChangesTask = this.WaitForProviderChangesAsync();
-
while (true)
{
- if (await Task.WhenAny(waitForSendRequiredTask, waitForProviderChangesTask) ==
- waitForSendRequiredTask)
- {
- await waitForSendRequiredTask;
- await this.SendCoreAsync();
- waitForSendRequiredTask = this.WaitForSendRequiredAsync();
- }
- else
- {
- await waitForProviderChangesTask;
- this.ApplyProviderChanges();
- await this.RetrieveChildrenAsync();
- this.isVerifiedSource.TrySetResult(false);
- this.isVerifiedSource = new TaskCompletionSource();
- waitForProviderChangesTask = this.WaitForProviderChangesAsync();
- }
+ await this.taskQueue.ExecuteAsync();
}
}
catch (OperationCanceledException)
@@ -331,13 +345,6 @@ private async void SendReceiveLoop()
}
}
- private async Task WaitForSendRequiredAsync()
- {
- await this.isSendRequiredSource.Task;
- await this.DelayAutoSend();
- this.isSendRequiredSource = new TaskCompletionSource();
- }
-
private async Task RetrieveChildrenCoreAsync()
{
while (await this.SendRequestAsync())
@@ -357,20 +364,20 @@ private void ApplyProviderChanges()
private Task WaitForProviderChangesAsync() => this.receiveQueue.WaitForMessageAsync();
- private async Task DelayAutoSend()
+ private async Task DelayAutoSend()
{
while (true)
{
try
{
await Task.Delay(this.autoSendInterval, this.autoSendDelayCancellationSource.Token);
- return;
+ return true;
}
catch (OperationCanceledException)
{
if (this.disposed)
{
- throw;
+ return false;
}
else
{
@@ -457,6 +464,38 @@ private bool WriteRequest(out MemoryStream stream)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ private sealed class TaskQueue
+ {
+ internal void Enqueue(Func task)
+ {
+ this.queue.Enqueue(task);
+
+ if (this.queue.Count == 1)
+ {
+ this.nonEmpty.SetResult(true);
+ }
+ }
+
+ internal async Task ExecuteAsync()
+ {
+ await this.nonEmpty.Task;
+ var tasks = new Func[this.queue.Count];
+ this.queue.CopyTo(tasks, 0);
+ this.queue.Clear();
+ this.nonEmpty = new TaskCompletionSource();
+
+ foreach (var task in tasks)
+ {
+ await task();
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ private readonly Queue> queue = new Queue>();
+ private TaskCompletionSource nonEmpty = new TaskCompletionSource();
+ }
+
private sealed class ReceiveQueue
{
internal int MessageCount => this.queue.Count;