Skip to content

Commit

Permalink
Websocket: Fix and simplify traffic monitor
Browse files Browse the repository at this point in the history
trafficMonitor had a check throttle at the end of the for loop to stop it just gobbling the (blocking) trafficAlert channel non-stop.
That makes sense, except that nothing is sent to the trafficAlert channel if there's no listener.
So that means that it's out by one second on the trafficAlert, because any traffic received during the pause is doesn't try to send a traffic alert.

The unstopped timer is deliberately leaked for later GC when shutdown.
It won't delay/block anything, and it's a trivial memory leak during an infrequent event.

Deliberately Choosing to recreate the timer each time instead of using Stop, drain and reset
  • Loading branch information
gbjk committed Feb 16, 2024
1 parent 3e19e83 commit 36226ee
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 67 deletions.
78 changes: 30 additions & 48 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ import (
)

const (
defaultJobBuffer = 5000
// defaultTrafficPeriod defines a period of pause for the traffic monitor,
// as there are periods with large incoming traffic alerts which requires a
// timer reset, this limits work on this routine to a more effective rate
// of check.
defaultTrafficPeriod = time.Second
jobBuffer = 5000
)

// Public websocket errors
Expand All @@ -37,6 +32,7 @@ var (
ErrNotConnected = errors.New("websocket is not connected")
)

