-
Notifications
You must be signed in to change notification settings - Fork 1k
/
EventBusUnsubscriber.cs
184 lines (165 loc) · 6.28 KB
/
EventBusUnsubscriber.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
//-----------------------------------------------------------------------
// <copyright file="EventBusUnsubscriber.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Annotations;
using Akka.Dispatch;
using Akka.Util.Internal;
namespace Akka.Event
{
/// <summary>
/// INTERNAL API
///
/// Watches all actors which subscribe on the given eventStream, and unsubscribes them from it when they are Terminated.
///
/// Assumptions note:
/// We do not guarantee happens-before in the EventStream when 2 threads subscribe(a) / unsubscribe(a) on the same actor,
/// thus the messages sent to this actor may appear to be reordered - this is fine, because the worst-case is starting to
/// needlessly watch the actor which will not cause trouble for the stream. This is a trade-off between slowing down
/// subscribe calls * because of the need of linearizing the history message sequence and the possibility of sometimes
/// watching a few actors too much - we opt for the 2nd choice here.
/// </summary>
[InternalApi]
class EventStreamUnsubscriber : ActorBase
{
private readonly EventStream _eventStream;
private readonly bool _debug;
private readonly ActorSystem _system;
/// <summary>
/// TBD
/// </summary>
/// <param name="eventStream">TBD</param>
/// <param name="system">TBD</param>
/// <param name="debug">TBD</param>
public EventStreamUnsubscriber(EventStream eventStream, ActorSystem system, bool debug)
{
_eventStream = eventStream;
_system = system;
_debug = debug;
}
/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
protected override bool Receive(object message)
{
return message.Match().With<Register>(register =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("watching {0} in order to unsubscribe from EventStream when it terminates", register.Actor)));
Context.Watch(register.Actor);
}).With<UnregisterIfNoMoreSubscribedChannels>(unregister =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unwatching {0} since has no subscriptions", unregister.Actor)));
Context.Unwatch(unregister.Actor);
}).With<Terminated>(terminated =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unsubscribe {0} from {1}, because it was terminated", terminated.Actor , _eventStream )));
_eventStream.Unsubscribe(terminated.Actor);
})
.WasHandled;
}
/// <summary>
/// TBD
/// </summary>
protected override void PreStart()
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("registering unsubscriber with {0}", _eventStream)));
_eventStream.InitUnsubscriber(Self, _system);
}
/// <summary>
/// TBD
/// </summary>
internal class Register
{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public Register(IActorRef actor)
{
Actor = actor;
}
/// <summary>
/// TBD
/// </summary>
public IActorRef Actor { get; private set; }
}
/// <summary>
/// TBD
/// </summary>
internal class Terminated
{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public Terminated(IActorRef actor)
{
Actor = actor;
}
/// <summary>
/// TBD
/// </summary>
public IActorRef Actor { get; private set; }
}
/// <summary>
/// TBD
/// </summary>
internal class UnregisterIfNoMoreSubscribedChannels
{
/// <summary>
/// TBD
/// </summary>
/// <param name="actor">TBD</param>
public UnregisterIfNoMoreSubscribedChannels(IActorRef actor)
{
Actor = actor;
}
/// <summary>
/// TBD
/// </summary>
public IActorRef Actor { get; private set; }
}
}
/// <summary>
/// Provides factory for Akka.Event.EventStreamUnsubscriber actors with unique names.
/// This is needed if someone spins up more EventStreams using the same ActorSystem,
/// each stream gets it's own unsubscriber.
/// </summary>
class EventStreamUnsubscribersProvider
{
private readonly AtomicCounter _unsubscribersCounter = new AtomicCounter(0);
private static readonly EventStreamUnsubscribersProvider _instance = new EventStreamUnsubscribersProvider();
/// <summary>
/// TBD
/// </summary>
public static EventStreamUnsubscribersProvider Instance
{
get { return _instance; }
}
/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
/// <param name="eventStream">TBD</param>
/// <param name="debug">TBD</param>
public void Start(ActorSystemImpl system, EventStream eventStream, bool debug)
{
system.SystemActorOf(Props.Create<EventStreamUnsubscriber>(eventStream, system, debug).WithDispatcher(Dispatchers.InternalDispatcherId),
string.Format("EventStreamUnsubscriber-{0}", _unsubscribersCounter.IncrementAndGet()));
}
}
}