Skip to content

Commit

Permalink
Bitfinex: Switch Subs to using Websocket Keys
Browse files Browse the repository at this point in the history
This removes the bespoke subscribed channel id mapping and uses the new
centralised GetSubscription() with Channel.Key

Fixes concurrency panics accessing local WebsocketSubdChannels
  • Loading branch information
gbjk committed Sep 24, 2023
1 parent 3af169e commit 872b0fc
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 94 deletions.
4 changes: 0 additions & 4 deletions exchanges/bitfinex/bitfinex.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/thrasher-corp/gocryptotrader/common"
Expand All @@ -22,7 +21,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -107,8 +105,6 @@ const (
// Bitfinex is the overarching type across the bitfinex package
type Bitfinex struct {
exchange.Base
wsSubChannels map[int]*stream.ChannelSubscription
wsSubMutex sync.RWMutex
}

// GetPlatformStatus returns the Bifinex platform status
Expand Down
46 changes: 7 additions & 39 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func TestMain(m *testing.M) {
b.API.AuthenticatedSupport = true
b.API.AuthenticatedWebsocketSupport = true
}
b.wsSubChannels = make(map[int]*stream.ChannelSubscription)

btcusdPair, err = currency.NewPairFromString("BTCUSD")
if err != nil {
Expand Down Expand Up @@ -1198,9 +1197,7 @@ func TestWsSubscribedResponse(t *testing.T) {
}

func TestWsTradingPairSnapshot(t *testing.T) {
b.wsSubMutex.Lock()
b.wsSubChannels[23405] = &stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsBook, Params: map[string]interface{}{"chanId": 23405}}
b.wsSubMutex.Unlock()
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsBook, Params: map[string]interface{}{"chanId": 23405}, Key: 23405})
pressXToJSON := `[23405,[[38334303613,9348.8,0.53],[38334308111,9348.8,5.98979404],[38331335157,9344.1,1.28965787],[38334302803,9343.8,0.08230094],[38334279092,9343,0.8],[38334307036,9342.938663676,0.8],[38332749107,9342.9,0.2],[38332277330,9342.8,0.85],[38329406786,9342,0.1432012],[38332841570,9341.947288638,0.3],[38332163238,9341.7,0.3],[38334303384,9341.6,0.324],[38332464840,9341.4,0.5],[38331935870,9341.2,0.5],[38334312082,9340.9,0.02126899],[38334261292,9340.8,0.26763],[38334138680,9340.625455254,0.12],[38333896802,9339.8,0.85],[38331627527,9338.9,1.57863959],[38334186713,9338.9,0.26769],[38334305819,9338.8,2.999],[38334211180,9338.75285796,3.999],[38334310699,9337.8,0.10679883],[38334307414,9337.5,1],[38334179822,9337.1,0.26773],[38334306600,9336.659955102,1.79],[38334299667,9336.6,1.1],[38334306452,9336.6,0.13979771],[38325672859,9336.3,1.25],[38334311646,9336.2,1],[38334258509,9336.1,0.37],[38334310592,9336,1.79],[38334310378,9335.6,1.43],[38334132444,9335.2,0.26777],[38331367325,9335,0.07],[38334310703,9335,0.10680562],[38334298209,9334.7,0.08757301],[38334304857,9334.456899462,0.291],[38334309940,9334.088390727,0.0725],[38334310377,9333.7,1.2868],[38334297615,9333.607784,0.1108],[38334095188,9333.3,0.26785],[38334228913,9332.7,0.40861186],[38334300526,9332.363996604,0.3884],[38334310701,9332.2,0.10680562],[38334303548,9332.005382871,0.07],[38334311798,9331.8,0.41285228],[38334301012,9331.7,1.7952],[38334089877,9331.4,0.2679],[38321942150,9331.2,0.2],[38334310670,9330,1.069],[38334063096,9329.6,0.26796],[38334310700,9329.4,0.10680562],[38334310404,9329.3,1],[38334281630,9329.1,6.57150597],[38334036864,9327.7,0.26801],[38334310702,9326.6,0.10680562],[38334311799,9326.1,0.50220625],[38334164163,9326,0.219638],[38334309722,9326,1.5],[38333051682,9325.8,0.26807],[38334302027,9325.7,0.75],[38334203435,9325.366592,0.32397696],[38321967613,9325,0.05],[38334298787,9324.9,0.3],[38334301719,9324.8,3.6227592],[38331316716,9324.763454646,0.71442],[38334310698,9323.8,0.10680562],[38334035499,9323.7,0.23431017],[38334223472,9322.670551788,0.42150603],[38334163459,9322.560399006,0.143967],[38321825171,9320.8,2],[38334075805,9320.467496148,0.30772633],[38334075800,9319.916732238,0.61457592],[38333682302,9319.7,0.0011],[38331323088,9319.116771762,0.12913],[38333677480,9319,0.0199],[38334277797,9318.6,0.89],[38325235155,9318.041088,1.20249],[38334310910,9317.82382938,1.79],[38334311811,9317.2,0.61079138],[38334311812,9317.2,0.71937652],[38333298214,9317.1,50],[38334306359,9317,1.79],[38325531545,9316.382823951,0.21263],[38333727253,9316.3,0.02316372],[38333298213,9316.1,45],[38333836479,9316,2.135],[38324520465,9315.9,2.7681],[38334307411,9315.5,1],[38330313617,9315.3,0.84455],[38334077770,9315.294024,0.01248397],[38334286663,9315.294024,1],[38325533762,9315.290315394,2.40498],[38334310018,9315.2,3],[38333682617,9314.6,0.0011],[38334304794,9314.6,0.76364676],[38334304798,9314.3,0.69242113],[38332915733,9313.8,0.0199],[38334084411,9312.8,1],[38334311893,9350.1,-1.015],[38334302734,9350.3,-0.26737],[38334300732,9350.8,-5.2],[38333957619,9351,-0.90677089],[38334300521,9351,-1.6457],[38334301600,9351.012829557,-0.0523],[38334308878,9351.7,-2.5],[38334299570,9351.921544,-0.1015],[38334279367,9352.1,-0.26732],[38334299569,9352.411802928,-0.4036],[38334202773,9353.4,-0.02139404],[38333918472,9353.7,-1.96412776],[38334278782,9354,-0.26731],[38334278606,9355,-1.2785],[38334302105,9355.439221251,-0.79191542],[38313897370,9355.569409242,-0.43363],[38334292995,9355.584296,-0.0979],[38334216989,9355.8,-0.03686414],[38333894025,9355.9,-0.26721],[38334293798,9355.936691952,-0.4311],[38331159479,9356,-0.4204022],[38333918888,9356.1,-1.10885563],[38334298205,9356.4,-0.20124428],[38328427481,9356.5,-0.1],[38333343289,9356.6,-0.41034213],[38334297205,9356.6,-0.08835018],[38334277927,9356.741101161,-0.0737],[38334311645,9356.8,-0.5],[38334309002,9356.9,-5],[38334309736,9357,-0.10680107],[38334306448,9357.4,-0.18645275],[38333693302,9357.7,-0.2672],[38332815159,9357.8,-0.0011],[38331239824,9358.2,-0.02],[38334271608,9358.3,-2.999],[38334311971,9358.4,-0.55],[38333919260,9358.5,-1.9972841],[38334265365,9358.5,-1.7841],[38334277960,9359,-3],[38334274601,9359.020969848,-3],[38326848839,9359.1,-0.84],[38334291080,9359.247048,-0.16199869],[38326848844,9359.4,-1.84],[38333680200,9359.6,-0.26713],[38331326606,9359.8,-0.84454],[38334309738,9359.8,-0.10680107],[38331314707,9359.9,-0.2],[38333919803,9360.9,-1.41177599],[38323651149,9361.33417827,-0.71442],[38333656906,9361.5,-0.26705],[38334035500,9361.5,-0.40861586],[38334091886,9362.4,-6.85940815],[38334269617,9362.5,-4],[38323629409,9362.545858872,-2.40497],[38334309737,9362.7,-0.10680107],[38334312380,9362.7,-3],[38325280830,9362.8,-1.75123],[38326622800,9362.8,-1.05145],[38333175230,9363,-0.0011],[38326848745,9363.2,-0.79],[38334308960,9363.206775564,-0.12],[38333920234,9363.3,-1.25318113],[38326848843,9363.4,-1.29],[38331239823,9363.4,-0.02],[38333209613,9363.4,-0.26719],[38334299964,9364,-0.05583123],[38323470224,9364.161816648,-0.12912],[38334284711,9365,-0.21346019],[38334299594,9365,-2.6757062],[38323211816,9365.073132585,-0.21262],[38334312456,9365.1,-0.11167861],[38333209612,9365.2,-0.26719],[38327770474,9365.3,-0.0073],[38334298788,9365.3,-0.3],[38334075803,9365.409831204,-0.30772637],[38334309740,9365.5,-0.10680107],[38326608767,9365.7,-2.76809],[38333920657,9365.7,-1.25848083],[38329594226,9366.6,-0.02587],[38334311813,9366.7,-4.72290945],[38316386301,9367.39258128,-2.37581],[38334302026,9367.4,-4.5],[38334228915,9367.9,-0.81725458],[38333921381,9368.1,-1.72213641],[38333175678,9368.2,-0.0011],[38334301150,9368.2,-2.654604],[38334297208,9368.3,-0.78036466],[38334309739,9368.3,-0.10680107],[38331227515,9368.7,-0.02],[38331184470,9369,-0.003975],[38334203436,9369.319616,-0.32397695],[38334269964,9369.7,-0.5],[38328386732,9370,-4.11759935],[38332719555,9370,-0.025],[38333921935,9370.5,-1.2224398],[38334258511,9370.5,-0.35],[38326848842,9370.8,-0.34],[38333985038,9370.9,-0.8551502],[38334283018,9370.9,-1],[38326848744,9371,-1.34]],5]`
err := b.wsHandleData([]byte(pressXToJSON))
if err != nil {
Expand All @@ -1214,9 +1211,7 @@ func TestWsTradingPairSnapshot(t *testing.T) {
}

func TestWsTradeResponse(t *testing.T) {
b.wsSubMutex.Lock()
b.wsSubChannels[18788] = &stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTrades}
b.wsSubMutex.Unlock()
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTrades, Key: 18788})
pressXToJSON := `[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]]]`
err := b.wsHandleData([]byte(pressXToJSON))
if err != nil {
Expand All @@ -1225,9 +1220,7 @@ func TestWsTradeResponse(t *testing.T) {
}

func TestWsTickerResponse(t *testing.T) {
b.wsSubMutex.Lock()
b.wsSubChannels[11534] = &stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTicker}
b.wsSubMutex.Unlock()
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTicker, Key: 11534})
pressXToJSON := `[11534,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err := b.wsHandleData([]byte(pressXToJSON))
if err != nil {
Expand All @@ -1237,9 +1230,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
b.wsSubMutex.Lock()
b.wsSubChannels[123412] = &stream.ChannelSubscription{Asset: asset.Spot, Currency: pair, Channel: wsTicker}
b.wsSubMutex.Unlock()
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: pair, Channel: wsTicker, Key: 123412})
pressXToJSON = `[123412,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
if err != nil {
Expand All @@ -1249,9 +1240,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
b.wsSubMutex.Lock()
b.wsSubChannels[123413] = &stream.ChannelSubscription{Asset: asset.Spot, Currency: pair, Channel: wsTicker}
b.wsSubMutex.Unlock()
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: pair, Channel: wsTicker, Key: 123413})
pressXToJSON = `[123413,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
if err != nil {
Expand All @@ -1261,9 +1250,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
b.wsSubMutex.Lock()
b.wsSubChannels[123414] = &stream.ChannelSubscription{Asset: asset.Spot, Currency: pair, Channel: wsTicker}
b.wsSubMutex.Unlock()
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: pair, Channel: wsTicker, Key: 123414})
pressXToJSON = `[123414,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
if err != nil {
Expand All @@ -1272,9 +1259,7 @@ func TestWsTickerResponse(t *testing.T) {
}

func TestWsCandleResponse(t *testing.T) {
b.wsSubMutex.Lock()
b.wsSubChannels[343351] = &stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsCandles}
b.wsSubMutex.Unlock()
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsCandles, Key: 343351})
pressXToJSON := `[343351,[[1574698260000,7379.785503,7383.8,7388.3,7379.785503,1.68829482]]]`
err := b.wsHandleData([]byte(pressXToJSON))
if err != nil {
Expand Down Expand Up @@ -1820,23 +1805,6 @@ func TestCancelMultipleOrdersV2(t *testing.T) {
}
}

func TestChanForSub(t *testing.T) {
t.Parallel()
p := currency.NewPairWithDelimiter("DOGE", "XLM", "-")
s, err := b.chanForSub(wsBook, asset.Spot, p)
assert.ErrorIs(t, err, errSubNotFound, "Correct error returned when stream when sub not found")
assert.Nil(t, s, "No stream returned when sub not found")

// Add a spare sub to ensure we don't get only-answer-is-right syndrome
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTicker})

want := stream.ChannelSubscription{Asset: asset.Spot, Currency: p, Channel: wsBook}
b.Websocket.AddSuccessfulSubscriptions(want)
s, err = b.chanForSub(wsBook, asset.Spot, p)
assert.Nil(t, err, "No error returned when sub found")
assert.EqualValues(t, want, *s, "Correct Sub found")
}

// setupWs is a helper function to connect both auth and normal websockets
// It will skip the test if websockets are not enabled
// It's up to the test to skip if it requires creds, though
Expand Down
Loading

0 comments on commit 872b0fc

Please sign in to comment.