Skip to content

Commit

Permalink
reproduced akkadotnet#5717
Browse files Browse the repository at this point in the history
Reproduced `IActorRef` leak inside the `EventStream`
  • Loading branch information
Aaronontheweb committed Mar 14, 2022
1 parent 7123d0f commit e805350
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 29 deletions.
49 changes: 49 additions & 0 deletions src/core/Akka.Tests/Event/Bugfix5717Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// //-----------------------------------------------------------------------
// // <copyright file="Bugfix5717Specs.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using Akka.TestKit;
using Xunit;

namespace Akka.Tests.Event
{
public class Bugfix5717Specs : AkkaSpec
{
/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/5717
/// </summary>
[Fact]
public void Should_unsubscribe_from_all_topics_on_Terminate()
{
var es = Sys.EventStream;
var tm1 = 1;
var tm2 = "FOO";
var a1 = CreateTestProbe();
var a2 = CreateTestProbe();

es.Subscribe(a1.Ref, typeof(int));
es.Subscribe(a2.Ref, typeof(int));
es.Subscribe(a2.Ref, typeof(string));
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg(tm1);
a2.ExpectMsg(tm1);
a2.ExpectMsg(tm2);

// kill second test probe
Watch(a2);
Sys.Stop(a2);
ExpectTerminated(a2);

EventFilter.DeadLetter().Expect(0, () =>
{
es.Publish(tm1);
es.Publish(tm2);
a1.ExpectMsg(tm1);
});
}
}
}
61 changes: 32 additions & 29 deletions src/core/Akka/Event/EventBusUnsubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace Akka.Event
/// watching a few actors too much - we opt for the 2nd choice here.
/// </summary>
[InternalApi]
class EventStreamUnsubscriber : ActorBase
internal class EventStreamUnsubscriber : ActorBase
{
private readonly EventStream _eventStream;
private readonly bool _debug;
Expand All @@ -45,39 +45,42 @@ public EventStreamUnsubscriber(EventStream eventStream, ActorSystem system, bool
_debug = debug;

}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>

protected override bool Receive(object message)
{
return message.Match().With<Register>(register =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("watching {0} in order to unsubscribe from EventStream when it terminates", register.Actor)));
Context.Watch(register.Actor);
}).With<UnregisterIfNoMoreSubscribedChannels>(unregister =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unwatching {0} since has no subscriptions", unregister.Actor)));
Context.Unwatch(unregister.Actor);
}).With<Terminated>(terminated =>
switch (message)
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unsubscribe {0} from {1}, because it was terminated", terminated.Actor , _eventStream )));
_eventStream.Unsubscribe(terminated.Actor);
})
.WasHandled;
case Register register:
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("watching {0} in order to unsubscribe from EventStream when it terminates", register.Actor)));
Context.Watch(register.Actor);
break;
}
case UnregisterIfNoMoreSubscribedChannels unregister:
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unwatching {0} since has no subscriptions", unregister.Actor)));
Context.Unwatch(unregister.Actor);
break;
}
case Terminated terminated:
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unsubscribe {0} from {1}, because it was terminated", terminated.Actor , _eventStream )));
_eventStream.Unsubscribe(terminated.Actor);
break;
}
default:
return false;
}

return true;
}

/// <summary>
/// TBD
/// </summary>
protected override void PreStart()
{
if (_debug)
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/Event/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public bool InitUnsubscriber(IActorRef unsubscriber, ActorSystem system)
{
return false;
}

return _initiallySubscribedOrUnsubscriber.Match().With<Left<IImmutableSet<IActorRef>>>(v =>
{
if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, Either.Right(unsubscriber)))
Expand Down

0 comments on commit e805350

Please sign in to comment.