-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added an UnboundStablePriorityMailbox (#3536)
* Added an UnboundStablePriorityMailbox, sending messages according to priority. Messages with the same priority will be send using the same order as they appear. #2652 is the related issue. * added PropertyTest for StableListPriorityQueue * forgot to add spec * verified that the UnboundedStablePriorityMailbox can be loaded and used * validate4d that UnboundedStablePriorityMailbox supports stashing * added API approval for core
- Loading branch information
1 parent
da9ee86
commit a4876ff
Showing
8 changed files
with
492 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
// //----------------------------------------------------------------------- | ||
// // <copyright file="StableListPriorityQueueSpec.cs" company="Akka.NET Project"> | ||
// // Copyright (C) 2009-2019 Lightbend Inc. <http://www.lightbend.com> | ||
// // Copyright (C) 2013-2019 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// // </copyright> | ||
// //----------------------------------------------------------------------- | ||
|
||
using System.Collections.Generic; | ||
using System.Linq; | ||
using Akka.Actor; | ||
using Akka.Util; | ||
using FsCheck; | ||
using FsCheck.Xunit; | ||
|
||
namespace Akka.Tests.Util | ||
{ | ||
public class StableListPriorityQueueSpec | ||
{ | ||
public static int Priority(object obj) | ||
{ | ||
switch (obj) | ||
{ | ||
case int i: | ||
return i; | ||
case string str: | ||
return str.Length; | ||
default: | ||
return 1; | ||
} | ||
} | ||
|
||
[Property(MaxTest = 1000)] | ||
public Property StableListPriorityQueue_must_be_stable(NonEmptyString[] values) | ||
{ | ||
var sortedValues = values | ||
.Select(x => x.Item) | ||
.GroupBy(x => x.Length) | ||
.OrderBy(x => x.Key) | ||
.SelectMany(x => x).ToList(); | ||
var pq = new StableListPriorityQueue(10, Priority); | ||
|
||
foreach (var i in values.Select(x => x.Item)) pq.Enqueue(new Envelope(i, ActorRefs.NoSender)); | ||
|
||
var isConsistent = pq.IsConsistent().ToProperty().Label("Expected queue to be consistent, but was not."); | ||
|
||
var queueValues = new List<string>(); | ||
while (pq.Count() > 0) queueValues.Add((string)pq.Dequeue().Message); | ||
|
||
var sequenceEqual = queueValues.SequenceEqual(sortedValues).ToProperty().Label( | ||
$"Expected output to be [{string.Join(",", sortedValues)}] but was [{string.Join(",", queueValues)}]"); | ||
|
||
return sequenceEqual.And(isConsistent); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,5 +94,4 @@ public void EnqueueFirst(Envelope envelope) | |
_prependBuffer.Push(envelope); | ||
} | ||
} | ||
} | ||
|
||
} |
89 changes: 89 additions & 0 deletions
89
src/core/Akka/Dispatch/MessageQueues/UnboundedStablePriorityMessageQueue.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="UnboundedPriorityMessageQueue.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using Akka.Actor; | ||
using Akka.Util; | ||
|
||
namespace Akka.Dispatch.MessageQueues | ||
{ | ||
/// <summary> | ||
/// Base class for a message queue that uses a priority generator for messages | ||
/// </summary> | ||
public class UnboundedStablePriorityMessageQueue : BlockingMessageQueue, IUnboundedDequeBasedMessageQueueSemantics | ||
{ | ||
private readonly StableListPriorityQueue _prioQueue; | ||
// doesn't need to be threadsafe - only called from within actor | ||
private readonly Stack<Envelope> _prependBuffer = new Stack<Envelope>(); | ||
|
||
|
||
/// <summary> | ||
/// Creates a new unbounded priority message queue. | ||
/// </summary> | ||
/// <param name="priorityGenerator">The calculator function for determining the priority of inbound messages.</param> | ||
/// <param name="initialCapacity">The initial capacity of the queue.</param> | ||
public UnboundedStablePriorityMessageQueue(Func<object, int> priorityGenerator, int initialCapacity) | ||
{ | ||
_prioQueue = new StableListPriorityQueue(initialCapacity, priorityGenerator); | ||
} | ||
|
||
/// <summary> | ||
/// Unsafe method for computing the underlying message count. | ||
/// </summary> | ||
/// <remarks> | ||
/// Called from within a synchronization mechanism. | ||
/// </remarks> | ||
protected override int LockedCount | ||
{ | ||
get { return _prioQueue.Count(); } | ||
} | ||
|
||
/// <summary> | ||
/// Unsafe method for enqueuing a new message to the queue. | ||
/// </summary> | ||
/// <param name="envelope">The message to enqueue.</param> | ||
/// <remarks> | ||
/// Called from within a synchronization mechanism. | ||
/// </remarks> | ||
protected override void LockedEnqueue(Envelope envelope) | ||
{ | ||
_prioQueue.Enqueue(envelope); | ||
} | ||
|
||
/// <summary> | ||
/// Unsafe method for attempting to dequeue a message. | ||
/// </summary> | ||
/// <param name="envelope">The message that might be dequeued.</param> | ||
/// <returns><c>true</c> if a message was available to be dequeued, <c>false</c> otherwise.</returns> | ||
/// <remarks> | ||
/// Called from within a synchronization mechanism. | ||
/// </remarks> | ||
protected override bool LockedTryDequeue(out Envelope envelope) | ||
{ | ||
if (_prependBuffer.Count > 0) | ||
{ | ||
envelope = _prependBuffer.Pop(); | ||
return true; | ||
} | ||
|
||
if (_prioQueue.Count() > 0) | ||
{ | ||
envelope = _prioQueue.Dequeue(); | ||
return true; | ||
} | ||
envelope = default(Envelope); | ||
return false; | ||
} | ||
|
||
public void EnqueueFirst(Envelope envelope) | ||
{ | ||
_prependBuffer.Push(envelope); | ||
} | ||
} | ||
} | ||
|
Oops, something went wrong.