From 56097a26e1f041c4f46f74914cb907b63b73d37b Mon Sep 17 00:00:00 2001 From: Ismael Hamed Date: Sat, 9 Feb 2019 10:00:35 +0100 Subject: [PATCH] Optimize AtLeastOnceDelivery by not scheduling ticks when not needed --- .../CoreAPISpec.ApprovePersistence.approved.txt | 2 +- .../AtLeastOnceDeliverySemantic.cs | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt index 8c59631138f..2389db69aa1 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt @@ -77,7 +77,7 @@ namespace Akka.Persistence public Akka.Persistence.AtLeastOnceDeliverySemantic.Delivery IncrementedCopy() { } public override string ToString() { } } - public sealed class RedeliveryTick : Akka.Actor.INotInfluenceReceiveTimeout + public sealed class RedeliveryTick : Akka.Actor.INotInfluenceReceiveTimeout, Akka.Event.IDeadLetterSuppression { public static Akka.Persistence.AtLeastOnceDeliverySemantic.RedeliveryTick Instance { get; } public override bool Equals(object obj) { } diff --git a/src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs b/src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs index e6e726f6b9d..a5372c7da31 100644 --- a/src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs +++ b/src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs @@ -11,6 +11,7 @@ using System.Linq; using System.Runtime.Serialization; using Akka.Actor; +using Akka.Event; using Akka.Persistence.Serialization; namespace Akka.Persistence @@ -296,7 +297,7 @@ public override int GetHashCode() } [Serializable] - public sealed class RedeliveryTick : INotInfluenceReceiveTimeout + public sealed class RedeliveryTick : INotInfluenceReceiveTimeout, IDeadLetterSuppression { /// /// The singleton instance of the redelivery tick @@ -378,7 +379,8 @@ public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.At private void StartRedeliverTask() { - var interval = new TimeSpan(RedeliverInterval.Ticks/2); + if (_redeliverScheduleCancelable != null) return; + var interval = new TimeSpan(RedeliverInterval.Ticks / 2); _redeliverScheduleCancelable = _context.System.Scheduler.ScheduleTellRepeatedlyCancelable(interval, interval, _context.Self, RedeliveryTick.Instance, _context.Self); } @@ -429,6 +431,7 @@ public bool ConfirmDelivery(long deliveryId) { var before = _unconfirmed; _unconfirmed = _unconfirmed.Remove(deliveryId); + if (_unconfirmed.IsEmpty) Cancel(); return _unconfirmed.Count < before.Count; } @@ -462,11 +465,9 @@ var entry in _unconfirmed.Where(e => e.Value.Timestamp <= deadline).Take(Redeliv private void Send(long deliveryId, Delivery delivery, DateTime timestamp) { - ActorSelection destination = _context.ActorSelection(delivery.Destination); - destination.Tell(delivery.Message); - - _unconfirmed = _unconfirmed.SetItem(deliveryId, - new Delivery(delivery.Destination, delivery.Message, timestamp, delivery.Attempt + 1)); + _context.ActorSelection(delivery.Destination).Tell(delivery.Message); + _unconfirmed = _unconfirmed.SetItem(deliveryId, new Delivery(delivery.Destination, delivery.Message, timestamp, delivery.Attempt + 1)); + StartRedeliverTask(); } /// @@ -513,6 +514,7 @@ public void Cancel() { // need a null check here, in case actor is terminated before StartRedeliverTask() is called _redeliverScheduleCancelable?.Cancel(); + _redeliverScheduleCancelable = null; } @@ -521,6 +523,7 @@ public void Cancel() /// public void OnReplaySuccess() { + if (_unconfirmed.IsEmpty) return; RedeliverOverdue(); StartRedeliverTask(); }