Skip to content

Commit

Permalink
Websocket: Remove trafficMonitor connected status
Browse files Browse the repository at this point in the history
trafficMonitor does not need to set the connection to be connected.
Connect() does that. Anything after that should result in a full
shutdown and restart. It can't and shouldn't become connected
unexpectedly, and this is most likely a race anyway.

Also dropped trafficCheckInterval to 100ms to mitigate races of traffic
alerts being buffered for too long.
  • Loading branch information
gbjk committed Feb 23, 2024
1 parent 41280d0 commit 40333fe
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
11 changes: 8 additions & 3 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (

var (
globalReporter Reporter
trafficCheckInterval = time.Second
trafficCheckInterval = 100 * time.Millisecond
)

// SetupGlobalReporter sets a reporter interface to be used
Expand Down Expand Up @@ -569,15 +569,20 @@ func (w *Websocket) trafficMonitor() {
case <-time.After(trafficCheckInterval):
select {
case <-w.TrafficAlert:
w.setState(connected)
if !t.Stop() {
<-t.C
}
t.Reset(w.trafficTimeout)
default:
}
case <-t.C:
if w.IsConnecting() {
checkAgain := w.IsConnecting()
select {
case <-w.TrafficAlert:
checkAgain = true
default:
}
if checkAgain {
t.Reset(w.trafficTimeout)
break
}
Expand Down
15 changes: 7 additions & 8 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,15 @@ func TestTrafficMonitorTrafficAlerts(t *testing.T) {
patience := 10 * time.Millisecond
ws.trafficTimeout = 200 * time.Millisecond
ws.ShutdownC = make(chan struct{})
ws.state.Store(connected)

thenish := time.Now()
ws.trafficMonitor()

assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running")
require.Equal(t, disconnected, ws.state.Load(), "websocket must be disconnected")

for i := 0; i < 2; i++ {
ws.state.Store(disconnected)
require.Equal(t, connected, ws.state.Load(), "websocket must be connected")

for i := 0; i < 6; i++ { // Timeout will happen at 200ms so we want 6 * 50ms checks to pass
select {
case ws.TrafficAlert <- signal:
if i == 0 {
Expand All @@ -223,9 +222,9 @@ func TestTrafficMonitorTrafficAlerts(t *testing.T) {
}

require.EventuallyWithTf(t, func(c *assert.CollectT) {
assert.Truef(c, ws.IsConnected(), "state should been marked as connected; Check #%d", i)
assert.Truef(c, ws.IsConnected(), "state should still be connected; Check #%d", i)
assert.Emptyf(c, ws.TrafficAlert, "trafficAlert channel should be drained; Check #%d", i)
}, 2*trafficCheckInterval, patience, "trafficAlert should be read and state connected; Check #%d", i)
}, 2*trafficCheckInterval, patience, "trafficAlert should be read; Check #%d", i)
}

require.EventuallyWithT(t, func(c *assert.CollectT) {
Expand All @@ -234,7 +233,7 @@ func TestTrafficMonitorTrafficAlerts(t *testing.T) {
}, 2*ws.trafficTimeout, patience, "trafficTimeout should trigger a shutdown once we stop feeding trafficAlerts")
}

// TestTrafficMonitorConnecting ensure connecting status doesn't trigger shutdown
// TestTrafficMonitorConnecting ensures connecting status doesn't trigger shutdown
func TestTrafficMonitorConnecting(t *testing.T) {
t.Parallel()
ws := NewWebsocket()
Expand All @@ -256,7 +255,7 @@ func TestTrafficMonitorConnecting(t *testing.T) {
}, 4*ws.trafficTimeout, 10*time.Millisecond, "trafficTimeout should trigger a shutdown after connecting status changes")
}

// TestTrafficMonitorShutdown ensure shutdown is processed and waitgroup is cleared
// TestTrafficMonitorShutdown ensures shutdown is processed and waitgroup is cleared
func TestTrafficMonitorShutdown(t *testing.T) {
t.Parallel()
ws := NewWebsocket()
Expand Down

0 comments on commit 40333fe

Please sign in to comment.