Skip to content

Commit

Permalink
Kraken: Subscription State instead of pending
Browse files Browse the repository at this point in the history
We were still getting resub loops and errors due to LUNA/USD checksum
failures. This change allows more clarity about the current state and
checks for specifically already Unsubing
  • Loading branch information
gbjk committed Oct 21, 2023
1 parent f40636d commit b3671a3
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 40 deletions.
3 changes: 2 additions & 1 deletion exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,8 @@ func TestWsSubscribe(t *testing.T) {
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription")

err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}})
assert.NoError(t, err, "Resubscribing to the same channel shouldn't error")
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Resubscribing to the same channel should error with SubFailure")
assert.ErrorIs(t, err, stream.ErrSubscribedAlready, "Resubscribing to the same channel should error with SubscribedAlready")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error")

err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}})
Expand Down
28 changes: 24 additions & 4 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ func (k *Kraken) wsReadDataResponse(c *stream.ChannelSubscription, response Webs
}
return k.wsProcessCandles(c, o)
case krakenWsOrderbook:
if c.State == stream.ChannelUnsubscribing {
return nil
}
ob, ok := response[1].(map[string]interface{})
if !ok {
return errors.New("received invalid orderbook data")
Expand Down Expand Up @@ -762,7 +765,7 @@ func (k *Kraken) wsProcessOrderBook(c *stream.ChannelSubscription, data map[stri
// This was locking the main websocket reader routine and a
// backlog occurred. So put this into it's own go routine.
errResub := k.Websocket.ResubscribeToChannel(resub)
if errResub != nil {
if errResub != nil && errResub != stream.ErrChannelInStateAlready {
log.Errorf(log.WebsocketMgr,
"resubscription failure for %v: %v",
resub,
Expand Down Expand Up @@ -1205,7 +1208,8 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error {
return err
}

err = k.Websocket.AddPendingSubscription(c)
c.State = stream.ChannelSubscribing
err = k.Websocket.AddSubscription(c)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
}
Expand All @@ -1229,7 +1233,10 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error {
return wErr
}

k.Websocket.AddSuccessfulSubscriptions(*c)
if err = k.Websocket.SetSubscriptionState(c, stream.ChannelSubscribed); err != nil {
log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, err)
}

if k.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Currency)
}
Expand All @@ -1246,7 +1253,9 @@ func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error {

c.EnsureKeyed()

if err = k.Websocket.SetSubscriptionPending(c); err != nil {
if err = k.Websocket.SetSubscriptionState(c, stream.ChannelUnsubscribing); err != nil {
// err is probably ErrChannelInStateAlready, but we want to bubble it up to prevent an attempt to Subscribe again
// We can catch and ignore it in our call to resub
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrUnsubscribeFailure, c.Channel, c.Currency, err)
}

Expand All @@ -1258,12 +1267,18 @@ func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error {

respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r)
if err != nil {
if e2 := k.Websocket.SetSubscriptionState(c, stream.ChannelSubscribed); e2 != nil {
log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, e2)
}
return err
}

if err = k.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrUnsubscribeFailure, c.Channel, c.Currency, err)
k.Websocket.DataHandler <- wErr
if e2 := k.Websocket.SetSubscriptionState(c, stream.ChannelSubscribed); e2 != nil {
log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, e2)
}
return wErr
}

