Skip to content

Commit

Permalink
Websocket: Remove trafficMonitor connected status
Browse files Browse the repository at this point in the history
trafficMonitor does not need to set the connection to be connected.
Connect() does that. Anything after that should result in a full
shutdown and restart. It can't and shouldn't become connected
unexpectedly, and this is most likely a race anyway.

Also dropped trafficCheckInterval to 100ms to mitigate races of traffic
alerts being buffered for too long.
  • Loading branch information
gbjk committed Feb 20, 2024
1 parent d691839 commit c6dc10c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
11 changes: 8 additions & 3 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (

var (
globalReporter Reporter
trafficCheckInterval = time.Second
trafficCheckInterval = 100 * time.Millisecond
)

// SetupGlobalReporter sets a reporter interface to be used
Expand Down Expand Up @@ -569,15 +569,20 @@ func (w *Websocket) trafficMonitor() {
case <-time.After(trafficCheckInterval):
select {
case <-w.TrafficAlert:
w.setState(connected)
if !t.Stop() {
<-t.C
}
t.Reset(w.trafficTimeout)
default:
}
case <-t.C:
if w.IsConnecting() {
checkAgain := w.IsConnecting()
select {
case <-w.TrafficAlert:
checkAgain = true
default:
}
if checkAgain {
t.Reset(w.trafficTimeout)
break
}
Expand Down
18 changes: 9 additions & 9 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,15 @@ func TestTrafficMonitorTrafficAlerts(t *testing.T) {
patience := 10 * time.Millisecond
ws.trafficTimeout = 200 * time.Millisecond
ws.ShutdownC = make(chan struct{})
ws.state.Store(connected)

thenish := time.Now()
ws.trafficMonitor()

assert.True(t, ws.IsTrafficMonitorRunning(), "traffic monitor should be running")
require.Equal(t, disconnected, ws.state.Load(), "websocket must be disconnected")

for i := 0; i < 2; i++ {
ws.state.Store(disconnected)
require.Equal(t, connected, ws.state.Load(), "websocket must be connected")

for i := 0; i < 6; i++ { // Timeout wil happen at 200ms so we want 6 * 50ms checks to pass

Check failure on line 205 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / Spell checker

wil ==> will, well
select {
case ws.TrafficAlert <- signal:
if i == 0 {
Expand All @@ -222,19 +221,20 @@ func TestTrafficMonitorTrafficAlerts(t *testing.T) {
}
}

// Timeout wil happen at 200ms

Check failure on line 224 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / Spell checker

wil ==> will, well

Check failure on line 224 in exchanges/stream/websocket_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
require.EventuallyWithTf(t, func(c *assert.CollectT) {
assert.Truef(c, ws.IsConnected(), "state should been marked as connected; Check #%d", i)
assert.Truef(c, ws.IsConnected(), "state should still be connected; Check #%d", i)
assert.Emptyf(c, ws.TrafficAlert, "trafficAlert channel should be drained; Check #%d", i)
}, 2*trafficCheckInterval, patience, "trafficAlert should be read and state connected; Check #%d", i)
}, 2*trafficCheckInterval, patience, "trafficAlert should be read; Check #%d", i)
}

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, disconnected, ws.state.Load(), "websocket must be disconnected")
assert.False(c, ws.IsTrafficMonitorRunning(), "trafficMonitor shound be shut down")
}, 2*ws.trafficTimeout, patience, "trafficTimeout should trigger a shutdown")
}, 2*ws.trafficTimeout, patience, "trafficTimeout should trigger a shutdown once we stop feeding trafficAlerts")
}

// TestTrafficMonitorConnecting ensure connecting status doesn't trigger shutdown
// TestTrafficMonitorConnecting ensures connecting status doesn't trigger shutdown
func TestTrafficMonitorConnecting(t *testing.T) {
t.Parallel()
ws := NewWebsocket()
Expand All @@ -256,7 +256,7 @@ func TestTrafficMonitorConnecting(t *testing.T) {
}, 4*ws.trafficTimeout, 10*time.Millisecond, "trafficTimeout should trigger a shutdown after connecting status changes")
}

// TestTrafficMonitorShutdown ensure shutdown is processed and waitgroup is cleared
// TestTrafficMonitorShutdown ensures shutdown is processed and waitgroup is cleared
func TestTrafficMonitorShutdown(t *testing.T) {
t.Parallel()
ws := NewWebsocket()
Expand Down

0 comments on commit c6dc10c

Please sign in to comment.