Skip to content

Commit

Permalink
WS: Relay disconnect errors to subscribers
Browse files Browse the repository at this point in the history
Subscribers probably care when the WS got disconncted.
Tell them and expose a method to test the error for matching
  • Loading branch information
gbjk committed Sep 11, 2023
1 parent 3218982 commit 4a514bd
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
9 changes: 4 additions & 5 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,16 +391,15 @@ func (w *Websocket) connectionMonitor() error {
}
select {
case err := <-w.ReadMessageErrors:
if isDisconnectionError(err) {
if IsDisconnectionError(err) {
w.setInit(false)
log.Warnf(log.WebsocketMgr,
"%v websocket has been disconnected. Reason: %v",
w.exchangeName, err)
w.setConnectedStatus(false)
} else {
// pass off non disconnect errors to datahandler to manage
w.DataHandler <- err
}

w.DataHandler <- err
case <-timer.C:
if !w.IsConnecting() && !w.IsConnected() {
err := w.Connect()
Expand Down Expand Up @@ -984,7 +983,7 @@ func (w *Websocket) CanUseAuthenticatedEndpoints() bool {
}

// isDisconnectionError Determines if the error sent over chan ReadMessageErrors is a disconnection error

Check warning on line 985 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / lint

exported: comment on exported function IsDisconnectionError should be of the form "IsDisconnectionError ..." (revive)
func isDisconnectionError(err error) bool {
func IsDisconnectionError(err error) bool {
if websocket.IsUnexpectedCloseError(err) {
return true
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/stream/websocket_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (w *WebsocketConnection) IsConnected() bool {
func (w *WebsocketConnection) ReadMessage() Response {
mType, resp, err := w.Connection.ReadMessage()
if err != nil {
if isDisconnectionError(err) {
if IsDisconnectionError(err) {
if w.setConnectedStatus(false) {
// NOTE: When w.setConnectedStatus() returns true the underlying
// state was changed and this infers that the connection was
Expand Down
14 changes: 8 additions & 6 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,26 +230,26 @@ func TestTrafficMonitorTimeout(t *testing.T) {

func TestIsDisconnectionError(t *testing.T) {
t.Parallel()
isADisconnectionError := isDisconnectionError(errors.New("errorText"))
isADisconnectionError := IsDisconnectionError(errors.New("errorText"))
if isADisconnectionError {
t.Error("Its not")
}
isADisconnectionError = isDisconnectionError(&websocket.CloseError{
isADisconnectionError = IsDisconnectionError(&websocket.CloseError{
Code: 1006,
Text: "errorText",
})
if !isADisconnectionError {
t.Error("It is")
}

isADisconnectionError = isDisconnectionError(&net.OpError{
isADisconnectionError = IsDisconnectionError(&net.OpError{
Err: errClosedConnection,
})
if isADisconnectionError {
t.Error("It's not")
}

isADisconnectionError = isDisconnectionError(&net.OpError{
isADisconnectionError = IsDisconnectionError(&net.OpError{
Err: errors.New("errText"),
})
if !isADisconnectionError {
Expand Down Expand Up @@ -321,8 +321,10 @@ func TestConnectionMessageErrors(t *testing.T) {
outer:
for {
select {
case <-ws.ToRoutine:
t.Fatal("Error is a disconnection error")
case err := <-ws.ToRoutine:
if _, ok := err.(*websocket.CloseError); !ok {
t.Errorf("Error is not a disconnection error: %v", err)
}
case <-timer.C:
break outer
}
Expand Down

0 comments on commit 4a514bd

Please sign in to comment.