Skip to content

Commit

Permalink
Bitfinex: Resubscribe orderbook after checksum err
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Sep 14, 2023
1 parent 3218982 commit 2c356b7
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
24 changes: 24 additions & 0 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/core"
Expand Down Expand Up @@ -1854,3 +1855,26 @@ func TestCancelMultipleOrdersV2(t *testing.T) {
t.Error(err)
}
}

func TestChanForSub(t *testing.T) {
t.Parallel()
p := currency.NewPairWithDelimiter("DOGE", "XLM", "-")
s, err := b.chanForSub(wsBook, asset.Spot, p)
assert.ErrorIs(t, err, errSubNotFound, "Correct error returned when stream when sub not found")
assert.Nil(t, s, "No stream returned when sub not found")

// Add a spare sub to ensure we don't get only-answer-is-right syndrome
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTicker})

want := stream.ChannelSubscription{Asset: asset.Spot, Currency: p, Channel: wsBook}
b.Websocket.AddSuccessfulSubscriptions(want)
s, err = b.chanForSub(wsBook, asset.Spot, p)
assert.Nil(t, err, "No error returned when sub found")
assert.EqualValues(t, want, *s, "Correct Sub found")

dup := stream.ChannelSubscription{Asset: asset.Spot, Currency: p, Channel: wsBook, Params: map[string]interface{}{"muffins": "yummy"}}
b.Websocket.AddSuccessfulSubscriptions(dup)
s, err = b.chanForSub(wsBook, asset.Spot, p)
assert.ErrorIs(t, err, errTooManyMatchingSubs, "Correct error returns when too many subs found")
assert.Nil(t, s, "No stream returned when too many subs found")
}
6 changes: 4 additions & 2 deletions exchanges/bitfinex/bitfinex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
)

var (
errTypeAssert = errors.New("type assertion failed")
errSetCannotBeEmpty = errors.New("set cannot be empty")
errSetCannotBeEmpty = errors.New("set cannot be empty")
errSubNotFound = errors.New("could not find matching subscription")
errTooManyMatchingSubs = errors.New("too many matching subscriptions")
errTypeAssert = errors.New("type assertion failed")
)

// AccountV2Data stores account v2 data
Expand Down
45 changes: 43 additions & 2 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1525,15 +1525,56 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book
err)
}

err = validateCRC32(ob, checkme.Token)
if err != nil {
if err = validateCRC32(ob, checkme.Token); err != nil {
log.Errorf(log.WebsocketMgr, "%s websocket orderbook update error, will resubscribe orderbook: %v", b.Name, err)
if suberr := b.resubOrderbook(p, assetType); suberr != nil {
log.Errorf(log.ExchangeSys, "%s error resubscribing orderbook: %v", b.Name, suberr)
}
return err
}
}

return b.Websocket.Orderbook.Update(&orderbookUpdate)
}

// resubOrderbook resubscribes the orderbook after a consistency error, probably a failed checksum,
// which forces a fresh snapshot. If we don't do this the orderbook will keep erroring and drifting.
func (b *Bitfinex) resubOrderbook(p currency.Pair, assetType asset.Item) error {
if err := b.Websocket.Orderbook.FlushOrderbook(p, assetType); err != nil {
return err
}

c, err := b.chanForSub(wsBook, assetType, p)
if err != nil {
return err
}
return b.Websocket.ResubscribeToChannel(c)
}

// chanForSub returns an existing channel subscription for a given channel/asset/pair
func (b *Bitfinex) chanForSub(cName string, assetType asset.Item, pair currency.Pair) (*stream.ChannelSubscription, error) {
var c *stream.ChannelSubscription
want := &stream.ChannelSubscription{
Channel: cName,
Currency: pair,
Asset: assetType,
}
subs := b.Websocket.GetSubscriptions()
for i := range subs {
if subs[i].Equal(want) {
if c != nil {
return nil, errTooManyMatchingSubs
}
c = &subs[i]
}
}
if c == nil {
return nil, errSubNotFound
}

return c, nil
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
var wsPairFormat = currency.PairFormat{Uppercase: true}
Expand Down

0 comments on commit 2c356b7

Please sign in to comment.