Skip to content

Commit

Permalink
Websocket: Fix Resubscribe erroring Duplicate
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Aug 15, 2024
1 parent 2813a9f commit 3b139e2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
29 changes: 29 additions & 0 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,35 @@ func TestWsSubscribe(t *testing.T) {
}
}

// TestWsResubscribe tests websocket resubscription
func TestWsResubscribe(t *testing.T) {
k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(k), "TestInstance must not error")
testexch.SetupWs(t, k)

err := k.Subscribe(subscription.List{{Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 1000}})
require.NoError(t, err, "Subscribe must not error")
subs := k.Websocket.GetSubscriptions()
require.Len(t, subs, 1, "Should add 1 Subscription")
require.Equal(t, subscription.SubscribedState, subs[0].State(), "Subscription should be subscribed state")

require.Eventually(t, func() bool {
b, err := k.Websocket.Orderbook.GetOrderbook(xbtusdPair, asset.Spot)

Check failure on line 1036 in exchanges/kraken/kraken_test.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 1029 (govet)
if err == nil {
return !b.LastUpdated.IsZero()
}
return false
}, time.Second*4, time.Millisecond*10, "orderbook must start streaming")

// Set the state to Unsub so we definitely know Resub worked
err = subs[0].SetState(subscription.UnsubscribingState)
require.NoError(t, err)

err = k.Websocket.ResubscribeToChannel(subs[0])
require.NoError(t, err, "Resubscribe must not error")
require.Equal(t, subscription.SubscribedState, subs[0].State(), "subscription must be subscribed again")
}

// TestWsOrderbookSub tests orderbook subscriptions for MaxDepth params
func TestWsOrderbookSub(t *testing.T) {
t.Parallel()
Expand Down
3 changes: 3 additions & 0 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,9 @@ func (w *Websocket) checkSubscriptions(subs subscription.List) error {
}

for _, s := range subs {
if s.State() == subscription.ResubscribingState {
continue
}
if found := w.subscriptions.Get(s); found != nil {
return fmt.Errorf("%w: %s", subscription.ErrDuplicate, s)
}
Expand Down

0 comments on commit 3b139e2

Please sign in to comment.