From ba77c9946d4772574eebebed509c6edef21c5d9e Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Fri, 18 Oct 2024 00:23:15 +0200 Subject: [PATCH 1/7] Bitfinex: Add subscription configuration and templating (#1597) * Bitfinex: Correct comment about R0 OB * Bitfinex: Test config updates * Bitfinex: Add missing assets to configtest * Bitfinex: Rename GenerateDefaultSubscriptions * Bitfinex: Add Subscription configuration * Subscriptions: Document panic in templates --- exchanges/bitfinex/bitfinex.go | 4 +- exchanges/bitfinex/bitfinex_test.go | 130 ++++---- exchanges/bitfinex/bitfinex_websocket.go | 288 +++++++++--------- exchanges/bitfinex/bitfinex_wrapper.go | 3 +- exchanges/subscription/README.md | 5 +- .../testing/subscriptions/subscriptions.go | 2 +- testdata/configtest.json | 40 ++- 7 files changed, 269 insertions(+), 203 deletions(-) diff --git a/exchanges/bitfinex/bitfinex.go b/exchanges/bitfinex/bitfinex.go index 4ef8f877b3c..996ab3cb5ee 100644 --- a/exchanges/bitfinex/bitfinex.go +++ b/exchanges/bitfinex/bitfinex.go @@ -104,9 +104,7 @@ const ( bitfinexChecksumFlag = 131072 bitfinexWsSequenceFlag = 65536 - // CandlesTimeframeKey configures the timeframe in subscription.Subscription.Params - CandlesTimeframeKey = "_timeframe" - // CandlesPeriodKey configures the aggregated period in subscription.Subscription.Params + // CandlesPeriodKey configures the Candles aggregated period for MarginFunding in subscription.Subscription.Params CandlesPeriodKey = "_period" ) diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index 1445a3ac4ad..854d7a46761 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -26,6 +26,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -541,8 +542,7 @@ func TestUpdateTickers(t *testing.T) { require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") testexch.UpdatePairsOnce(t, b) - assets := b.GetAssetTypes(false) - for _, a := range assets { + for _, a := range b.GetAssetTypes(true) { avail, err := b.GetAvailablePairs(a) require.NoError(t, err, "GetAvailablePairs should not error") @@ -1136,13 +1136,49 @@ func TestWsAuth(t *testing.T) { } } +func TestGenerateSubscriptions(t *testing.T) { + t.Parallel() + + b.Websocket.SetCanUseAuthenticatedEndpoints(true) + require.True(t, b.Websocket.CanUseAuthenticatedEndpoints(), "CanUseAuthenticatedEndpoints must return true") + subs, err := b.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions should not error") + exp := subscription.List{} + for _, baseSub := range b.Features.Subscriptions { + for _, a := range b.GetAssetTypes(true) { + if baseSub.Asset != asset.All && baseSub.Asset != a { + continue + } + pairs, err := b.GetEnabledPairs(a) + require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a) + for _, p := range pairs.Format(currency.PairFormat{Uppercase: true}) { + s := baseSub.Clone() + s.Asset = a + s.Pairs = currency.Pairs{p} + switch s.Channel { + case subscription.TickerChannel: + s.QualifiedChannel = `{"channel":"ticker","symbol":"t` + p.String() + `"}` + case subscription.CandlesChannel: + s.QualifiedChannel = `{"channel":"candles","key":"trade:1m:t` + p.String() + `"}` + case subscription.OrderbookChannel: + s.QualifiedChannel = `{"channel":"book","len":100,"prec":"R0","symbol":"t` + p.String() + `"}` + case subscription.AllTradesChannel: + s.QualifiedChannel = `{"channel":"trades","symbol":"t` + p.String() + `"}` + } + exp = append(exp, s) + } + } + } + testsubs.EqualLists(t, exp, subs) +} + // TestWsSubscribe tests Subscribe and Unsubscribe functionality // See also TestSubscribeReq which covers key and symbol conversion func TestWsSubscribe(t *testing.T) { b := new(Bitfinex) //nolint:govet // Intentional shadow of b to avoid future copy/paste mistakes require.NoError(t, testexch.Setup(b), "TestInstance must not error") testexch.SetupWs(t, b) - err := b.Subscribe(subscription.List{{Channel: wsTicker, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot}}) + err := b.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot}}) require.NoError(t, err, "Subrcribe should not error") catcher := func() (ok bool) { i := <-b.Websocket.ToRoutine @@ -1155,19 +1191,15 @@ func TestWsSubscribe(t *testing.T) { require.NoError(t, err, "GetSubscriptions should not error") require.Len(t, subs, 1, "We should only have 1 subscription; subID subscription should have been Removed by subscribeToChan") - err = b.Subscribe(subscription.List{{Channel: wsTicker, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot}}) - require.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Duplicate subscription should error correctly") - catcher = func() bool { + err = b.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot}}) + require.ErrorContains(t, err, "subscribe: dup (code: 10301)", "Duplicate subscription should error correctly") + + assert.EventuallyWithT(t, func(t *assert.CollectT) { i := <-b.Websocket.ToRoutine - if e, ok := i.(error); ok { - if assert.ErrorIs(t, e, stream.ErrSubscriptionFailure, "Error should go to DataHandler") { - assert.ErrorContains(t, e, "subscribe: dup (code: 10301)", "Error should contain message and code") - return true - } - } - return false - } - assert.Eventually(t, catcher, sharedtestvalues.WebsocketResponseDefaultTimeout, time.Millisecond*10, "error response should arrive") + e, ok := i.(error) + require.True(t, ok, "must find an error") + assert.ErrorContains(t, e, "subscribe: dup (code: 10301)", "error should be correct") + }, sharedtestvalues.WebsocketResponseDefaultTimeout, time.Millisecond*10, "error response should go to ToRoutine") subs, err = b.GetSubscriptions() require.NoError(t, err, "GetSubscriptions should not error") @@ -1180,53 +1212,49 @@ func TestWsSubscribe(t *testing.T) { assert.True(t, ok, "sub.Key should be an int") err = b.Unsubscribe(subs) - assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Unsubscribe should error") assert.ErrorContains(t, err, strconv.Itoa(chanID), "Unsubscribe should contain correct chanId") assert.ErrorContains(t, err, "unsubscribe: invalid (code: 10400)", "Unsubscribe should contain correct upstream error") err = b.Subscribe(subscription.List{{ - Channel: wsTicker, + Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, Asset: asset.Spot, - Params: map[string]interface{}{"key": "tBTCUSD"}, + Params: map[string]any{"key": "tBTCUSD"}, }}) - assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Trying to use a 'key' param should error ErrSubscriptionFailure") assert.ErrorIs(t, err, errParamNotAllowed, "Trying to use a 'key' param should error errParamNotAllowed") } -// TestSubscribeReq tests the channel to request map marshalling -func TestSubscribeReq(t *testing.T) { - c := &subscription.Subscription{ - Channel: wsCandles, - Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, - Asset: asset.MarginFunding, - Params: map[string]interface{}{ - CandlesPeriodKey: "30", - }, +// TestSubToMap tests the channel to request map marshalling +func TestSubToMap(t *testing.T) { + s := &subscription.Subscription{ + Channel: subscription.CandlesChannel, + Asset: asset.Spot, + Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.USD)}, + Interval: kline.OneMin, } - r, err := subscribeReq(c) - assert.NoError(t, err, "subscribeReq should not error") - assert.Equal(t, "trade:1m:fBTCUSD:p30", r["key"], "key contain period and default timeframe") - c.Params = map[string]interface{}{ - CandlesTimeframeKey: "15m", - } - r, err = subscribeReq(c) - assert.NoError(t, err, "subscribeReq should not error") - assert.Equal(t, "trade:15m:fBTCUSD", r["key"], "key should be contain specific timeframe and no period") + r := subToMap(s, s.Asset, s.Pairs[0]) + assert.Equal(t, "trade:1m:tBTCUSD", r["key"], "key should contain a specific timeframe and no period") + + s.Interval = kline.FifteenMin + s.Asset = asset.MarginFunding + s.Params = map[string]any{CandlesPeriodKey: "p30"} + + r = subToMap(s, s.Asset, s.Pairs[0]) + assert.Equal(t, "trade:15m:fBTCUSD:p30", r["key"], "key should contain a period and specific timeframe") + + s.Interval = kline.FifteenMin - c = &subscription.Subscription{ - Channel: wsBook, + s = &subscription.Subscription{ + Channel: subscription.OrderbookChannel, Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.DOGE)}, Asset: asset.Spot, } - r, err = subscribeReq(c) - assert.NoError(t, err, "subscribeReq should not error") + r = subToMap(s, s.Asset, s.Pairs[0]) assert.Equal(t, "tBTC:DOGE", r["symbol"], "symbol should use colon delimiter if a currency is > 3 chars") - c.Pairs = currency.Pairs{currency.NewPair(currency.BTC, currency.LTC)} - r, err = subscribeReq(c) - assert.NoError(t, err, "subscribeReq should not error") + s.Pairs = currency.Pairs{currency.NewPair(currency.BTC, currency.LTC)} + r = subToMap(s, s.Asset, s.Pairs[0]) assert.Equal(t, "tBTCLTC", r["symbol"], "symbol should not use colon delimiter if both currencies < 3 chars") } @@ -1337,7 +1365,7 @@ func TestWsSubscribedResponse(t *testing.T) { } func TestWsOrderBook(t *testing.T) { - err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Key: 23405, Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsBook}) + err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Key: 23405, Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.OrderbookChannel}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON := `[23405,[[38334303613,9348.8,0.53],[38334308111,9348.8,5.98979404],[38331335157,9344.1,1.28965787],[38334302803,9343.8,0.08230094],[38334279092,9343,0.8],[38334307036,9342.938663676,0.8],[38332749107,9342.9,0.2],[38332277330,9342.8,0.85],[38329406786,9342,0.1432012],[38332841570,9341.947288638,0.3],[38332163238,9341.7,0.3],[38334303384,9341.6,0.324],[38332464840,9341.4,0.5],[38331935870,9341.2,0.5],[38334312082,9340.9,0.02126899],[38334261292,9340.8,0.26763],[38334138680,9340.625455254,0.12],[38333896802,9339.8,0.85],[38331627527,9338.9,1.57863959],[38334186713,9338.9,0.26769],[38334305819,9338.8,2.999],[38334211180,9338.75285796,3.999],[38334310699,9337.8,0.10679883],[38334307414,9337.5,1],[38334179822,9337.1,0.26773],[38334306600,9336.659955102,1.79],[38334299667,9336.6,1.1],[38334306452,9336.6,0.13979771],[38325672859,9336.3,1.25],[38334311646,9336.2,1],[38334258509,9336.1,0.37],[38334310592,9336,1.79],[38334310378,9335.6,1.43],[38334132444,9335.2,0.26777],[38331367325,9335,0.07],[38334310703,9335,0.10680562],[38334298209,9334.7,0.08757301],[38334304857,9334.456899462,0.291],[38334309940,9334.088390727,0.0725],[38334310377,9333.7,1.2868],[38334297615,9333.607784,0.1108],[38334095188,9333.3,0.26785],[38334228913,9332.7,0.40861186],[38334300526,9332.363996604,0.3884],[38334310701,9332.2,0.10680562],[38334303548,9332.005382871,0.07],[38334311798,9331.8,0.41285228],[38334301012,9331.7,1.7952],[38334089877,9331.4,0.2679],[38321942150,9331.2,0.2],[38334310670,9330,1.069],[38334063096,9329.6,0.26796],[38334310700,9329.4,0.10680562],[38334310404,9329.3,1],[38334281630,9329.1,6.57150597],[38334036864,9327.7,0.26801],[38334310702,9326.6,0.10680562],[38334311799,9326.1,0.50220625],[38334164163,9326,0.219638],[38334309722,9326,1.5],[38333051682,9325.8,0.26807],[38334302027,9325.7,0.75],[38334203435,9325.366592,0.32397696],[38321967613,9325,0.05],[38334298787,9324.9,0.3],[38334301719,9324.8,3.6227592],[38331316716,9324.763454646,0.71442],[38334310698,9323.8,0.10680562],[38334035499,9323.7,0.23431017],[38334223472,9322.670551788,0.42150603],[38334163459,9322.560399006,0.143967],[38321825171,9320.8,2],[38334075805,9320.467496148,0.30772633],[38334075800,9319.916732238,0.61457592],[38333682302,9319.7,0.0011],[38331323088,9319.116771762,0.12913],[38333677480,9319,0.0199],[38334277797,9318.6,0.89],[38325235155,9318.041088,1.20249],[38334310910,9317.82382938,1.79],[38334311811,9317.2,0.61079138],[38334311812,9317.2,0.71937652],[38333298214,9317.1,50],[38334306359,9317,1.79],[38325531545,9316.382823951,0.21263],[38333727253,9316.3,0.02316372],[38333298213,9316.1,45],[38333836479,9316,2.135],[38324520465,9315.9,2.7681],[38334307411,9315.5,1],[38330313617,9315.3,0.84455],[38334077770,9315.294024,0.01248397],[38334286663,9315.294024,1],[38325533762,9315.290315394,2.40498],[38334310018,9315.2,3],[38333682617,9314.6,0.0011],[38334304794,9314.6,0.76364676],[38334304798,9314.3,0.69242113],[38332915733,9313.8,0.0199],[38334084411,9312.8,1],[38334311893,9350.1,-1.015],[38334302734,9350.3,-0.26737],[38334300732,9350.8,-5.2],[38333957619,9351,-0.90677089],[38334300521,9351,-1.6457],[38334301600,9351.012829557,-0.0523],[38334308878,9351.7,-2.5],[38334299570,9351.921544,-0.1015],[38334279367,9352.1,-0.26732],[38334299569,9352.411802928,-0.4036],[38334202773,9353.4,-0.02139404],[38333918472,9353.7,-1.96412776],[38334278782,9354,-0.26731],[38334278606,9355,-1.2785],[38334302105,9355.439221251,-0.79191542],[38313897370,9355.569409242,-0.43363],[38334292995,9355.584296,-0.0979],[38334216989,9355.8,-0.03686414],[38333894025,9355.9,-0.26721],[38334293798,9355.936691952,-0.4311],[38331159479,9356,-0.4204022],[38333918888,9356.1,-1.10885563],[38334298205,9356.4,-0.20124428],[38328427481,9356.5,-0.1],[38333343289,9356.6,-0.41034213],[38334297205,9356.6,-0.08835018],[38334277927,9356.741101161,-0.0737],[38334311645,9356.8,-0.5],[38334309002,9356.9,-5],[38334309736,9357,-0.10680107],[38334306448,9357.4,-0.18645275],[38333693302,9357.7,-0.2672],[38332815159,9357.8,-0.0011],[38331239824,9358.2,-0.02],[38334271608,9358.3,-2.999],[38334311971,9358.4,-0.55],[38333919260,9358.5,-1.9972841],[38334265365,9358.5,-1.7841],[38334277960,9359,-3],[38334274601,9359.020969848,-3],[38326848839,9359.1,-0.84],[38334291080,9359.247048,-0.16199869],[38326848844,9359.4,-1.84],[38333680200,9359.6,-0.26713],[38331326606,9359.8,-0.84454],[38334309738,9359.8,-0.10680107],[38331314707,9359.9,-0.2],[38333919803,9360.9,-1.41177599],[38323651149,9361.33417827,-0.71442],[38333656906,9361.5,-0.26705],[38334035500,9361.5,-0.40861586],[38334091886,9362.4,-6.85940815],[38334269617,9362.5,-4],[38323629409,9362.545858872,-2.40497],[38334309737,9362.7,-0.10680107],[38334312380,9362.7,-3],[38325280830,9362.8,-1.75123],[38326622800,9362.8,-1.05145],[38333175230,9363,-0.0011],[38326848745,9363.2,-0.79],[38334308960,9363.206775564,-0.12],[38333920234,9363.3,-1.25318113],[38326848843,9363.4,-1.29],[38331239823,9363.4,-0.02],[38333209613,9363.4,-0.26719],[38334299964,9364,-0.05583123],[38323470224,9364.161816648,-0.12912],[38334284711,9365,-0.21346019],[38334299594,9365,-2.6757062],[38323211816,9365.073132585,-0.21262],[38334312456,9365.1,-0.11167861],[38333209612,9365.2,-0.26719],[38327770474,9365.3,-0.0073],[38334298788,9365.3,-0.3],[38334075803,9365.409831204,-0.30772637],[38334309740,9365.5,-0.10680107],[38326608767,9365.7,-2.76809],[38333920657,9365.7,-1.25848083],[38329594226,9366.6,-0.02587],[38334311813,9366.7,-4.72290945],[38316386301,9367.39258128,-2.37581],[38334302026,9367.4,-4.5],[38334228915,9367.9,-0.81725458],[38333921381,9368.1,-1.72213641],[38333175678,9368.2,-0.0011],[38334301150,9368.2,-2.654604],[38334297208,9368.3,-0.78036466],[38334309739,9368.3,-0.10680107],[38331227515,9368.7,-0.02],[38331184470,9369,-0.003975],[38334203436,9369.319616,-0.32397695],[38334269964,9369.7,-0.5],[38328386732,9370,-4.11759935],[38332719555,9370,-0.025],[38333921935,9370.5,-1.2224398],[38334258511,9370.5,-0.35],[38326848842,9370.8,-0.34],[38333985038,9370.9,-0.8551502],[38334283018,9370.9,-1],[38326848744,9371,-1.34]],5]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1355,7 +1383,7 @@ func TestWsOrderBook(t *testing.T) { } func TestWsTradeResponse(t *testing.T) { - err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTrades, Key: 18788}) + err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.AllTradesChannel, Key: 18788}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON := `[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1365,7 +1393,7 @@ func TestWsTradeResponse(t *testing.T) { } func TestWsTickerResponse(t *testing.T) { - err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTicker, Key: 11534}) + err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.TickerChannel, Key: 11534}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON := `[11534,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1376,7 +1404,7 @@ func TestWsTickerResponse(t *testing.T) { if err != nil { t.Error(err) } - err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123412}) + err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: subscription.TickerChannel, Key: 123412}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON = `[123412,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1387,7 +1415,7 @@ func TestWsTickerResponse(t *testing.T) { if err != nil { t.Error(err) } - err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123413}) + err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: subscription.TickerChannel, Key: 123413}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON = `[123413,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1398,7 +1426,7 @@ func TestWsTickerResponse(t *testing.T) { if err != nil { t.Error(err) } - err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123414}) + err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: subscription.TickerChannel, Key: 123414}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON = `[123414,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]` err = b.wsHandleData([]byte(pressXToJSON)) @@ -1408,7 +1436,7 @@ func TestWsTickerResponse(t *testing.T) { } func TestWsCandleResponse(t *testing.T) { - err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsCandles, Key: 343351}) + err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.CandlesChannel, Key: 343351}) require.NoError(t, err, "AddSubscriptions must not error") pressXToJSON := `[343351,[[1574698260000,7379.785503,7383.8,7388.3,7379.785503,1.68829482]]]` err = b.wsHandleData([]byte(pressXToJSON)) diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index b4cb9904542..74c04afc2d3 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -11,8 +11,10 @@ import ( "strconv" "strings" "sync" + "text/template" "time" + "github.com/Masterminds/sprig/v3" "github.com/buger/jsonparser" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" @@ -20,6 +22,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/common/crypto" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/request" @@ -30,6 +33,15 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) +var defaultSubscriptions = subscription.List{ + {Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All}, + {Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All}, + {Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.OneMin}, + {Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.Margin, Interval: kline.OneMin}, + {Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.MarginFunding, Interval: kline.OneMin, Params: map[string]any{CandlesPeriodKey: "p30"}}, + {Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.All, Levels: 100, Params: map[string]any{"prec": "R0"}}, +} + var comms = make(chan stream.Response) type checksum struct { @@ -41,6 +53,13 @@ type checksum struct { var checksumStore = make(map[int]*checksum) var cMtx sync.Mutex +var subscriptionNames = map[string]string{ + subscription.TickerChannel: wsTicker, + subscription.OrderbookChannel: wsBook, + subscription.CandlesChannel: wsCandles, + subscription.AllTradesChannel: wsTrades, +} + // WsConnect starts a new websocket connection func (b *Bitfinex) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { @@ -525,35 +544,35 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error { return nil } -func (b *Bitfinex) handleWSChannelUpdate(c *subscription.Subscription, eventType string, d []interface{}) error { - if c == nil { +func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error { + if s == nil { return fmt.Errorf("%w: Subscription param", common.ErrNilPointer) } if eventType == wsChecksum { - return b.handleWSChecksum(c, d) + return b.handleWSChecksum(s, d) } if eventType == wsHeartbeat { return nil } - if len(c.Pairs) != 1 { + if len(s.Pairs) != 1 { return subscription.ErrNotSinglePair } - switch c.Channel { - case wsBook: - return b.handleWSBookUpdate(c, d) - case wsCandles: - return b.handleWSCandleUpdate(c, d) - case wsTicker: - return b.handleWSTickerUpdate(c, d) - case wsTrades: - return b.handleWSTradesUpdate(c, eventType, d) + switch s.Channel { + case subscription.OrderbookChannel: + return b.handleWSBookUpdate(s, d) + case subscription.CandlesChannel: + return b.handleWSCandleUpdate(s, d) + case subscription.TickerChannel: + return b.handleWSTickerUpdate(s, d) + case subscription.AllTradesChannel: + return b.handleWSTradesUpdate(s, eventType, d) } - return fmt.Errorf("%s unhandled channel update: %s", b.Name, c.Channel) + return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel) } func (b *Bitfinex) handleWSChecksum(c *subscription.Subscription, d []interface{}) error { @@ -1669,44 +1688,20 @@ func (b *Bitfinex) resubOrderbook(c *subscription.Subscription) error { return nil } -// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() -func (b *Bitfinex) GenerateDefaultSubscriptions() (subscription.List, error) { - var channels = []string{wsBook, wsTrades, wsTicker, wsCandles} - - var subscriptions subscription.List - assets := b.GetAssetTypes(true) - for i := range assets { - if !b.IsAssetWebsocketSupported(assets[i]) { - continue - } - enabledPairs, err := b.GetEnabledPairs(assets[i]) - if err != nil { - return nil, err - } - - for j := range channels { - for k := range enabledPairs { - params := make(map[string]interface{}) - if channels[j] == wsBook { - params["prec"] = "R0" - params["len"] = "100" - } - - if channels[j] == wsCandles && assets[i] == asset.MarginFunding { - params[CandlesPeriodKey] = "30" - } - - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: channels[j], - Pairs: currency.Pairs{enabledPairs[k]}, - Params: params, - Asset: assets[i], - }) - } - } - } +// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature +func (b *Bitfinex) generateSubscriptions() (subscription.List, error) { + return b.Features.Subscriptions.ExpandTemplates(b) +} - return subscriptions, nil +// GetSubscriptionTemplate returns a subscription channel template +func (b *Bitfinex) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(sprig.FuncMap()).Funcs(template.FuncMap{ + "subToMap": subToMap, + "removeSpotFromMargin": func(ap map[asset.Item]currency.Pairs) string { + spotPairs, _ := b.GetEnabledPairs(asset.Spot) + return removeSpotFromMargin(ap, spotPairs) + }, + }).Parse(subTplText) } // ConfigureWS to send checksums and sequence numbers @@ -1718,26 +1713,36 @@ func (b *Bitfinex) ConfigureWS() error { } // Subscribe sends a websocket message to receive data from channels -func (b *Bitfinex) Subscribe(channels subscription.List) error { - return b.ParallelChanOp(channels, b.subscribeToChan, 1) +func (b *Bitfinex) Subscribe(subs subscription.List) error { + var err error + if subs, err = subs.ExpandTemplates(b); err != nil { + return err + } + return b.ParallelChanOp(subs, b.subscribeToChan, 1) } // Unsubscribe sends a websocket message to stop receiving data from channels -func (b *Bitfinex) Unsubscribe(channels subscription.List) error { - return b.ParallelChanOp(channels, b.unsubscribeFromChan, 1) +func (b *Bitfinex) Unsubscribe(subs subscription.List) error { + var err error + if subs, err = subs.ExpandTemplates(b); err != nil { + return err + } + return b.ParallelChanOp(subs, b.unsubscribeFromChan, 1) } // subscribeToChan handles a single subscription and parses the result // on success it adds the subscription to the websocket -func (b *Bitfinex) subscribeToChan(chans subscription.List) error { - if len(chans) != 1 { - return errors.New("subscription batching limited to 1") +func (b *Bitfinex) subscribeToChan(subs subscription.List) error { + if len(subs) != 1 { + return subscription.ErrNotSinglePair } - c := chans[0] - req, err := subscribeReq(c) - if err != nil { - return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs) + s := subs[0] + req := map[string]any{ + "event": "subscribe", + } + if err := json.Unmarshal([]byte(s.QualifiedChannel), &req); err != nil { + return err } // subId is a single round-trip identifier that provides linking sub requests to chanIDs @@ -1747,23 +1752,23 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error { // Add a temporary Key so we can find this Sub when we get the resp without delay or context switch // Otherwise we might drop the first messages after the subscribed resp - c.Key = subID // Note subID string type avoids conflicts with later chanID key - if err = b.Websocket.AddSubscriptions(b.Websocket.Conn, c); err != nil { - return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err) + s.Key = subID // Note subID string type avoids conflicts with later chanID key + if err := b.Websocket.AddSubscriptions(b.Websocket.Conn, s); err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s", err, s.Channel, s.Pairs) } // Always remove the temporary subscription keyed by subID defer func() { - _ = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, c) + _ = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s) }() respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "subscribe:"+subID, req) if err != nil { - return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs) + return fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs) } if err = b.getErrResp(respRaw); err != nil { - wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs) + wErr := fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs) b.Websocket.DataHandler <- wErr return wErr } @@ -1771,78 +1776,15 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error { return nil } -// subscribeReq returns a map of request params for subscriptions -func subscribeReq(c *subscription.Subscription) (map[string]interface{}, error) { - if c == nil { - return nil, fmt.Errorf("%w: Subscription param", common.ErrNilPointer) - } - if len(c.Pairs) != 1 { - return nil, subscription.ErrNotSinglePair - } - pair := c.Pairs[0] - req := map[string]interface{}{ - "event": "subscribe", - "channel": c.Channel, - } - - for k, v := range c.Params { - switch k { - case CandlesPeriodKey, CandlesTimeframeKey: - // Skip these internal Params - case "key", "symbol": - // Ensure user's Params aren't silently overwritten - return nil, fmt.Errorf("%s %w", k, errParamNotAllowed) - default: - req[k] = v - } - } - - prefix := "t" - if c.Asset == asset.MarginFunding { - prefix = "f" - } - - needsDelimiter := pair.Len() > 6 - - var formattedPair string - if needsDelimiter { - formattedPair = pair.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String() - } else { - formattedPair = currency.PairFormat{Uppercase: true}.Format(pair) - } - - if c.Channel == wsCandles { - timeframe := "1m" - if t, ok := c.Params[CandlesTimeframeKey]; ok { - if timeframe, ok = t.(string); !ok { - return nil, common.GetTypeAssertError("string", t, "Subscription.CandlesTimeframeKey") - } - } - fundingPeriod := "" - if p, ok := c.Params[CandlesPeriodKey]; ok { - s, cOk := p.(string) - if !cOk { - return nil, common.GetTypeAssertError("string", p, "Subscription.CandlesPeriodKey") - } - fundingPeriod = ":p" + s - } - req["key"] = "trade:" + timeframe + ":" + prefix + formattedPair + fundingPeriod - } else { - req["symbol"] = prefix + formattedPair - } - - return req, nil -} - // unsubscribeFromChan sends a websocket message to stop receiving data from a channel -func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error { - if len(chans) != 1 { +func (b *Bitfinex) unsubscribeFromChan(subs subscription.List) error { + if len(subs) != 1 { return errors.New("subscription batching limited to 1") } - c := chans[0] - chanID, ok := c.Key.(int) + s := subs[0] + chanID, ok := s.Key.(int) if !ok { - return common.GetTypeAssertError("int", c.Key, "chanID") + return common.GetTypeAssertError("int", s.Key, "subscription.Key") } req := map[string]interface{}{ @@ -1856,12 +1798,12 @@ func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error { } if err := b.getErrResp(respRaw); err != nil { - wErr := fmt.Errorf("%w from ChanId: %v; %w", stream.ErrUnsubscribeFailure, chanID, err) + wErr := fmt.Errorf("%w: ChanId: %v", err, chanID) b.Websocket.DataHandler <- wErr return wErr } - return b.Websocket.RemoveSubscriptions(b.Websocket.Conn, c) + return b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s) } // getErrResp takes a json response string and looks for an error event type @@ -2143,7 +2085,7 @@ func validateCRC32(book *orderbook.Base, token int) error { reOrderByID(book.Bids) reOrderByID(book.Asks) - // RO precision calculation is based on order ID's and amount values + // R0 precision calculation is based on order ID's and amount values var bids, asks []orderbook.Tranche for i := range 25 { if i < len(book.Bids) { @@ -2233,3 +2175,71 @@ subSort: break } } + +// subToMap returns a json object of request params for subscriptions +func subToMap(s *subscription.Subscription, a asset.Item, p currency.Pair) map[string]any { + c := s.Channel + if name, ok := subscriptionNames[s.Channel]; ok { + c = name + } + req := map[string]interface{}{ + "channel": c, + } + + var fundingPeriod string + for k, v := range s.Params { + switch k { + case CandlesPeriodKey: + if s, ok := v.(string); !ok { + panic(common.GetTypeAssertError("string", v, "subscription.CandlesPeriodKey")) + } else { + fundingPeriod = ":" + s + } + case "key", "symbol", "len": + panic(fmt.Errorf("%w: %s", errParamNotAllowed, k)) // Ensure user's Params aren't silently overwritten + default: + req[k] = v + } + } + + if s.Levels != 0 { + req["len"] = s.Levels + } + + prefix := "t" + if a == asset.MarginFunding { + prefix = "f" + } + + pairFmt := currency.PairFormat{Uppercase: true} + if needsDelimiter := p.Len() > 6; needsDelimiter { + pairFmt.Delimiter = ":" + } + symbol := p.Format(pairFmt).String() + if c == wsCandles { + req["key"] = "trade:" + s.Interval.Short() + ":" + prefix + symbol + fundingPeriod + } else { + req["symbol"] = prefix + symbol + } + + return req +} + +// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map to avoid duplicate subscriptions +func removeSpotFromMargin(ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string { + if p, ok := ap[asset.Margin]; ok { + ap[asset.Margin] = p.Remove(spotPairs...) + } + return "" +} + +const subTplText = ` +{{- removeSpotFromMargin $.AssetPairs -}} +{{ range $asset, $pairs := $.AssetPairs }} + {{- range $p := $pairs -}} + {{- subToMap $.S $asset $p | mustToJson }} + {{- $.PairSeparator }} + {{- end -}} + {{ $.AssetSeparator }} +{{- end -}} +` diff --git a/exchanges/bitfinex/bitfinex_wrapper.go b/exchanges/bitfinex/bitfinex_wrapper.go index cf52cb3acfe..97026e7fdfc 100644 --- a/exchanges/bitfinex/bitfinex_wrapper.go +++ b/exchanges/bitfinex/bitfinex_wrapper.go @@ -159,6 +159,7 @@ func (b *Bitfinex) SetDefaults() { GlobalResultLimit: 10000, }, }, + Subscriptions: defaultSubscriptions.Clone(), } b.Requester, err = request.New(b.Name, @@ -208,7 +209,7 @@ func (b *Bitfinex) Setup(exch *config.Exchange) error { Connector: b.WsConnect, Subscriber: b.Subscribe, Unsubscriber: b.Unsubscribe, - GenerateSubscriptions: b.GenerateDefaultSubscriptions, + GenerateSubscriptions: b.generateSubscriptions, Features: &b.Features.Supports.WebsocketCapabilities, OrderbookBufferConfig: buffer.Config{ UpdateEntriesByID: true, diff --git a/exchanges/subscription/README.md b/exchanges/subscription/README.md index bbc9492c41a..e7616b931e9 100644 --- a/exchanges/subscription/README.md +++ b/exchanges/subscription/README.md @@ -61,14 +61,15 @@ Example: Assets and pairs should be output in the sequence in AssetPairs since text/template range function uses an sorted order for map keys. -Template functions may modify AssetPairs to update the subscription's pairs, e.g. Filtering out margin pairs already in spot subscription +Template functions may modify AssetPairs to update the subscription's pairs, e.g. Filtering out margin pairs already in spot subscription. We use separators like this because it allows mono-templates to decide at runtime whether to fan out. -See exchanges/subscription/testdata/subscriptions.tmpl for an example mono-template showcasing various features +See exchanges/subscription/testdata/subscriptions.tmpl for an example mono-template showcasing various features. Templates do not need to worry about joining around separators; Trailing separators will be stripped automatically. +Template functions should panic to handle errors. They are caught by text/template and turned into errors for use in `subscription.expandTemplate`. ## Contribution diff --git a/internal/testing/subscriptions/subscriptions.go b/internal/testing/subscriptions/subscriptions.go index bd032e299ed..f32ee1acd42 100644 --- a/internal/testing/subscriptions/subscriptions.go +++ b/internal/testing/subscriptions/subscriptions.go @@ -1,4 +1,4 @@ -package subscriptionstest +package subscription import ( "maps" diff --git a/testdata/configtest.json b/testdata/configtest.json index 46a7096d550..30c83b610d6 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -579,19 +579,41 @@ "uppercase": true }, "useGlobalFormat": true, - "assetTypes": [ - "spot" - ], "pairs": { "spot": { + "assetEnabled": true, "enabled": "BTCUSD,LTCUSD,LTCBTC,ETHUSD,ETHBTC", "available": "BTCUSD,LTCUSD,LTCBTC,ETHUSD,ETHBTC,ETCBTC,ETCUSD,RRTUSD,RRTBTC,ZECUSD,ZECBTC,XMRUSD,XMRBTC,DSHUSD,DSHBTC,BTCEUR,BTCJPY,XRPUSD,XRPBTC,IOTUSD,IOTBTC,IOTETH,EOSUSD,EOSBTC,EOSETH,SANUSD,SANBTC,SANETH,OMGUSD,OMGBTC,OMGETH,NEOUSD,NEOBTC,NEOETH,ETPUSD,ETPBTC,ETPETH,QTMUSD,QTMBTC,QTMETH,AVTUSD,AVTBTC,AVTETH,EDOUSD,EDOBTC,EDOETH,BTGUSD,BTGBTC,DATUSD,DATBTC,DATETH,QSHUSD,QSHBTC,QSHETH,YYWUSD,YYWBTC,YYWETH,GNTUSD,GNTBTC,GNTETH,SNTUSD,SNTBTC,SNTETH,IOTEUR,BATUSD,BATBTC,BATETH,MNAUSD,MNABTC,MNAETH,FUNUSD,FUNBTC,FUNETH,ZRXUSD,ZRXBTC,ZRXETH,TNBUSD,TNBBTC,TNBETH,SPKUSD,SPKBTC,SPKETH,TRXUSD,TRXBTC,TRXETH,RCNUSD,RCNBTC,RCNETH,RLCUSD,RLCBTC,RLCETH,AIDUSD,AIDBTC,AIDETH,SNGUSD,SNGBTC,SNGETH,REPUSD,REPBTC,REPETH,ELFUSD,ELFBTC,ELFETH,NECUSD,NECBTC,NECETH,BTCGBP,ETHEUR,ETHJPY,ETHGBP,NEOEUR,NEOJPY,NEOGBP,EOSEUR,EOSJPY,EOSGBP,IOTJPY,IOTGBP,IOSUSD,IOSBTC,IOSETH,AIOUSD,AIOBTC,AIOETH,REQUSD,REQBTC,REQETH,RDNUSD,RDNBTC,RDNETH,LRCUSD,LRCBTC,LRCETH,WAXUSD,WAXBTC,WAXETH,DAIUSD,DAIBTC,DAIETH,AGIUSD,AGIBTC,AGIETH,BFTUSD,BFTBTC,BFTETH,MTNUSD,MTNBTC,MTNETH,ODEUSD,ODEBTC,ODEETH,ANTUSD,ANTBTC,ANTETH,DTHUSD,DTHBTC,DTHETH,MITUSD,MITBTC,MITETH,STJUSD,STJBTC,STJETH,XLMUSD,XLMEUR,XLMJPY,XLMGBP,XLMBTC,XLMETH,XVGUSD,XVGEUR,XVGJPY,XVGGBP,XVGBTC,XVGETH,BCIUSD,BCIBTC,MKRUSD,MKRBTC,MKRETH,KNCUSD,KNCBTC,KNCETH,POAUSD,POABTC,POAETH,EVTUSD,LYMUSD,LYMBTC,LYMETH,UTKUSD,UTKBTC,UTKETH,VEEUSD,VEEBTC,VEEETH,DADUSD,DADBTC,DADETH,ORSUSD,ORSBTC,ORSETH,AUCUSD,AUCBTC,AUCETH,POYUSD,POYBTC,POYETH,FSNUSD,FSNBTC,FSNETH,CBTUSD,CBTBTC,CBTETH,ZCNUSD,ZCNBTC,ZCNETH,SENUSD,SENBTC,SENETH,NCAUSD,NCABTC,NCAETH,CNDUSD,CNDBTC,CNDETH,CTXUSD,CTXBTC,CTXETH,PAIUSD,PAIBTC,SEEUSD,SEEBTC,SEEETH,ESSUSD,ESSBTC,ESSETH,ATMUSD,ATMBTC,ATMETH,HOTUSD,HOTBTC,HOTETH,DTAUSD,DTABTC,DTAETH,IQXUSD,IQXBTC,IQXEOS,WPRUSD,WPRBTC,WPRETH,ZILUSD,ZILBTC,ZILETH,BNTUSD,BNTBTC,BNTETH,ABSUSD,ABSETH,XRAUSD,XRAETH,MANUSD,MANETH,BBNUSD,BBNETH,NIOUSD,NIOETH,DGXUSD,DGXETH,VETUSD,VETBTC,VETETH,UTNUSD,UTNETH,TKNUSD,TKNETH,GOTUSD,GOTEUR,GOTETH,XTZUSD,XTZBTC,CNNUSD,CNNETH,BOXUSD,BOXETH,TRXEUR,TRXGBP,TRXJPY,MGOUSD,MGOETH,RTEUSD,RTEETH,YGGUSD,YGGETH,MLNUSD,MLNETH,WTCUSD,WTCETH,CSXUSD,CSXETH,OMNUSD,OMNBTC,INTUSD,INTETH,DRNUSD,DRNETH,PNKUSD,PNKETH,DGBUSD,DGBBTC,BSVUSD,BSVBTC,BABUSD,BABBTC,WLOUSD,WLOXLM,VLDUSD,VLDETH,ENJUSD,ENJETH,ONLUSD,ONLETH,RBTUSD,RBTBTC,USTUSD,EUTEUR,EUTUSD,GSDUSD,UDCUSD,TSDUSD,PAXUSD,RIFUSD,RIFBTC,PASUSD,PASETH,VSYUSD,VSYBTC,ZRXDAI,MKRDAI,OMGDAI,BTTUSD,BTTBTC,BTCUST,ETHUST,CLOUSD,CLOBTC,IMPUSD,IMPETH,LTCUST,EOSUST,BABUST,SCRUSD,SCRETH,GNOUSD,GNOETH,GENUSD,GENETH,ATOUSD,ATOBTC,ATOETH,WBTUSD,XCHUSD,EUSUSD,WBTETH,XCHETH,EUSETH,LEOUSD,LEOBTC,LEOUST,LEOEOS,LEOETH,ASTUSD,ASTETH,FOAUSD,FOAETH,UFRUSD,UFRETH,ZBTUSD,ZBTUST,OKBUSD,USKUSD,GTXUSD,KANUSD,OKBUST,OKBETH,OKBBTC,USKUST,USKETH,USKBTC,USKEOS,GTXUST,KANUST,AMPUSD,ALGUSD,ALGBTC,ALGUST,BTCXCH,SWMUSD,SWMETH,TRIUSD,TRIETH,LOOUSD,LOOETH,AMPUST,DUSK:USD,DUSK:BTC,UOSUSD,UOSBTC,RRBUSD,RRBUST,DTXUSD,DTXUST,AMPBTC,FTTUSD,FTTUST,PAXUST,UDCUST,TSDUST,BTC:CNHT,UST:CNHT,CNH:CNHT,CHZUSD,CHZUST,BTCF0:USTF0,ETHF0:USTF0" + }, + "margin": { + "assetEnabled": false, + "enabled": "ADA:BTC,FTM:USD", + "available": "ADA:BTC,ADA:USD,ADA:UST,ALG:USD,ALG:UST,APE:USD,APE:UST,APT:USD,APT:UST,ATO:USD,ATO:UST,AVAX:BTC,AVAX:USD,AVAX:UST,AXS:USD,AXS:UST,BCHN:USD,BTC:EUR,BTC:EUT,BTC:GBP,BTC:JPY,BTC:USD,BTC:UST,COMP:USD,COMP:UST,DAI:USD,DOGE:BTC,DOGE:USD,DOGE:UST,DOT:BTC,DOT:USD,DOT:UST,DSH:BTC,DSH:USD,EGLD:USD,EGLD:UST,EOS:BTC,EOS:ETH,EOS:USD,EOS:UST,ETC:BTC,ETC:USD,ETC:UST,ETH:BTC,ETH:EUR,ETH:EUT,ETH:GBP,ETH:JPY,ETH:USD,ETH:UST,ETHW:USD,ETHW:UST,FIL:USD,FIL:UST,FTM:USD,FTM:UST,IOT:BTC,IOT:USD,LEO:USD,LEO:UST,LINK:USD,LINK:UST,LTC:BTC,LTC:USD,LTC:UST,MATIC:USD,MATIC:UST,MKR:USD,NEO:BTC,NEO:USD,NEO:UST,SHIB:USD,SHIB:UST,SOL:BTC,SOL:USD,SOL:UST,SUSHI:USD,SUSHI:UST,TRX:USD,TRX:UST,UNI:USD,UNI:UST,UST:USD,XAUT:BTC,XAUT:USD,XAUT:UST,XLM:BTC,XLM:USD,XMR:BTC,XMR:USD,XMR:UST,XRP:BTC,XRP:USD,XRP:UST,XTZ:BTC,XTZ:USD,XTZ:UST,YFI:USD,YFI:UST,ZEC:BTC,ZEC:USD,ZRX:USD", + "requestFormat": { + "uppercase": true + }, + "configFormat": { + "uppercase": true, + "delimiter": ":" + } + }, + "marginfunding": { + "assetEnabled": false, + "enabled": "MKR-,AVAX-", + "available": "MKR-,DAI-,USD-,XMR-,DOT-,UNI-,DSH-,ZEC-,ZRX-,UST-,IOT-,SOL-,SHIB-,FTM-,MATIC-,LINK-,BCHN-,EUT-,ADA-,LTC-,APE-,NEO-,APT-,LEO-,YFI-,FIL-,DOGE-,ALG-,SUSHI-,ETC-,TRX-,XTZ-,ETHW-,XRP-,EOS-,XLM-,AVAX-,XAUT-,GBP-,ETH-,BTC-,ATO-,JPY-,EGLD-,EUR-,AXS-,COMP-", + "requestFormat": { + "uppercase": true + }, + "configFormat": { + "uppercase": true, + "delimiter": "-" + } } } }, "api": { - "authenticatedSupport": true, - "authenticatedWebsocketApiSupport": true, + "authenticatedSupport": false, + "authenticatedWebsocketApiSupport": false, "endpoints": { "url": "NON_DEFAULT_HTTP_LINK_TO_EXCHANGE_API", "urlSecondary": "NON_DEFAULT_HTTP_LINK_TO_EXCHANGE_API", @@ -648,7 +670,13 @@ "iban": "DE78660700240057016801", "supportedCurrencies": "JPY,GBP" } - ] + ], + "orderbook": { + "verificationBypass": false, + "websocketBufferLimit": 5, + "websocketBufferEnabled": false, + "publishPeriod": 10000000000 + } }, { "name": "Bitflyer", From bd2cc9d7bb11a097b7af0bb16fe9dbd6fd8c463d Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Mon, 21 Oct 2024 17:05:26 +1100 Subject: [PATCH 2/7] gateio: Small update on fields and subscriptions (#1658) * Cherry_pickable * gateio/websocket: use millisecond time for more accurate push time --------- Co-authored-by: Ryan O'Hara-Reid --- exchanges/gateio/gateio_types.go | 5 +++-- exchanges/gateio/gateio_ws_delivery_futures.go | 1 + exchanges/gateio/gateio_ws_futures.go | 3 ++- exchanges/gateio/gateio_ws_option.go | 1 + 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index 2b3e558c29a..582498668e8 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -708,6 +708,7 @@ type FuturesCandlestick struct { HighestPrice types.Number `json:"h"` LowestPrice types.Number `json:"l"` OpenPrice types.Number `json:"o"` + Sum types.Number `json:"sum"` // Trading volume (unit: Quote currency) // Added for websocket push data Name string `json:"n,omitempty"` @@ -1822,8 +1823,8 @@ type OrderCreateParams struct { Iceberg int64 `json:"iceberg"` Price string `json:"price"` // NOTE: Market orders require string "0" TimeInForce string `json:"tif"` - Text string `json:"text"` - ClosePosition bool `json:"close,omitempty"` + Text string `json:"text,omitempty"` // Omitempty required as payload sent as `text:""` will return error message: Text content not starting with `t-`" + ClosePosition bool `json:"close,omitempty"` // Size needs to be zero if true ReduceOnly bool `json:"reduce_only,omitempty"` AutoSize string `json:"auto_size,omitempty"` Settle string `json:"-"` // Used in URL. diff --git a/exchanges/gateio/gateio_ws_delivery_futures.go b/exchanges/gateio/gateio_ws_delivery_futures.go index 085e0c2cd07..d4e1c8abb33 100644 --- a/exchanges/gateio/gateio_ws_delivery_futures.go +++ b/exchanges/gateio/gateio_ws_delivery_futures.go @@ -102,6 +102,7 @@ func (g *Gateio) GenerateDeliveryFuturesDefaultSubscriptions() (subscription.Lis Channel: channelsToSubscribe[i], Pairs: currency.Pairs{fPair.Upper()}, Params: params, + Asset: asset.DeliveryFutures, }) } } diff --git a/exchanges/gateio/gateio_ws_futures.go b/exchanges/gateio/gateio_ws_futures.go index 00c37ba09f3..6e54cbf3c01 100644 --- a/exchanges/gateio/gateio_ws_futures.go +++ b/exchanges/gateio/gateio_ws_futures.go @@ -145,6 +145,7 @@ func (g *Gateio) GenerateFuturesDefaultSubscriptions(settlement currency.Code) ( Channel: channelsToSubscribe[i], Pairs: currency.Pairs{fPair.Upper()}, Params: params, + Asset: asset.Futures, }) } } @@ -182,7 +183,7 @@ func (g *Gateio) WsHandleFuturesData(_ context.Context, respRaw []byte, a asset. case futuresTradesChannel: return g.processFuturesTrades(respRaw, a) case futuresOrderbookChannel: - return g.processFuturesOrderbookSnapshot(push.Event, push.Result, a, push.Time.Time()) + return g.processFuturesOrderbookSnapshot(push.Event, push.Result, a, push.TimeMs.Time()) case futuresOrderbookTickerChannel: return g.processFuturesOrderbookTicker(push.Result) case futuresOrderbookUpdateChannel: diff --git a/exchanges/gateio/gateio_ws_option.go b/exchanges/gateio/gateio_ws_option.go index b91471d847b..40bc63fe336 100644 --- a/exchanges/gateio/gateio_ws_option.go +++ b/exchanges/gateio/gateio_ws_option.go @@ -163,6 +163,7 @@ getEnabledPairs: Channel: channelsToSubscribe[i], Pairs: currency.Pairs{fPair.Upper()}, Params: params, + Asset: asset.Options, }) } } From 2232340d499b108a93f02d4447167bebce976542 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Tue, 22 Oct 2024 08:21:35 +0200 Subject: [PATCH 3/7] Bitmex: Subscription templating (#1586) * Exchanges: Allow empty batches in ParallelChanOp In keeping with both common.Batch and "It's not my responsibility", ParallelChanOp should just do nothing when given an empty list (and implicitly an empty batch size. Whatever it's going to do, it'll delegate to common.Batch, and this allows us to just inline calls: ``` return common.AppendError( b.ParallelChanOp(subs.Public(), func(l subscription.List) error { return b.manageSubs(wsSubscribeMethod, l, wsPublicStream) }, len(subs)), b.ParallelChanOp(subs.Private(), func(l subscription.List) error { return b.manageSubs(wsSubscribeMethod, l, wsPrivateStream) }, len(subs)), ) ``` * Bitmex: Test config updates * Bitmex: Sub Templating * Bitmex: Enable websocket for tests * Bitmex: Handle subscription errors This switches to multiplexing so that we know which errors belong to which stream, particularly for the auth attempt * Bitmex: Fix ws order side err going to data stream Shouldn't fall into classification error if it's actually a parsing error --- exchanges/bitmex/bitmex_test.go | 148 ++-- exchanges/bitmex/bitmex_websocket.go | 741 ++++++++++----------- exchanges/bitmex/bitmex_websocket_types.go | 8 +- exchanges/bitmex/bitmex_wrapper.go | 15 +- exchanges/exchange.go | 4 - testdata/configtest.json | 42 +- 6 files changed, 459 insertions(+), 499 deletions(-) diff --git a/exchanges/bitmex/bitmex_test.go b/exchanges/bitmex/bitmex_test.go index 0471dcd019b..27ba9a2b346 100644 --- a/exchanges/bitmex/bitmex_test.go +++ b/exchanges/bitmex/bitmex_test.go @@ -25,7 +25,9 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -830,7 +832,7 @@ func TestUpdateTradablePairs(t *testing.T) { func TestWsPositionUpdate(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -838,7 +840,7 @@ func TestWsPositionUpdate(t *testing.T) { "riskValue":87960,"homeNotional":0.0008796,"posState":"Liquidation","maintMargin":263, "unrealisedGrossPnl":-677,"unrealisedPnl":-677,"unrealisedPnlPcnt":-0.0078,"unrealisedRoePcnt":-0.7756, "simpleQty":0.001,"liquidationPrice":1140.1, "timestamp":"2017-04-04T22:07:45.442Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -847,7 +849,7 @@ func TestWsPositionUpdate(t *testing.T) { func TestWsInsertExectuionUpdate(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"execution", + pressXToJSON := []byte(`[0, "public", "public", {"table":"execution", "action":"insert", "data":[{ "execID":"0193e879-cb6f-2891-d099-2c4eb40fee21", @@ -862,27 +864,7 @@ func TestWsInsertExectuionUpdate(t *testing.T) { "text":"Liquidation","trdMatchID":"7f4ab7f6-0006-3234-76f4-ae1385aad00f","execCost":88155,"execComm":66, "homeNotional":-0.00088155,"foreignNotional":1,"transactTime":"2017-04-04T22:07:46.035Z", "timestamp":"2017-04-04T22:07:46.035Z" - }]}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } -} - -func TestWSConnectionHandling(t *testing.T) { - t.Parallel() - pressXToJSON := []byte(`{"info":"Welcome to the BitMEX Realtime API.","version":"1.1.0", - "timestamp":"2015-01-18T10:14:06.802Z","docs":"https://www.bitmex.com/app/wsAPI","heartbeatEnabled":false}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } -} - -func TestWSSubscriptionHandling(t *testing.T) { - t.Parallel() - pressXToJSON := []byte(`{"success":true,"subscribe":"trade:ETHUSD", - "request":{"op":"subscribe","args":["trade:ETHUSD","instrument:ETHUSD"]}}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -891,18 +873,18 @@ func TestWSSubscriptionHandling(t *testing.T) { func TestWSPositionUpdateHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt","currentQty":1, "markPrice":1136.88,"posState":"Liquidated","simpleQty":0.001,"liquidationPrice":1140.1,"bankruptPrice":1134.37, "timestamp":"2017-04-04T22:07:46.019Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } - pressXToJSON = []byte(`{"table":"position", + pressXToJSON = []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -915,7 +897,7 @@ func TestWSPositionUpdateHandling(t *testing.T) { "unrealisedPnlPcnt":0,"unrealisedRoePcnt":0,"simpleQty":0,"simpleCost":0,"simpleValue":0,"avgCostPrice":null, "avgEntryPrice":null,"breakEvenPrice":null,"marginCallPrice":null,"liquidationPrice":null,"bankruptPrice":null, "timestamp":"2017-04-04T22:07:46.140Z" - }]}`) + }]}]`) err = b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -924,7 +906,7 @@ func TestWSPositionUpdateHandling(t *testing.T) { func TestWSOrderbookHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ + pressXToJSON := []byte(`[0, "public", "public", { "table":"orderBookL2_25", "keys":["symbol","id","side"], "types":{"id":"long","price":"float","side":"symbol","size":"long","symbol":"symbol"}, @@ -938,76 +920,60 @@ func TestWSOrderbookHandling(t *testing.T) { {"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":10,"price":50}, {"symbol":"ETHUSD","id":17999996000,"side":"Buy","size":20,"price":40}, {"symbol":"ETHUSD","id":17999997000,"side":"Buy","size":100,"price":30} - ] - }`) + ]}]`) err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"update", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":5,"timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"update", - "data":[ - ] - }`) + "data":[]}]`) err = b.wsHandleData(pressXToJSON) - if err == nil { - t.Error("Expected error") - } + require.ErrorContains(t, err, "empty orderbook") - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"delete", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"delete", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if !errors.Is(err, orderbook.ErrOrderbookInvalid) { - t.Error(err) - } + assert.ErrorIs(t, err, orderbook.ErrOrderbookInvalid) } func TestWSDeleveragePositionUpdateHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt","currentQty":2000, "markPrice":1160.72,"posState":"Deleverage","simpleQty":1.746,"liquidationPrice":1140.1, "timestamp":"2017-04-04T22:16:38.460Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } - pressXToJSON = []byte(`{"table":"position", + pressXToJSON = []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -1021,7 +987,7 @@ func TestWSDeleveragePositionUpdateHandling(t *testing.T) { "simpleQty":0,"simpleCost":0,"simpleValue":0,"simplePnl":0,"simplePnlPcnt":0,"avgCostPrice":null, "avgEntryPrice":null,"breakEvenPrice":null,"marginCallPrice":null,"liquidationPrice":null,"bankruptPrice":null, "timestamp":"2017-04-04T22:16:38.547Z" - }]}`) + }]}]`) err = b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1030,7 +996,7 @@ func TestWSDeleveragePositionUpdateHandling(t *testing.T) { func TestWSDeleverageExecutionInsertHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"execution", + pressXToJSON := []byte(`[0, "public", "public", {"table":"execution", "action":"insert", "data":[{ "execID":"20ad1ff4-c110-a4f2-dd31-f94eaa0701fd", @@ -1045,7 +1011,7 @@ func TestWSDeleverageExecutionInsertHandling(t *testing.T) { "trdMatchID":"1e849b8a-7e88-3c67-a93f-cc654d40e8ba","execCost":172306000,"execComm":-43077, "homeNotional":-1.72306,"foreignNotional":2000,"transactTime":"2017-04-04T22:16:38.472Z", "timestamp":"2017-04-04T22:16:38.472Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1054,7 +1020,7 @@ func TestWSDeleverageExecutionInsertHandling(t *testing.T) { func TestWsTrades(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"MinusTick","trdMatchID":"c427f7a0-6b26-1e10-5c4e-1bd74daf2a73","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"95eb9155-b58c-70e9-44b7-34efe50302e0","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"e607c187-f25c-86bc-cb39-8afff7aaf2d9","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":17,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"0f076814-a57d-9a59-8063-ad6b823a80ac","grossValue":439110,"homeNotional":0.1683835182250396,"foreignNotional":43.49346275752773},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"MinusTick","trdMatchID":"f4ef3dfd-51c4-538f-37c1-e5071ba1c75d","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"81ef136b-8f4a-b1cf-78a8-fffbfa89bf40","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"65a87e8c-7563-34a4-d040-94e8513c5401","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":15,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"1d11a74e-a157-3f33-036d-35a101fba50b","grossValue":387375,"homeNotional":0.14857369255150554,"foreignNotional":38.369156101426306},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":1,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"40d49df1-f018-f66f-4ca5-31d4997641d7","grossValue":25825,"homeNotional":0.009904912836767036,"foreignNotional":2.5579437400950873},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"MinusTick","trdMatchID":"36135b51-73e5-c007-362b-a55be5830c6b","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"6ee19edb-99aa-3030-ba63-933ffb347ade","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"d44be603-cdb8-d676-e3e2-f91fb12b2a70","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":5,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"a14b43b3-50b4-c075-c54d-dfb0165de33d","grossValue":129100,"homeNotional":0.04952456418383518,"foreignNotional":12.787242472266245},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":8,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"3c30e175-5194-320c-8f8c-01636c2f4a32","grossValue":206560,"homeNotional":0.07923930269413629,"foreignNotional":20.45958795562599},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":50,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"5b803378-760b-4919-21fc-bfb275d39ace","grossValue":1291000,"homeNotional":0.49524564183835185,"foreignNotional":127.87242472266244},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":244,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"cf57fec1-c444-b9e5-5e2d-4fb643f4fdb7","grossValue":6300080,"homeNotional":2.416798732171157,"foreignNotional":624.0174326465927}]}`) + pressXToJSON := []byte(`[0, "public", "public", {"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"MinusTick","trdMatchID":"c427f7a0-6b26-1e10-5c4e-1bd74daf2a73","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"95eb9155-b58c-70e9-44b7-34efe50302e0","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"e607c187-f25c-86bc-cb39-8afff7aaf2d9","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":17,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"0f076814-a57d-9a59-8063-ad6b823a80ac","grossValue":439110,"homeNotional":0.1683835182250396,"foreignNotional":43.49346275752773},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"MinusTick","trdMatchID":"f4ef3dfd-51c4-538f-37c1-e5071ba1c75d","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"81ef136b-8f4a-b1cf-78a8-fffbfa89bf40","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"65a87e8c-7563-34a4-d040-94e8513c5401","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":15,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"1d11a74e-a157-3f33-036d-35a101fba50b","grossValue":387375,"homeNotional":0.14857369255150554,"foreignNotional":38.369156101426306},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":1,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"40d49df1-f018-f66f-4ca5-31d4997641d7","grossValue":25825,"homeNotional":0.009904912836767036,"foreignNotional":2.5579437400950873},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"MinusTick","trdMatchID":"36135b51-73e5-c007-362b-a55be5830c6b","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"6ee19edb-99aa-3030-ba63-933ffb347ade","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"d44be603-cdb8-d676-e3e2-f91fb12b2a70","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":5,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"a14b43b3-50b4-c075-c54d-dfb0165de33d","grossValue":129100,"homeNotional":0.04952456418383518,"foreignNotional":12.787242472266245},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":8,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"3c30e175-5194-320c-8f8c-01636c2f4a32","grossValue":206560,"homeNotional":0.07923930269413629,"foreignNotional":20.45958795562599},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":50,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"5b803378-760b-4919-21fc-bfb275d39ace","grossValue":1291000,"homeNotional":0.49524564183835185,"foreignNotional":127.87242472266244},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":244,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"cf57fec1-c444-b9e5-5e2d-4fb643f4fdb7","grossValue":6300080,"homeNotional":2.416798732171157,"foreignNotional":624.0174326465927}]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1357,3 +1323,53 @@ func TestGetCurrencyTradeURL(t *testing.T) { assert.NotEmpty(t, resp) } } + +func TestGenerateSubscriptions(t *testing.T) { + t.Parallel() + + b := new(Bitmex) + require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") + + p := currency.Pairs{ + currency.NewPair(currency.ETH, currency.USD), + currency.NewPair(currency.BCH, currency.NewCode("Z19")), + } + + exp := subscription.List{ + {QualifiedChannel: bitmexWSOrderbookL2 + ":" + p[1].String(), Channel: bitmexWSOrderbookL2, Asset: asset.Futures, Pairs: p[1:2]}, + {QualifiedChannel: bitmexWSOrderbookL2 + ":" + p[0].String(), Channel: bitmexWSOrderbookL2, Asset: asset.PerpetualContract, Pairs: p[:1]}, + {QualifiedChannel: bitmexWSTrade + ":" + p[1].String(), Channel: bitmexWSTrade, Asset: asset.Futures, Pairs: p[1:2]}, + {QualifiedChannel: bitmexWSTrade + ":" + p[0].String(), Channel: bitmexWSTrade, Asset: asset.PerpetualContract, Pairs: p[:1]}, + {QualifiedChannel: bitmexWSAffiliate, Channel: bitmexWSAffiliate, Authenticated: true}, + {QualifiedChannel: bitmexWSOrder, Channel: bitmexWSOrder, Authenticated: true}, + {QualifiedChannel: bitmexWSMargin, Channel: bitmexWSMargin, Authenticated: true}, + {QualifiedChannel: bitmexWSTransact, Channel: bitmexWSTransact, Authenticated: true}, + {QualifiedChannel: bitmexWSWallet, Channel: bitmexWSWallet, Authenticated: true}, + {QualifiedChannel: bitmexWSExecution + ":" + p[0].String(), Channel: bitmexWSExecution, Authenticated: true, Asset: asset.PerpetualContract, Pairs: p[:1]}, + {QualifiedChannel: bitmexWSPosition + ":" + p[0].String(), Channel: bitmexWSPosition, Authenticated: true, Asset: asset.PerpetualContract, Pairs: p[:1]}, + } + + b.Websocket.SetCanUseAuthenticatedEndpoints(true) + subs, err := b.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + testsubs.EqualLists(t, exp, subs) +} + +func TestSubscribe(t *testing.T) { + t.Parallel() + b := new(Bitmex) + require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") + subs, err := b.generateSubscriptions() // Note: We grab this before it's overwritten by SetupWs + require.NoError(t, err, "generateSubscriptions must not error") + testexch.SetupWs(t, b) + err = b.Subscribe(subs) + require.NoError(t, err, "Subscribe should not error") + for _, s := range subs { + assert.Equalf(t, subscription.SubscribedState, s.State(), "%s state should be subscribed", s.QualifiedChannel) + } + err = b.Unsubscribe(subs) + require.NoError(t, err, "Unsubscribe should not error") + for _, s := range subs { + assert.Equalf(t, subscription.UnsubscribedState, s.State(), "%s state should be unsusbscribed", s.QualifiedChannel) + } +} diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index c2c5c71f57c..c6297404116 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -8,9 +8,12 @@ import ( "net/http" "strconv" "strings" + "text/template" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/crypto" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" @@ -24,7 +27,7 @@ import ( ) const ( - bitmexWSURL = "wss://www.bitmex.com/realtime" + bitmexWSURL = "wss://www.bitmex.com/realtimemd" // Public Subscription Channels bitmexWSAnnouncement = "announcement" @@ -66,9 +69,16 @@ const ( bitmexActionUpdateData = "update" ) -var subscriptionNames = map[string]string{ - subscription.OrderbookChannel: bitmexWSOrderbookL2, - subscription.AllTradesChannel: bitmexWSTrade, +var defaultSubscriptions = subscription.List{ + {Enabled: true, Channel: bitmexWSOrderbookL2, Asset: asset.All}, + {Enabled: true, Channel: bitmexWSTrade, Asset: asset.All}, + {Enabled: true, Channel: bitmexWSAffiliate, Authenticated: true}, + {Enabled: true, Channel: bitmexWSOrder, Authenticated: true}, + {Enabled: true, Channel: bitmexWSMargin, Authenticated: true}, + {Enabled: true, Channel: bitmexWSTransact, Authenticated: true}, + {Enabled: true, Channel: bitmexWSWallet, Authenticated: true}, + {Enabled: true, Channel: bitmexWSExecution, Authenticated: true, Asset: asset.PerpetualContract}, + {Enabled: true, Channel: bitmexWSPosition, Authenticated: true, Asset: asset.PerpetualContract}, } // WsConnect initiates a new websocket connection @@ -77,35 +87,20 @@ func (b *Bitmex) WsConnect() error { return stream.ErrWebsocketNotEnabled } var dialer websocket.Dialer - err := b.Websocket.Conn.Dial(&dialer, http.Header{}) - if err != nil { - return err - } - - resp := b.Websocket.Conn.ReadMessage() - if resp.Raw == nil { - return errors.New("connection closed") - } - var welcomeResp WebsocketWelcome - err = json.Unmarshal(resp.Raw, &welcomeResp) - if err != nil { + if err := b.Websocket.Conn.Dial(&dialer, http.Header{}); err != nil { return err } - if b.Verbose { - log.Debugf(log.ExchangeSys, - "Successfully connected to Bitmex %s at time: %s Limit: %d", - welcomeResp.Info, - welcomeResp.Timestamp, - welcomeResp.Limit.Remaining) - } - b.Websocket.Wg.Add(1) go b.wsReadData() + ctx := context.TODO() + if err := b.wsOpenStream(ctx, b.Websocket.Conn, wsPublicStream); err != nil { + return err + } + if b.Websocket.CanUseAuthenticatedEndpoints() { - err = b.websocketSendAuth(context.TODO()) - if err != nil { + if err := b.websocketSendAuth(ctx); err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", b.Name, err) } @@ -114,6 +109,31 @@ func (b *Bitmex) WsConnect() error { return nil } +const ( + wsPublicStream = "public" + wsPrivateStream = "private" + wsSubscribeOp = "subscribe" + wsUnsubscribeOp = "unsubscribe" + wsMsgPacket = 0 + wsOpenPacket = 1 + wsClosePacket = 2 +) + +func (b *Bitmex) wsOpenStream(ctx context.Context, c stream.Connection, name string) error { + resp, err := c.SendMessageReturnResponse(ctx, request.Unset, "open:"+name, []any{wsOpenPacket, name, name}) + if err != nil { + return err + } + var welcomeResp WebsocketWelcome + if err := json.Unmarshal(resp, &welcomeResp); err != nil { + return err + } + if b.Verbose { + log.Debugf(log.ExchangeSys, "Successfully connected to Bitmex %s websocket API at time: %s Limit: %d", name, welcomeResp.Timestamp, welcomeResp.Limit.Remaining) + } + return nil +} + // wsReadData receives and passes on websocket messages for processing func (b *Bitmex) wsReadData() { defer b.Websocket.Wg.Done() @@ -131,338 +151,292 @@ func (b *Bitmex) wsReadData() { } func (b *Bitmex) wsHandleData(respRaw []byte) error { - quickCapture := make(map[string]interface{}) - err := json.Unmarshal(respRaw, &quickCapture) + var err error + msg, _, _, err := jsonparser.Get(respRaw, "[3]") if err != nil { - return err + return fmt.Errorf("unknown message format: %s", respRaw) + } + // We don't need to know about errors, since we're looking optimistically into the json + op, _ := jsonparser.GetString(msg, "request", "op") + errMsg, _ := jsonparser.GetString(msg, "error") + success, _ := jsonparser.GetBoolean(msg, "success") + version, _ := jsonparser.GetString(msg, "version") + switch { + case version != "": + op = "open" + fallthrough + case errMsg != "", success: + streamID, e2 := jsonparser.GetString(respRaw, "[1]") + if e2 != nil { + return fmt.Errorf("%w parsing stream", e2) + } + if !b.Websocket.Match.IncomingWithData(op+":"+streamID, msg) { + return fmt.Errorf("%w: %s:%s", stream.ErrNoMessageListener, op, streamID) + } + return nil } - var respError WebsocketErrorResponse - if _, ok := quickCapture["status"]; ok { - err = json.Unmarshal(respRaw, &respError) - if err != nil { + tableName, err := jsonparser.GetString(msg, "table") + if err != nil { + // Anything that's not a table isn't expected + return fmt.Errorf("unknown message format: %s", msg) + } + + switch tableName { + case bitmexWSOrderbookL2, bitmexWSOrderbookL225, bitmexWSOrderbookL10: + var orderbooks OrderBookData + if err := json.Unmarshal(msg, &orderbooks); err != nil { return err } - } + if len(orderbooks.Data) == 0 { + return fmt.Errorf("empty orderbook data received: %s", msg) + } - if _, ok := quickCapture["success"]; ok { - var decodedResp WebsocketSubscribeResp - err = json.Unmarshal(respRaw, &decodedResp) + pair, a, err := b.GetPairAndAssetTypeRequestFormatted(orderbooks.Data[0].Symbol) if err != nil { return err } - if decodedResp.Success { - if len(quickCapture) == 3 { - if b.Verbose { - log.Debugf(log.ExchangeSys, "%s websocket: Successfully subscribed to %s", - b.Name, decodedResp.Subscribe) - } - } else { - b.Websocket.SetCanUseAuthenticatedEndpoints(true) - if b.Verbose { - log.Debugf(log.ExchangeSys, "%s websocket: Successfully authenticated websocket connection", - b.Name) - } - } + err = b.processOrderbook(orderbooks.Data, orderbooks.Action, pair, a) + if err != nil { + return err + } + case bitmexWSTrade: + if !b.IsSaveTradeDataEnabled() { return nil } - - b.Websocket.DataHandler <- fmt.Errorf("%s websocket error: Unable to subscribe %s", - b.Name, decodedResp.Subscribe) - } else if _, ok := quickCapture["table"]; ok { - var decodedResp WebsocketMainResponse - err = json.Unmarshal(respRaw, &decodedResp) - if err != nil { + var tradeHolder TradeData + if err := json.Unmarshal(msg, &tradeHolder); err != nil { return err } - switch decodedResp.Table { - case bitmexWSOrderbookL2, bitmexWSOrderbookL225, bitmexWSOrderbookL10: - var orderbooks OrderBookData - err = json.Unmarshal(respRaw, &orderbooks) + var trades []trade.Data + for i := range tradeHolder.Data { + if tradeHolder.Data[i].Price == 0 { + // Please note that indices (symbols starting with .) post trades at intervals to the trade feed. + // These have a size of 0 and are used only to indicate a changing price. + continue + } + p, a, err := b.GetPairAndAssetTypeRequestFormatted(tradeHolder.Data[i].Symbol) if err != nil { return err } - if len(orderbooks.Data) == 0 { - return fmt.Errorf("%s - Empty orderbook data received: %s", b.Name, respRaw) - } - - var pair currency.Pair - var a asset.Item - pair, a, err = b.GetPairAndAssetTypeRequestFormatted(orderbooks.Data[0].Symbol) + oSide, err := order.StringToOrderSide(tradeHolder.Data[i].Side) if err != nil { return err } - err = b.processOrderbook(orderbooks.Data, orderbooks.Action, pair, a) + trades = append(trades, trade.Data{ + TID: tradeHolder.Data[i].TrdMatchID, + Exchange: b.Name, + CurrencyPair: p, + AssetType: a, + Side: oSide, + Price: tradeHolder.Data[i].Price, + Amount: float64(tradeHolder.Data[i].Size), + Timestamp: tradeHolder.Data[i].Timestamp, + }) + } + return b.AddTradesToBuffer(trades...) + case bitmexWSAnnouncement: + var announcement AnnouncementData + if err := json.Unmarshal(msg, &announcement); err != nil { + return err + } + + if announcement.Action == bitmexActionInitialData { + return nil + } + + b.Websocket.DataHandler <- announcement.Data + case bitmexWSAffiliate: + var response WsAffiliateResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSInstrument: + // ticker + case bitmexWSExecution: + // trades of an order + var response WsExecutionResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + + for i := range response.Data { + p, a, err := b.GetPairAndAssetTypeRequestFormatted(response.Data[i].Symbol) if err != nil { return err } - case bitmexWSTrade: - if !b.IsSaveTradeDataEnabled() { - return nil - } - var tradeHolder TradeData - err = json.Unmarshal(respRaw, &tradeHolder) + oStatus, err := order.StringToOrderStatus(response.Data[i].OrdStatus) if err != nil { - return err + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + Err: err, + } } - var trades []trade.Data - for i := range tradeHolder.Data { - if tradeHolder.Data[i].Price == 0 { - // Please note that indices (symbols starting with .) post trades at intervals to the trade feed. - // These have a size of 0 and are used only to indicate a changing price. - continue + oSide, err := order.StringToOrderSide(response.Data[i].Side) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + Err: err, } - var p currency.Pair - var a asset.Item - p, a, err = b.GetPairAndAssetTypeRequestFormatted(tradeHolder.Data[i].Symbol) + } + b.Websocket.DataHandler <- &order.Detail{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + AccountID: strconv.FormatInt(response.Data[i].Account, 10), + AssetType: a, + Pair: p, + Status: oStatus, + Trades: []order.TradeHistory{ + { + Price: response.Data[i].Price, + Amount: response.Data[i].OrderQuantity, + Exchange: b.Name, + TID: response.Data[i].ExecID, + Side: oSide, + Timestamp: response.Data[i].Timestamp, + IsMaker: false, + }, + }, + } + } + case bitmexWSOrder: + var response WsOrderResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + switch response.Action { + case "update", "insert": + for x := range response.Data { + p, a, err := b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) if err != nil { return err } - var oSide order.Side - oSide, err = order.StringToOrderSide(tradeHolder.Data[i].Side) + oSide, err := order.StringToOrderSide(response.Data[x].Side) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, + OrderID: response.Data[x].OrderID, Err: err, } } - - trades = append(trades, trade.Data{ - TID: tradeHolder.Data[i].TrdMatchID, - Exchange: b.Name, - CurrencyPair: p, - AssetType: a, - Side: oSide, - Price: tradeHolder.Data[i].Price, - Amount: float64(tradeHolder.Data[i].Size), - Timestamp: tradeHolder.Data[i].Timestamp, - }) - } - return b.AddTradesToBuffer(trades...) - case bitmexWSAnnouncement: - var announcement AnnouncementData - err = json.Unmarshal(respRaw, &announcement) - if err != nil { - return err - } - - if announcement.Action == bitmexActionInitialData { - return nil - } - - b.Websocket.DataHandler <- announcement.Data - case bitmexWSAffiliate: - var response WsAffiliateResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSInstrument: - // ticker - case bitmexWSExecution: - // trades of an order - var response WsExecutionResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - - for i := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetPairAndAssetTypeRequestFormatted(response.Data[i].Symbol) - if err != nil { - return err - } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[i].OrdStatus) + oType, err := order.StringToOrderType(response.Data[x].OrderType) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, - OrderID: response.Data[i].OrderID, + OrderID: response.Data[x].OrderID, Err: err, } } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[i].Side) + oStatus, err := order.StringToOrderStatus(response.Data[x].OrderStatus) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, - OrderID: response.Data[i].OrderID, + OrderID: response.Data[x].OrderID, Err: err, } } b.Websocket.DataHandler <- &order.Detail{ + Price: response.Data[x].Price, + Amount: response.Data[x].OrderQuantity, Exchange: b.Name, - OrderID: response.Data[i].OrderID, - AccountID: strconv.FormatInt(response.Data[i].Account, 10), + OrderID: response.Data[x].OrderID, + AccountID: strconv.FormatInt(response.Data[x].Account, 10), + Type: oType, + Side: oSide, + Status: oStatus, AssetType: a, + Date: response.Data[x].TransactTime, Pair: p, - Status: oStatus, - Trades: []order.TradeHistory{ - { - Price: response.Data[i].Price, - Amount: response.Data[i].OrderQuantity, - Exchange: b.Name, - TID: response.Data[i].ExecID, - Side: oSide, - Timestamp: response.Data[i].Timestamp, - IsMaker: false, - }, - }, } } - case bitmexWSOrder: - var response WsOrderResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - switch response.Action { - case "update", "insert": - for x := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) - if err != nil { - return err - } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[x].Side) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oType order.Type - oType, err = order.StringToOrderType(response.Data[x].OrderType) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - b.Websocket.DataHandler <- &order.Detail{ - Price: response.Data[x].Price, - Amount: response.Data[x].OrderQuantity, - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - AccountID: strconv.FormatInt(response.Data[x].Account, 10), - Type: oType, - Side: oSide, - Status: oStatus, - AssetType: a, - Date: response.Data[x].TransactTime, - Pair: p, - } + case "delete": + for x := range response.Data { + p, a, err := b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) + if err != nil { + return err } - case "delete": - for x := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) - if err != nil { - return err - } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[x].Side) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oType order.Type - oType, err = order.StringToOrderType(response.Data[x].OrderType) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } + var oSide order.Side + oSide, err = order.StringToOrderSide(response.Data[x].Side) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } + } + var oType order.Type + oType, err = order.StringToOrderType(response.Data[x].OrderType) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, } - b.Websocket.DataHandler <- &order.Detail{ - Price: response.Data[x].Price, - Amount: response.Data[x].OrderQuantity, - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - AccountID: strconv.FormatInt(response.Data[x].Account, 10), - Type: oType, - Side: oSide, - Status: oStatus, - AssetType: a, - Date: response.Data[x].TransactTime, - Pair: p, + } + var oStatus order.Status + oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, } } - default: - b.Websocket.DataHandler <- fmt.Errorf("%s - Unsupported order update %+v", b.Name, response) - } - case bitmexWSMargin: - var response WsMarginResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSPosition: - var response WsPositionResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - - case bitmexWSPrivateNotifications: - var response WsPrivateNotificationsResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSTransact: - var response WsTransactResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSWallet: - var response WsWalletResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err + b.Websocket.DataHandler <- &order.Detail{ + Price: response.Data[x].Price, + Amount: response.Data[x].OrderQuantity, + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + AccountID: strconv.FormatInt(response.Data[x].Account, 10), + Type: oType, + Side: oSide, + Status: oStatus, + AssetType: a, + Date: response.Data[x].TransactTime, + Pair: p, + } } - b.Websocket.DataHandler <- response default: - b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(respRaw)} - return nil + b.Websocket.DataHandler <- fmt.Errorf("%s - Unsupported order update %+v", b.Name, response) + } + case bitmexWSMargin: + var response WsMarginResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSPosition: + var response WsPositionResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + case bitmexWSPrivateNotifications: + var response WsPrivateNotificationsResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSTransact: + var response WsTransactResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSWallet: + var response WsWalletResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err } + b.Websocket.DataHandler <- response + default: + b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(msg)} } + return nil } @@ -543,96 +517,66 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, p currency. return nil } -// generateSubscriptions returns Adds default subscriptions to websocket to be handled by ManageSubscriptions() +// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature func (b *Bitmex) generateSubscriptions() (subscription.List, error) { - authed := b.Websocket.CanUseAuthenticatedEndpoints() - - assetPairs := map[asset.Item]currency.Pairs{} - for _, a := range b.GetAssetTypes(true) { - p, err := b.GetEnabledPairs(a) - if err != nil { - return nil, err - } - f, err := b.GetPairFormat(a, true) - if err != nil { - return nil, err - } - assetPairs[a] = p.Format(f) - } - - subs := subscription.List{} - for _, baseSub := range b.Features.Subscriptions { - if !authed && baseSub.Authenticated { - continue - } - - if baseSub.Asset == asset.Empty { - // Skip pair handling for subs which don't have an asset - subs = append(subs, baseSub.Clone()) - continue - } - - for a, p := range assetPairs { - if baseSub.Channel == bitmexWSOrderbookL2 && a == asset.Index { - continue // There are no L2 orderbook for index assets - } - if baseSub.Asset != asset.All && baseSub.Asset != a { - continue - } - s := baseSub.Clone() - s.Asset = a - s.Pairs = p - subs = append(subs, s) - } - } + return b.Features.Subscriptions.ExpandTemplates(b) +} - return subs, nil +// GetSubscriptionTemplate returns a subscription channel template +func (b *Bitmex) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(template.FuncMap{ + "channelName": channelName, + }).Parse(subTplText) } // Subscribe subscribes to a websocket channel func (b *Bitmex) Subscribe(subs subscription.List) error { - req := WebsocketRequest{ - Command: "subscribe", - } - for _, s := range subs { - for _, p := range s.Pairs { - cName := channelName(s.Channel) - req.Arguments = append(req.Arguments, cName+":"+p.String()) - } - } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) - if err == nil { - err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, subs...) - } - return err + return common.AppendError( + b.ParallelChanOp(subs.Public(), func(l subscription.List) error { return b.manageSubs(wsSubscribeOp, l, wsPublicStream) }, len(subs)), + b.ParallelChanOp(subs.Private(), func(l subscription.List) error { return b.manageSubs(wsSubscribeOp, l, wsPrivateStream) }, len(subs)), + ) } // Unsubscribe sends a websocket message to stop receiving data from the channel func (b *Bitmex) Unsubscribe(subs subscription.List) error { + return common.AppendError( + b.ParallelChanOp(subs.Public(), func(l subscription.List) error { return b.manageSubs(wsUnsubscribeOp, l, wsPublicStream) }, len(subs)), + b.ParallelChanOp(subs.Private(), func(l subscription.List) error { return b.manageSubs(wsUnsubscribeOp, l, wsPrivateStream) }, len(subs)), + ) +} + +func (b *Bitmex) manageSubs(op string, subs subscription.List, stream string) error { req := WebsocketRequest{ - Command: "unsubscribe", + Command: op, } - + exp := map[string]*subscription.Subscription{} for _, s := range subs { - for _, p := range s.Pairs { - cName := channelName(s.Channel) - req.Arguments = append(req.Arguments, cName+":"+p.String()) - } - } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) - if err == nil { - err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, subs...) + req.Arguments = append(req.Arguments, s.QualifiedChannel) + exp[s.QualifiedChannel] = s } - return err -} - -// channelName converts global channel Names used in config of channel input into bitmex channel names -// returns the name unchanged if no match is found -func channelName(name string) string { - if s, ok := subscriptionNames[name]; ok { - return s + packet := []any{wsMsgPacket, stream, stream, req} + resps, errs := b.Websocket.Conn.SendMessageReturnResponses(context.TODO(), request.Unset, op+":"+stream, packet, len(subs)) + for _, resp := range resps { + if errMsg, _ := jsonparser.GetString(resp, "error"); errMsg != "" { + errs = common.AppendError(errs, errors.New(errMsg)) + } else { + chanName, err := jsonparser.GetString(resp, op) + if err != nil { + errs = common.AppendError(errs, err) + } + s, ok := exp[chanName] + if !ok { + errs = common.AppendError(errs, fmt.Errorf("%w: %s", subscription.ErrNotFound, chanName)) + } else { + if op == wsSubscribeOp { + errs = common.AppendError(errs, b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s)) + } else { + errs = common.AppendError(errs, b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s)) + } + } + } } - return name + return errs } // WebsocketSendAuth sends an authenticated subscription @@ -641,26 +585,33 @@ func (b *Bitmex) websocketSendAuth(ctx context.Context) error { if err != nil { return err } - b.Websocket.SetCanUseAuthenticatedEndpoints(true) timestamp := time.Now().Add(time.Hour * 1).Unix() newTimestamp := strconv.FormatInt(timestamp, 10) - hmac, err := crypto.GetHMAC(crypto.HashSHA256, - []byte("GET/realtime"+newTimestamp), - []byte(creds.Secret)) + hmac, err := crypto.GetHMAC(crypto.HashSHA256, []byte("GET/realtime"+newTimestamp), []byte(creds.Secret)) if err != nil { return err } signature := crypto.HexEncodeToString(hmac) - var sendAuth WebsocketRequest - sendAuth.Command = "authKeyExpires" - sendAuth.Arguments = append(sendAuth.Arguments, creds.Key, timestamp, - signature) - err = b.Websocket.Conn.SendJSONMessage(ctx, request.Unset, sendAuth) + err = b.wsOpenStream(ctx, b.Websocket.Conn, wsPrivateStream) if err != nil { - b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err } + req := WebsocketRequest{ + Command: "authKeyExpires", + Arguments: []any{creds.Key, timestamp, signature}, + } + packet := []any{wsMsgPacket, wsPrivateStream, wsPrivateStream, req} + resp, err := b.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.Command+":"+wsPrivateStream, packet) + if err != nil { + return err + } + if errMsg, _ := jsonparser.GetString(resp, "error"); errMsg != "" { + return errors.New(errMsg) + } + if b.Verbose { + log.Debugf(log.ExchangeSys, "%s websocket: Successfully authenticated websocket connection", b.Name) + } return nil } @@ -678,3 +629,33 @@ func (b *Bitmex) GetActionFromString(s string) (orderbook.Action, error) { } return 0, fmt.Errorf("%s %w", s, orderbook.ErrInvalidAction) } + +// channelName returns the correct channel name for the asset +func channelName(s *subscription.Subscription, a asset.Item) string { + switch s.Channel { + case subscription.OrderbookChannel: + if a == asset.Index { + return "" // There are no L2 orderbook for index assets + } + return bitmexWSOrderbookL2 + case subscription.AllTradesChannel: + return bitmexWSTrade + } + return s.Channel +} + +const subTplText = ` +{{- if $.S.Asset }} + {{ range $asset, $pairs := $.AssetPairs }} + {{- with $name := channelName $.S $asset }} + {{- range $i, $p := $pairs -}} + {{- $name -}} : {{- $p -}} + {{ $.PairSeparator }} + {{- end }} + {{- end }} + {{ $.AssetSeparator }} + {{- end }} +{{- else -}} + {{ channelName $.S $.S.Asset }} +{{- end }} +` diff --git a/exchanges/bitmex/bitmex_websocket_types.go b/exchanges/bitmex/bitmex_websocket_types.go index 26a2a0325bc..5ae86e38c46 100644 --- a/exchanges/bitmex/bitmex_websocket_types.go +++ b/exchanges/bitmex/bitmex_websocket_types.go @@ -1,11 +1,13 @@ package bitmex -import "time" +import ( + "time" +) // WebsocketRequest is the main request type type WebsocketRequest struct { - Command string `json:"op"` - Arguments []interface{} `json:"args"` + Command string `json:"op"` + Arguments []any `json:"args"` } // WebsocketErrorResponse main error response diff --git a/exchanges/bitmex/bitmex_wrapper.go b/exchanges/bitmex/bitmex_wrapper.go index 78a13945fe3..eea24d81326 100644 --- a/exchanges/bitmex/bitmex_wrapper.go +++ b/exchanges/bitmex/bitmex_wrapper.go @@ -28,7 +28,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" - "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -137,19 +136,7 @@ func (b *Bitmex) SetDefaults() { Enabled: exchange.FeaturesEnabled{ AutoPairUpdates: true, }, - Subscriptions: subscription.List{ - {Enabled: true, Channel: bitmexWSAnnouncement}, - {Enabled: true, Channel: bitmexWSOrderbookL2, Asset: asset.All}, - {Enabled: true, Channel: bitmexWSTrade, Asset: asset.All}, - {Enabled: true, Channel: bitmexWSAffiliate, Authenticated: true}, - {Enabled: true, Channel: bitmexWSOrder, Authenticated: true}, - {Enabled: true, Channel: bitmexWSMargin, Authenticated: true}, - {Enabled: true, Channel: bitmexWSPrivateNotifications, Authenticated: true}, - {Enabled: true, Channel: bitmexWSTransact, Authenticated: true}, - {Enabled: true, Channel: bitmexWSWallet, Authenticated: true}, - {Enabled: true, Channel: bitmexWSExecution, Authenticated: true, Asset: asset.PerpetualContract}, - {Enabled: true, Channel: bitmexWSPosition, Authenticated: true, Asset: asset.PerpetualContract}, - }, + Subscriptions: defaultSubscriptions.Clone(), } b.Requester, err = request.New(b.Name, diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 9676364c149..9b0ede6bd62 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -58,7 +58,6 @@ var ( errSymbolCannotBeMatched = errors.New("symbol cannot be matched") errSetDefaultsNotCalled = errors.New("set defaults not called") errExchangeIsNil = errors.New("exchange is nil") - errBatchSizeZero = errors.New("batch size cannot be 0") ) // SetRequester sets the instance of the requester @@ -1811,9 +1810,6 @@ func (b *Base) GetOpenInterest(context.Context, ...key.PairAsset) ([]futures.Ope func (b *Base) ParallelChanOp(channels subscription.List, m func(subscription.List) error, batchSize int) error { wg := sync.WaitGroup{} errC := make(chan error, len(channels)) - if batchSize == 0 { - return errBatchSizeZero - } for _, b := range common.Batch(channels, batchSize) { wg.Add(1) diff --git a/testdata/configtest.json b/testdata/configtest.json index 30c83b610d6..d9c80bf88ae 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -852,26 +852,9 @@ "websocketOrderbookBufferLimit": 5, "baseCurrencies": "USD", "currencyPairs": { - "assetTypes": [ - "perpetualcontract", - "futures", - "downsideprofitcontract", - "upsideprofitcontract" - ], "pairs": { - "downsideprofitcontract": { - "enabled": "XBT7D_D95", - "available": "XBT7D_D95", - "requestFormat": { - "uppercase": true, - "delimiter": "_" - }, - "configFormat": { - "uppercase": true, - "delimiter": "_" - } - }, "futures": { + "assetEnabled": true, "enabled": "BCHZ19", "available": "XRPZ19,BCHZ19,ADAZ19,EOSZ19,TRXZ19,XBTZ19,ETHZ19,LTCZ19", "requestFormat": { @@ -882,6 +865,7 @@ } }, "perpetualcontract": { + "assetEnabled": true, "enabled": "ETHUSD", "available": "XBTUSD,ETHUSD", "requestFormat": { @@ -890,18 +874,6 @@ "configFormat": { "uppercase": true } - }, - "upsideprofitcontract": { - "enabled": "XBT7D_U105", - "available": "XBT7D_U105", - "requestFormat": { - "uppercase": true, - "delimiter": "_" - }, - "configFormat": { - "uppercase": true, - "delimiter": "_" - } } } }, @@ -934,7 +906,7 @@ }, "enabled": { "autoPairUpdates": true, - "websocketAPI": false + "websocketAPI": true } }, "bankAccounts": [ @@ -951,7 +923,13 @@ "iban": "", "supportedCurrencies": "" } - ] + ], + "orderbook": { + "verificationBypass": false, + "websocketBufferLimit": 5, + "websocketBufferEnabled": false, + "publishPeriod": 10000000000 + } }, { "name": "Bitstamp", From 4fc1bf0ad15324b527be0da5a7d58abcb2f5963f Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Wed, 23 Oct 2024 03:05:47 +0200 Subject: [PATCH 4/7] Kucoin: Abstract a subscriptionNames for assets solution (#1669) --- exchanges/kucoin/kucoin_test.go | 20 +++++++++++++++++ exchanges/kucoin/kucoin_websocket.go | 33 +++++++++++++++++----------- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/exchanges/kucoin/kucoin_test.go b/exchanges/kucoin/kucoin_test.go index e059e8fcca9..86c3ac38b33 100644 --- a/exchanges/kucoin/kucoin_test.go +++ b/exchanges/kucoin/kucoin_test.go @@ -4335,3 +4335,23 @@ func TestCancelBatchOrders(t *testing.T) { _, err := ku.CancelBatchOrders(context.Background(), nil) assert.ErrorIs(t, common.ErrFunctionNotSupported, err) } + +func TestChannelName(t *testing.T) { + t.Parallel() + for _, tt := range []struct { + a asset.Item + ch string + exp string + }{ + {asset.Futures, futuresOrderbookDepth50Channel, futuresOrderbookDepth50Channel}, + {asset.Futures, subscription.OrderbookChannel, futuresOrderbookDepth5Channel}, + {asset.Futures, subscription.CandlesChannel, marketCandlesChannel}, + {asset.Futures, subscription.TickerChannel, futuresTickerChannel}, + {asset.Spot, subscription.OrderbookChannel, marketOrderbookDepth5Channel}, + {asset.Spot, subscription.AllTradesChannel, marketMatchChannel}, + {asset.Spot, subscription.CandlesChannel, marketCandlesChannel}, + {asset.Spot, subscription.TickerChannel, marketTickerChannel}, + } { + assert.Equal(t, tt.exp, channelName(&subscription.Subscription{Channel: tt.ch}, tt.a)) + } +} diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index ed97c1ab1a9..8b4aa535e26 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -88,6 +88,19 @@ var ( maxWSOrderbookWorkers = 10 ) +var subscriptionNames = map[asset.Item]map[string]string{ + asset.Futures: { + subscription.TickerChannel: futuresTickerChannel, + subscription.OrderbookChannel: futuresOrderbookDepth5Channel, // This does not require a REST request to get the orderbook. + }, + asset.All: { + subscription.TickerChannel: marketTickerChannel, + subscription.OrderbookChannel: marketOrderbookDepth5Channel, // This does not require a REST request to get the orderbook. + subscription.CandlesChannel: marketCandlesChannel, + subscription.AllTradesChannel: marketMatchChannel, + }, +} + var defaultSubscriptions = subscription.List{ {Enabled: true, Asset: asset.All, Channel: subscription.TickerChannel}, {Enabled: true, Asset: asset.All, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds}, @@ -1664,21 +1677,15 @@ func (ku *Kucoin) checkSubscriptions() { // channelName returns the correct channel name for the asset func channelName(s *subscription.Subscription, a asset.Item) string { - switch s.Channel { - case subscription.TickerChannel: - if a == asset.Futures { - return futuresTickerChannel + if byAsset, hasAsset := subscriptionNames[a]; hasAsset { + if name, ok := byAsset[s.Channel]; ok { + return name } - return marketTickerChannel - case subscription.OrderbookChannel: - if a == asset.Futures { - return futuresOrderbookDepth5Channel + } + if allAssets, hasAll := subscriptionNames[asset.All]; hasAll { + if name, ok := allAssets[s.Channel]; ok { + return name } - return marketOrderbookDepth5Channel // This does not require a REST request to get the orderbook. - case subscription.CandlesChannel: - return marketCandlesChannel // No support in GCT yet for Futures candles - case subscription.AllTradesChannel: - return marketMatchChannel // No support in GCT yet for Futures all trades } return s.Channel } From 4c7c0bc533cbd79fbbddf62dd28b626080a3fec3 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Wed, 23 Oct 2024 06:34:01 +0200 Subject: [PATCH 5/7] Subscriptions: Relax subscription validation for non-existent pairs (#1635) The subscription pairs do not need to be validated as enabled or available. The check was just belt-and-braces and didn't have a specific use-case in mind. Coinbase has a use-case for wanting to subscribe to BTC-USD when it's not enabled. Moreover, it shouldn't be our job to check this. You want a sub expanded with these pairs? Fine. Done. If it doesn't work, you can work out why --- exchanges/subscription/template.go | 16 +++++++--------- exchanges/subscription/template_test.go | 11 ++++++++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/exchanges/subscription/template.go b/exchanges/subscription/template.go index ab0a18b4690..ae994f7919d 100644 --- a/exchanges/subscription/template.go +++ b/exchanges/subscription/template.go @@ -104,6 +104,13 @@ func expandTemplate(e iExchange, s *Subscription, ap assetPairs, assets asset.It subs := List{} + if len(s.Pairs) != 0 { + // We deliberately do not check Availability of sub Pairs because users have edge cases to subscribe to non-existent pairs + for a := range ap { + ap[a] = s.Pairs + } + } + switch s.Asset { case asset.All: subCtx.AssetPairs = ap @@ -118,15 +125,6 @@ func expandTemplate(e iExchange, s *Subscription, ap assetPairs, assets asset.It } } - if len(s.Pairs) != 0 { - for a, pairs := range subCtx.AssetPairs { - if err := pairs.ContainsAll(s.Pairs, true); err != nil { //nolint:govet // Shadow, or gocritic will complain sloppyReassign - return nil, err - } - subCtx.AssetPairs[a] = s.Pairs - } - } - buf := &bytes.Buffer{} if err := t.Execute(buf, subCtx); err != nil { //nolint:govet // Shadow, or gocritic will complain sloppyReassign return nil, err diff --git a/exchanges/subscription/template_test.go b/exchanges/subscription/template_test.go index f14e851ea8f..5025b8df71c 100644 --- a/exchanges/subscription/template_test.go +++ b/exchanges/subscription/template_test.go @@ -84,13 +84,18 @@ func TestExpandTemplates(t *testing.T) { } equalLists(t, exp, got) + // Users can specify pairs which aren't available, even across diverse assets + // Use-case: Coinbasepro user sub for futures BTC-USD would return all BTC pairs and all USD pairs, even though BTC-USD might not be enabled or available + p := currency.Pairs{currency.NewPairWithDelimiter("BEAR", "PEAR", "🐻")} + got, err = List{{Channel: "expand-pairs", Asset: asset.All, Pairs: p}}.ExpandTemplates(e) + require.NoError(t, err, "Must not error with fictional pairs") + exp = List{{Channel: "expand-pairs", QualifiedChannel: "spot-PEARBEAR-expand-pairs@0", Asset: asset.Spot, Pairs: p}} + equalLists(t, exp, got) + // Error cases _, err = List{{Channel: "nil"}}.ExpandTemplates(e) assert.ErrorIs(t, err, errInvalidTemplate, "Should get correct error on nil template") - _, err = List{{Channel: "single-channel", Asset: asset.Spot, Pairs: currency.Pairs{currency.NewPairWithDelimiter("NOPE", "POPE", "🐰")}}}.ExpandTemplates(e) - assert.ErrorIs(t, err, currency.ErrPairNotContainedInAvailablePairs, "Should error correctly when pair not available") - e.tpl = "errors.tmpl" _, err = List{{Channel: "error1"}}.ExpandTemplates(e) From 4d36ea4943f1053e80d9b4e486e337fdf0b23fa8 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Fri, 25 Oct 2024 10:57:34 +0200 Subject: [PATCH 6/7] Bitmex: Fix handling index records in WS trade stream (#1685) Fixes handling for Size == 0 index records sent to trade stream fixes #1684 --- exchanges/bitmex/bitmex_test.go | 16 +++-- exchanges/bitmex/bitmex_websocket.go | 74 ++++++++++++---------- exchanges/bitmex/bitmex_websocket_types.go | 4 +- exchanges/exchange.go | 10 +-- exchanges/exchange_test.go | 8 +-- 5 files changed, 62 insertions(+), 50 deletions(-) diff --git a/exchanges/bitmex/bitmex_test.go b/exchanges/bitmex/bitmex_test.go index 27ba9a2b346..20704c50770 100644 --- a/exchanges/bitmex/bitmex_test.go +++ b/exchanges/bitmex/bitmex_test.go @@ -1020,11 +1020,17 @@ func TestWSDeleverageExecutionInsertHandling(t *testing.T) { func TestWsTrades(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`[0, "public", "public", {"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"MinusTick","trdMatchID":"c427f7a0-6b26-1e10-5c4e-1bd74daf2a73","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"95eb9155-b58c-70e9-44b7-34efe50302e0","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"e607c187-f25c-86bc-cb39-8afff7aaf2d9","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":17,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"0f076814-a57d-9a59-8063-ad6b823a80ac","grossValue":439110,"homeNotional":0.1683835182250396,"foreignNotional":43.49346275752773},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"MinusTick","trdMatchID":"f4ef3dfd-51c4-538f-37c1-e5071ba1c75d","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"81ef136b-8f4a-b1cf-78a8-fffbfa89bf40","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"65a87e8c-7563-34a4-d040-94e8513c5401","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":15,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"1d11a74e-a157-3f33-036d-35a101fba50b","grossValue":387375,"homeNotional":0.14857369255150554,"foreignNotional":38.369156101426306},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":1,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"40d49df1-f018-f66f-4ca5-31d4997641d7","grossValue":25825,"homeNotional":0.009904912836767036,"foreignNotional":2.5579437400950873},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"MinusTick","trdMatchID":"36135b51-73e5-c007-362b-a55be5830c6b","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"6ee19edb-99aa-3030-ba63-933ffb347ade","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"d44be603-cdb8-d676-e3e2-f91fb12b2a70","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":5,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"a14b43b3-50b4-c075-c54d-dfb0165de33d","grossValue":129100,"homeNotional":0.04952456418383518,"foreignNotional":12.787242472266245},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":8,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"3c30e175-5194-320c-8f8c-01636c2f4a32","grossValue":206560,"homeNotional":0.07923930269413629,"foreignNotional":20.45958795562599},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":50,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"5b803378-760b-4919-21fc-bfb275d39ace","grossValue":1291000,"homeNotional":0.49524564183835185,"foreignNotional":127.87242472266244},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":244,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"cf57fec1-c444-b9e5-5e2d-4fb643f4fdb7","grossValue":6300080,"homeNotional":2.416798732171157,"foreignNotional":624.0174326465927}]}]`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + b := new(Bitmex) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") + b.SetSaveTradeDataStatus(true) + msg := []byte(`[0, "public", "public", {"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"MinusTick","trdMatchID":"c427f7a0-6b26-1e10-5c4e-1bd74daf2a73","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"95eb9155-b58c-70e9-44b7-34efe50302e0","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"e607c187-f25c-86bc-cb39-8afff7aaf2d9","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":17,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"0f076814-a57d-9a59-8063-ad6b823a80ac","grossValue":439110,"homeNotional":0.1683835182250396,"foreignNotional":43.49346275752773},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"MinusTick","trdMatchID":"f4ef3dfd-51c4-538f-37c1-e5071ba1c75d","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"81ef136b-8f4a-b1cf-78a8-fffbfa89bf40","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"65a87e8c-7563-34a4-d040-94e8513c5401","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":15,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"1d11a74e-a157-3f33-036d-35a101fba50b","grossValue":387375,"homeNotional":0.14857369255150554,"foreignNotional":38.369156101426306},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":1,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"40d49df1-f018-f66f-4ca5-31d4997641d7","grossValue":25825,"homeNotional":0.009904912836767036,"foreignNotional":2.5579437400950873},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"MinusTick","trdMatchID":"36135b51-73e5-c007-362b-a55be5830c6b","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"6ee19edb-99aa-3030-ba63-933ffb347ade","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"d44be603-cdb8-d676-e3e2-f91fb12b2a70","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":5,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"a14b43b3-50b4-c075-c54d-dfb0165de33d","grossValue":129100,"homeNotional":0.04952456418383518,"foreignNotional":12.787242472266245},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":8,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"3c30e175-5194-320c-8f8c-01636c2f4a32","grossValue":206560,"homeNotional":0.07923930269413629,"foreignNotional":20.45958795562599},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":50,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"5b803378-760b-4919-21fc-bfb275d39ace","grossValue":1291000,"homeNotional":0.49524564183835185,"foreignNotional":127.87242472266244},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":244,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"cf57fec1-c444-b9e5-5e2d-4fb643f4fdb7","grossValue":6300080,"homeNotional":2.416798732171157,"foreignNotional":624.0174326465927}]}]`) + require.NoError(t, b.wsHandleData(msg), "Must not error handling a standard stream of trades") + + msg = []byte(`[0, "public", "public", {"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":".BGCT","size":14,"price":258.2,"side":"sell"}]}]`) + require.ErrorIs(t, b.wsHandleData(msg), exchange.ErrSymbolCannotBeMatched, "Must error correctly with an unknown symbol") + + msg = []byte(`[0, "public", "public", {"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":".BGCT","size":0,"price":258.2,"side":"sell"}]}]`) + require.NoError(t, b.wsHandleData(msg), "Must not error that symbol is unknown when index trade is ignored due to zero size") } func TestGetRecentTrades(t *testing.T) { diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index c6297404116..a18fa161216 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -202,41 +202,7 @@ func (b *Bitmex) wsHandleData(respRaw []byte) error { return err } case bitmexWSTrade: - if !b.IsSaveTradeDataEnabled() { - return nil - } - var tradeHolder TradeData - if err := json.Unmarshal(msg, &tradeHolder); err != nil { - return err - } - var trades []trade.Data - for i := range tradeHolder.Data { - if tradeHolder.Data[i].Price == 0 { - // Please note that indices (symbols starting with .) post trades at intervals to the trade feed. - // These have a size of 0 and are used only to indicate a changing price. - continue - } - p, a, err := b.GetPairAndAssetTypeRequestFormatted(tradeHolder.Data[i].Symbol) - if err != nil { - return err - } - oSide, err := order.StringToOrderSide(tradeHolder.Data[i].Side) - if err != nil { - return err - } - - trades = append(trades, trade.Data{ - TID: tradeHolder.Data[i].TrdMatchID, - Exchange: b.Name, - CurrencyPair: p, - AssetType: a, - Side: oSide, - Price: tradeHolder.Data[i].Price, - Amount: float64(tradeHolder.Data[i].Size), - Timestamp: tradeHolder.Data[i].Timestamp, - }) - } - return b.AddTradesToBuffer(trades...) + return b.handleWsTrades(msg) case bitmexWSAnnouncement: var announcement AnnouncementData if err := json.Unmarshal(msg, &announcement); err != nil { @@ -517,6 +483,44 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, p currency. return nil } +func (b *Bitmex) handleWsTrades(msg []byte) error { + if !b.IsSaveTradeDataEnabled() { + return nil + } + var tradeHolder TradeData + if err := json.Unmarshal(msg, &tradeHolder); err != nil { + return err + } + trades := make([]trade.Data, 0, len(tradeHolder.Data)) + for _, t := range tradeHolder.Data { + if t.Size == 0 { + // Indices (symbols starting with .) post trades at intervals to the trade feed + // These have a size of 0 and are used only to indicate a changing price + continue + } + p, a, err := b.GetPairAndAssetTypeRequestFormatted(t.Symbol) + if err != nil { + return err + } + oSide, err := order.StringToOrderSide(t.Side) + if err != nil { + return err + } + + trades = append(trades, trade.Data{ + TID: t.TrdMatchID, + Exchange: b.Name, + CurrencyPair: p, + AssetType: a, + Side: oSide, + Price: t.Price, + Amount: float64(t.Size), + Timestamp: t.Timestamp, + }) + } + return b.AddTradesToBuffer(trades...) +} + // generateSubscriptions returns a list of subscriptions from the configured subscriptions feature func (b *Bitmex) generateSubscriptions() (subscription.List, error) { return b.Features.Subscriptions.ExpandTemplates(b) diff --git a/exchanges/bitmex/bitmex_websocket_types.go b/exchanges/bitmex/bitmex_websocket_types.go index 5ae86e38c46..dc22099ae5e 100644 --- a/exchanges/bitmex/bitmex_websocket_types.go +++ b/exchanges/bitmex/bitmex_websocket_types.go @@ -66,8 +66,8 @@ type OrderBookData struct { // TradeData contains trade resp data with action to be taken type TradeData struct { - Data []Trade `json:"data"` - Action string `json:"action"` + Data []*Trade `json:"data"` + Action string `json:"action"` } // AnnouncementData contains announcement resp data with action to be taken diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 9b0ede6bd62..938923e2332 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -49,13 +49,15 @@ const ( DefaultWebsocketOrderbookBufferLimit = 5 ) +// Public Errors var ( - // ErrExchangeNameIsEmpty is returned when the exchange name is empty - ErrExchangeNameIsEmpty = errors.New("exchange name is empty") + ErrExchangeNameIsEmpty = errors.New("exchange name is empty") + ErrSymbolCannotBeMatched = errors.New("symbol cannot be matched") +) +var ( errEndpointStringNotFound = errors.New("endpoint string not found") errConfigPairFormatRequiresDelimiter = errors.New("config pair format requires delimiter") - errSymbolCannotBeMatched = errors.New("symbol cannot be matched") errSetDefaultsNotCalled = errors.New("set defaults not called") errExchangeIsNil = errors.New("exchange is nil") ) @@ -247,7 +249,7 @@ func (b *Base) GetPairAndAssetTypeRequestFormatted(symbol string) (currency.Pair } } } - return currency.EMPTYPAIR, asset.Empty, errSymbolCannotBeMatched + return currency.EMPTYPAIR, asset.Empty, ErrSymbolCannotBeMatched } // GetClientBankAccounts returns banking details associated with diff --git a/exchanges/exchange_test.go b/exchanges/exchange_test.go index fd4cf117373..a5db589d047 100644 --- a/exchanges/exchange_test.go +++ b/exchanges/exchange_test.go @@ -2145,13 +2145,13 @@ func TestGetPairAndAssetTypeRequestFormatted(t *testing.T) { } _, _, err = b.GetPairAndAssetTypeRequestFormatted("BTCAUD") - if !errors.Is(err, errSymbolCannotBeMatched) { - t.Fatalf("received: '%v' but expected: '%v'", err, errSymbolCannotBeMatched) + if !errors.Is(err, ErrSymbolCannotBeMatched) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrSymbolCannotBeMatched) } _, _, err = b.GetPairAndAssetTypeRequestFormatted("BTCUSDT") - if !errors.Is(err, errSymbolCannotBeMatched) { - t.Fatalf("received: '%v' but expected: '%v'", err, errSymbolCannotBeMatched) + if !errors.Is(err, ErrSymbolCannotBeMatched) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrSymbolCannotBeMatched) } p, a, err := b.GetPairAndAssetTypeRequestFormatted("BTC-USDT") From e1b24e22c1ff2d554184eb69eb79dfd252653e65 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 28 Oct 2024 17:21:17 +1100 Subject: [PATCH 7/7] build(deps): Bump ikalnytskyi/action-setup-postgres from 6 to 7 (#1694) Bumps [ikalnytskyi/action-setup-postgres](https://github.com/ikalnytskyi/action-setup-postgres) from 6 to 7. - [Release notes](https://github.com/ikalnytskyi/action-setup-postgres/releases) - [Commits](https://github.com/ikalnytskyi/action-setup-postgres/compare/v6...v7) --- updated-dependencies: - dependency-name: ikalnytskyi/action-setup-postgres dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2143b7e2e04..8d9d7b7112b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -40,7 +40,7 @@ jobs: - name: Setup Postgres if: matrix.psql == true - uses: ikalnytskyi/action-setup-postgres@v6 + uses: ikalnytskyi/action-setup-postgres@v7 with: database: gct_dev_ci id: postgres