-
Notifications
You must be signed in to change notification settings - Fork 6
/
channel.go
290 lines (262 loc) · 8.22 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// Copyright (c) 2015-2017. Oleg Sklyar & teris.io. All rights reserved.
// See the LICENSE file in the project root for licensing information.
package longpoll
import (
"errors"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/teris-io/shortid"
)
// Channel represents a single channel for publishing and receiving data over a long-polling
// subscription. Data published to any of the topics subscribed to will be received by the client
// asking for new data. The receiving is not split by topic.
//
// The subscription is setup to timeout if no Get request is made before the end of the timeout
// period provided at construction. Every Get request extends the lifetime of the subscription for
// the duration of the timeout.
type Channel struct {
mx sync.Mutex
id string
onClose func(id string)
topics map[string]bool
data []interface{}
alive int32
notif *getnotifier
tor *Timeout
}
type getnotifier struct {
ping chan bool
pinged bool
}
// NewChannel constructs a new long-polling pubsub channel with the given timeout, optional exit
// handler, and subscribing to given topics. Every new channel gets a unique channel/subscription Id
// assigned based on UUID.v4.
//
// Constructing a channel with NewChannel starts a timeout timer. The first Get request must
// follow within the timeout window.
func NewChannel(timeout time.Duration, onClose func(id string), topics ...string) (*Channel, error) {
if len(topics) == 0 {
return nil, errors.New("at least one topic expected")
}
id, err := shortid.Generate()
if err != nil {
return nil, err
}
ch := Channel{
id: id,
onClose: onClose,
topics: make(map[string]bool),
alive: yes,
}
for _, topic := range topics {
ch.topics[topic] = true
}
if tor, err := NewTimeout(timeout, ch.Drop); err == nil {
ch.tor = tor
} else {
return nil, err
}
return &ch, nil
}
// MustNewChannel acts just like NewChannel, however, it does not return
// errors and panics instead.
func MustNewChannel(timeout time.Duration, onClose func(id string), topics ...string) *Channel {
ch, err := NewChannel(timeout, onClose, topics...)
if err == nil {
return ch
}
panic(err)
}
// Publish publishes data on the channel in a non-blocking manner if the topic corresponds to one of
// those provided at construction. Data published to other topics will be silently ignored. No topic
// information is persisted and retrieved with the data.
func (ch *Channel) Publish(data interface{}, topic string) error {
if !ch.IsAlive() {
return errors.New("subscription channel is down")
}
// no locking: read-only upon construction
if _, ok := ch.topics[topic]; !ok {
return nil
}
go func() {
ch.mx.Lock()
defer ch.mx.Unlock()
// ch could have died between the check above and entering the lock
if ch.IsAlive() {
ch.data = append(ch.data, data)
if ch.notif != nil && !ch.notif.pinged {
ch.notif.pinged = true
ch.notif.ping <- true
}
}
}()
// this routine is likely to be run within a goroutine and in case of non-stop publishing Gets may
// have little chance to receive data otherwise
defer runtime.Gosched()
return nil
}
// Get requests data published on all of the channel topics. The function returns a channel
// to receive the data set on.
//
// The request is held until data becomes available (published to a matching topic). Upon new data,
// or if data has been waiting at the time of the call, the request returns immediately. Otherwise
// it waits over the `polltime` duration and return empty if no new data arrives. It is expected
// that a new Get request is made immediately afterwards to receive further data and prevent channel
// timeout.
//
// Multiple Get requests to the channel can be made concurrently, however, every data sample
// will be delivered to only one request issuer. It is not guaranteed to which one, although
// every new incoming request will trigger a return of any earlier one.
func (ch *Channel) Get(polltime time.Duration) (chan []interface{}, error) {
if !ch.IsAlive() {
return nil, errors.New("subscription channel is down")
}
if polltime <= 0 {
return nil, errors.New("positive polltime value expected")
}
resp := make(chan []interface{}, 1)
go func() {
ch.tor.Ping()
ch.mx.Lock()
// ch could have died between the check above and entering the lock
if !ch.IsAlive() {
// next request will result in an error
resp <- nil
ch.mx.Unlock()
return
}
// notify existing Get to terminate immediately (will wait for lock)
if ch.notif != nil && !ch.notif.pinged {
ch.notif.pinged = true
ch.notif.ping <- true
}
// ch.notif is reset either here, ...
if ch.onDataWaiting(resp) {
ch.mx.Unlock()
return
}
// ...or here. Set this one to be notified by Publish
notif := &getnotifier{ping: make(chan bool, 1), pinged: false}
ch.notif = notif
ch.mx.Unlock()
gotdata := no
pollend := make(chan bool, 1)
go ch.startLongpollTimer(polltime, pollend, &gotdata)
select {
case <-notif.ping:
ch.onNewDataLocking(resp, notif)
case <-pollend:
ch.onLongpollTimeoutLocking(resp, notif)
}
// signal the long-poll timer to stop
atomic.StoreInt32(&gotdata, yes)
}()
return resp, nil
}
func (ch *Channel) startLongpollTimer(polltime time.Duration, pollend chan bool, gotdata *int32) {
hundredth := polltime / 100
endpoint := time.Now().Add(polltime)
for time.Now().Before(endpoint) {
// if Get has data, this timer is irrelevant
if atomic.LoadInt32(gotdata) == yes {
return
}
// splitting polltime into 100 segments, let it quit much quicker
time.Sleep(hundredth)
}
pollend <- true
}
func (ch *Channel) onDataWaiting(resp chan []interface{}) bool {
if len(ch.data) > 0 {
// answer with currently waiting data
resp <- ch.data
// remove data as it is already sent back
ch.data = nil
// earlier Get should get nothing, this one comes back with data immediately,
// thus no get notifier for Publish
ch.notif = nil
return true
}
return false
}
func (ch *Channel) onNewDataLocking(resp chan []interface{}, notif *getnotifier) {
ch.mx.Lock()
defer ch.mx.Unlock()
// answer with currently waiting data
resp <- ch.data
// remove data as it is already sent back
ch.data = nil
// remove this Get from Publish notification as this Get is already processed
if ch.notif == notif {
ch.notif = nil
}
}
func (ch *Channel) onLongpollTimeoutLocking(resp chan []interface{}, notif *getnotifier) {
ch.mx.Lock()
defer ch.mx.Unlock()
// asnwer with no data
resp <- nil
// remove this Get from Publish notification as this Get is already processed
if ch.notif == notif {
ch.notif = nil
}
}
// IsAlive tests if the channel is up and running.
func (ch *Channel) IsAlive() bool {
return atomic.LoadInt32(&ch.alive) == yes
}
// Drop terminates any publishing and receiving on the channel, signals the currently waiting Get
// request to return empty, terminates the timeout timer and runs the exit handler if supplied.
func (ch *Channel) Drop() {
if !ch.IsAlive() {
return
}
atomic.StoreInt32(&ch.alive, no)
go func() {
// prevent any external changes to data, new subscriptions
ch.mx.Lock()
defer ch.mx.Unlock()
// signal timeout handler to quit
ch.tor.Drop()
// clear data: no subscription gets anything
ch.data = nil
// let current get know that it should quit (with no data, see above)
if ch.notif != nil && !ch.notif.pinged {
ch.notif.ping <- true
}
// tell publish that there is no get listening, let it quit
ch.notif = nil
// execute callback (e.g. removing from pubsub subscriptions map)
if ch.onClose != nil {
ch.onClose(ch.id)
}
}()
}
// ID returns the channel/subscription Id assigned at construction.
func (ch *Channel) ID() string {
return ch.id
}
// Topics returns the list of topics the channel is subscribed to.
func (ch *Channel) Topics() []string {
var res []string
// no locking: read-only upon construction
for topic := range ch.topics {
res = append(res, topic)
}
return res
}
// QueueSize returns the size of the currently waiting data queue (only not empty when no Get
// request waiting).
func (ch *Channel) QueueSize() int {
ch.mx.Lock()
res := len(ch.data)
ch.mx.Unlock()
return res
}
// IsGetWaiting reports if there is a Get request waiting for data.
func (ch *Channel) IsGetWaiting() bool {
// do not synchronise
return ch.notif != nil
}