From 36226ee5f6834b247cc26beb29eeefbdbb053003 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Thu, 15 Feb 2024 20:14:47 +0700 Subject: [PATCH] Websocket: Fix and simplify traffic monitor trafficMonitor had a check throttle at the end of the for loop to stop it just gobbling the (blocking) trafficAlert channel non-stop. That makes sense, except that nothing is sent to the trafficAlert channel if there's no listener. So that means that it's out by one second on the trafficAlert, because any traffic received during the pause is doesn't try to send a traffic alert. The unstopped timer is deliberately leaked for later GC when shutdown. It won't delay/block anything, and it's a trivial memory leak during an infrequent event. Deliberately Choosing to recreate the timer each time instead of using Stop, drain and reset --- exchanges/stream/websocket.go | 78 +++++++---------- exchanges/stream/websocket_connection.go | 2 +- exchanges/stream/websocket_test.go | 102 +++++++++++++++++++---- 3 files changed, 115 insertions(+), 67 deletions(-) diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 409e0953251..bebc3abb595 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -16,12 +16,7 @@ import ( ) const ( - defaultJobBuffer = 5000 - // defaultTrafficPeriod defines a period of pause for the traffic monitor, - // as there are periods with large incoming traffic alerts which requires a - // timer reset, this limits work on this routine to a more effective rate - // of check. - defaultTrafficPeriod = time.Second + jobBuffer = 5000 ) // Public websocket errors @@ -37,6 +32,7 @@ var ( ErrNotConnected = errors.New("websocket is not connected") ) +// Private websocket errors var ( errAlreadyRunning = errors.New("connection monitor is already running") errExchangeConfigIsNil = errors.New("exchange config is nil") @@ -72,7 +68,10 @@ var ( errConnSetup = errors.New("error in connection setup") ) -var globalReporter Reporter +var ( + globalReporter Reporter + trafficCheckInterval = time.Second +) // SetupGlobalReporter sets a reporter interface to be used // for all exchange requests @@ -83,9 +82,9 @@ func SetupGlobalReporter(r Reporter) { // NewWebsocket initialises the websocket struct func NewWebsocket() *Websocket { return &Websocket{ - DataHandler: make(chan interface{}, defaultJobBuffer), - ToRoutine: make(chan interface{}, defaultJobBuffer), - TrafficAlert: make(chan struct{}), + DataHandler: make(chan interface{}, jobBuffer), + ToRoutine: make(chan interface{}, jobBuffer), + TrafficAlert: make(chan struct{}, 1), ReadMessageErrors: make(chan error), Subscribe: make(chan []subscription.Subscription), Unsubscribe: make(chan []subscription.Subscription), @@ -545,9 +544,9 @@ func (w *Websocket) FlushChannels() error { return w.Connect() } -// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires, -// it will reconnect if the TrafficAlert channel has not received any data. The -// trafficTimer will reset on each traffic alert +// trafficMonitor waits trafficCheckInterval before checking for a trafficAlert +// 1 slot buffer means that connection will only write to trafficAlert once per trafficCheckInterval to avoid read/write flood in high traffic +// Otherwise we Shutdown the connection after trafficTimeout, unless it's connecting. connectionMonitor is responsible for Connecting again func (w *Websocket) trafficMonitor() { if w.IsTrafficMonitorRunning() { return @@ -556,62 +555,45 @@ func (w *Websocket) trafficMonitor() { w.Wg.Add(1) go func() { - var trafficTimer = time.NewTimer(w.trafficTimeout) - pause := make(chan struct{}) + t := time.NewTimer(w.trafficTimeout) for { select { case <-w.ShutdownC: if w.verbose { log.Debugf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown message received", w.exchangeName) } - trafficTimer.Stop() + t.Stop() w.setTrafficMonitorRunning(false) w.Wg.Done() return - case <-w.TrafficAlert: - if !trafficTimer.Stop() { - select { - case <-trafficTimer.C: - default: + case <-time.After(trafficCheckInterval): + select { + case <-w.TrafficAlert: + w.setState(connected) + if !t.Stop() { + <-t.C } + t.Reset(w.trafficTimeout) + default: + } + case <-t.C: + if w.IsConnecting() { + t.Reset(w.trafficTimeout) + break } - w.setState(connected) - trafficTimer.Reset(w.trafficTimeout) - case <-trafficTimer.C: // Falls through when timer runs out if w.verbose { log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout) } - trafficTimer.Stop() - w.setTrafficMonitorRunning(false) - w.Wg.Done() // without this the w.Shutdown() call below will deadlock - if !w.IsConnecting() && w.IsConnected() { + w.setTrafficMonitorRunning(false) // Cannot defer lest Connect is called after Shutdown but before deferred call + w.Wg.Done() // Without this the w.Shutdown() call below will deadlock + if w.IsConnected() { err := w.Shutdown() if err != nil { log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err) } } - return } - - if w.IsConnected() { - // Routine pausing mechanism - go func(p chan<- struct{}) { - time.Sleep(defaultTrafficPeriod) - select { - case p <- struct{}{}: - default: - } - }(pause) - select { - case <-w.ShutdownC: - trafficTimer.Stop() - w.setTrafficMonitorRunning(false) - w.Wg.Done() - return - case <-pause: - } - } } }() } diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index 910142caa9a..b5c7e83d9e7 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -227,7 +227,7 @@ func (w *WebsocketConnection) ReadMessage() Response { select { case w.Traffic <- struct{}{}: - default: // causes contention, just bypass if there is no receiver. + default: // Non-Blocking write ensuses 1 buffered signal per trafficCheckInterval to avoid flooding } var standardMessage []byte diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index 4725cc9b064..a04ba82b8a0 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -181,22 +181,95 @@ func TestTrafficMonitorTimeout(t *testing.T) { err := ws.Setup(defaultSetup) require.NoError(t, err, "Setup must not error") - ws.trafficTimeout = time.Millisecond * 42 + signal := struct{}{} + patience := 10 * time.Millisecond + trafficCheckInterval = 50 * time.Millisecond + ws.trafficTimeout = 200 * time.Millisecond ws.ShutdownC = make(chan struct{}) + + thenish := time.Now() ws.trafficMonitor() + assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running") + require.Equal(t, disconnected, ws.state.Load(), "websocket must be disconnected") - // Deploy traffic alert - ws.TrafficAlert <- struct{}{} - // try to add another traffic monitor + // Behaviour: Test multiple traffic alerts work and only process one trafficAlert per interval + for i := 0; i < 2; i++ { + ws.state.Store(disconnected) + + select { + case ws.TrafficAlert <- signal: + if i == 0 { + require.WithinDurationf(t, time.Now(), thenish, trafficCheckInterval, "First Non-blocking test must happen before the traffic is checked") + } + default: + require.Failf(t, "", "TrafficAlert should not block; Check #%d", i) + } + + select { + case ws.TrafficAlert <- signal: + require.Failf(t, "", "TrafficAlert should block after first slot used; Check #%d", i) + default: + if i == 0 { + require.WithinDuration(t, time.Now(), thenish, trafficCheckInterval, "First Blocking test must happen before the traffic is checked") + } + } + + require.EventuallyWithTf(t, func(c *assert.CollectT) { + assert.Truef(c, ws.IsConnected(), "state should been marked as connected; Check #%d", i) + assert.Emptyf(c, ws.TrafficAlert, "trafficAlert channel should be drained; Check #%d", i) + }, 2*trafficCheckInterval, patience, "trafficAlert should be read and state connected; Check #%d", i) + } + + thenish = time.Now() + + // Behaviour: Shuts down websocket and exits on timeout + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, disconnected, ws.state.Load(), "websocket must be disconnected") + assert.False(c, ws.IsTrafficMonitorRunning(), "trafficMonitor shound be shut down") + }, 2*ws.trafficTimeout, patience, "trafficTimeout should trigger a shutdown") + + // Behaviour: connecting status doesn't trigger shutdown + ws.state.Store(connecting) + ws.trafficTimeout = 50 * time.Millisecond + ws.trafficMonitor() + assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running") + require.Equal(t, connecting, ws.state.Load(), "websocket must be connecting") + <-time.After(4 * ws.trafficTimeout) + require.Equal(t, connecting, ws.state.Load(), "websocket must still be connecting after several checks") + ws.state.Store(connected) + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, disconnected, ws.state.Load(), "websocket must be disconnected") + assert.False(c, ws.IsTrafficMonitorRunning(), "trafficMonitor shound be shut down") + }, 4*ws.trafficTimeout, patience, "trafficTimeout should trigger a shutdown after connecting status changes") + + // Behaviour: shutdown is processed and waitgroup is cleared + trafficCheckInterval = 10 * time.Millisecond + ws.state.Store(connected) + ws.trafficTimeout = time.Minute ws.trafficMonitor() assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running") - // prevent shutdown routine - ws.setState(disconnected) - // await timeout closure - ws.Wg.Wait() - assert.False(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be not be running") + wgReady := make(chan bool) + go func() { + ws.Wg.Wait() + close(wgReady) + }() + select { + case <-wgReady: + require.Failf(t, "", "WaitGroup should be blocking still") + case <-time.After(1 * trafficCheckInterval): + } + + close(ws.ShutdownC) + + <-time.After(2 * trafficCheckInterval) + assert.False(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be shutdown") + select { + case <-wgReady: + default: + require.Failf(t, "", "WaitGroup should be freed now") + } } func TestIsDisconnectionError(t *testing.T) { @@ -1079,18 +1152,11 @@ func TestSetupNewConnection(t *testing.T) { err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"}) assert.ErrorIs(t, err, errTrafficAlertNil, "SetupNewConnection should error correctly") - nonsenseWebsock.TrafficAlert = make(chan struct{}) + nonsenseWebsock.TrafficAlert = make(chan struct{}, 1) err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"}) assert.ErrorIs(t, err, errReadMessageErrorsNil, "SetupNewConnection should error correctly") - web := Websocket{ - connector: connect, - Wg: new(sync.WaitGroup), - ShutdownC: make(chan struct{}), - TrafficAlert: make(chan struct{}), - ReadMessageErrors: make(chan error), - DataHandler: make(chan interface{}), - } + web := NewWebsocket() err = web.Setup(defaultSetup) assert.NoError(t, err, "Setup should not error")