Skip to content

Commit

Permalink
Optimize AtLeastOnceDelivery by not scheduling ticks when not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Feb 11, 2019
1 parent 010dfd6 commit 56097a2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace Akka.Persistence
public Akka.Persistence.AtLeastOnceDeliverySemantic.Delivery IncrementedCopy() { }
public override string ToString() { }
}
public sealed class RedeliveryTick : Akka.Actor.INotInfluenceReceiveTimeout
public sealed class RedeliveryTick : Akka.Actor.INotInfluenceReceiveTimeout, Akka.Event.IDeadLetterSuppression
{
public static Akka.Persistence.AtLeastOnceDeliverySemantic.RedeliveryTick Instance { get; }
public override bool Equals(object obj) { }
Expand Down
17 changes: 10 additions & 7 deletions src/core/Akka.Persistence/AtLeastOnceDeliverySemantic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.Serialization;

namespace Akka.Persistence
Expand Down Expand Up @@ -296,7 +297,7 @@ public override int GetHashCode()
}

[Serializable]
public sealed class RedeliveryTick : INotInfluenceReceiveTimeout
public sealed class RedeliveryTick : INotInfluenceReceiveTimeout, IDeadLetterSuppression
{
/// <summary>
/// The singleton instance of the redelivery tick
Expand Down Expand Up @@ -378,7 +379,8 @@ public AtLeastOnceDeliverySemantic(IActorContext context, PersistenceSettings.At

private void StartRedeliverTask()
{
var interval = new TimeSpan(RedeliverInterval.Ticks/2);
if (_redeliverScheduleCancelable != null) return;
var interval = new TimeSpan(RedeliverInterval.Ticks / 2);
_redeliverScheduleCancelable = _context.System.Scheduler.ScheduleTellRepeatedlyCancelable(interval, interval, _context.Self,
RedeliveryTick.Instance, _context.Self);
}
Expand Down Expand Up @@ -429,6 +431,7 @@ public bool ConfirmDelivery(long deliveryId)
{
var before = _unconfirmed;
_unconfirmed = _unconfirmed.Remove(deliveryId);
if (_unconfirmed.IsEmpty) Cancel();
return _unconfirmed.Count < before.Count;
}

Expand Down Expand Up @@ -462,11 +465,9 @@ var entry in _unconfirmed.Where(e => e.Value.Timestamp <= deadline).Take(Redeliv

private void Send(long deliveryId, Delivery delivery, DateTime timestamp)
{
ActorSelection destination = _context.ActorSelection(delivery.Destination);
destination.Tell(delivery.Message);

_unconfirmed = _unconfirmed.SetItem(deliveryId,
new Delivery(delivery.Destination, delivery.Message, timestamp, delivery.Attempt + 1));
_context.ActorSelection(delivery.Destination).Tell(delivery.Message);
_unconfirmed = _unconfirmed.SetItem(deliveryId, new Delivery(delivery.Destination, delivery.Message, timestamp, delivery.Attempt + 1));
StartRedeliverTask();
}

/// <summary>
Expand Down Expand Up @@ -513,6 +514,7 @@ public void Cancel()
{
// need a null check here, in case actor is terminated before StartRedeliverTask() is called
_redeliverScheduleCancelable?.Cancel();
_redeliverScheduleCancelable = null;
}


Expand All @@ -521,6 +523,7 @@ public void Cancel()
/// </summary>
public void OnReplaySuccess()
{
if (_unconfirmed.IsEmpty) return;
RedeliverOverdue();
StartRedeliverTask();
}
Expand Down

0 comments on commit 56097a2

Please sign in to comment.