Skip to content

Commit

Permalink
Websocket: Fix and simplify traffic monitor
Browse files Browse the repository at this point in the history
trafficMonitor had a check throttle at the end of the for loop to stop it just gobbling the (blocking) trafficAlert channel non-stop.
That makes sense, except that nothing is sent to the trafficAlert channel if there's no listener.
So that means that it's out by one second on the trafficAlert, because any traffic received during the pause is doesn't try to send a traffic alert.

The unstopped timer is deliberately leaked for later GC when shutdown.
It won't delay/block anything, and it's a trivial memory leak during an infrequent event.

Deliberately Choosing to recreate the timer each time instead of using Stop, drain and reset
  • Loading branch information
gbjk committed Feb 16, 2024
1 parent 3e19e83 commit 350ef5b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 65 deletions.
83 changes: 34 additions & 49 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ import (
)

const (
defaultJobBuffer = 5000
// defaultTrafficPeriod defines a period of pause for the traffic monitor,
// as there are periods with large incoming traffic alerts which requires a
// timer reset, this limits work on this routine to a more effective rate
// of check.
defaultTrafficPeriod = time.Second
jobBuffer = 5000
)

// Public websocket errors
Expand All @@ -37,6 +32,7 @@ var (
ErrNotConnected = errors.New("websocket is not connected")
)

