Skip to content

Commit

Permalink
Websocket: Fix and simplify traffic monitor
Browse files Browse the repository at this point in the history
trafficMonitor had a check throttle at the end of the for loop to stop it just gobbling the (blocking) trafficAlert channel non-stop.
That makes sense, except that nothing is sent to the trafficAlert channel if there's no listener.
So that means that it's out by one second on the trafficAlert, because any traffic received during the pause is doesn't try to send a traffic alert.
  • Loading branch information
gbjk committed Feb 15, 2024
1 parent 1c7441c commit 5335be3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 57 deletions.
69 changes: 22 additions & 47 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ import (
)

const (
defaultJobBuffer = 5000
// defaultTrafficPeriod defines a period of pause for the traffic monitor,
// as there are periods with large incoming traffic alerts which requires a
// timer reset, this limits work on this routine to a more effective rate
// of check.
defaultTrafficPeriod = time.Second
jobBuffer = 5000
trafficCheckInterval = time.Second
)

// Public websocket errors
Expand Down Expand Up @@ -83,9 +79,9 @@ func SetupGlobalReporter(r Reporter) {
// NewWebsocket initialises the websocket struct
func NewWebsocket() *Websocket {
return &Websocket{
DataHandler: make(chan interface{}, defaultJobBuffer),
ToRoutine: make(chan interface{}, defaultJobBuffer),
TrafficAlert: make(chan struct{}),
DataHandler: make(chan interface{}, jobBuffer),
ToRoutine: make(chan interface{}, jobBuffer),
TrafficAlert: make(chan struct{}, 1),
ReadMessageErrors: make(chan error),
Subscribe: make(chan []subscription.Subscription),
Unsubscribe: make(chan []subscription.Subscription),
Expand Down Expand Up @@ -541,9 +537,9 @@ func (w *Websocket) FlushChannels() error {
return w.Connect()
}

// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires,
// it will reconnect if the TrafficAlert channel has not received any data. The
// trafficTimer will reset on each traffic alert
// trafficMonitor waits trafficCheckInterval before checking for a trafficAlert
// 1 slot buffer means that connection will only write to trafficAlert once per trafficCheckInterval to avoid read/write flood in high traffic
// Otherwise we Shutdown the connection after trafficTimeout, unless it's connecting. connectionMonitor is responsible for Connecting again
func (w *Websocket) trafficMonitor() {
if w.IsTrafficMonitorRunning() {
return
Expand All @@ -552,9 +548,8 @@ func (w *Websocket) trafficMonitor() {
w.Wg.Add(1)

go func() {
var trafficTimer = time.NewTimer(w.trafficTimeout)
pause := make(chan struct{})
for {
trafficTimer := time.NewTimer(w.trafficTimeout)
select {
case <-w.ShutdownC:
if w.verbose {
Expand All @@ -564,50 +559,30 @@ func (w *Websocket) trafficMonitor() {
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-w.TrafficAlert:
if !trafficTimer.Stop() {
select {
case <-trafficTimer.C:
default:
}
case <-time.After(trafficCheckInterval):
select {
case <-w.TrafficAlert:
w.setState(connected)
default:
}
case <-trafficTimer.C:
if w.IsConnecting() {
break
}
w.setState(connected)
trafficTimer.Reset(w.trafficTimeout)
case <-trafficTimer.C: // Falls through when timer runs out
if w.verbose {
log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout)
}
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done() // without this the w.Shutdown() call below will deadlock
if !w.IsConnecting() && w.IsConnected() {
w.setTrafficMonitorRunning(false) // Cannot defer lest Connect is called after Shutdown but before deferred call
w.Wg.Done() // Without this the w.Shutdown() call below will deadlock
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err)
}
}

return
}

if w.IsConnected() {
// Routine pausing mechanism
go func(p chan<- struct{}) {
time.Sleep(defaultTrafficPeriod)
select {
case p <- struct{}{}:
default:
}
}(pause)
select {
case <-w.ShutdownC:
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-pause:
}
}
trafficTimer.Stop()
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/stream/websocket_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (w *WebsocketConnection) ReadMessage() Response {

select {
case w.Traffic <- struct{}{}:
default: // causes contention, just bypass if there is no receiver.
default: // Non-Blocking write ensuses 1 buffered signal per trafficCheckInterval to avoid flooding
}

var standardMessage []byte
Expand Down
11 changes: 2 additions & 9 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,18 +1079,11 @@ func TestSetupNewConnection(t *testing.T) {
err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"})
assert.ErrorIs(t, err, errTrafficAlertNil, "SetupNewConnection should error correctly")

nonsenseWebsock.TrafficAlert = make(chan struct{})
nonsenseWebsock.TrafficAlert = make(chan struct{}, 1)
err = nonsenseWebsock.SetupNewConnection(ConnectionSetup{URL: "urlstring"})
assert.ErrorIs(t, err, errReadMessageErrorsNil, "SetupNewConnection should error correctly")

web := Websocket{
connector: connect,
Wg: new(sync.WaitGroup),
ShutdownC: make(chan struct{}),
TrafficAlert: make(chan struct{}),
ReadMessageErrors: make(chan error),
DataHandler: make(chan interface{}),
}
web := New()

Check failure on line 1086 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: New (typecheck)

Check failure on line 1086 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, amd64, true, false)

undefined: New

Check failure on line 1086 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, 386, true, true)

undefined: New

Check failure on line 1086 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-latest, amd64, true, true)

undefined: New

Check failure on line 1086 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-13, amd64, true, true)

undefined: New

Check failure on line 1086 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (windows-latest, amd64, true, true)

undefined: New

err = web.Setup(defaultSetup)
assert.NoError(t, err, "Setup should not error")
Expand Down

0 comments on commit 5335be3

Please sign in to comment.