Expand Down Expand Up @@ -1298,6 +1313,11 @@ func (k *Kraken) reqForSub(e string, c *stream.ChannelSubscription) (*WebsocketS

// ensureChannelKeyed wraps the channel EnsureKeyed to add channel name suffixes for Depth and Interval
func ensureChannelKeyed(c *stream.ChannelSubscription, r *WebsocketSubRequest) error {
if c.Key != nil {
// Avoid double wrapping the Key.Channel in resubscriptions
return nil
}

key, ok := c.EnsureKeyed().(stream.DefaultChannelKey)
if !ok {
return common.GetTypeAssertError("stream.DefaultChannelKey", c.Key, "subscription.Key") // Should be impossible
Expand Down
14 changes: 12 additions & 2 deletions exchanges/stream/stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,24 @@ type DefaultChannelKey struct {
Asset asset.Item
}

// ChannelSubscription container for streaming subscriptions
// ChannelState tracks the status of a subscription channel
type ChannelState int

const (
ChannelStateUnknown ChannelState = iota // ChannelStateUnknown means subscription state is not registered, but doesn't imply Inactive
ChannelSubscribing // ChannelSubscribing means channel is in the process of subscribing
ChannelSubscribed // ChannelSubscribed means the channel has finished a successful and acknowledged subscription
ChannelUnsubscribing // ChannelUnsubscribing means the channel has started to unsubscribe, but not yet confirmed
)

// ChannelSubscription container for streaming subscription channels
type ChannelSubscription struct {
Key any
Channel string
Currency currency.Pair
Asset asset.Item
Params map[string]interface{}
pending bool
State ChannelState
}

// ConnectionSetup defines variables for an individual stream connection
Expand Down
43 changes: 20 additions & 23 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ const (
var (
// ErrSubscriptionNotFound defines an error when a subscription is not found
ErrSubscriptionNotFound = errors.New("subscription not found")
// ErrSubscribedAlready defines an error when a channel is already subscribed
ErrSubscribedAlready = errors.New("duplicate subscription")
// ErrSubscriptionFailure defines an error when a subscription fails
ErrSubscriptionFailure = errors.New("subscription failure")
// ErrUnsubscribeFailure defines an error when a unsubscribe fails
ErrUnsubscribeFailure = errors.New("unsubscribe failure")
// ErrSubscriptionPending defines an error when a subscription is already pending an operation result
ErrSubscriptionPending = errors.New("subscription update already happening")
// ErrChannelInStateAlready defines an error when a subscription channel is already in a new state
ErrChannelInStateAlready = errors.New("channel already in state")
// ErrAlreadyDisabled is returned when you double-disable the websocket
ErrAlreadyDisabled = errors.New("websocket already disabled")
// ErrNotConnected defines an error when websocket is not connected
Expand Down Expand Up @@ -929,33 +931,28 @@ func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error {
return nil
}

// AddPendingSubscription adds a subscription to the subscription lists, or updates it if it exists.
// The pending operation could be subscribe or unsubscribe. The purpose of setting this state is to block other similar operations
// If the subscription already exists then the state is changed to pending but no other fields are merged
// Returns error if the subscription is already pending
func (w *Websocket) AddPendingSubscription(c *ChannelSubscription) error {
// AddSubscription adds a subscription to the subscription lists
// Unlike AddSubscriptions this method will error if the subscription already exists
func (w *Websocket) AddSubscription(c *ChannelSubscription) error {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
if w.subscriptions == nil {
w.subscriptions = subscriptionMap{}
}
key := c.EnsureKeyed()
p, ok := w.subscriptions[key]
if !ok {
n := *c // Fresh copy; we don't want to use the pointer we were given and allow encapsulation/locks to be bypassed
p = &n
w.subscriptions[key] = p
}
if p.pending {
return ErrSubscriptionPending
if _, ok := w.subscriptions[key]; ok {
return ErrSubscribedAlready
}
p.pending = true

n := *c // Fresh copy; we don't want to use the pointer we were given and allow encapsulation/locks to be bypassed
w.subscriptions[key] = &n

return nil
}

// SetSubscriptionPending sets an existing subscription to be Pending
// returns an error if the subscription is not found or if it's already Pending
func (w *Websocket) SetSubscriptionPending(c *ChannelSubscription) error {
// SetSubscriptionState sets an existing subscription state
// returns an error if the subscription is not found, or the new state is already set
func (w *Websocket) SetSubscriptionState(c *ChannelSubscription, state ChannelState) error {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
if w.subscriptions == nil {
Expand All @@ -966,10 +963,10 @@ func (w *Websocket) SetSubscriptionPending(c *ChannelSubscription) error {
if !ok {
return ErrSubscriptionNotFound
}
if p.pending {
return ErrSubscriptionPending
if state == p.State {
return ErrChannelInStateAlready
}
p.pending = true
p.State = state
return nil
}

Expand All @@ -991,7 +988,7 @@ func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription)
for _, cN := range channels {
c := cN // cN is an iteration var; Not safe to make a pointer to
key := c.EnsureKeyed()
c.pending = false
c.State = ChannelSubscribed
w.subscriptions[key] = &c
}
}
Expand Down
24 changes: 14 additions & 10 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,25 +574,29 @@ func TestResubscribe(t *testing.T) {
assert.NoError(t, ws.ResubscribeToChannel(&channel[0]), "Resubscribe should not error now the channel is subscribed")
}

// TestPendingSubscriptions tests AddPendingSubscription and IsPending
func TestPendingSubscriptions(t *testing.T) {
// TestSubscriptionState tests Subscription state changes
func TestSubscriptionState(t *testing.T) {
t.Parallel()
ws := New()

c := &ChannelSubscription{Key: 42, Channel: "Gophers"}
assert.ErrorIs(t, ws.SetSubscriptionPending(c), ErrSubscriptionNotFound, "Setting an imaginary sub should error")
c := &ChannelSubscription{Key: 42, Channel: "Gophers", State: ChannelSubscribing}
assert.ErrorIs(t, ws.SetSubscriptionState(c, ChannelUnsubscribing), ErrSubscriptionNotFound, "Setting an imaginary sub should error")

assert.NoError(t, ws.AddPendingSubscription(c), "Adding first subscription should not error")
assert.NoError(t, ws.AddSubscription(c), "Adding first subscription should not error")
found := ws.GetSubscription(42)
assert.NotNil(t, found, "Should find the subscription")
assert.True(t, found.pending, "Subscription should be pending")
assert.ErrorIs(t, ws.AddPendingSubscription(c), ErrSubscriptionPending, "Adding an already pending sub should error")
assert.ErrorIs(t, ws.SetSubscriptionPending(c), ErrSubscriptionPending, "Setting an already pending sub should error")
assert.Equal(t, ChannelSubscribing, found.State, "Subscription should be Subscribing")
assert.ErrorIs(t, ws.AddSubscription(c), ErrSubscribedAlready, "Adding an already existing sub should error")
assert.ErrorIs(t, ws.SetSubscriptionState(c, ChannelSubscribing), ErrChannelInStateAlready, "Setting Same state should error")

ws.AddSuccessfulSubscriptions(*c)
found = ws.GetSubscription(42)
assert.NotNil(t, found, "Should find the subscription")
assert.False(t, found.pending, "Subscription should no longer be pending")
assert.Equal(t, found.State, ChannelSubscribed, "Subscription should be subscribed state")

assert.NoError(t, ws.SetSubscriptionState(c, ChannelUnsubscribing), "Setting Unsub state should not error")
found = ws.GetSubscription(42)
assert.Equal(t, found.State, ChannelUnsubscribing, "Subscription should be unsubscribing state")
}

// TestRemoveSubscription tests removing a subscription
Expand All @@ -601,7 +605,7 @@ func TestRemoveSubscription(t *testing.T) {
ws := New()

c := &ChannelSubscription{Key: 42, Channel: "Unite!"}
assert.NoError(t, ws.AddPendingSubscription(c), "Adding first subscription should not error")
assert.NoError(t, ws.AddSubscription(c), "Adding first subscription should not error")
assert.NotNil(t, ws.GetSubscription(42), "Added subscription should be findable")

ws.RemoveSubscription(c)
Expand Down

0 comments on commit b3671a3

Please sign in to comment.