From a4876ff8ab4b5b4c8aaeae9c741afa22dc729bb8 Mon Sep 17 00:00:00 2001 From: AndreSteenbergen Date: Thu, 21 Mar 2019 16:41:06 +0100 Subject: [PATCH] Added an UnboundStablePriorityMailbox (#3536) * Added an UnboundStablePriorityMailbox, sending messages according to priority. Messages with the same priority will be send using the same order as they appear. #2652 is the related issue. * added PropertyTest for StableListPriorityQueue * forgot to add spec * verified that the UnboundedStablePriorityMailbox can be loaded and used * validate4d that UnboundedStablePriorityMailbox supports stashing * added API approval for core --- .../CoreAPISpec.ApproveCore.approved.txt | 26 +++ src/core/Akka.Tests/Dispatch/MailboxesSpec.cs | 107 ++++++++++- .../Util/StableListPriorityQueueSpec.cs | 55 ++++++ src/core/Akka/Dispatch/Mailbox.cs | 42 ++++- .../UnboundedPriorityMessageQueue.cs | 3 +- .../UnboundedStablePriorityMessageQueue.cs | 89 +++++++++ src/core/Akka/Util/ListPriorityQueue.cs | 5 +- src/core/Akka/Util/StableListPriorityQueue.cs | 174 ++++++++++++++++++ 8 files changed, 492 insertions(+), 9 deletions(-) create mode 100644 src/core/Akka.Tests/Util/StableListPriorityQueueSpec.cs create mode 100644 src/core/Akka/Dispatch/MessageQueues/UnboundedStablePriorityMessageQueue.cs create mode 100644 src/core/Akka/Util/StableListPriorityQueue.cs diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 10bc2a67dd7..8075a4e65d0 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -2612,6 +2612,14 @@ namespace Akka.Dispatch public virtual Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { } protected abstract int PriorityGenerator(object message); } + public abstract class UnboundedStablePriorityMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + { + public const int DefaultCapacity = 11; + protected UnboundedStablePriorityMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } + public int InitialCapacity { get; } + public virtual Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { } + protected abstract int PriorityGenerator(object message); + } } namespace Akka.Dispatch.MessageQueues { @@ -2687,6 +2695,14 @@ namespace Akka.Dispatch.MessageQueues protected override void LockedEnqueue(Akka.Actor.Envelope envelope) { } protected override bool LockedTryDequeue(out Akka.Actor.Envelope envelope) { } } + public class UnboundedStablePriorityMessageQueue : Akka.Dispatch.MessageQueues.BlockingMessageQueue, Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics + { + public UnboundedStablePriorityMessageQueue(System.Func priorityGenerator, int initialCapacity) { } + protected override int LockedCount { get; } + public void EnqueueFirst(Akka.Actor.Envelope envelope) { } + protected override void LockedEnqueue(Akka.Actor.Envelope envelope) { } + protected override bool LockedTryDequeue(out Akka.Actor.Envelope envelope) { } + } } namespace Akka.Dispatch.SysMsg { @@ -4773,6 +4789,16 @@ namespace Akka.Util public static readonly bool IsMono; public static readonly bool IsWindows; } + public sealed class StableListPriorityQueue + { + public StableListPriorityQueue(int initialCapacity, System.Func priorityCalculator) { } + public int Count() { } + public Akka.Actor.Envelope Dequeue() { } + public void Enqueue(Akka.Actor.Envelope item) { } + public bool IsConsistent() { } + public Akka.Actor.Envelope Peek() { } + public override string ToString() { } + } public class static StandardOutWriter { public static void Write(string message, System.Nullable foregroundColor = null, System.Nullable backgroundColor = null) { } diff --git a/src/core/Akka.Tests/Dispatch/MailboxesSpec.cs b/src/core/Akka.Tests/Dispatch/MailboxesSpec.cs index f84e8aac1ba..a887887e362 100644 --- a/src/core/Akka.Tests/Dispatch/MailboxesSpec.cs +++ b/src/core/Akka.Tests/Dispatch/MailboxesSpec.cs @@ -47,6 +47,27 @@ public TestPriorityMailbox(Settings settings, Config config) : base(settings, co } } + public class TestStablePriorityMailbox : UnboundedStablePriorityMailbox + { + protected override int PriorityGenerator(object message) + { + if (message is string) + return 1; + if (message is bool) + return 2; + if (message is int) + return 3; + if (message is double) + return 4; + + return 5; + } + + public TestStablePriorityMailbox(Settings settings, Config config) : base(settings, config) + { + } + } + public class IntPriorityMailbox : UnboundedPriorityMailbox { protected override int PriorityGenerator(object message) @@ -138,6 +159,9 @@ private static string GetConfig() int-prio-mailbox { mailbox-type : """ + typeof(IntPriorityMailbox).AssemblyQualifiedName + @""" } +stable-prio-mailbox{ + mailbox-type : """ + typeof(TestStablePriorityMailbox).AssemblyQualifiedName + @""" +} "; } @@ -204,7 +228,45 @@ public void Can_use_unbounded_priority_mailbox() ExpectMsg(2.0); ExpectNoMsg(TimeSpan.FromSeconds(0.3)); - } + } + + [Fact] + public void Can_use_unbounded_stable_priority_mailbox() + { + var actor = (IInternalActorRef)Sys.ActorOf(EchoActor.Props(this).WithMailbox("stable-prio-mailbox"), "echo"); + + //pause mailbox until all messages have been told + actor.SendSystemMessage(new Suspend()); + + // wait until we can confirm that the mailbox is suspended before we begin sending messages + AwaitCondition(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf().Mailbox.IsSuspended()); + + actor.Tell(true); + for (var i = 0; i < 30; i++) + { + actor.Tell(i); + } + actor.Tell("a"); + actor.Tell(2.0); + for (var i = 0; i < 30; i++) + { + actor.Tell(i + 30); + } + actor.SendSystemMessage(new Resume(null)); + + //resume mailbox, this prevents the mailbox from running to early + //priority mailbox is best effort only + + ExpectMsg("a"); + ExpectMsg(true); + for (var i = 0; i < 60; i++) + { + ExpectMsg(i); + } + ExpectMsg(2.0); + + ExpectNoMsg(TimeSpan.FromSeconds(0.3)); + } [Fact] public void Priority_mailbox_keeps_ordering_with_many_priority_values() @@ -286,6 +348,49 @@ public void Unbounded_Priority_Mailbox_Supports_Unbounded_Stashing() ExpectNoMsg(TimeSpan.FromSeconds(0.3)); } + + [Fact] + public void Unbounded_Stable_Priority_Mailbox_Supports_Unbounded_Stashing() + { + var actor = (IInternalActorRef)Sys.ActorOf(StashingActor.Props(this).WithMailbox("stable-prio-mailbox"), "echo"); + + //pause mailbox until all messages have been told + actor.SendSystemMessage(new Suspend()); + + AwaitCondition(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf().Mailbox.IsSuspended()); + + var values = new int[10]; + var increment = (int)(UInt32.MaxValue / values.Length); + + for (var i = 0; i < values.Length; i++) + values[i] = Int32.MinValue + increment * i; + + // tell the actor in order + foreach (var value in values) + { + actor.Tell(value); + actor.Tell(value); + actor.Tell(value); + } + + actor.Tell(new StashingActor.Start()); + + //resume mailbox, this prevents the mailbox from running to early + actor.SendSystemMessage(new Resume(null)); + + this.Within(5.Seconds(), () => + { + // expect the messages in the original order + foreach (var value in values) + { + ExpectMsg(value); + ExpectMsg(value); + ExpectMsg(value); + } + }); + + ExpectNoMsg(TimeSpan.FromSeconds(0.3)); + } } } diff --git a/src/core/Akka.Tests/Util/StableListPriorityQueueSpec.cs b/src/core/Akka.Tests/Util/StableListPriorityQueueSpec.cs new file mode 100644 index 00000000000..fb6ab52c090 --- /dev/null +++ b/src/core/Akka.Tests/Util/StableListPriorityQueueSpec.cs @@ -0,0 +1,55 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2019 Lightbend Inc. +// // Copyright (C) 2013-2019 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Linq; +using Akka.Actor; +using Akka.Util; +using FsCheck; +using FsCheck.Xunit; + +namespace Akka.Tests.Util +{ + public class StableListPriorityQueueSpec + { + public static int Priority(object obj) + { + switch (obj) + { + case int i: + return i; + case string str: + return str.Length; + default: + return 1; + } + } + + [Property(MaxTest = 1000)] + public Property StableListPriorityQueue_must_be_stable(NonEmptyString[] values) + { + var sortedValues = values + .Select(x => x.Item) + .GroupBy(x => x.Length) + .OrderBy(x => x.Key) + .SelectMany(x => x).ToList(); + var pq = new StableListPriorityQueue(10, Priority); + + foreach (var i in values.Select(x => x.Item)) pq.Enqueue(new Envelope(i, ActorRefs.NoSender)); + + var isConsistent = pq.IsConsistent().ToProperty().Label("Expected queue to be consistent, but was not."); + + var queueValues = new List(); + while (pq.Count() > 0) queueValues.Add((string)pq.Dequeue().Message); + + var sequenceEqual = queueValues.SequenceEqual(sortedValues).ToProperty().Label( + $"Expected output to be [{string.Join(",", sortedValues)}] but was [{string.Join(",", queueValues)}]"); + + return sequenceEqual.And(isConsistent); + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Dispatch/Mailbox.cs b/src/core/Akka/Dispatch/Mailbox.cs index 416fd8eaf43..30c4e1d5925 100644 --- a/src/core/Akka/Dispatch/Mailbox.cs +++ b/src/core/Akka/Dispatch/Mailbox.cs @@ -682,7 +682,7 @@ public override IMessageQueue Create(IActorRef owner, ActorSystem system) /// Priority mailbox base class; unbounded mailbox that allows for prioritization of its contents. /// Extend this class and implement the method with your own prioritization. /// The value returned by the method will be used to order the message in the mailbox. - /// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order. + /// Lower values will be delivered first. Messages ordered by the same number will remain in delivered in undefined order. /// public abstract class UnboundedPriorityMailbox : MailboxType, IProducesMessageQueue { @@ -716,7 +716,45 @@ protected UnboundedPriorityMailbox(Settings settings, Config config) : base(sett } } - //todo: bounded priority mailbox; stable priority mailboxes + //todo: bounded priority mailbox; + + /// + /// Priority mailbox - an unbounded mailbox that allows for prioritization of its contents. + /// Extend this class and implement the method with your own prioritization. + /// The value returned by the method will be used to order the message in the mailbox. + /// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order. + /// + public abstract class UnboundedStablePriorityMailbox : MailboxType, IProducesMessageQueue + { + /// + /// Function responsible for generating the priority value of a message based on its type and content. + /// + /// The message to inspect. + /// An integer. The lower the value, the higher the priority. + protected abstract int PriorityGenerator(object message); + + /// + /// The initial capacity of the unbounded mailbox. + /// + public int InitialCapacity { get; } + + /// + /// The default capacity of an unbounded priority mailbox. + /// + public const int DefaultCapacity = 11; + + /// + public sealed override IMessageQueue Create(IActorRef owner, ActorSystem system) + { + return new UnboundedStablePriorityMessageQueue(PriorityGenerator, InitialCapacity); + } + + /// + protected UnboundedStablePriorityMailbox(Settings settings, Config config) : base(settings, config) + { + InitialCapacity = DefaultCapacity; + } + } /// /// UnboundedDequeBasedMailbox is an unbounded backed by a double-ended queue. Used for stashing. diff --git a/src/core/Akka/Dispatch/MessageQueues/UnboundedPriorityMessageQueue.cs b/src/core/Akka/Dispatch/MessageQueues/UnboundedPriorityMessageQueue.cs index 173a768ec5f..30240379647 100644 --- a/src/core/Akka/Dispatch/MessageQueues/UnboundedPriorityMessageQueue.cs +++ b/src/core/Akka/Dispatch/MessageQueues/UnboundedPriorityMessageQueue.cs @@ -94,5 +94,4 @@ public void EnqueueFirst(Envelope envelope) _prependBuffer.Push(envelope); } } -} - +} \ No newline at end of file diff --git a/src/core/Akka/Dispatch/MessageQueues/UnboundedStablePriorityMessageQueue.cs b/src/core/Akka/Dispatch/MessageQueues/UnboundedStablePriorityMessageQueue.cs new file mode 100644 index 00000000000..8d705096f41 --- /dev/null +++ b/src/core/Akka/Dispatch/MessageQueues/UnboundedStablePriorityMessageQueue.cs @@ -0,0 +1,89 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using Akka.Actor; +using Akka.Util; + +namespace Akka.Dispatch.MessageQueues +{ + /// + /// Base class for a message queue that uses a priority generator for messages + /// + public class UnboundedStablePriorityMessageQueue : BlockingMessageQueue, IUnboundedDequeBasedMessageQueueSemantics + { + private readonly StableListPriorityQueue _prioQueue; + // doesn't need to be threadsafe - only called from within actor + private readonly Stack _prependBuffer = new Stack(); + + + /// + /// Creates a new unbounded priority message queue. + /// + /// The calculator function for determining the priority of inbound messages. + /// The initial capacity of the queue. + public UnboundedStablePriorityMessageQueue(Func priorityGenerator, int initialCapacity) + { + _prioQueue = new StableListPriorityQueue(initialCapacity, priorityGenerator); + } + + /// + /// Unsafe method for computing the underlying message count. + /// + /// + /// Called from within a synchronization mechanism. + /// + protected override int LockedCount + { + get { return _prioQueue.Count(); } + } + + /// + /// Unsafe method for enqueuing a new message to the queue. + /// + /// The message to enqueue. + /// + /// Called from within a synchronization mechanism. + /// + protected override void LockedEnqueue(Envelope envelope) + { + _prioQueue.Enqueue(envelope); + } + + /// + /// Unsafe method for attempting to dequeue a message. + /// + /// The message that might be dequeued. + /// true if a message was available to be dequeued, false otherwise. + /// + /// Called from within a synchronization mechanism. + /// + protected override bool LockedTryDequeue(out Envelope envelope) + { + if (_prependBuffer.Count > 0) + { + envelope = _prependBuffer.Pop(); + return true; + } + + if (_prioQueue.Count() > 0) + { + envelope = _prioQueue.Dequeue(); + return true; + } + envelope = default(Envelope); + return false; + } + + public void EnqueueFirst(Envelope envelope) + { + _prependBuffer.Push(envelope); + } + } +} + diff --git a/src/core/Akka/Util/ListPriorityQueue.cs b/src/core/Akka/Util/ListPriorityQueue.cs index 640fc5cdab2..a745085d655 100644 --- a/src/core/Akka/Util/ListPriorityQueue.cs +++ b/src/core/Akka/Util/ListPriorityQueue.cs @@ -158,7 +158,4 @@ public bool IsConsistent() return true; // passed all checks } // IsConsistent } // ListPriorityQueue - - -} - +} \ No newline at end of file diff --git a/src/core/Akka/Util/StableListPriorityQueue.cs b/src/core/Akka/Util/StableListPriorityQueue.cs new file mode 100644 index 00000000000..ea8c7451de5 --- /dev/null +++ b/src/core/Akka/Util/StableListPriorityQueue.cs @@ -0,0 +1,174 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading; +using Akka.Actor; + +namespace Akka.Util +{ + /// + /// Priority queue implemented using a simple list with binary search for inserts. + /// This specific implementation is cheap in terms of memory but weak in terms of performance. + /// See http://visualstudiomagazine.com/articles/2012/11/01/priority-queues-with-c.aspx for original implementation + /// This specific version is adapted for Envelopes only and calculates a priority of envelope.Message + /// + public sealed class StableListPriorityQueue + { + private struct WrappedEnvelope + { + public WrappedEnvelope(Envelope envelope, int sequenceNumber) + { + Envelope = envelope; + SequenceNumber = sequenceNumber; + } + + public Envelope Envelope { get; } + public int SequenceNumber { get; } + } + + private class WrappedEnvelopeComparator + { + private readonly Func priorityCalculator; + + public WrappedEnvelopeComparator(Func priorityCalculator) + { + this.priorityCalculator = priorityCalculator; + } + + public int Compare(WrappedEnvelope x, WrappedEnvelope y) + { + var baseCompare = priorityCalculator(x.Envelope.Message).CompareTo(priorityCalculator(y.Envelope.Message)); + if (baseCompare != 0) return baseCompare; + return x.SequenceNumber.CompareTo(y.SequenceNumber); + } + } + + private readonly List _data; + private readonly WrappedEnvelopeComparator comparator; + + /// + /// The default priority generator. + /// + internal static readonly Func DefaultPriorityCalculator = message => 1; + private int sequenceNumber; + + /// + /// Creates a new priority queue. + /// + /// The initial capacity of the queue. + /// The calculator function for assigning message priorities. + public StableListPriorityQueue(int initialCapacity, Func priorityCalculator) + { + _data = new List(initialCapacity); + comparator = new WrappedEnvelopeComparator(priorityCalculator); + } + + /// + /// Enqueues a message into the priority queue. + /// + /// The item to enqueue. + public void Enqueue(Envelope item) + { + int seq = Interlocked.Increment(ref sequenceNumber); + var wrappedItem = new WrappedEnvelope(item, seq); + + _data.Add(wrappedItem); + var ci = _data.Count - 1; // child index; start at end + while (ci > 0) + { + var pi = (ci - 1) / 2; // parent index + if (comparator.Compare(_data[ci], _data[pi]) >= 0) break; // child item is larger than (or equal) parent so we're done + var tmp = _data[ci]; _data[ci] = _data[pi]; _data[pi] = tmp; + ci = pi; + } + } + + /// + /// Dequeues the highest priority message at the front of the priority queue. + /// + /// The highest priority message . + public Envelope Dequeue() + { + // assumes pq is not empty; up to calling code + var li = _data.Count - 1; // last index (before removal) + var frontItem = _data[0]; // fetch the front + _data[0] = _data[li]; + _data.RemoveAt(li); + + --li; // last index (after removal) + var pi = 0; // parent index. start at front of pq + while (true) + { + var ci = pi * 2 + 1; // left child index of parent + if (ci > li) break; // no children so done + var rc = ci + 1; // right child + if (rc <= li && comparator.Compare(_data[rc], _data[ci]) < 0) // if there is a rc (ci + 1), and it is smaller than left child, use the rc instead + ci = rc; + if (comparator.Compare(_data[pi], _data[ci]) <= 0) break; // parent is smaller than (or equal to) smallest child so done + var tmp = _data[pi]; _data[pi] = _data[ci]; _data[ci] = tmp; // swap parent and child + pi = ci; + } + return frontItem.Envelope; + } + + /// + /// Peek at the message at the front of the priority queue. + /// + /// The highest priority message . + public Envelope Peek() + { + return _data[0].Envelope; + } + + /// + /// Counts the number of items in the priority queue. + /// + /// The total number of items in the queue. + public int Count() + { + return _data.Count; + } + + /// + /// Converts the queue to a string representation. + /// + /// A string representation of the queue. + public override string ToString() + { + var s = ""; + for (var i = 0; i < _data.Count; ++i) + s += _data[i].ToString() + " "; + s += "count = " + _data.Count; + return s; + } + + /// + /// Checks the integrity of the StableListPriorityQueue. + /// + /// true if the list is consistent, false otherwise. + /// + /// WARNING: high performance impact. Call during testing only. + /// + public bool IsConsistent() + { + // is the heap property true for all data? + if (_data.Count == 0) return true; + var li = _data.Count - 1; // last index + for (var pi = 0; pi < _data.Count; ++pi) // each parent index + { + var lci = 2 * pi + 1; // left child index + var rci = 2 * pi + 2; // right child index + + if (lci <= li && comparator.Compare(_data[pi], _data[lci]) > 0) return false; // if lc exists and it's greater than parent then bad. + if (rci <= li && comparator.Compare(_data[pi], _data[rci]) > 0) return false; // check the right child too. + } + return true; // passed all checks + } + } +} \ No newline at end of file