-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix HealthCheck.Persistence littering journal and snapshot store #206
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,16 +71,18 @@ public class AkkaPersistenceLivenessProbe : ActorBase | |
{ | ||
private readonly ILoggingAdapter _log = Context.GetLogger(); | ||
private readonly HashSet<IActorRef> _subscribers = new HashSet<IActorRef>(); | ||
private PersistenceLivenessStatus _currentLivenessStatus = new PersistenceLivenessStatus(message: "Persistence is still starting up"); | ||
private PersistenceLivenessStatus _currentLivenessStatus = new(message: "Warming up probe. Recovery status is still undefined"); | ||
private IActorRef? _probe; | ||
private int _probeCounter; | ||
private readonly TimeSpan _delay; | ||
private readonly string _id; | ||
private readonly Cancelable _shutdownCancellable; | ||
|
||
public AkkaPersistenceLivenessProbe(TimeSpan delay) | ||
{ | ||
_delay = delay; | ||
_id = Guid.NewGuid().ToString("N"); | ||
_shutdownCancellable = new Cancelable(Context.System.Scheduler); | ||
} | ||
public AkkaPersistenceLivenessProbe() : this(TimeSpan.FromSeconds(10)) | ||
{ | ||
|
@@ -93,6 +95,13 @@ public static Props PersistentHealthCheckProps() | |
.WithSupervisorStrategy(Actor.SupervisorStrategy.StoppingStrategy); | ||
} | ||
|
||
protected override void PostStop() | ||
{ | ||
_shutdownCancellable.Cancel(); | ||
_shutdownCancellable.Dispose(); | ||
base.PostStop(); | ||
} | ||
|
||
private bool HandleSubscriptions(object msg) | ||
{ | ||
switch (msg) | ||
|
@@ -124,8 +133,9 @@ private bool Started(object message) | |
switch (message) | ||
{ | ||
case Terminated t when t.ActorRef.Equals(_probe): | ||
Context.Unwatch(_probe); | ||
_log.Info("Persistence probe terminated. Recreating..."); | ||
CreateProbe(false); | ||
CreateProbe(); | ||
Become(obj => Recreating(obj) || HandleSubscriptions(obj)); | ||
return true; | ||
case PersistenceLivenessStatus status: | ||
|
@@ -148,8 +158,9 @@ private bool Recreating(object message) | |
switch (message) | ||
{ | ||
case Terminated t when t.ActorRef.Equals(_probe): | ||
Context.Unwatch(_probe); | ||
_log.Debug("Persistence probe terminated. Recreating..."); | ||
CreateProbe(false); | ||
CreateProbe(); | ||
return true; | ||
case PersistenceLivenessStatus status: | ||
HandleRecoveryStatus(status); | ||
|
@@ -166,21 +177,23 @@ protected override bool Receive(object message) | |
|
||
protected override void PreStart() | ||
{ | ||
CreateProbe(true); | ||
CreateProbe(); | ||
} | ||
|
||
private void CreateProbe(bool firstTime) | ||
private void CreateProbe() | ||
{ | ||
_probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, firstTime, _id))); | ||
if(firstTime) | ||
_probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _probeCounter == 0, _id))); | ||
Context.Watch(_probe); | ||
|
||
if(_probeCounter == 0) | ||
{ | ||
_probe.Tell("hit" + _probeCounter++); | ||
_probe.Tell("hit" + _probeCounter); | ||
} | ||
else | ||
{ | ||
Context.System.Scheduler.ScheduleTellOnce(_delay, _probe, "hit" + _probeCounter++, Self); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why don't we version this any longer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see the merit of versioning this, we already have "versioning" via the sequence number There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Chesterton's Fence - why remove it if it's not hurting anything? I can't tell you, given how old the code is, why it was there in the first place. |
||
Context.System.Scheduler.ScheduleTellOnce(_delay, _probe, "hit" + _probeCounter, Self, _shutdownCancellable); | ||
} | ||
Context.Watch(_probe); | ||
_probeCounter++; | ||
} | ||
|
||
private void PublishStatusUpdates() | ||
|
@@ -221,6 +234,11 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id) | |
{ | ||
_recoveredSnapshotStore = true; | ||
}); | ||
Recover<RecoveryCompleted>(_ => | ||
{ | ||
DeleteMessages(long.MaxValue); | ||
DeleteSnapshots(new SnapshotSelectionCriteria(long.MaxValue)); | ||
}); | ||
|
||
Command<string>(str => | ||
{ | ||
|
@@ -235,12 +253,6 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id) | |
s => | ||
{ | ||
_persistedJournal = true; | ||
|
||
if (!_firstAttempt) | ||
{ | ||
DeleteMessages(save.Metadata.SequenceNr - 1); | ||
Arkatufus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
DeleteSnapshots(new SnapshotSelectionCriteria(save.Metadata.SequenceNr - 1)); | ||
} | ||
SendRecoveryStatusWhenFinished(); | ||
}); | ||
}); | ||
|
@@ -257,8 +269,6 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id) | |
_persistedJournal = true; | ||
SendRecoveryStatusWhenFinished(); | ||
}); | ||
|
||
SendRecoveryStatusWhenFinished(); | ||
}); | ||
|
||
Command<DeleteMessagesSuccess>(_ => | ||
|
@@ -306,13 +316,16 @@ private void SendRecoveryStatusWhenFinished() | |
&& _persistedSnapshotStore is { }) | ||
{ | ||
var msg = _persistedJournal == true && _persistedSnapshotStore == true | ||
? "Warming up probe. Recovery status is still undefined" : null; | ||
? "Warming up probe. Recovery status is still undefined" | ||
: null; | ||
_probe.Tell(CreateStatus(msg)); | ||
Context.Stop(Self); | ||
} | ||
|
||
// Third case, all fields should be populated | ||
if (_persistedJournal is { } | ||
if (_recoveredJournal is { } | ||
&& _recoveredSnapshotStore is { } | ||
&& _persistedJournal is { } | ||
&& _persistedSnapshotStore is { } | ||
&& _deletedJournal is { } | ||
&& _deletedSnapshotStore is { }) | ||
|
@@ -328,15 +341,16 @@ protected override void OnPersistFailure(Exception cause, object @event, long se | |
_failures.Add(cause); | ||
_persistedJournal = false; | ||
_probe.Tell(CreateStatus("Journal persist failure")); | ||
// Actor is automatically killed here. | ||
Context.Stop(Self); | ||
} | ||
|
||
protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr) | ||
{ | ||
_log.Error(cause, "Journal persist rejected"); | ||
_failures.Add(cause); | ||
_persistedJournal = false; | ||
SendRecoveryStatusWhenFinished(); | ||
_probe.Tell(CreateStatus("Journal persist rejected")); | ||
Context.Stop(Self); | ||
} | ||
|
||
protected override void OnRecoveryFailure(Exception reason, object? message = null) | ||
|
@@ -346,7 +360,7 @@ protected override void OnRecoveryFailure(Exception reason, object? message = nu | |
|
||
_failures.Add(reason); | ||
_probe.Tell(CreateStatus(msg)); | ||
// Actor is automatically killed here. | ||
Context.Stop(Self); | ||
} | ||
|
||
private PersistenceLivenessStatus CreateStatus(string? message = null) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the counter back in