-
Notifications
You must be signed in to change notification settings - Fork 0
/
actor.go
96 lines (80 loc) · 2.2 KB
/
actor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package eventual2go
type message struct {
data Data
}
type loop struct{}
type shutdown message
//ActorMessageStream is used to send messages to an actor.
type ActorMessageStream struct {
streamController *StreamController[Data]
finalErr *Future[error]
}
func newActorMessageStream(finalErr *Future[error]) (ams ActorMessageStream) {
ams = ActorMessageStream{
streamController: NewStreamController[Data](),
finalErr: finalErr,
}
return
}
// Send sends a message to an actor.
func (ams ActorMessageStream) Send(data Data) {
ams.streamController.Add(message{data})
}
// Shutdown sends a shutdown signal to the actor. Messages send before the shutdown signal are guaranteed to be handled.
func (ams ActorMessageStream) Shutdown(data Data) (err error) {
ams.streamController.Add(shutdown{data})
ferr := (<-ams.finalErr.AsChan())
if ferr != nil {
return ferr.(error)
}
return
}
// Actor is a simple actor.
type Actor interface {
Init() error
OnMessage(d Data)
}
// ShutdownActor is an actor with a Shutdown method, which is called upon actor shutdown.
type ShutdownActor interface {
Actor
Shutdown(Data) error
}
// LoopActor is an actor with a loop method which is called repeatedly. Messages are handled in between loop repetitions.
type LoopActor interface {
Actor
Loop() (cont bool)
}
// SpawnActor creates an actor and returns a message stream to it.
func SpawnActor(a Actor) (messages ActorMessageStream, err error) {
if err = a.Init(); err != nil {
return
}
finalErr := NewCompleter[error]()
messages = newActorMessageStream(finalErr.Future())
messages.streamController.Stream().Listen(messageHandler(a, messages.streamController, finalErr))
if _, ok := a.(LoopActor); ok {
messages.streamController.Add(loop{})
}
return
}
func messageHandler(a Actor, msg *StreamController[Data], finalErr *Completer[error]) Subscriber[Data] {
return func(d Data) {
switch d.(type) {
case message:
a.OnMessage(d.(message).data)
case loop:
if finalErr.Completed() {
return
}
if a.(LoopActor).Loop() {
msg.Add(loop{})
}
case shutdown:
var err error
if s, ok := a.(ShutdownActor); ok {
err = s.Shutdown(d.(shutdown).data)
}
finalErr.Complete(err)
}
}
}