Skip to content

Commit

Permalink
Websocket: Fix race on DataHandler during shutdown
Browse files Browse the repository at this point in the history
Previously we would encounter situations where shutdown would race and
fail TestConnectionMessageErrors, demonstrating that consumers might not
be told why their connection is closing.

* Do not drain DataHandler on dataMonitor shutdown
Consumers should be allowed to process whatever messages were in flight
prior to a socket shutdown

* Ensure all DataHandler messages are sent to ToRoutine before shutdown
This avoids a race where we'd read a message from DataHandler, but then
process a shutdown before trying to relay it. The relay is non-blocking
anyway, so we can trust that we'd pick up the Shutdown during the next
loop.

* Send errors to DataHandler before starting a shutdown
This just reduces the chance of a race on shutdown processing
  • Loading branch information
gbjk committed Feb 26, 2024
1 parent cdbead3 commit bb38136
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 24 deletions.
33 changes: 11 additions & 22 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,34 +347,24 @@ func (w *Websocket) dataMonitor() {
w.Wg.Add(1)

go func() {
defer func() {
for {
// Bleeds data from the websocket connection if needed
select {
case <-w.DataHandler:
default:
w.setDataMonitorRunning(false)
w.Wg.Done()
return
}
}
}()

defer w.setDataMonitorRunning(false)
defer w.Wg.Done()
full := false
for {
select {
case <-w.ShutdownC:
return
case d := <-w.DataHandler:
select {
case w.ToRoutine <- d:
case <-w.ShutdownC:
return
if full {
full = false
}
default:
log.Warnf(log.WebsocketMgr, "%s exchange backlog in websocket processing detected", w.exchangeName)
select {
case w.ToRoutine <- d:
case <-w.ShutdownC:
return
if !full {
full = true
// If this becomes prone to flapping we could drain the buffer, but that's extreme and we'd like to avoid it if possible
log.Warnf(log.WebsocketMgr, "%s exchange backlog in websocket ToRoutine consumer channel; dropping messages", w.exchangeName)
}
}
}
Expand Down Expand Up @@ -413,6 +403,7 @@ func (w *Websocket) connectionMonitor() error {
}
select {
case err := <-w.ReadMessageErrors:
w.DataHandler <- err
if IsDisconnectionError(err) {
log.Warnf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", w.exchangeName, err)
if w.IsConnected() {
Expand All @@ -421,8 +412,6 @@ func (w *Websocket) connectionMonitor() error {
}
}
}

w.DataHandler <- err
case <-timer.C:
if !w.IsConnecting() && !w.IsConnected() {
err := w.Connect()
Expand Down
3 changes: 1 addition & 2 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,6 @@ func TestConnectionMessageErrors(t *testing.T) {
err = ws.Connect()
require.NoError(t, err, "Connect must not error")

ws.TrafficAlert <- struct{}{}

c := func(tb *assert.CollectT) {
select {
case v, ok := <-ws.ToRoutine:
Expand All @@ -345,6 +343,7 @@ func TestConnectionMessageErrors(t *testing.T) {
}
}

ws.TrafficAlert <- struct{}{}
ws.ReadMessageErrors <- errDastardlyReason
assert.EventuallyWithT(t, c, 2*time.Second, 10*time.Millisecond, "Should get an error down the routine")

Expand Down

0 comments on commit bb38136

Please sign in to comment.