From 7dd681c32d0e043e6ea24cdc86cabaf73157c771 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Wed, 10 Jul 2024 18:56:40 +0700 Subject: [PATCH] Kucoin: WIP - Batch all symbols --- exchanges/kucoin/kucoin_test.go | 96 ++++++++++++++++----------- exchanges/kucoin/kucoin_websocket.go | 99 ++++++++++++++-------------- 2 files changed, 104 insertions(+), 91 deletions(-) diff --git a/exchanges/kucoin/kucoin_test.go b/exchanges/kucoin/kucoin_test.go index ee77caa2cb2..078954adaaf 100644 --- a/exchanges/kucoin/kucoin_test.go +++ b/exchanges/kucoin/kucoin_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "log" "os" "strings" @@ -2062,42 +2061,22 @@ func TestGenerateSubscriptions(t *testing.T) { testsubs.EqualLists(t, exp, subs) } -// TestGenerateOtherSubscriptions exercises subscriptions with different requirements from the template -// Candles should fan out and include an interval -// Snapshot should fan out and not have an interval +// TestGenerateOtherSubscriptions exercises non-default subscriptions func TestGenerateOtherSubscriptions(t *testing.T) { t.Parallel() ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - expPairs := map[asset.Item]currency.Pairs{ - asset.Spot: subPairs[0:4], - asset.Margin: subPairs[6:8], // Just the Margin pairs not in Spot - } - - for a, pairs := range expPairs { - for _, tt := range []struct { - sub *subscription.Subscription - path string - }{ - {&subscription.Subscription{Channel: subscription.CandlesChannel, Asset: a, Interval: kline.FourHour}, "/market/candles:%s_4hour"}, - {&subscription.Subscription{Channel: marketSnapshotChannel, Asset: a}, "/market/snapshot:%s"}, - } { - ku.Features.Subscriptions = subscription.List{tt.sub} - - subs, err := ku.generateSubscriptions() - require.NoError(t, err, "generateSubscriptions with Candles should not error") - - exp := subscription.List{} - for _, pair := range pairs { - s := tt.sub.Clone() - s.Asset = a - s.Pairs = currency.Pairs{pair} - s.QualifiedChannel = fmt.Sprintf(tt.path, pair) - exp = append(exp, s) - } - testsubs.EqualLists(t, exp, subs) - } + subs := subscription.List{ + {Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.FourHour}, + {Channel: marketSnapshotChannel, Asset: asset.Spot}, + } + + for _, s := range subs { + ku.Features.Subscriptions = subscription.List{s} + got, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions should not error") + require.Len(t, got, 1, "Should generate just one sub") } } @@ -2492,23 +2471,33 @@ func TestProcessMarketSnapshot(t *testing.T) { } } -func TestSubscribeMarketSnapshot(t *testing.T) { +// TestSubscribeBatches ensures that endpoints support batching, contary to kucoin api docs +func TestSubscribeBatches(t *testing.T) { t.Parallel() ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.Features.Subscriptions = subscription.List{} testexch.SetupWs(t, ku) - ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: marketSnapshotChannel}} + ku.Features.Subscriptions = subscription.List{ + {Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin}, + {Asset: asset.Futures, Channel: subscription.TickerChannel}, + {Asset: asset.Spot, Channel: marketSnapshotChannel}, + } + subs, err := ku.generateSubscriptions() require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, len(ku.Features.Subscriptions), "Must generate batched subscriptions") err = ku.Subscribe(subs) - assert.NoError(t, err, "Subscribe to MarketSnapshot should not error") + require.NoError(t, err, "Subscribe to small batches should not error") } -// TestSubscribMatch exercises the Match subscription -// Ensures batching works and the connection symbol limit is still 300 at Kucoin's end -func TestSubscribeMatch(t *testing.T) { +// TestSubscribeTickerAll ensures that ticker subscriptions switch to using all and it works + +// TestSubscribeBatchLimit exercises the kucoin batch limits of 300 per connection +// Ensures batching of 100 pairs and the connection symbol limit is still 300 at Kucoin's end +func TestSubscribeBatchLimit(t *testing.T) { t.Parallel() ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes @@ -2527,9 +2516,12 @@ func TestSubscribeMatch(t *testing.T) { require.Len(t, subs, 3, "Must get 3 subs") err = ku.Subscribe(subs) - require.NoError(t, err, "Subscribe to MarketSnapshot should not error") + require.NoError(t, err, "Subscribe must not error") + + err = ku.Unsubscribe(subs) + require.NoError(t, err, "Unsubscribe must not error") - err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:301], true) + err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:320], true) require.NoError(t, err, "StorePairs must not error") ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.AllTradesChannel}} @@ -2541,6 +2533,30 @@ func TestSubscribeMatch(t *testing.T) { require.ErrorContains(t, err, "exceed max subscription count limitation of 300 per session", "Subscribe to MarketSnapshot must error above connection symbol limit") } +func TestSubscribeTickerAll(t *testing.T) { + t.Parallel() + + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.Features.Subscriptions = subscription.List{} + testexch.SetupWs(t, ku) + + avail, err := ku.GetAvailablePairs(asset.Spot) + require.NoError(t, err, "GetAvailablePairs must not error") + + err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:500], true) + require.NoError(t, err, "StorePairs must not error") + + ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.TickerChannel}} + + subs, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, 1, "Must generate one subscriptions") + require.Equal(t, "/market/ticker:all", subs[0].QualifiedChannel, "QualifiedChannel must be correct") + + err = ku.Subscribe(subs) + require.NoError(t, err, "Subscribe to must not error") +} + func TestSeedLocalCache(t *testing.T) { t.Parallel() pair, err := currency.NewPairFromString("ETH-USDT") diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index 3a7602bbe15..b684ed667c0 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -55,20 +55,20 @@ const ( spotMarketAdvancedChannel = "/spotMarket/advancedOrders" // futures channels - futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol} - futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol} - futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol} - futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol} - futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol} - futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol} + futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol},... + futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol},... + futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol},... + futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol},... + futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol},... + futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol},... futuresSystemAnnouncementChannel = "/contract/announcement" - futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol} + futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol},... // futures private channels - futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol} + futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol},... + futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol},... futuresStopOrdersLifecycleEventChannel = "/contractMarket/advancedOrders" futuresAccountBalanceEventChannel = "/contractAccount/wallet" - futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol} ) var ( @@ -987,14 +987,16 @@ func (ku *Kucoin) GetSubscriptionTemplate(_ *subscription.Subscription) (*templa spotPairs, _ := ku.GetEnabledPairs(asset.Spot) subTemplate, err = template.New("master.tmpl"). Funcs(template.FuncMap{ - "channelName": channelName, - "removeSpotFromMargin": func(ap map[asset.Item]currency.Pairs) string { return removeSpotFromMargin(ap, spotPairs) }, - "isCurrencyChannel": isCurrencyChannel, - "isSymbolChannel": isSymbolChannel, - "isSymbolListChannel": isSymbolListChannel, - "channelInterval": channelInterval, - "assetCurrencies": assetCurrencies, - "batch": common.Batch[currency.Pairs], + "channelName": channelName, + "removeSpotFromMargin": func(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) string { + return removeSpotFromMargin(s, ap, spotPairs) + }, + "isCurrencyChannel": isCurrencyChannel, + "isSymbolChannel": isSymbolChannel, + "channelInterval": channelInterval, + "assetCurrencies": assetCurrencies, + "joinPairsWithInterval": joinPairsWithInterval, + "batch": common.Batch[currency.Pairs], }). Parse(subTplText) } @@ -1589,8 +1591,11 @@ func channelName(s *subscription.Subscription, a asset.Item) string { return s.Channel } -// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map -func removeSpotFromMargin(ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string { +// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map for subscriptions to non-margin endpoints +func removeSpotFromMargin(s *subscription.Subscription, ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string { + if strings.HasPrefix(s.Channel, "/margin") { + return "" + } if p, ok := ap[asset.Margin]; ok { ap[asset.Margin] = p.Remove(spotPairs...) } @@ -1612,16 +1617,6 @@ func isCurrencyChannel(s *subscription.Subscription) bool { return s.Channel == marginLoanChannel } -// isSymbolListChannel returns if the channel expects receive a list of symbol -func isSymbolListChannel(c string) bool { - switch c { - case marketTickerChannel, marketOrderbookChannel, marketOrderbookDepth5Channel, marketOrderbookDepth50Channel, - marketMatchChannel, indexPriceIndicatorChannel, markPriceIndicatorChannel, marginFundingbookChangeChannel: - return true - } - return false -} - // channelInterval returns the channel interval if it has one func channelInterval(s *subscription.Subscription) string { if channelName(s, s.Asset) == marketCandlesChannel { @@ -1644,34 +1639,36 @@ func assetCurrencies(s *subscription.Subscription, ap map[asset.Item]currency.Pa return cs } +// joinPairsWithInterval returns a list of currency pair symbols joined by comma +// If the subscription has a viable interval it's appended after each symbol +func joinPairsWithInterval(b currency.Pairs, s *subscription.Subscription) string { + out := make([]string, len(b)) + suffix, err := intervalToString(s.Interval) + if err == nil { + suffix = "_" + suffix + } + for i, p := range b { + out[i] = p.String() + suffix + } + return strings.Join(out, ",") +} + const subTplText = ` -{{- removeSpotFromMargin $.AssetPairs -}} +{{- removeSpotFromMargin $.S $.AssetPairs -}} {{- if isCurrencyChannel $.S }} - {{- range $currency := assetCurrencies $.S $.AssetPairs -}} - {{ channelName $.S $.S.Asset -}} : {{- $currency }} - {{- $.PairSeparator -}} - {{- end -}} + {{ channelName $.S $.S.Asset -}} : {{- (assetCurrencies $.S $.AssetPairs).Join -}} {{- else if isSymbolChannel $.S }} {{ range $asset, $pairs := $.AssetPairs }} {{- with $name := channelName $.S $asset }} - {{- if isSymbolListChannel $name }} - {{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}} - {{- $name -}} :all - {{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}} - {{- else -}} - {{- range $b := batch $pairs 100 -}} - {{- $name -}} : {{- $b.Join -}} - {{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}} - {{ $.PairSeparator }} - {{- end -}} - {{- $.BatchSize -}} 100 - {{- end }} - {{- else }} - {{- range $pair := $pairs -}} - {{ $name -}} : {{- $pair }} - {{- with $i := channelInterval $.S -}} _ {{- $i }}{{ end -}} + {{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}} + {{- $name -}} :all + {{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}} + {{- else -}} + {{- range $b := batch $pairs 100 -}} + {{- $name -}} : {{- joinPairsWithInterval $b $.S -}} {{ $.PairSeparator }} - {{- end }} + {{- end -}} + {{- $.BatchSize -}} 100 {{- end }} {{- end }} {{ $.AssetSeparator }}