diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 1cc5e70fec0..cd1efb10d88 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -1218,7 +1218,8 @@ func TestWsSubscribe(t *testing.T) { assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription") err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}}) - assert.NoError(t, err, "Resubscribing to the same channel shouldn't error") + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Resubscribing to the same channel should error with SubFailure") + assert.ErrorIs(t, err, stream.ErrSubscribedAlready, "Resubscribing to the same channel should error with SubscribedAlready") assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error") err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}) diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index dd25cf43a15..4cb7363e512 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -408,6 +408,9 @@ func (k *Kraken) wsReadDataResponse(c *stream.ChannelSubscription, response Webs } return k.wsProcessCandles(c, o) case krakenWsOrderbook: + if c.State == stream.ChannelUnsubscribing { + return nil + } ob, ok := response[1].(map[string]interface{}) if !ok { return errors.New("received invalid orderbook data") @@ -762,7 +765,7 @@ func (k *Kraken) wsProcessOrderBook(c *stream.ChannelSubscription, data map[stri // This was locking the main websocket reader routine and a // backlog occurred. So put this into it's own go routine. errResub := k.Websocket.ResubscribeToChannel(resub) - if errResub != nil { + if errResub != nil && errResub != stream.ErrChannelInStateAlready { log.Errorf(log.WebsocketMgr, "resubscription failure for %v: %v", resub, @@ -1205,7 +1208,8 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { return err } - err = k.Websocket.AddPendingSubscription(c) + c.State = stream.ChannelSubscribing + err = k.Websocket.AddSubscription(c) if err != nil { return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) } @@ -1229,7 +1233,10 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { return wErr } - k.Websocket.AddSuccessfulSubscriptions(*c) + if err = k.Websocket.SetSubscriptionState(c, stream.ChannelSubscribed); err != nil { + log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, err) + } + if k.Verbose { log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Currency) } @@ -1246,7 +1253,9 @@ func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error { c.EnsureKeyed() - if err = k.Websocket.SetSubscriptionPending(c); err != nil { + if err = k.Websocket.SetSubscriptionState(c, stream.ChannelUnsubscribing); err != nil { + // err is probably ErrChannelInStateAlready, but we want to bubble it up to prevent an attempt to Subscribe again + // We can catch and ignore it in our call to resub return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrUnsubscribeFailure, c.Channel, c.Currency, err) } @@ -1258,12 +1267,18 @@ func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error { respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) if err != nil { + if e2 := k.Websocket.SetSubscriptionState(c, stream.ChannelSubscribed); e2 != nil { + log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, e2) + } return err } if err = k.getErrResp(respRaw); err != nil { wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrUnsubscribeFailure, c.Channel, c.Currency, err) k.Websocket.DataHandler <- wErr + if e2 := k.Websocket.SetSubscriptionState(c, stream.ChannelSubscribed); e2 != nil { + log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, e2) + } return wErr } @@ -1298,6 +1313,11 @@ func (k *Kraken) reqForSub(e string, c *stream.ChannelSubscription) (*WebsocketS // ensureChannelKeyed wraps the channel EnsureKeyed to add channel name suffixes for Depth and Interval func ensureChannelKeyed(c *stream.ChannelSubscription, r *WebsocketSubRequest) error { + if c.Key != nil { + // Avoid double wrapping the Key.Channel in resubscriptions + return nil + } + key, ok := c.EnsureKeyed().(stream.DefaultChannelKey) if !ok { return common.GetTypeAssertError("stream.DefaultChannelKey", c.Key, "subscription.Key") // Should be impossible diff --git a/exchanges/stream/stream_types.go b/exchanges/stream/stream_types.go index 156dba2d9ef..4daf68bbe66 100644 --- a/exchanges/stream/stream_types.go +++ b/exchanges/stream/stream_types.go @@ -38,14 +38,24 @@ type DefaultChannelKey struct { Asset asset.Item } -// ChannelSubscription container for streaming subscriptions +// ChannelState tracks the status of a subscription channel +type ChannelState int + +const ( + ChannelStateUnknown ChannelState = iota // ChannelStateUnknown means subscription state is not registered, but doesn't imply Inactive + ChannelSubscribing // ChannelSubscribing means channel is in the process of subscribing + ChannelSubscribed // ChannelSubscribed means the channel has finished a successful and acknowledged subscription + ChannelUnsubscribing // ChannelUnsubscribing means the channel has started to unsubscribe, but not yet confirmed +) + +// ChannelSubscription container for streaming subscription channels type ChannelSubscription struct { Key any Channel string Currency currency.Pair Asset asset.Item Params map[string]interface{} - pending bool + State ChannelState } // ConnectionSetup defines variables for an individual stream connection diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index e0a8d1f5fbe..f8ade9250c4 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -26,12 +26,14 @@ const ( var ( // ErrSubscriptionNotFound defines an error when a subscription is not found ErrSubscriptionNotFound = errors.New("subscription not found") + // ErrSubscribedAlready defines an error when a channel is already subscribed + ErrSubscribedAlready = errors.New("duplicate subscription") // ErrSubscriptionFailure defines an error when a subscription fails ErrSubscriptionFailure = errors.New("subscription failure") // ErrUnsubscribeFailure defines an error when a unsubscribe fails ErrUnsubscribeFailure = errors.New("unsubscribe failure") - // ErrSubscriptionPending defines an error when a subscription is already pending an operation result - ErrSubscriptionPending = errors.New("subscription update already happening") + // ErrChannelInStateAlready defines an error when a subscription channel is already in a new state + ErrChannelInStateAlready = errors.New("channel already in state") // ErrAlreadyDisabled is returned when you double-disable the websocket ErrAlreadyDisabled = errors.New("websocket already disabled") // ErrNotConnected defines an error when websocket is not connected @@ -929,33 +931,28 @@ func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error { return nil } -// AddPendingSubscription adds a subscription to the subscription lists, or updates it if it exists. -// The pending operation could be subscribe or unsubscribe. The purpose of setting this state is to block other similar operations -// If the subscription already exists then the state is changed to pending but no other fields are merged -// Returns error if the subscription is already pending -func (w *Websocket) AddPendingSubscription(c *ChannelSubscription) error { +// AddSubscription adds a subscription to the subscription lists +// Unlike AddSubscriptions this method will error if the subscription already exists +func (w *Websocket) AddSubscription(c *ChannelSubscription) error { w.subscriptionMutex.Lock() defer w.subscriptionMutex.Unlock() if w.subscriptions == nil { w.subscriptions = subscriptionMap{} } key := c.EnsureKeyed() - p, ok := w.subscriptions[key] - if !ok { - n := *c // Fresh copy; we don't want to use the pointer we were given and allow encapsulation/locks to be bypassed - p = &n - w.subscriptions[key] = p - } - if p.pending { - return ErrSubscriptionPending + if _, ok := w.subscriptions[key]; ok { + return ErrSubscribedAlready } - p.pending = true + + n := *c // Fresh copy; we don't want to use the pointer we were given and allow encapsulation/locks to be bypassed + w.subscriptions[key] = &n + return nil } -// SetSubscriptionPending sets an existing subscription to be Pending -// returns an error if the subscription is not found or if it's already Pending -func (w *Websocket) SetSubscriptionPending(c *ChannelSubscription) error { +// SetSubscriptionState sets an existing subscription state +// returns an error if the subscription is not found, or the new state is already set +func (w *Websocket) SetSubscriptionState(c *ChannelSubscription, state ChannelState) error { w.subscriptionMutex.Lock() defer w.subscriptionMutex.Unlock() if w.subscriptions == nil { @@ -966,10 +963,10 @@ func (w *Websocket) SetSubscriptionPending(c *ChannelSubscription) error { if !ok { return ErrSubscriptionNotFound } - if p.pending { - return ErrSubscriptionPending + if state == p.State { + return ErrChannelInStateAlready } - p.pending = true + p.State = state return nil } @@ -991,7 +988,7 @@ func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription) for _, cN := range channels { c := cN // cN is an iteration var; Not safe to make a pointer to key := c.EnsureKeyed() - c.pending = false + c.State = ChannelSubscribed w.subscriptions[key] = &c } } diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index a191fb12a41..7bcbeb24a77 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -574,25 +574,29 @@ func TestResubscribe(t *testing.T) { assert.NoError(t, ws.ResubscribeToChannel(&channel[0]), "Resubscribe should not error now the channel is subscribed") } -// TestPendingSubscriptions tests AddPendingSubscription and IsPending -func TestPendingSubscriptions(t *testing.T) { +// TestSubscriptionState tests Subscription state changes +func TestSubscriptionState(t *testing.T) { t.Parallel() ws := New() - c := &ChannelSubscription{Key: 42, Channel: "Gophers"} - assert.ErrorIs(t, ws.SetSubscriptionPending(c), ErrSubscriptionNotFound, "Setting an imaginary sub should error") + c := &ChannelSubscription{Key: 42, Channel: "Gophers", State: ChannelSubscribing} + assert.ErrorIs(t, ws.SetSubscriptionState(c, ChannelUnsubscribing), ErrSubscriptionNotFound, "Setting an imaginary sub should error") - assert.NoError(t, ws.AddPendingSubscription(c), "Adding first subscription should not error") + assert.NoError(t, ws.AddSubscription(c), "Adding first subscription should not error") found := ws.GetSubscription(42) assert.NotNil(t, found, "Should find the subscription") - assert.True(t, found.pending, "Subscription should be pending") - assert.ErrorIs(t, ws.AddPendingSubscription(c), ErrSubscriptionPending, "Adding an already pending sub should error") - assert.ErrorIs(t, ws.SetSubscriptionPending(c), ErrSubscriptionPending, "Setting an already pending sub should error") + assert.Equal(t, ChannelSubscribing, found.State, "Subscription should be Subscribing") + assert.ErrorIs(t, ws.AddSubscription(c), ErrSubscribedAlready, "Adding an already existing sub should error") + assert.ErrorIs(t, ws.SetSubscriptionState(c, ChannelSubscribing), ErrChannelInStateAlready, "Setting Same state should error") ws.AddSuccessfulSubscriptions(*c) found = ws.GetSubscription(42) assert.NotNil(t, found, "Should find the subscription") - assert.False(t, found.pending, "Subscription should no longer be pending") + assert.Equal(t, found.State, ChannelSubscribed, "Subscription should be subscribed state") + + assert.NoError(t, ws.SetSubscriptionState(c, ChannelUnsubscribing), "Setting Unsub state should not error") + found = ws.GetSubscription(42) + assert.Equal(t, found.State, ChannelUnsubscribing, "Subscription should be unsubscribing state") } // TestRemoveSubscription tests removing a subscription @@ -601,7 +605,7 @@ func TestRemoveSubscription(t *testing.T) { ws := New() c := &ChannelSubscription{Key: 42, Channel: "Unite!"} - assert.NoError(t, ws.AddPendingSubscription(c), "Adding first subscription should not error") + assert.NoError(t, ws.AddSubscription(c), "Adding first subscription should not error") assert.NotNil(t, ws.GetSubscription(42), "Added subscription should be findable") ws.RemoveSubscription(c)