From ade2d9c5d2c8a4020bdd3a0a12316fe3602f709a Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Thu, 14 Sep 2023 06:38:01 +0100 Subject: [PATCH] Bitfinex: Resubscribe orderbook after checksum err (#1303) * Bitfinex: Resubscribe orderbook after checksum err * Bitfinex: chanForSub return only first match --- exchanges/bitfinex/bitfinex_test.go | 18 ++++++++++++ exchanges/bitfinex/bitfinex_types.go | 3 +- exchanges/bitfinex/bitfinex_websocket.go | 37 ++++++++++++++++++++++-- 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index a2c7520ac68..40f3a0cb0a1 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/core" @@ -1854,3 +1855,20 @@ func TestCancelMultipleOrdersV2(t *testing.T) { t.Error(err) } } + +func TestChanForSub(t *testing.T) { + t.Parallel() + p := currency.NewPairWithDelimiter("DOGE", "XLM", "-") + s, err := b.chanForSub(wsBook, asset.Spot, p) + assert.ErrorIs(t, err, errSubNotFound, "Correct error returned when stream when sub not found") + assert.Nil(t, s, "No stream returned when sub not found") + + // Add a spare sub to ensure we don't get only-answer-is-right syndrome + b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTicker}) + + want := stream.ChannelSubscription{Asset: asset.Spot, Currency: p, Channel: wsBook} + b.Websocket.AddSuccessfulSubscriptions(want) + s, err = b.chanForSub(wsBook, asset.Spot, p) + assert.Nil(t, err, "No error returned when sub found") + assert.EqualValues(t, want, *s, "Correct Sub found") +} diff --git a/exchanges/bitfinex/bitfinex_types.go b/exchanges/bitfinex/bitfinex_types.go index e35521afe71..6b2856d2a8d 100644 --- a/exchanges/bitfinex/bitfinex_types.go +++ b/exchanges/bitfinex/bitfinex_types.go @@ -11,8 +11,9 @@ import ( ) var ( - errTypeAssert = errors.New("type assertion failed") errSetCannotBeEmpty = errors.New("set cannot be empty") + errSubNotFound = errors.New("could not find matching subscription") + errTypeAssert = errors.New("type assertion failed") ) // AccountV2Data stores account v2 data diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 3a5c617436f..aea8e68cbab 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -1525,8 +1525,11 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book err) } - err = validateCRC32(ob, checkme.Token) - if err != nil { + if err = validateCRC32(ob, checkme.Token); err != nil { + log.Errorf(log.WebsocketMgr, "%s websocket orderbook update error, will resubscribe orderbook: %v", b.Name, err) + if suberr := b.resubOrderbook(p, assetType); suberr != nil { + log.Errorf(log.ExchangeSys, "%s error resubscribing orderbook: %v", b.Name, suberr) + } return err } } @@ -1534,6 +1537,36 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book return b.Websocket.Orderbook.Update(&orderbookUpdate) } +// resubOrderbook resubscribes the orderbook after a consistency error, probably a failed checksum, +// which forces a fresh snapshot. If we don't do this the orderbook will keep erroring and drifting. +func (b *Bitfinex) resubOrderbook(p currency.Pair, assetType asset.Item) error { + if err := b.Websocket.Orderbook.FlushOrderbook(p, assetType); err != nil { + return err + } + + c, err := b.chanForSub(wsBook, assetType, p) + if err != nil { + return err + } + return b.Websocket.ResubscribeToChannel(c) +} + +// chanForSub returns an existing channel subscription for a given channel/asset/pair +func (b *Bitfinex) chanForSub(cName string, assetType asset.Item, pair currency.Pair) (*stream.ChannelSubscription, error) { + want := &stream.ChannelSubscription{ + Channel: cName, + Currency: pair, + Asset: assetType, + } + subs := b.Websocket.GetSubscriptions() + for i := range subs { + if subs[i].Equal(want) { + return &subs[i], nil + } + } + return nil, errSubNotFound +} + // GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) { var wsPairFormat = currency.PairFormat{Uppercase: true}