From a143ddcd8a63e621bff6973523cf54319e3f3fa9 Mon Sep 17 00:00:00 2001 From: Beadko Date: Thu, 8 Feb 2024 17:24:22 +0700 Subject: [PATCH] Bitfinex: Add Subscription configuration --- exchanges/bitfinex/bitfinex.go | 4 +- exchanges/bitfinex/bitfinex_test.go | 106 +++++---- exchanges/bitfinex/bitfinex_websocket.go | 216 +++++++++--------- exchanges/bitfinex/bitfinex_wrapper.go | 8 + .../testing/subscriptions/subscriptions.go | 2 +- 5 files changed, 180 insertions(+), 156 deletions(-) diff --git a/exchanges/bitfinex/bitfinex.go b/exchanges/bitfinex/bitfinex.go index a8da20915d0..540fa7be749 100644 --- a/exchanges/bitfinex/bitfinex.go +++ b/exchanges/bitfinex/bitfinex.go @@ -103,9 +103,7 @@ const ( bitfinexChecksumFlag = 131072 bitfinexWsSequenceFlag = 65536 - // CandlesTimeframeKey configures the timeframe in subscription.Subscription.Params - CandlesTimeframeKey = "_timeframe" - // CandlesPeriodKey configures the aggregated period in subscription.Subscription.Params + // CandlesPeriodKey configures the Candles aggregated period for MarginFunding in subscription.Subscription.Params CandlesPeriodKey = "_period" ) diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index 30a2c85b35f..06f95756984 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -26,6 +26,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -1137,13 +1138,36 @@ func TestWsAuth(t *testing.T) { } } +func TestGenerateSubscriptions(t *testing.T) { + t.Parallel() + + b.Websocket.SetCanUseAuthenticatedEndpoints(true) + require.True(t, b.Websocket.CanUseAuthenticatedEndpoints(), "CanUseAuthenticatedEndpoints must return true") + subs, err := b.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions should not error") + exp := subscription.List{} + for _, a := range b.GetAssetTypes(true) { + pairs, err := b.GetEnabledPairs(a) + require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a) + for _, p := range pairs { + exp = append(exp, + &subscription.Subscription{Channel: subscription.TickerChannel, Asset: a, Pairs: currency.Pairs{p}}, + &subscription.Subscription{Channel: subscription.CandlesChannel, Asset: a, Pairs: currency.Pairs{p}, Interval: kline.OneMin}, + &subscription.Subscription{Channel: subscription.OrderbookChannel, Asset: a, Pairs: currency.Pairs{p}, Levels: 100}, + &subscription.Subscription{Channel: subscription.AllTradesChannel, Asset: a, Pairs: currency.Pairs{p}}, + ) + } + } + testsubs.EqualLists(t, exp, subs) +} + // TestWsSubscribe tests Subscribe and Unsubscribe functionality // See also TestSubscribeReq which covers key and symbol conversion func TestWsSubscribe(t *testing.T) { b := new(Bitfinex) //nolint:govet // Intentional shadow of b to avoid future copy/paste mistakes require.NoError(t, testexch.Setup(b), "TestInstance must not error") testexch.SetupWs(t, b) - err := b.Subscribe(subscription.List{{Channel: wsTicker, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot}}) + err := b.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot}}) require.NoError(t, err, "Subrcribe should not error") catcher := func() (ok bool) { i := <-b.Websocket.ToRoutine @@ -1156,19 +1180,15 @@ func TestWsSubscribe(t *testing.T) { require.NoError(t, err, "GetSubscriptions should not error") require.Len(t, subs, 1, "We should only have 1 subscription; subID subscription should have been Removed by subscribeToChan") - err = b.Subscribe(subscription.List{{Channel: wsTicker, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot}}) - require.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Duplicate subscription should error correctly") - catcher = func() bool { + err = b.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot}}) + require.ErrorContains(t, err, "subscribe: dup (code: 10301)", "Duplicate subscription should error correctly") + + assert.EventuallyWithT(t, func(t *assert.CollectT) { i := <-b.Websocket.ToRoutine - if e, ok := i.(error); ok { - if assert.ErrorIs(t, e, stream.ErrSubscriptionFailure, "Error should go to DataHandler") { - assert.ErrorContains(t, e, "subscribe: dup (code: 10301)", "Error should contain message and code") - return true - } - } - return false - } - assert.Eventually(t, catcher, sharedtestvalues.WebsocketResponseDefaultTimeout, time.Millisecond*10, "error response should arrive") + e, ok := i.(error) + require.True(t, ok, "must find an error") + assert.ErrorContains(t, e, "subscribe: dup (code: 10301)", "error should be correct") + }, sharedtestvalues.WebsocketResponseDefaultTimeout, time.Millisecond*10, "error response should go to ToRoutine") subs, err = b.GetSubscriptions() require.NoError(t, err, "GetSubscriptions should not error") @@ -1181,52 +1201,52 @@ func TestWsSubscribe(t *testing.T) { assert.True(t, ok, "sub.Key should be an int") err = b.Unsubscribe(subs) - assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Unsubscribe should error") assert.ErrorContains(t, err, strconv.Itoa(chanID), "Unsubscribe should contain correct chanId") assert.ErrorContains(t, err, "unsubscribe: invalid (code: 10400)", "Unsubscribe should contain correct upstream error") err = b.Subscribe(subscription.List{{ - Channel: wsTicker, + Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot, - Params: map[string]interface{}{"key": "tBTCUSD"}, + Params: map[string]any{"key": "tBTCUSD"}, }}) - assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Trying to use a 'key' param should error ErrSubscriptionFailure") assert.ErrorIs(t, err, errParamNotAllowed, "Trying to use a 'key' param should error errParamNotAllowed") } // TestSubscribeReq tests the channel to request map marshalling func TestSubscribeReq(t *testing.T) { - c := &subscription.Subscription{ - Channel: wsCandles, - Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, - Asset: asset.MarginFunding, - Params: map[string]interface{}{ - CandlesPeriodKey: "30", - }, + s := &subscription.Subscription{ + Channel: subscription.CandlesChannel, + Asset: asset.Spot, + Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, + Interval: kline.OneMin, } - r, err := subscribeReq(c) + + r, err := subscribeReq(s) assert.NoError(t, err, "subscribeReq should not error") - assert.Equal(t, "trade:1m:fBTCUSD:p30", r["key"], "key contain period and default timeframe") + assert.Equal(t, "trade:1m:tBTCUSD", r["key"], "key should be contain specific timeframe and no period") - c.Params = map[string]interface{}{ - CandlesTimeframeKey: "15m", - } - r, err = subscribeReq(c) + s.Interval = kline.FifteenMin + s.Asset = asset.MarginFunding + s.Params = map[string]any{CandlesPeriodKey: "p30"} + + r, err = subscribeReq(s) assert.NoError(t, err, "subscribeReq should not error") - assert.Equal(t, "trade:15m:fBTCUSD", r["key"], "key should be contain specific timeframe and no period") + assert.Equal(t, "trade:15m:fBTCUSD:p30", r["key"], "key contain period and no timeframe") + + s.Interval = kline.FifteenMin - c = &subscription.Subscription{ - Channel: wsBook, + s = &subscription.Subscription{ + Channel: subscription.OrderbookChannel, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.DOGE)}, Asset: asset.Spot, } - r, err = subscribeReq(c) + r, err = subscribeReq(s) assert.NoError(t, err, "subscribeReq should not error") assert.Equal(t, "tBTC:DOGE", r["symbol"], "symbol should use colon delimiter if a currency is > 3 chars") - c.Pairs = currency.Pairs{currency.NewPair(currency.BTC, currency.LTC)} - r, err = subscribeReq(c) + s.Pairs = currency.Pairs{currency.NewPair(currency.BTC, currency.LTC)} + r, err = subscribeReq(s) assert.NoError(t, err, "subscribeReq should not error") assert.Equal(t, "tBTCLTC", r["symbol"], "symbol should not use colon delimiter if both currencies < 3 chars") } @@ -1339,7 +1359,7 @@ func TestWsSubscribedResponse(t *testing.T) { } func TestWsOrderBook(t *testing.T) { - err := b.Websocket.AddSubscriptions(&subscription.Subscription{Key: 23405, Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsBook}) + err := b.Websocket.AddSubscriptions(&subscription.Subscription{Key: 23405, Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.OrderbookChannel}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON := `[23405,[[38334303613,9348.8,0.53],[38334308111,9348.8,5.98979404],[38331335157,9344.1,1.28965787],[38334302803,9343.8,0.08230094],[38334279092,9343,0.8],[38334307036,9342.938663676,0.8],[38332749107,9342.9,0.2],[38332277330,9342.8,0.85],[38329406786,9342,0.1432012],[38332841570,9341.947288638,0.3],[38332163238,9341.7,0.3],[38334303384,9341.6,0.324],[38332464840,9341.4,0.5],[38331935870,9341.2,0.5],[38334312082,9340.9,0.02126899],[38334261292,9340.8,0.26763],[38334138680,9340.625455254,0.12],[38333896802,9339.8,0.85],[38331627527,9338.9,1.57863959],[38334186713,9338.9,0.26769],[38334305819,9338.8,2.999],[38334211180,9338.75285796,3.999],[38334310699,9337.8,0.10679883],[38334307414,9337.5,1],[38334179822,9337.1,0.26773],[38334306600,9336.659955102,1.79],[38334299667,9336.6,1.1],[38334306452,9336.6,0.13979771],[38325672859,9336.3,1.25],[38334311646,9336.2,1],[38334258509,9336.1,0.37],[38334310592,9336,1.79],[38334310378,9335.6,1.43],[38334132444,9335.2,0.26777],[38331367325,9335,0.07],[38334310703,9335,0.10680562],[38334298209,9334.7,0.08757301],[38334304857,9334.456899462,0.291],[38334309940,9334.088390727,0.0725],[38334310377,9333.7,1.2868],[38334297615,9333.607784,0.1108],[38334095188,9333.3,0.26785],[38334228913,9332.7,0.40861186],[38334300526,9332.363996604,0.3884],[38334310701,9332.2,0.10680562],[38334303548,9332.005382871,0.07],[38334311798,9331.8,0.41285228],[38334301012,9331.7,1.7952],[38334089877,9331.4,0.2679],[38321942150,9331.2,0.2],[38334310670,9330,1.069],[38334063096,9329.6,0.26796],[38334310700,9329.4,0.10680562],[38334310404,9329.3,1],[38334281630,9329.1,6.57150597],[38334036864,9327.7,0.26801],[38334310702,9326.6,0.10680562],[38334311799,9326.1,0.50220625],[38334164163,9326,0.219638],[38334309722,9326,1.5],[38333051682,9325.8,0.26807],[38334302027,9325.7,0.75],[38334203435,9325.366592,0.32397696],[38321967613,9325,0.05],[38334298787,9324.9,0.3],[38334301719,9324.8,3.6227592],[38331316716,9324.763454646,0.71442],[38334310698,9323.8,0.10680562],[38334035499,9323.7,0.23431017],[38334223472,9322.670551788,0.42150603],[38334163459,9322.560399006,0.143967],[38321825171,9320.8,2],[38334075805,9320.467496148,0.30772633],[38334075800,9319.916732238,0.61457592],[38333682302,9319.7,0.0011],[38331323088,9319.116771762,0.12913],[38333677480,9319,0.0199],[38334277797,9318.6,0.89],[38325235155,9318.041088,1.20249],[38334310910,9317.82382938,1.79],[38334311811,9317.2,0.61079138],[38334311812,9317.2,0.71937652],[38333298214,9317.1,50],[38334306359,9317,1.79],[38325531545,9316.382823951,0.21263],[38333727253,9316.3,0.02316372],[38333298213,9316.1,45],[38333836479,9316,2.135],[38324520465,9315.9,2.7681],[38334307411,9315.5,1],[38330313617,9315.3,0.84455],[38334077770,9315.294024,0.01248397],[38334286663,9315.294024,1],[38325533762,9315.290315394,2.40498],[38334310018,9315.2,3],[38333682617,9314.6,0.0011],[38334304794,9314.6,0.76364676],[38334304798,9314.3,0.69242113],[38332915733,9313.8,0.0199],[38334084411,9312.8,1],[38334311893,9350.1,-1.015],[38334302734,9350.3,-0.26737],[38334300732,9350.8,-5.2],[38333957619,9351,-0.90677089],[38334300521,9351,-1.6457],[38334301600,9351.012829557,-0.0523],[38334308878,9351.7,-2.5],[38334299570,9351.921544,-0.1015],[38334279367,9352.1,-0.26732],[38334299569,9352.411802928,-0.4036],[38334202773,9353.4,-0.02139404],[38333918472,9353.7,-1.96412776],[38334278782,9354,-0.26731],[38334278606,9355,-1.2785],[38334302105,9355.439221251,-0.79191542],[38313897370,9355.569409242,-0.43363],[38334292995,9355.584296,-0.0979],[38334216989,9355.8,-0.03686414],[38333894025,9355.9,-0.26721],[38334293798,9355.936691952,-0.4311],[38331159479,9356,-0.4204022],[38333918888,9356.1,-1.10885563],[38334298205,9356.4,-0.20124428],[38328427481,9356.5,-0.1],[38333343289,9356.6,-0.41034213],[38334297205,9356.6,-0.08835018],[38334277927,9356.741101161,-0.0737],[38334311645,9356.8,-0.5],[38334309002,9356.9,-5],[38334309736,9357,-0.10680107],[38334306448,9357.4,-0.18645275],[38333693302,9357.7,-0.2672],[38332815159,9357.8,-0.0011],[38331239824,9358.2,-0.02],[38334271608,9358.3,-2.999],[38334311971,9358.4,-0.55],[38333919260,9358.5,-1.9972841],[38334265365,9358.5,-1.7841],[38334277960,9359,-3],[38334274601,9359.020969848,-3],[38326848839,9359.1,-0.84],[38334291080,9359.247048,-0.16199869],[38326848844,9359.4,-1.84],[38333680200,9359.6,-0.26713],[38331326606,9359.8,-0.84454],[38334309738,9359.8,-0.10680107],[38331314707,9359.9,-0.2],[38333919803,9360.9,-1.41177599],[38323651149,9361.33417827,-0.71442],[38333656906,9361.5,-0.26705],[38334035500,9361.5,-0.40861586],[38334091886,9362.4,-6.85940815],[38334269617,9362.5,-4],[38323629409,9362.545858872,-2.40497],[38334309737,9362.7,-0.10680107],[38334312380,9362.7,-3],[38325280830,9362.8,-1.75123],[38326622800,9362.8,-1.05145],[38333175230,9363,-0.0011],[38326848745,9363.2,-0.79],[38334308960,9363.206775564,-0.12],[38333920234,9363.3,-1.25318113],[38326848843,9363.4,-1.29],[38331239823,9363.4,-0.02],[38333209613,9363.4,-0.26719],[38334299964,9364,-0.05583123],[38323470224,9364.161816648,-0.12912],[38334284711,9365,-0.21346019],[38334299594,9365,-2.6757062],[38323211816,9365.073132585,-0.21262],[38334312456,9365.1,-0.11167861],[38333209612,9365.2,-0.26719],[38327770474,9365.3,-0.0073],[38334298788,9365.3,-0.3],[38334075803,9365.409831204,-0.30772637],[38334309740,9365.5,-0.10680107],[38326608767,9365.7,-2.76809],[38333920657,9365.7,-1.25848083],[38329594226,9366.6,-0.02587],[38334311813,9366.7,-4.72290945],[38316386301,9367.39258128,-2.37581],[38334302026,9367.4,-4.5],[38334228915,9367.9,-0.81725458],[38333921381,9368.1,-1.72213641],[38333175678,9368.2,-0.0011],[38334301150,9368.2,-2.654604],[38334297208,9368.3,-0.78036466],[38334309739,9368.3,-0.10680107],[38331227515,9368.7,-0.02],[38331184470,9369,-0.003975],[38334203436,9369.319616,-0.32397695],[38334269964,9369.7,-0.5],[38328386732,9370,-4.11759935],[38332719555,9370,-0.025],[38333921935,9370.5,-1.2224398],[38334258511,9370.5,-0.35],[38326848842,9370.8,-0.34],[38333985038,9370.9,-0.8551502],[38334283018,9370.9,-1],[38326848744,9371,-1.34]],5]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1357,7 +1377,7 @@ func TestWsOrderBook(t *testing.T) { } func TestWsTradeResponse(t *testing.T) { - err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTrades, Key: 18788}) + err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.AllTradesChannel, Key: 18788}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON := `[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1367,7 +1387,7 @@ func TestWsTradeResponse(t *testing.T) { } func TestWsTickerResponse(t *testing.T) { - err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTicker, Key: 11534}) + err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.TickerChannel, Key: 11534}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON := `[11534,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1378,7 +1398,7 @@ func TestWsTickerResponse(t *testing.T) { if err != nil { t.Error(err) } - err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123412}) + err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: subscription.TickerChannel, Key: 123412}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON = `[123412,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1389,7 +1409,7 @@ func TestWsTickerResponse(t *testing.T) { if err != nil { t.Error(err) } - err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123413}) + err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: subscription.TickerChannel, Key: 123413}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON = `[123413,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1400,7 +1420,7 @@ func TestWsTickerResponse(t *testing.T) { if err != nil { t.Error(err) } - err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123414}) + err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: subscription.TickerChannel, Key: 123414}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON = `[123414,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1410,7 +1430,7 @@ func TestWsTickerResponse(t *testing.T) { } func TestWsCandleResponse(t *testing.T) { - err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsCandles, Key: 343351}) + err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.CandlesChannel, Key: 343351}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON := `[343351,[[1574698260000,7379.785503,7383.8,7388.3,7379.785503,1.68829482]]]` err = b.wsHandleData([]byte(pressXToJSON)) diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 24719c57cc0..42fec639c6a 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "text/template" "time" "github.com/buger/jsonparser" @@ -40,6 +41,13 @@ type checksum struct { var checksumStore = make(map[int]*checksum) var cMtx sync.Mutex +var subscriptionNames = map[string]string{ + subscription.TickerChannel: wsTicker, + subscription.OrderbookChannel: wsBook, + subscription.CandlesChannel: wsCandles, + subscription.AllTradesChannel: wsTrades, +} + // WsConnect starts a new websocket connection func (b *Bitfinex) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { @@ -524,35 +532,35 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error { return nil } -func (b *Bitfinex) handleWSChannelUpdate(c *subscription.Subscription, eventType string, d []interface{}) error { - if c == nil { +func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error { + if s == nil { return fmt.Errorf("%w: Subscription param", common.ErrNilPointer) } if eventType == wsChecksum { - return b.handleWSChecksum(c, d) + return b.handleWSChecksum(s, d) } if eventType == wsHeartbeat { return nil } - if len(c.Pairs) != 1 { + if len(s.Pairs) != 1 { return subscription.ErrNotSinglePair } - switch c.Channel { - case wsBook: - return b.handleWSBookUpdate(c, d) - case wsCandles: - return b.handleWSCandleUpdate(c, d) - case wsTicker: - return b.handleWSTickerUpdate(c, d) - case wsTrades: - return b.handleWSTradesUpdate(c, eventType, d) + switch s.Channel { + case subscription.OrderbookChannel: + return b.handleWSBookUpdate(s, d) + case subscription.CandlesChannel: + return b.handleWSCandleUpdate(s, d) + case subscription.TickerChannel: + return b.handleWSTickerUpdate(s, d) + case subscription.AllTradesChannel: + return b.handleWSTradesUpdate(s, eventType, d) } - return fmt.Errorf("%s unhandled channel update: %s", b.Name, c.Channel) + return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel) } func (b *Bitfinex) handleWSChecksum(c *subscription.Subscription, d []interface{}) error { @@ -1668,44 +1676,18 @@ func (b *Bitfinex) resubOrderbook(c *subscription.Subscription) error { return nil } -// generateSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() +// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature func (b *Bitfinex) generateSubscriptions() (subscription.List, error) { - var channels = []string{wsBook, wsTrades, wsTicker, wsCandles} - - var subscriptions subscription.List - assets := b.GetAssetTypes(true) - for i := range assets { - if !b.IsAssetWebsocketSupported(assets[i]) { - continue - } - enabledPairs, err := b.GetEnabledPairs(assets[i]) - if err != nil { - return nil, err - } - - for j := range channels { - for k := range enabledPairs { - params := make(map[string]interface{}) - if channels[j] == wsBook { - params["prec"] = "R0" - params["len"] = "100" - } - - if channels[j] == wsCandles && assets[i] == asset.MarginFunding { - params[CandlesPeriodKey] = "30" - } - - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: channels[j], - Pairs: currency.Pairs{enabledPairs[k]}, - Params: params, - Asset: assets[i], - }) - } - } - } + return b.Features.Subscriptions.ExpandTemplates(b) +} - return subscriptions, nil +// GetSubscriptionTemplate returns a subscription channel template +func (b *Bitfinex) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl"). + Funcs(template.FuncMap{ + "channelName": channelName, + }). + Parse(subTplText) } // ConfigureWS to send checksums and sequence numbers @@ -1717,26 +1699,26 @@ func (b *Bitfinex) ConfigureWS() error { } // Subscribe sends a websocket message to receive data from channels -func (b *Bitfinex) Subscribe(channels subscription.List) error { - return b.ParallelChanOp(channels, b.subscribeToChan, 1) +func (b *Bitfinex) Subscribe(subs subscription.List) error { + return b.ParallelChanOp(subs, b.subscribeToChan, 1) } // Unsubscribe sends a websocket message to stop receiving data from channels -func (b *Bitfinex) Unsubscribe(channels subscription.List) error { - return b.ParallelChanOp(channels, b.unsubscribeFromChan, 1) +func (b *Bitfinex) Unsubscribe(subs subscription.List) error { + return b.ParallelChanOp(subs, b.unsubscribeFromChan, 1) } // subscribeToChan handles a single subscription and parses the result // on success it adds the subscription to the websocket -func (b *Bitfinex) subscribeToChan(chans subscription.List) error { - if len(chans) != 1 { +func (b *Bitfinex) subscribeToChan(subs subscription.List) error { + if len(subs) != 1 { return errors.New("subscription batching limited to 1") } - c := chans[0] - req, err := subscribeReq(c) + s := subs[0] + req, err := subscribeReq(s) if err != nil { - return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs) + return fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs) } // subId is a single round-trip identifier that provides linking sub requests to chanIDs @@ -1746,23 +1728,23 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error { // Add a temporary Key so we can find this Sub when we get the resp without delay or context switch // Otherwise we might drop the first messages after the subscribed resp - c.Key = subID // Note subID string type avoids conflicts with later chanID key - if err = b.Websocket.AddSubscriptions(c); err != nil { - return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err) + s.Key = subID // Note subID string type avoids conflicts with later chanID key + if err = b.Websocket.AddSubscriptions(s); err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s", err, s.Channel, s.Pairs) } // Always remove the temporary subscription keyed by subID defer func() { - _ = b.Websocket.RemoveSubscriptions(c) + _ = b.Websocket.RemoveSubscriptions(s) }() respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req) if err != nil { - return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs) + return fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs) } if err = b.getErrResp(respRaw); err != nil { - wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs) + wErr := fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs) b.Websocket.DataHandler <- wErr return wErr } @@ -1771,77 +1753,72 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error { } // subscribeReq returns a map of request params for subscriptions -func subscribeReq(c *subscription.Subscription) (map[string]interface{}, error) { - if c == nil { +func subscribeReq(s *subscription.Subscription) (map[string]interface{}, error) { + if s == nil { return nil, fmt.Errorf("%w: Subscription param", common.ErrNilPointer) } - if len(c.Pairs) != 1 { + if len(s.Pairs) > 1 { return nil, subscription.ErrNotSinglePair } - pair := c.Pairs[0] + + c := channelName(s) req := map[string]interface{}{ "event": "subscribe", - "channel": c.Channel, + "channel": c, } - for k, v := range c.Params { + var fundingPeriod string + for k, v := range s.Params { switch k { - case CandlesPeriodKey, CandlesTimeframeKey: - // Skip these internal Params - case "key", "symbol": - // Ensure user's Params aren't silently overwritten - return nil, fmt.Errorf("%s %w", k, errParamNotAllowed) + case CandlesPeriodKey: + if s, ok := v.(string); !ok { + return nil, common.GetTypeAssertError("string", v, "subscription.CandlesPeriodKey") + } else { + fundingPeriod = ":" + s + } + case "key", "symbol", "len": + return nil, fmt.Errorf("%w: %s", errParamNotAllowed, k) // Ensure user's Params aren't silently overwritten default: req[k] = v } } - prefix := "t" - if c.Asset == asset.MarginFunding { - prefix = "f" + if s.Levels != 0 { + req["len"] = s.Levels } - needsDelimiter := pair.Len() > 6 - - var formattedPair string - if needsDelimiter { - formattedPair = pair.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String() - } else { - formattedPair = currency.PairFormat{Uppercase: true}.Format(pair) + prefix := "t" + if s.Asset == asset.MarginFunding { + prefix = "f" } - if c.Channel == wsCandles { - timeframe := "1m" - if t, ok := c.Params[CandlesTimeframeKey]; ok { - if timeframe, ok = t.(string); !ok { - return nil, common.GetTypeAssertError("string", t, "Subscription.CandlesTimeframeKey") - } + if len(s.Pairs) == 1 { + var pairFmt currency.PairFormat + if needsDelimiter := s.Pairs[0].Len() > 6; needsDelimiter { + pairFmt = currency.PairFormat{Uppercase: true, Delimiter: ":"} + } else { + pairFmt = currency.PairFormat{Uppercase: true} } - fundingPeriod := "" - if p, ok := c.Params[CandlesPeriodKey]; ok { - s, cOk := p.(string) - if !cOk { - return nil, common.GetTypeAssertError("string", p, "Subscription.CandlesPeriodKey") - } - fundingPeriod = ":p" + s + symbol := s.Pairs.Format(pairFmt).Join() + if c == wsCandles { + req["key"] = "trade:" + s.Interval.Short() + ":" + prefix + symbol + fundingPeriod + } else { + req["symbol"] = prefix + symbol } - req["key"] = "trade:" + timeframe + ":" + prefix + formattedPair + fundingPeriod - } else { - req["symbol"] = prefix + formattedPair } return req, nil } // unsubscribeFromChan sends a websocket message to stop receiving data from a channel -func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error { - if len(chans) != 1 { +func (b *Bitfinex) unsubscribeFromChan(subs subscription.List) error { + if len(subs) != 1 { return errors.New("subscription batching limited to 1") } - c := chans[0] - chanID, ok := c.Key.(int) + s := subs[0] + chanID, ok := s.Key.(int) if !ok { - return common.GetTypeAssertError("int", c.Key, "chanID") + return common.GetTypeAssertError("int", s.Key, "subscription.Key") } req := map[string]interface{}{ @@ -1855,12 +1832,12 @@ func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error { } if err := b.getErrResp(respRaw); err != nil { - wErr := fmt.Errorf("%w from ChanId: %v; %w", stream.ErrUnsubscribeFailure, chanID, err) + wErr := fmt.Errorf("%w: ChanId: %v", err, chanID) b.Websocket.DataHandler <- wErr return wErr } - return b.Websocket.RemoveSubscriptions(c) + return b.Websocket.RemoveSubscriptions(s) } // getErrResp takes a json response string and looks for an error event type @@ -2233,3 +2210,24 @@ subSort: break } } + +func channelName(s *subscription.Subscription) string { + if name, ok := subscriptionNames[s.Channel]; ok { + return name + } + return s.Channel +} + +const subTplText = ` +{{- if $.S.Asset -}} + {{ range $asset, $pairs := $.AssetPairs }} + {{- range $p := $pairs -}} + {{- channelName $.S }} + {{- $.PairSeparator }} + {{- end -}} + {{ $.AssetSeparator }} + {{- end -}} +{{- else -}} + {{- channelName $.S }} +{{- end }} +` diff --git a/exchanges/bitfinex/bitfinex_wrapper.go b/exchanges/bitfinex/bitfinex_wrapper.go index 1ae452b4c84..6c5ac1a7e67 100644 --- a/exchanges/bitfinex/bitfinex_wrapper.go +++ b/exchanges/bitfinex/bitfinex_wrapper.go @@ -27,6 +27,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -159,6 +160,13 @@ func (b *Bitfinex) SetDefaults() { GlobalResultLimit: 10000, }, }, + Subscriptions: []*subscription.Subscription{ + {Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All}, + {Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All}, + {Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.OneMin}, + {Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.MarginFunding, Interval: kline.OneMin, Params: map[string]any{CandlesPeriodKey: "p30"}}, + {Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.All, Levels: 100, Params: map[string]any{"prec": "R0"}}, + }, } b.Requester, err = request.New(b.Name, diff --git a/internal/testing/subscriptions/subscriptions.go b/internal/testing/subscriptions/subscriptions.go index bd032e299ed..f32ee1acd42 100644 --- a/internal/testing/subscriptions/subscriptions.go +++ b/internal/testing/subscriptions/subscriptions.go @@ -1,4 +1,4 @@ -package subscriptionstest +package subscription import ( "maps"