From b9e5737c01b91cb40f368012a05ee46674074843 Mon Sep 17 00:00:00 2001 From: Joshua Benjamin Date: Fri, 24 Apr 2015 09:17:59 -0700 Subject: [PATCH] Fixed to notification on DeathWatch watching. --- src/core/Akka/Actor/ActorCell.DeathWatch.cs | 88 +++++++++++---------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/src/core/Akka/Actor/ActorCell.DeathWatch.cs b/src/core/Akka/Actor/ActorCell.DeathWatch.cs index 27fbd5d91d2..ff6cbcfbe1d 100644 --- a/src/core/Akka/Actor/ActorCell.DeathWatch.cs +++ b/src/core/Akka/Actor/ActorCell.DeathWatch.cs @@ -25,7 +25,7 @@ public IActorRef Watch(IActorRef subject) MaintainAddressTerminatedSubscription(() => { a.Tell(new Watch(a, Self)); // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS - _state = _state.AddWatching(a); + _state = _state.AddWatching(a); }, a); } return a; @@ -34,12 +34,12 @@ public IActorRef Watch(IActorRef subject) public IActorRef Unwatch(IActorRef subject) { var a = (IInternalActorRef)subject; - if (! a.Equals(Self) && WatchingContains(a)) + if (!a.Equals(Self) && WatchingContains(a)) { a.Tell(new Unwatch(a, Self)); MaintainAddressTerminatedSubscription(() => { - _state = _state.RemoveWatching(a); + _state = _state.RemoveWatching(a); }, a); } _state = _state.RemoveTerminated(a); @@ -48,11 +48,11 @@ public IActorRef Unwatch(IActorRef subject) protected void ReceivedTerminated(Terminated t) { - if (_state.ContainsTerminated(t.ActorRef)) - { - _state = _state.RemoveTerminated(t.ActorRef); // here we know that it is the SAME ref which was put in - ReceiveMessage(t); - } + if (!_state.ContainsTerminated(t.ActorRef)) + return; + + _state = _state.RemoveTerminated(t.ActorRef); // here we know that it is the SAME ref which was put in + ReceiveMessage(t); } /// @@ -87,12 +87,15 @@ public void TerminatedQueuedFor(IActorRef subject) private bool WatchingContains(IActorRef subject) { return _state.ContainsWatching(subject) || - (subject.Path.Uid != ActorCell.UndefinedUid && _state.ContainsWatching(new UndefinedUidActorRef(subject))); + (subject.Path.Uid != UndefinedUid && _state.ContainsWatching(new UndefinedUidActorRef(subject))); } protected void TellWatchersWeDied() { - var watchedBy = _state.GetWatchedBy(); + var watchedBy = _state + .GetWatchedBy() + .ToList(); + if (!watchedBy.Any()) return; try { @@ -124,26 +127,28 @@ private void SendTerminated(bool ifLocal, IActorRef watcher) { if (((IActorRefScope)watcher).IsLocal == ifLocal && !watcher.Equals(Parent)) { - ((IInternalActorRef)watcher).Tell(new DeathWatchNotification(Self, true, false)); + watcher.Tell(new DeathWatchNotification(Self, true, false)); } } protected void UnwatchWatchedActors(ActorBase actor) { - var watching = _state.GetWatching(); - if(!watching.Any()) return; + var watching = _state + .GetWatching() + .ToList(); + + if (!watching.Any()) return; + MaintainAddressTerminatedSubscription(() => { try { - foreach ( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS - var watchee in watching.OfType()) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS + foreach (var watchee in watching.OfType()) watchee.Tell(new Unwatch(watchee, Self)); } finally { - //_watching = new HashSet(); - //_terminatedQueue = new HashSet(); _state = _state.ClearWatching(); _state = _state.ClearTerminated(); } @@ -157,13 +162,13 @@ protected void AddWatcher(IActorRef watchee, IActorRef watcher) if (watcheeSelf && !watcherSelf) { - if(!_state.ContainsWatchedBy(watcher)) MaintainAddressTerminatedSubscription(() => - { - //_watchedBy.Add(watcher); - _state = _state.AddWatchedBy(watcher); - - if(System.Settings.DebugLifecycle) Publish(new Debug(Self.Path.ToString(), Actor.GetType(), string.Format("now watched by {0}", watcher))); - }, watcher); + if (!_state.ContainsWatchedBy(watcher)) MaintainAddressTerminatedSubscription(() => + { + //_watchedBy.Add(watcher); + _state = _state.AddWatchedBy(watcher); + + if (System.Settings.DebugLifecycle) Publish(new Debug(Self.Path.ToString(), Actor.GetType(), string.Format("now watched by {0}", watcher))); + }, watcher); } else if (!watcheeSelf && watcherSelf) { @@ -182,13 +187,13 @@ protected void RemWatcher(IActorRef watchee, IActorRef watcher) if (watcheeSelf && !watcherSelf) { - if( _state.ContainsWatchedBy(watcher)) MaintainAddressTerminatedSubscription(() => + if (_state.ContainsWatchedBy(watcher)) MaintainAddressTerminatedSubscription(() => { //_watchedBy.Remove(watcher); _state = _state.RemoveWatchedBy(watcher); - + if (System.Settings.DebugLifecycle) Publish(new Debug(Self.Path.ToString(), Actor.GetType(), string.Format("no longer watched by {0}", watcher))); - } , watcher); + }, watcher); } else if (!watcheeSelf && watcherSelf) { @@ -202,27 +207,23 @@ protected void RemWatcher(IActorRef watchee, IActorRef watcher) protected void AddressTerminated(Address address) { - var watchedBy = _state.GetWatchedBy(); // cleanup watchedBy since we know they are dead MaintainAddressTerminatedSubscription(() => { - foreach (var a in watchedBy.Where(a => a.Path.Address == address)) + foreach (var a in _state.GetWatchedBy().Where(a => a.Path.Address == address)) { //_watchedBy.Remove(a); _state = _state.RemoveWatchedBy(a); } }); - // - watchedBy = _state.GetWatchedBy(); - // send DeathWatchNotification to self for all matching subjects // that are not child with existenceConfirmed = false because we could have been watching a // non-local ActorRef that had never resolved before the other node went down // When a parent is watching a child and it terminates due to AddressTerminated // it is removed by sending DeathWatchNotification with existenceConfirmed = true to support // immediate creation of child with same name. - foreach (var a in watchedBy.Where(a => a.Path.Address == address)) + foreach (var a in _state.GetWatching().Where(a => a.Path.Address == address)) { Self.Tell(new DeathWatchNotification(a, true /*TODO: childrenRefs.getByRef(a).isDefined*/, true)); } @@ -234,15 +235,18 @@ protected void AddressTerminated(Address address) /// Ends subscription to AddressTerminated if subscribing and the /// block removes the last non-local ref from watching and watchedBy. /// - private void MaintainAddressTerminatedSubscription(Action block, IActorRef change= null) + private void MaintainAddressTerminatedSubscription(Action block, IActorRef change = null) { if (IsNonLocal(change)) { var had = HasNonLocalAddress(); block(); var has = HasNonLocalAddress(); - if (had && !has) UnsubscribeAddressTerminated(); - else if (!had && has) SubscribeAddressTerminated(); + + if (had && !has) + UnsubscribeAddressTerminated(); + else if (!had && has) + SubscribeAddressTerminated(); } else { @@ -252,10 +256,11 @@ private void MaintainAddressTerminatedSubscription(Action block, IActorRef chang private static bool IsNonLocal(IActorRef @ref) { - if (@ref == null) return true; + if (@ref == null) + return true; + var a = @ref as IInternalActorRef; - if (a != null && !a.IsLocal) return true; - return false; + return a != null && !a.IsLocal; } private bool HasNonLocalAddress() @@ -298,7 +303,4 @@ public override IActorRefProvider Provider } } } - - -} - +} \ No newline at end of file