diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 409e0953251..dca75335629 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -16,12 +16,7 @@ import ( ) const ( - defaultJobBuffer = 5000 - // defaultTrafficPeriod defines a period of pause for the traffic monitor, - // as there are periods with large incoming traffic alerts which requires a - // timer reset, this limits work on this routine to a more effective rate - // of check. - defaultTrafficPeriod = time.Second + jobBuffer = 5000 ) // Public websocket errors @@ -37,6 +32,7 @@ var ( ErrNotConnected = errors.New("websocket is not connected") ) +// Private websocket errors var ( errAlreadyRunning = errors.New("connection monitor is already running") errExchangeConfigIsNil = errors.New("exchange config is nil") @@ -72,7 +68,10 @@ var ( errConnSetup = errors.New("error in connection setup") ) -var globalReporter Reporter +var ( + globalReporter Reporter + trafficCheckInterval = time.Second +) // SetupGlobalReporter sets a reporter interface to be used // for all exchange requests @@ -83,9 +82,9 @@ func SetupGlobalReporter(r Reporter) { // NewWebsocket initialises the websocket struct func NewWebsocket() *Websocket { return &Websocket{ - DataHandler: make(chan interface{}, defaultJobBuffer), - ToRoutine: make(chan interface{}, defaultJobBuffer), - TrafficAlert: make(chan struct{}), + DataHandler: make(chan interface{}, jobBuffer), + ToRoutine: make(chan interface{}, jobBuffer), + TrafficAlert: make(chan struct{}, 1), ReadMessageErrors: make(chan error), Subscribe: make(chan []subscription.Subscription), Unsubscribe: make(chan []subscription.Subscription), @@ -545,9 +544,9 @@ func (w *Websocket) FlushChannels() error { return w.Connect() } -// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires, -// it will reconnect if the TrafficAlert channel has not received any data. The -// trafficTimer will reset on each traffic alert +// trafficMonitor waits trafficCheckInterval before checking for a trafficAlert +// 1 slot buffer means that connection will only write to trafficAlert once per trafficCheckInterval to avoid read/write flood in high traffic +// Otherwise we Shutdown the connection after trafficTimeout, unless it's connecting. connectionMonitor is responsible for Connecting again func (w *Websocket) trafficMonitor() { if w.IsTrafficMonitorRunning() { return @@ -556,62 +555,48 @@ func (w *Websocket) trafficMonitor() { w.Wg.Add(1) go func() { - var trafficTimer = time.NewTimer(w.trafficTimeout) - pause := make(chan struct{}) + t := time.NewTimer(w.trafficTimeout) + reset := func() { + if !t.Stop() { + <-t.C + } + t.Reset(w.trafficTimeout) + } for { select { case <-w.ShutdownC: if w.verbose { log.Debugf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown message received", w.exchangeName) } - trafficTimer.Stop() + t.Stop() w.setTrafficMonitorRunning(false) w.Wg.Done() return - case <-w.TrafficAlert: - if !trafficTimer.Stop() { - select { - case <-trafficTimer.C: - default: - } + case <-time.After(trafficCheckInterval): + select { + case <-w.TrafficAlert: + w.setState(connected) + reset() + default: + } + case <-t.C: + if w.IsConnecting() { + reset() + break } - w.setState(connected) - trafficTimer.Reset(w.trafficTimeout) - case <-trafficTimer.C: // Falls through when timer runs out if w.verbose { log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout) } - trafficTimer.Stop() - w.setTrafficMonitorRunning(false) - w.Wg.Done() // without this the w.Shutdown() call below will deadlock - if !w.IsConnecting() && w.IsConnected() { + w.setTrafficMonitorRunning(false) // Cannot defer lest Connect is called after Shutdown but before deferred call + w.Wg.Done() // Without this the w.Shutdown() call below will deadlock + if w.IsConnected() { err := w.Shutdown() if err != nil { log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err) } } - return } - - if w.IsConnected() { - // Routine pausing mechanism - go func(p chan<- struct{}) { - time.Sleep(defaultTrafficPeriod) - select { - case p <- struct{}{}: - default: - } - }(pause) - select { - case <-w.ShutdownC: - trafficTimer.Stop() - w.setTrafficMonitorRunning(false) - w.Wg.Done() - return - case <-pause: - } - } } }() } diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index 910142caa9a..b5c7e83d9e7 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -227,7 +227,7 @@ func (w *WebsocketConnection) ReadMessage() Response { select { case w.Traffic <- struct{}{}: - default: // causes contention, just bypass if there is no receiver. + default: // Non-Blocking write ensuses 1 buffered signal per trafficCheckInterval to avoid flooding } var standardMessage []byte diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index 4725cc9b064..b16a6eec3b9 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -181,22 +181,65 @@ func TestTrafficMonitorTimeout(t *testing.T) { err := ws.Setup(defaultSetup) require.NoError(t, err, "Setup must not error") - ws.trafficTimeout = time.Millisecond * 42 + signal := struct{}{} + patience := 10 * time.Millisecond + trafficCheckInterval = 50 * time.Millisecond + ws.trafficTimeout = 200 * time.Millisecond ws.ShutdownC = make(chan struct{}) - ws.trafficMonitor() - assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running") - // Deploy traffic alert - ws.TrafficAlert <- struct{}{} - // try to add another traffic monitor + thenish := time.Now() ws.trafficMonitor() + assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running") + require.Equal(t, disconnected, ws.state.Load(), "websocket must be disconnected") + + for i := 0; i < 2; i++ { + ws.state.Store(disconnected) + select { + case ws.TrafficAlert <- signal: + if i == 0 { + require.WithinDurationf(t, time.Now(), thenish, trafficCheckInterval, "First Non-blocking test must happen before the traffic is checked") + } + default: + require.Failf(t, "", "TrafficAlert should not block; Check #%d", i) + } + + select { + case ws.TrafficAlert <- signal: + require.Failf(t, "", "TrafficAlert should block after first slot used; Check #%d", i) + default: + if i == 0 { + require.WithinDuration(t, time.Now(), thenish, trafficCheckInterval, "First Blocking test must happen before the traffic is checked") + } + } + + require.EventuallyWithTf(t, func(c *assert.CollectT) { + assert.Truef(c, ws.IsConnected(), "state should been marked as connected; Check #%d", i) + assert.Emptyf(c, ws.TrafficAlert, "trafficAlert channel should be drained; Check #%d", i) + }, trafficCheckInterval*2, patience, "trafficAlert should be read and state connected; Check #%d", i) + } + + thenish = time.Now() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, disconnected, ws.state.Load(), "websocket must be disconnected") + assert.False(c, ws.IsTrafficMonitorRunning(), "trafficMonitor shound be shut down") + }, ws.trafficTimeout*2, patience, "trafficTimeout should trigger a shutdown") + + return // prevent shutdown routine ws.setState(disconnected) // await timeout closure ws.Wg.Wait() assert.False(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be not be running") + + // Multiple traffic alerts work + // Only processes a TrafficAlert once per interval + // Does not block + // Reconnects on timeout + // Exits on shutdown + // Connecting state doesn't hamper } func TestIsDisconnectionError(t *testing.T) { @@ -1079,18 +1122,11 @@ func TestSetupNewConnection(t *testing.T) { err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"}) assert.ErrorIs(t, err, errTrafficAlertNil, "SetupNewConnection should error correctly") - nonsenseWebsock.TrafficAlert = make(chan struct{}) + nonsenseWebsock.TrafficAlert = make(chan struct{}, 1) err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"}) assert.ErrorIs(t, err, errReadMessageErrorsNil, "SetupNewConnection should error correctly") - web := Websocket{ - connector: connect, - Wg: new(sync.WaitGroup), - ShutdownC: make(chan struct{}), - TrafficAlert: make(chan struct{}), - ReadMessageErrors: make(chan error), - DataHandler: make(chan interface{}), - } + web := NewWebsocket() err = web.Setup(defaultSetup) assert.NoError(t, err, "Setup should not error")