-
Notifications
You must be signed in to change notification settings - Fork 0
/
redis_broker.go
194 lines (166 loc) · 4.01 KB
/
redis_broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package main
import (
"github.com/garyburd/redigo/redis"
"log"
"sync"
"time"
)
type RedisPublisher struct {
addr string
}
func NewRedisPublisher(addr string) *RedisPublisher {
return &RedisPublisher{addr}
}
// Each Publish initiates a new connection
func (self *RedisPublisher) Publish(channel string, event *Event) (err error) {
c, err := self.connect()
if err != nil {
return
}
defer c.Close()
data := DumpEvent(event)
c.Send("MULTI")
c.Send("ZADD", channel, -1*event.At, data)
c.Send("ZREMRANGEBYRANK", channel, event.Size, -1)
c.Send("EXPIRE", channel, event.TTL)
c.Send("PUBLISH", channel, data)
_, err = c.Do("EXEC")
return
}
func (self *RedisPublisher) connect() (redis.Conn, error) {
return redis.Dial("tcp", self.addr)
}
type RedisSubscriber struct {
addr string
state int
stateChange chan int
event chan *ChannelEvent
mu sync.Mutex
c redis.Conn
psc redis.PubSubConn
}
func NewRedisSubscriber(addr string) *RedisSubscriber {
sub := &RedisSubscriber{
addr: addr,
state: BROKER_DISCONNECTED,
stateChange: make(chan int),
event: make(chan *ChannelEvent),
}
go func() {
for {
switch sub.state {
case BROKER_DISCONNECTED:
log.Println("redis sub: BROKER_DISCONNECTED")
var err error
var c1, c2 redis.Conn
var psc redis.PubSubConn
c1, err = redis.Dial("tcp", addr)
if err != nil {
log.Println("redis subscriber conn ERR: ", err)
time.Sleep(2 * time.Second)
continue
}
c2, err = redis.Dial("tcp", addr)
if err != nil {
log.Println("redis sub conn ERR2: ", err)
c1.Close()
time.Sleep(2 * time.Second)
continue
}
psc = redis.PubSubConn{c2}
sub.connect(c1, psc)
case BROKER_CONNECTED:
msg := sub.psc.Receive()
log.Println("redis sub: msg", msg)
switch msg.(type) {
case redis.Message:
var err error
var ev *Event
m := msg.(redis.Message)
if ev, err = LoadEvent(m.Data); err != nil {
// Just log the error, there is an issue in the redis storage
log.Println("redis sub LoadEvent2:", m.Data, err)
} else {
sub.event <- &ChannelEvent{m.Channel, ev}
}
case redis.Subscription:
// Nothing to do
case error:
log.Println("redis sub redis err:", msg)
sub.disconnect()
default:
panic("BUG: case missing")
}
}
}
}()
return sub
}
// Don't subscribe twice
func (self *RedisSubscriber) Subscribe(channel string) {
if self.state == BROKER_DISCONNECTED {
return
}
if !self.ok(self.psc.Subscribe(channel)) {
return
}
// Send old values on the channel
values, err := redis.Strings(self.c.Do("ZREVRANGE", channel, 0, -1))
if !self.ok(err) {
return
}
// FIXME: Is there a cleaner way to do that ?
go func() {
for _, v := range values {
var ev *Event
if ev, err = LoadEvent([]byte(v)); err != nil {
// TODO: Handle error
log.Println("multiplexer: LoadEvent2:", v, err)
} else {
self.event <- &ChannelEvent{channel, ev}
}
}
}()
}
func (self *RedisSubscriber) Unsubscribe(channel string) {
if self.state == BROKER_DISCONNECTED {
return
}
self.ok(self.psc.Unsubscribe(channel))
}
func (self *RedisSubscriber) ok(err error) bool {
if err != nil {
log.Println("redis subscriber:", err)
self.disconnect()
return false
}
return true
}
func (self *RedisSubscriber) disconnect() {
self.mu.Lock()
defer self.mu.Unlock()
if self.state == BROKER_DISCONNECTED {
return
}
self.c.Close()
self.psc.Close()
self.state = BROKER_DISCONNECTED
self.stateChange <- BROKER_DISCONNECTED
}
func (self *RedisSubscriber) connect(c redis.Conn, psc redis.PubSubConn) {
self.mu.Lock()
defer self.mu.Unlock()
if self.state == BROKER_CONNECTED {
panic("BUG: how could that happen ?")
}
self.c = c
self.psc = psc
self.state = BROKER_CONNECTED
self.stateChange <- BROKER_CONNECTED
}
func (self *RedisSubscriber) StateChange() <-chan int {
return self.stateChange
}
func (self *RedisSubscriber) ChannelEvent() <-chan *ChannelEvent {
return self.event
}