Skip to content

Commit

Permalink
Websocket: Fix and simplify traffic monitor
Browse files Browse the repository at this point in the history
trafficMonitor had a check throttle at the end of the for loop to stop it just gobbling the (blocking) trafficAlert channel non-stop.
That makes sense, except that nothing is sent to the trafficAlert channel if there's no listener.
So that means that it's out by one second on the trafficAlert, because any traffic received during the pause is doesn't try to send a traffic alert.
  • Loading branch information
gbjk committed Feb 15, 2024
1 parent 1c7441c commit 445ef2b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 48 deletions.
68 changes: 21 additions & 47 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ import (
)

const (
defaultJobBuffer = 5000
// defaultTrafficPeriod defines a period of pause for the traffic monitor,
// as there are periods with large incoming traffic alerts which requires a
// timer reset, this limits work on this routine to a more effective rate
// of check.
defaultTrafficPeriod = time.Second
jobBuffer = 5000
trafficCheckInterval = time.Second
)

// Public websocket errors
Expand Down Expand Up @@ -83,9 +79,9 @@ func SetupGlobalReporter(r Reporter) {
// NewWebsocket initialises the websocket struct
func NewWebsocket() *Websocket {
return &Websocket{
DataHandler: make(chan interface{}, defaultJobBuffer),
ToRoutine: make(chan interface{}, defaultJobBuffer),
TrafficAlert: make(chan struct{}),
DataHandler: make(chan interface{}, jobBuffer),
ToRoutine: make(chan interface{}, jobBuffer),
TrafficAlert: make(chan struct{}, 1),
ReadMessageErrors: make(chan error),
Subscribe: make(chan []subscription.Subscription),
Unsubscribe: make(chan []subscription.Subscription),
Expand Down Expand Up @@ -541,9 +537,9 @@ func (w *Websocket) FlushChannels() error {
return w.Connect()
}

// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires,
// it will reconnect if the TrafficAlert channel has not received any data. The
// trafficTimer will reset on each traffic alert
// trafficMonitor waits trafficCheckInterval before checking for a trafficAlert
// 1 slot buffer means that connection will only write to trafficAlert once per trafficCheckInterval to avoid read/write flood in high traffic
// Otherwise we Shutdown the connection after trafficTimeout, unless it's connecting. connectionMonitor is responsible for Connecting again
func (w *Websocket) trafficMonitor() {
if w.IsTrafficMonitorRunning() {
return
Expand All @@ -552,9 +548,8 @@ func (w *Websocket) trafficMonitor() {
w.Wg.Add(1)

go func() {
var trafficTimer = time.NewTimer(w.trafficTimeout)
pause := make(chan struct{})
for {
trafficTimer := time.NewTimer(w.trafficTimeout)
select {
case <-w.ShutdownC:
if w.verbose {
Expand All @@ -564,50 +559,29 @@ func (w *Websocket) trafficMonitor() {
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-w.TrafficAlert:
if !trafficTimer.Stop() {
select {
case <-trafficTimer.C:
default:
}
case <-time.After(trafficCheckInterval):
select {
case <-w.TrafficAlert:
w.setState(connected)
default:
}
case <-trafficTimer.C:
if w.IsConnecting() {
break
}
w.setState(connected)
trafficTimer.Reset(w.trafficTimeout)
case <-trafficTimer.C: // Falls through when timer runs out
if w.verbose {
log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout)
}
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done() // without this the w.Shutdown() call below will deadlock
if !w.IsConnecting() && w.IsConnected() {
w.setTrafficMonitorRunning(false) // Cannot defer lest Connect is called after Shutdown but before deferred call
w.Wg.Done() // Without this the w.Shutdown() call below will deadlock
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err)
}
}

return
}

if w.IsConnected() {
// Routine pausing mechanism
go func(p chan<- struct{}) {
time.Sleep(defaultTrafficPeriod)
select {
case p <- struct{}{}:
default:
}
}(pause)
select {
case <-w.ShutdownC:
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-pause:
}
}
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/stream/websocket_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (w *WebsocketConnection) ReadMessage() Response {

select {
case w.Traffic <- struct{}{}:
default: // causes contention, just bypass if there is no receiver.
default: // Non-Blocking write ensuses 1 buffered signal per trafficCheckInterval to avoid flooding
}

var standardMessage []byte
Expand Down

0 comments on commit 445ef2b

Please sign in to comment.