-
Notifications
You must be signed in to change notification settings - Fork 0
/
multiplexer.go
102 lines (93 loc) · 2.28 KB
/
multiplexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main
import (
"log"
)
const (
CLIENT_CONNECT = iota
CLIENT_DISCONNECT
// TODO: See if this state is useful
QUIT
)
type Op struct {
action int
channels []string
resp chan<- *ChannelEvent
}
func NewMultiplexer(sub BrokerSubscriber) chan<- Op {
c := make(chan Op)
go multiplexer(sub, c)
return c
}
func multiplexer(sub BrokerSubscriber, comm <-chan Op) {
registry := make(map[string]map[chan<- *ChannelEvent]bool)
buffers := make(map[string]*ChannelBuffer)
for {
select {
case state := <-sub.StateChange():
switch state {
case BROKER_CONNECTED:
// Re-subscribe
for c, _ := range registry {
sub.Subscribe(c)
}
log.Println("multiplexer: broker connected")
case BROKER_DISCONNECTED:
log.Println("multiplexer: broker disconnected")
}
case op := <-comm:
switch op.action {
case CLIENT_CONNECT:
log.Println("multiplexer: CONNECT", op)
for _, c := range op.channels {
var b *ChannelBuffer
_, ok := registry[c]
if !ok {
b = NewChannelBuffer()
registry[c] = make(map[chan<- *ChannelEvent]bool)
buffers[c] = b
sub.Subscribe(c)
}
registry[c][op.resp] = true
b = buffers[c]
// TODO: If the client disconnects at this point it's annoying.
// TODO: It might also slow down the loop if the client is slow
// to receive these messages.
// The buffer might flush itself if it's too old
for _, ev := range b.All() {
op.resp <- &ChannelEvent{c, ev}
}
}
case CLIENT_DISCONNECT:
log.Println("multiplexer: DISCONNECT", op)
for _, c := range op.channels {
if registry[c] == nil {
panic("BUG: missing registry channel " + c)
}
if buffers[c] == nil {
panic("BUG: missing buffers channel " + c)
}
delete(registry[c], op.resp)
if len(registry[c]) == 0 {
delete(registry, c)
delete(buffers, c)
sub.Unsubscribe(c)
}
}
close(op.resp)
}
case ce := <-sub.ChannelEvent():
targets, ok := registry[ce.Channel]
if !ok {
log.Println("multiplexer got an event on a unused channel ", ce.Channel)
} else {
if buffers[ce.Channel].Add(ce.Event) {
for c, _ := range targets {
c <- ce
}
} else {
log.Println("mulitplexer: ignoring", ce)
}
}
}
}
}