Skip to content

Commit

Permalink
Fix HealthCheck.Persistence littering journal and snapshot store (#206)
Browse files Browse the repository at this point in the history
* Fix HealthCheck.Persistence littering journal and snapshot store

* Refactor delete, move to RecoveryCompleted

* Code cleanup

* Put the counter back in
  • Loading branch information
Arkatufus authored Mar 8, 2023
1 parent 2480b75 commit b02cbbc
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<PackageReference Include="xunit.runner.visualstudio" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.HealthCheck.Persistence\Akka.HealthCheck.Persistence.csproj" />
<ItemGroup>
<ProjectReference Include="..\Akka.HealthCheck.Persistence\Akka.HealthCheck.Persistence.csproj" />
</ItemGroup>

</Project>
8 changes: 5 additions & 3 deletions src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace Akka.HealthCheck.Persistence.Tests
{
public class ProbeFailureSpec: PersistenceTestKit
{

private readonly string _id = Guid.NewGuid().ToString("N");
private int _count;

Expand Down Expand Up @@ -224,14 +225,15 @@ await WithJournalDelete(delete => delete.Fail(), () =>

private PersistenceLivenessStatus PerformProbe()
{
var first = _count == 0;
_count++;
var liveProbe = ActorOf(() => new SuicideProbe(TestActor, _count == 1, _id));
var liveProbe = ActorOf(() => new SuicideProbe(TestActor, first, _id));
Watch(liveProbe);
liveProbe.Tell($"hit-{_count}");
liveProbe.Tell("hit");
var status = ExpectMsg<PersistenceLivenessStatus>();
ExpectTerminated(liveProbe);
Unwatch(liveProbe);

return status;
}

Expand Down
60 changes: 37 additions & 23 deletions src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ public class AkkaPersistenceLivenessProbe : ActorBase
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly HashSet<IActorRef> _subscribers = new HashSet<IActorRef>();
private PersistenceLivenessStatus _currentLivenessStatus = new PersistenceLivenessStatus(message: "Persistence is still starting up");
private PersistenceLivenessStatus _currentLivenessStatus = new(message: "Warming up probe. Recovery status is still undefined");
private IActorRef? _probe;
private int _probeCounter;
private readonly TimeSpan _delay;
private readonly string _id;
private readonly Cancelable _shutdownCancellable;

public AkkaPersistenceLivenessProbe(TimeSpan delay)
{
_delay = delay;
_id = Guid.NewGuid().ToString("N");
_shutdownCancellable = new Cancelable(Context.System.Scheduler);
}
public AkkaPersistenceLivenessProbe() : this(TimeSpan.FromSeconds(10))
{
Expand All @@ -93,6 +95,13 @@ public static Props PersistentHealthCheckProps()
.WithSupervisorStrategy(Actor.SupervisorStrategy.StoppingStrategy);
}

protected override void PostStop()
{
_shutdownCancellable.Cancel();
_shutdownCancellable.Dispose();
base.PostStop();
}

private bool HandleSubscriptions(object msg)
{
switch (msg)
Expand Down Expand Up @@ -124,8 +133,9 @@ private bool Started(object message)
switch (message)
{
case Terminated t when t.ActorRef.Equals(_probe):
Context.Unwatch(_probe);
_log.Info("Persistence probe terminated. Recreating...");
CreateProbe(false);
CreateProbe();
Become(obj => Recreating(obj) || HandleSubscriptions(obj));
return true;
case PersistenceLivenessStatus status:
Expand All @@ -148,8 +158,9 @@ private bool Recreating(object message)
switch (message)
{
case Terminated t when t.ActorRef.Equals(_probe):
Context.Unwatch(_probe);
_log.Debug("Persistence probe terminated. Recreating...");
CreateProbe(false);
CreateProbe();
return true;
case PersistenceLivenessStatus status:
HandleRecoveryStatus(status);
Expand All @@ -166,21 +177,23 @@ protected override bool Receive(object message)

protected override void PreStart()
{
CreateProbe(true);
CreateProbe();
}

private void CreateProbe(bool firstTime)
private void CreateProbe()
{
_probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, firstTime, _id)));
if(firstTime)
_probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _probeCounter == 0, _id)));
Context.Watch(_probe);

if(_probeCounter == 0)
{
_probe.Tell("hit" + _probeCounter++);
_probe.Tell("hit" + _probeCounter);
}
else
{
Context.System.Scheduler.ScheduleTellOnce(_delay, _probe, "hit" + _probeCounter++, Self);
Context.System.Scheduler.ScheduleTellOnce(_delay, _probe, "hit" + _probeCounter, Self, _shutdownCancellable);
}
Context.Watch(_probe);
_probeCounter++;
}

private void PublishStatusUpdates()
Expand Down Expand Up @@ -221,6 +234,11 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id)
{
_recoveredSnapshotStore = true;
});
Recover<RecoveryCompleted>(_ =>
{
DeleteMessages(long.MaxValue);
DeleteSnapshots(new SnapshotSelectionCriteria(long.MaxValue));
});

Command<string>(str =>
{
Expand All @@ -235,12 +253,6 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id)
s =>
{
_persistedJournal = true;
if (!_firstAttempt)
{
DeleteMessages(save.Metadata.SequenceNr - 1);
DeleteSnapshots(new SnapshotSelectionCriteria(save.Metadata.SequenceNr - 1));
}
SendRecoveryStatusWhenFinished();
});
});
Expand All @@ -257,8 +269,6 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id)
_persistedJournal = true;
SendRecoveryStatusWhenFinished();
});
SendRecoveryStatusWhenFinished();
});

Command<DeleteMessagesSuccess>(_ =>
Expand Down Expand Up @@ -306,13 +316,16 @@ private void SendRecoveryStatusWhenFinished()
&& _persistedSnapshotStore is { })
{
var msg = _persistedJournal == true && _persistedSnapshotStore == true
? "Warming up probe. Recovery status is still undefined" : null;
? "Warming up probe. Recovery status is still undefined"
: null;
_probe.Tell(CreateStatus(msg));
Context.Stop(Self);
}

// Third case, all fields should be populated
if (_persistedJournal is { }
if (_recoveredJournal is { }
&& _recoveredSnapshotStore is { }
&& _persistedJournal is { }
&& _persistedSnapshotStore is { }
&& _deletedJournal is { }
&& _deletedSnapshotStore is { })
Expand All @@ -328,15 +341,16 @@ protected override void OnPersistFailure(Exception cause, object @event, long se
_failures.Add(cause);
_persistedJournal = false;
_probe.Tell(CreateStatus("Journal persist failure"));
// Actor is automatically killed here.
Context.Stop(Self);
}

protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr)
{
_log.Error(cause, "Journal persist rejected");
_failures.Add(cause);
_persistedJournal = false;
SendRecoveryStatusWhenFinished();
_probe.Tell(CreateStatus("Journal persist rejected"));
Context.Stop(Self);
}

protected override void OnRecoveryFailure(Exception reason, object? message = null)
Expand All @@ -346,7 +360,7 @@ protected override void OnRecoveryFailure(Exception reason, object? message = nu

_failures.Add(reason);
_probe.Tell(CreateStatus(msg));
// Actor is automatically killed here.
Context.Stop(Self);
}

private PersistenceLivenessStatus CreateStatus(string? message = null)
Expand Down

0 comments on commit b02cbbc

Please sign in to comment.