// Private websocket errors
var (
errAlreadyRunning = errors.New("connection monitor is already running")
errExchangeConfigIsNil = errors.New("exchange config is nil")
Expand Down Expand Up @@ -72,7 +68,10 @@ var (
errConnSetup = errors.New("error in connection setup")
)

var globalReporter Reporter
var (
globalReporter Reporter
trafficCheckInterval = time.Second
)

// SetupGlobalReporter sets a reporter interface to be used
// for all exchange requests
Expand All @@ -83,9 +82,9 @@ func SetupGlobalReporter(r Reporter) {
// NewWebsocket initialises the websocket struct
func NewWebsocket() *Websocket {
return &Websocket{
DataHandler: make(chan interface{}, defaultJobBuffer),
ToRoutine: make(chan interface{}, defaultJobBuffer),
TrafficAlert: make(chan struct{}),
DataHandler: make(chan interface{}, jobBuffer),
ToRoutine: make(chan interface{}, jobBuffer),
TrafficAlert: make(chan struct{}, 1),
ReadMessageErrors: make(chan error),
Subscribe: make(chan []subscription.Subscription),
Unsubscribe: make(chan []subscription.Subscription),
Expand Down Expand Up @@ -545,9 +544,9 @@ func (w *Websocket) FlushChannels() error {
return w.Connect()
}

// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires,
// it will reconnect if the TrafficAlert channel has not received any data. The
// trafficTimer will reset on each traffic alert
// trafficMonitor waits trafficCheckInterval before checking for a trafficAlert
// 1 slot buffer means that connection will only write to trafficAlert once per trafficCheckInterval to avoid read/write flood in high traffic
// Otherwise we Shutdown the connection after trafficTimeout, unless it's connecting. connectionMonitor is responsible for Connecting again
func (w *Websocket) trafficMonitor() {
if w.IsTrafficMonitorRunning() {
return
Expand All @@ -556,62 +555,48 @@ func (w *Websocket) trafficMonitor() {
w.Wg.Add(1)

go func() {
var trafficTimer = time.NewTimer(w.trafficTimeout)
pause := make(chan struct{})
t := time.NewTimer(w.trafficTimeout)
reset := func() {
if !t.Stop() {
<-t.C
}
t.Reset(w.trafficTimeout)
}
for {
select {
case <-w.ShutdownC:
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown message received", w.exchangeName)
}
trafficTimer.Stop()
t.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-w.TrafficAlert:
if !trafficTimer.Stop() {
select {
case <-trafficTimer.C:
default:
}
case <-time.After(trafficCheckInterval):
select {
case <-w.TrafficAlert:
w.setState(connected)
reset()
default:
}
case <-t.C:
if w.IsConnecting() {
reset()
break
}
w.setState(connected)
trafficTimer.Reset(w.trafficTimeout)
case <-trafficTimer.C: // Falls through when timer runs out
if w.verbose {
log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout)
}
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done() // without this the w.Shutdown() call below will deadlock
if !w.IsConnecting() && w.IsConnected() {
w.setTrafficMonitorRunning(false) // Cannot defer lest Connect is called after Shutdown but before deferred call
w.Wg.Done() // Without this the w.Shutdown() call below will deadlock
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err)
}
}

return
}

if w.IsConnected() {
// Routine pausing mechanism
go func(p chan<- struct{}) {
time.Sleep(defaultTrafficPeriod)
select {
case p <- struct{}{}:
default:
}
}(pause)
select {
case <-w.ShutdownC:
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-pause:
}
}
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/stream/websocket_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (w *WebsocketConnection) ReadMessage() Response {

select {
case w.Traffic <- struct{}{}:
default: // causes contention, just bypass if there is no receiver.
default: // Non-Blocking write ensuses 1 buffered signal per trafficCheckInterval to avoid flooding
}

var standardMessage []byte
Expand Down
66 changes: 51 additions & 15 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,65 @@ func TestTrafficMonitorTimeout(t *testing.T) {
err := ws.Setup(defaultSetup)
require.NoError(t, err, "Setup must not error")

ws.trafficTimeout = time.Millisecond * 42
signal := struct{}{}
patience := 10 * time.Millisecond
trafficCheckInterval = 50 * time.Millisecond
ws.trafficTimeout = 200 * time.Millisecond
ws.ShutdownC = make(chan struct{})
ws.trafficMonitor()
assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running")

// Deploy traffic alert
ws.TrafficAlert <- struct{}{}
// try to add another traffic monitor
thenish := time.Now()
ws.trafficMonitor()

assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running")
require.Equal(t, disconnected, ws.state.Load(), "websocket must be disconnected")

for i := 0; i < 2; i++ {
ws.state.Store(disconnected)

select {
case ws.TrafficAlert <- signal:
if i == 0 {
require.WithinDurationf(t, time.Now(), thenish, trafficCheckInterval, "First Non-blocking test must happen before the traffic is checked")
}
default:
require.Failf(t, "", "TrafficAlert should not block; Check #%d", i)
}

select {
case ws.TrafficAlert <- signal:
require.Failf(t, "", "TrafficAlert should block after first slot used; Check #%d", i)
default:
if i == 0 {
require.WithinDuration(t, time.Now(), thenish, trafficCheckInterval, "First Blocking test must happen before the traffic is checked")
}
}

require.EventuallyWithTf(t, func(c *assert.CollectT) {
assert.Truef(c, ws.IsConnected(), "state should been marked as connected; Check #%d", i)
assert.Emptyf(c, ws.TrafficAlert, "trafficAlert channel should be drained; Check #%d", i)
}, trafficCheckInterval*2, patience, "trafficAlert should be read and state connected; Check #%d", i)
}

thenish = time.Now()

Check failure on line 223 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to thenish (ineffassign)

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, disconnected, ws.state.Load(), "websocket must be disconnected")
assert.False(c, ws.IsTrafficMonitorRunning(), "trafficMonitor shound be shut down")
}, ws.trafficTimeout*2, patience, "trafficTimeout should trigger a shutdown")

return

Check warning on line 230 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / lint

unreachable-code: unreachable code after this statement (revive)
// prevent shutdown routine
ws.setState(disconnected)

Check failure on line 232 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / lint

unreachable: unreachable code (govet)
// await timeout closure
ws.Wg.Wait()
assert.False(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be not be running")

// Multiple traffic alerts work
// Only processes a TrafficAlert once per interval
// Does not block
// Reconnects on timeout
// Exits on shutdown
// Connecting state doesn't hamper
}

func TestIsDisconnectionError(t *testing.T) {
Expand Down Expand Up @@ -1079,18 +1122,11 @@ func TestSetupNewConnection(t *testing.T) {
err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"})
assert.ErrorIs(t, err, errTrafficAlertNil, "SetupNewConnection should error correctly")

nonsenseWebsock.TrafficAlert = make(chan struct{})
nonsenseWebsock.TrafficAlert = make(chan struct{}, 1)
err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"})
assert.ErrorIs(t, err, errReadMessageErrorsNil, "SetupNewConnection should error correctly")

web := Websocket{
connector: connect,
Wg: new(sync.WaitGroup),
ShutdownC: make(chan struct{}),
TrafficAlert: make(chan struct{}),
ReadMessageErrors: make(chan error),
DataHandler: make(chan interface{}),
}
web := NewWebsocket()

err = web.Setup(defaultSetup)
assert.NoError(t, err, "Setup should not error")
Expand Down

0 comments on commit 350ef5b

Please sign in to comment.