From 2c356b7e375bbef0a40cd97838b8cf7b7f946f9e Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Sun, 6 Aug 2023 20:32:05 +0700 Subject: [PATCH] Bitfinex: Resubscribe orderbook after checksum err --- exchanges/bitfinex/bitfinex_test.go | 24 +++++++++++++ exchanges/bitfinex/bitfinex_types.go | 6 ++-- exchanges/bitfinex/bitfinex_websocket.go | 45 ++++++++++++++++++++++-- 3 files changed, 71 insertions(+), 4 deletions(-) diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index a2c7520ac68..ccd7215e9f6 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/core" @@ -1854,3 +1855,26 @@ func TestCancelMultipleOrdersV2(t *testing.T) { t.Error(err) } } + +func TestChanForSub(t *testing.T) { + t.Parallel() + p := currency.NewPairWithDelimiter("DOGE", "XLM", "-") + s, err := b.chanForSub(wsBook, asset.Spot, p) + assert.ErrorIs(t, err, errSubNotFound, "Correct error returned when stream when sub not found") + assert.Nil(t, s, "No stream returned when sub not found") + + // Add a spare sub to ensure we don't get only-answer-is-right syndrome + b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTicker}) + + want := stream.ChannelSubscription{Asset: asset.Spot, Currency: p, Channel: wsBook} + b.Websocket.AddSuccessfulSubscriptions(want) + s, err = b.chanForSub(wsBook, asset.Spot, p) + assert.Nil(t, err, "No error returned when sub found") + assert.EqualValues(t, want, *s, "Correct Sub found") + + dup := stream.ChannelSubscription{Asset: asset.Spot, Currency: p, Channel: wsBook, Params: map[string]interface{}{"muffins": "yummy"}} + b.Websocket.AddSuccessfulSubscriptions(dup) + s, err = b.chanForSub(wsBook, asset.Spot, p) + assert.ErrorIs(t, err, errTooManyMatchingSubs, "Correct error returns when too many subs found") + assert.Nil(t, s, "No stream returned when too many subs found") +} diff --git a/exchanges/bitfinex/bitfinex_types.go b/exchanges/bitfinex/bitfinex_types.go index e35521afe71..e55b90ff669 100644 --- a/exchanges/bitfinex/bitfinex_types.go +++ b/exchanges/bitfinex/bitfinex_types.go @@ -11,8 +11,10 @@ import ( ) var ( - errTypeAssert = errors.New("type assertion failed") - errSetCannotBeEmpty = errors.New("set cannot be empty") + errSetCannotBeEmpty = errors.New("set cannot be empty") + errSubNotFound = errors.New("could not find matching subscription") + errTooManyMatchingSubs = errors.New("too many matching subscriptions") + errTypeAssert = errors.New("type assertion failed") ) // AccountV2Data stores account v2 data diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 3a5c617436f..796a124fd46 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -1525,8 +1525,11 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book err) } - err = validateCRC32(ob, checkme.Token) - if err != nil { + if err = validateCRC32(ob, checkme.Token); err != nil { + log.Errorf(log.WebsocketMgr, "%s websocket orderbook update error, will resubscribe orderbook: %v", b.Name, err) + if suberr := b.resubOrderbook(p, assetType); suberr != nil { + log.Errorf(log.ExchangeSys, "%s error resubscribing orderbook: %v", b.Name, suberr) + } return err } } @@ -1534,6 +1537,44 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book return b.Websocket.Orderbook.Update(&orderbookUpdate) } +// resubOrderbook resubscribes the orderbook after a consistency error, probably a failed checksum, +// which forces a fresh snapshot. If we don't do this the orderbook will keep erroring and drifting. +func (b *Bitfinex) resubOrderbook(p currency.Pair, assetType asset.Item) error { + if err := b.Websocket.Orderbook.FlushOrderbook(p, assetType); err != nil { + return err + } + + c, err := b.chanForSub(wsBook, assetType, p) + if err != nil { + return err + } + return b.Websocket.ResubscribeToChannel(c) +} + +// chanForSub returns an existing channel subscription for a given channel/asset/pair +func (b *Bitfinex) chanForSub(cName string, assetType asset.Item, pair currency.Pair) (*stream.ChannelSubscription, error) { + var c *stream.ChannelSubscription + want := &stream.ChannelSubscription{ + Channel: cName, + Currency: pair, + Asset: assetType, + } + subs := b.Websocket.GetSubscriptions() + for i := range subs { + if subs[i].Equal(want) { + if c != nil { + return nil, errTooManyMatchingSubs + } + c = &subs[i] + } + } + if c == nil { + return nil, errSubNotFound + } + + return c, nil +} + // GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) { var wsPairFormat = currency.PairFormat{Uppercase: true}