// Private websocket errors
var (
errAlreadyRunning = errors.New("connection monitor is already running")
errExchangeConfigIsNil = errors.New("exchange config is nil")
Expand Down Expand Up @@ -72,7 +68,10 @@ var (
errConnSetup = errors.New("error in connection setup")
)

var globalReporter Reporter
var (
globalReporter Reporter
trafficCheckInterval = time.Second
)

// SetupGlobalReporter sets a reporter interface to be used
// for all exchange requests
Expand All @@ -83,9 +82,9 @@ func SetupGlobalReporter(r Reporter) {
// NewWebsocket initialises the websocket struct
func NewWebsocket() *Websocket {
return &Websocket{
DataHandler: make(chan interface{}, defaultJobBuffer),
ToRoutine: make(chan interface{}, defaultJobBuffer),
TrafficAlert: make(chan struct{}),
DataHandler: make(chan interface{}, jobBuffer),
ToRoutine: make(chan interface{}, jobBuffer),
TrafficAlert: make(chan struct{}, 1),
ReadMessageErrors: make(chan error),
Subscribe: make(chan []subscription.Subscription),
Unsubscribe: make(chan []subscription.Subscription),
Expand Down Expand Up @@ -545,9 +544,9 @@ func (w *Websocket) FlushChannels() error {
return w.Connect()
}

// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires,
// it will reconnect if the TrafficAlert channel has not received any data. The
// trafficTimer will reset on each traffic alert
// trafficMonitor waits trafficCheckInterval before checking for a trafficAlert
// 1 slot buffer means that connection will only write to trafficAlert once per trafficCheckInterval to avoid read/write flood in high traffic
// Otherwise we Shutdown the connection after trafficTimeout, unless it's connecting. connectionMonitor is responsible for Connecting again
func (w *Websocket) trafficMonitor() {
if w.IsTrafficMonitorRunning() {
return
Expand All @@ -556,62 +555,45 @@ func (w *Websocket) trafficMonitor() {
w.Wg.Add(1)

go func() {
var trafficTimer = time.NewTimer(w.trafficTimeout)
pause := make(chan struct{})
t := time.NewTimer(w.trafficTimeout)
for {
select {
case <-w.ShutdownC:
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown message received", w.exchangeName)
}
trafficTimer.Stop()
t.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-w.TrafficAlert:
if !trafficTimer.Stop() {
select {
case <-trafficTimer.C:
default:
case <-time.After(trafficCheckInterval):
select {
case <-w.TrafficAlert:
w.setState(connected)
if !t.Stop() {
<-t.C
}
t.Reset(w.trafficTimeout)
default:
}
case <-t.C:
if w.IsConnecting() {
t.Reset(w.trafficTimeout)
break
}
w.setState(connected)
trafficTimer.Reset(w.trafficTimeout)
case <-trafficTimer.C: // Falls through when timer runs out
if w.verbose {
log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout)
}
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done() // without this the w.Shutdown() call below will deadlock
if !w.IsConnecting() && w.IsConnected() {
w.setTrafficMonitorRunning(false) // Cannot defer lest Connect is called after Shutdown but before deferred call
w.Wg.Done() // Without this the w.Shutdown() call below will deadlock
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err)
}
}

return
}

if w.IsConnected() {
// Routine pausing mechanism
go func(p chan<- struct{}) {
time.Sleep(defaultTrafficPeriod)
select {
case p <- struct{}{}:
default:
}
}(pause)
select {
case <-w.ShutdownC:
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-pause:
}
}
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/stream/websocket_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (w *WebsocketConnection) ReadMessage() Response {

select {
case w.Traffic <- struct{}{}:
default: // causes contention, just bypass if there is no receiver.
default: // Non-Blocking write ensuses 1 buffered signal per trafficCheckInterval to avoid flooding
}

var standardMessage []byte
Expand Down
102 changes: 84 additions & 18 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,95 @@ func TestTrafficMonitorTimeout(t *testing.T) {
err := ws.Setup(defaultSetup)
require.NoError(t, err, "Setup must not error")

ws.trafficTimeout = time.Millisecond * 42
signal := struct{}{}
patience := 10 * time.Millisecond
trafficCheckInterval = 50 * time.Millisecond
ws.trafficTimeout = 200 * time.Millisecond
ws.ShutdownC = make(chan struct{})

thenish := time.Now()
ws.trafficMonitor()

assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running")
require.Equal(t, disconnected, ws.state.Load(), "websocket must be disconnected")

// Deploy traffic alert
ws.TrafficAlert <- struct{}{}
// try to add another traffic monitor
// Behaviour: Test multiple traffic alerts work and only process one trafficAlert per interval
for i := 0; i < 2; i++ {
ws.state.Store(disconnected)

select {
case ws.TrafficAlert <- signal:
if i == 0 {
require.WithinDurationf(t, time.Now(), thenish, trafficCheckInterval, "First Non-blocking test must happen before the traffic is checked")
}
default:
require.Failf(t, "", "TrafficAlert should not block; Check #%d", i)
}

select {
case ws.TrafficAlert <- signal:
require.Failf(t, "", "TrafficAlert should block after first slot used; Check #%d", i)
default:
if i == 0 {
require.WithinDuration(t, time.Now(), thenish, trafficCheckInterval, "First Blocking test must happen before the traffic is checked")
}
}

require.EventuallyWithTf(t, func(c *assert.CollectT) {
assert.Truef(c, ws.IsConnected(), "state should been marked as connected; Check #%d", i)
assert.Emptyf(c, ws.TrafficAlert, "trafficAlert channel should be drained; Check #%d", i)
}, 2*trafficCheckInterval, patience, "trafficAlert should be read and state connected; Check #%d", i)
}

thenish = time.Now()

Check failure on line 224 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to thenish (ineffassign)

// Behaviour: Shuts down websocket and exits on timeout
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, disconnected, ws.state.Load(), "websocket must be disconnected")
assert.False(c, ws.IsTrafficMonitorRunning(), "trafficMonitor shound be shut down")
}, 2*ws.trafficTimeout, patience, "trafficTimeout should trigger a shutdown")

// Behaviour: connecting status doesn't trigger shutdown
ws.state.Store(connecting)
ws.trafficTimeout = 50 * time.Millisecond
ws.trafficMonitor()
assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running")
require.Equal(t, connecting, ws.state.Load(), "websocket must be connecting")
<-time.After(4 * ws.trafficTimeout)
require.Equal(t, connecting, ws.state.Load(), "websocket must still be connecting after several checks")
ws.state.Store(connected)
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, disconnected, ws.state.Load(), "websocket must be disconnected")
assert.False(c, ws.IsTrafficMonitorRunning(), "trafficMonitor shound be shut down")
}, 4*ws.trafficTimeout, patience, "trafficTimeout should trigger a shutdown after connecting status changes")

// Behaviour: shutdown is processed and waitgroup is cleared
trafficCheckInterval = 10 * time.Millisecond
ws.state.Store(connected)
ws.trafficTimeout = time.Minute
ws.trafficMonitor()
assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running")

// prevent shutdown routine
ws.setState(disconnected)
// await timeout closure
ws.Wg.Wait()
assert.False(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be not be running")
wgReady := make(chan bool)
go func() {
ws.Wg.Wait()
close(wgReady)
}()
select {
case <-wgReady:
require.Failf(t, "", "WaitGroup should be blocking still")
case <-time.After(1 * trafficCheckInterval):
}

close(ws.ShutdownC)

<-time.After(2 * trafficCheckInterval)
assert.False(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be shutdown")
select {
case <-wgReady:
default:
require.Failf(t, "", "WaitGroup should be freed now")
}
}

func TestIsDisconnectionError(t *testing.T) {
Expand Down Expand Up @@ -1079,18 +1152,11 @@ func TestSetupNewConnection(t *testing.T) {
err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"})
assert.ErrorIs(t, err, errTrafficAlertNil, "SetupNewConnection should error correctly")

nonsenseWebsock.TrafficAlert = make(chan struct{})
nonsenseWebsock.TrafficAlert = make(chan struct{}, 1)
err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"})
assert.ErrorIs(t, err, errReadMessageErrorsNil, "SetupNewConnection should error correctly")

web := Websocket{
connector: connect,
Wg: new(sync.WaitGroup),
ShutdownC: make(chan struct{}),
TrafficAlert: make(chan struct{}),
ReadMessageErrors: make(chan error),
DataHandler: make(chan interface{}),
}
web := NewWebsocket()

err = web.Setup(defaultSetup)
assert.NoError(t, err, "Setup should not error")
Expand Down

0 comments on commit 36226ee

Please sign in to comment.