Skip to content

Commit

Permalink
Kucoin: Add Subscription batching
Browse files Browse the repository at this point in the history
Turns out that contary to the documentation, kucoin supports batching of
all symbols and currencies
  • Loading branch information
gbjk committed Jul 11, 2024
1 parent 39da101 commit 3ae0e28
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 86 deletions.
134 changes: 94 additions & 40 deletions exchanges/kucoin/kucoin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"strings"
Expand Down Expand Up @@ -2045,59 +2044,42 @@ func TestGenerateSubscriptions(t *testing.T) {

ku.Websocket.SetCanUseAuthenticatedEndpoints(true)

var loanPairs currency.Pairs
loanCurrs := subPairs[0:8].GetCurrencies()
for _, c := range loanCurrs {
loanPairs = append(loanPairs, currency.Pair{Base: c})
}

exp = append(exp, subscription.List{
{Asset: asset.Futures, Channel: futuresTradeOrderChannel, QualifiedChannel: "/contractMarket/tradeOrders", Pairs: subPairs[8:]},
{Asset: asset.Futures, Channel: futuresStopOrdersLifecycleEventChannel, QualifiedChannel: "/contractMarket/advancedOrders", Pairs: subPairs[8:]},
{Asset: asset.Futures, Channel: futuresAccountBalanceEventChannel, QualifiedChannel: "/contractAccount/wallet", Pairs: subPairs[8:]},
{Asset: asset.Margin, Channel: marginPositionChannel, QualifiedChannel: "/margin/position", Pairs: subPairs[4:8]},
{Asset: asset.Margin, Channel: marginLoanChannel, QualifiedChannel: "/margin/loan:" + loanCurrs.Join(), Pairs: loanPairs},
{Channel: accountBalanceChannel, QualifiedChannel: "/account/balance"},
}...)
for _, c := range subPairs[0:8].GetCurrencies() {
exp = append(exp, &subscription.Subscription{
Asset: asset.Margin, Channel: marginLoanChannel, QualifiedChannel: "/margin/loan:" + c.String(), Pairs: currency.Pairs{currency.Pair{Base: c}},
})
}

subs, err = ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions with Auth must not error")
testsubs.EqualLists(t, exp, subs)
}

// TestGenerateOtherSubscriptions exercises subscriptions with different requirements from the template
// Candles should fan out and include an interval
// Snapshot should fan out and not have an interval
// TestGenerateOtherSubscriptions exercises non-default subscriptions
func TestGenerateOtherSubscriptions(t *testing.T) {
t.Parallel()

ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes

expPairs := map[asset.Item]currency.Pairs{
asset.Spot: subPairs[0:4],
asset.Margin: subPairs[6:8], // Just the Margin pairs not in Spot
}

for a, pairs := range expPairs {
for _, tt := range []struct {
sub *subscription.Subscription
path string
}{
{&subscription.Subscription{Channel: subscription.CandlesChannel, Asset: a, Interval: kline.FourHour}, "/market/candles:%s_4hour"},
{&subscription.Subscription{Channel: marketSnapshotChannel, Asset: a}, "/market/snapshot:%s"},
} {
ku.Features.Subscriptions = subscription.List{tt.sub}

subs, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions with Candles should not error")

exp := subscription.List{}
for _, pair := range pairs {
s := tt.sub.Clone()
s.Asset = a
s.Pairs = currency.Pairs{pair}
s.QualifiedChannel = fmt.Sprintf(tt.path, pair)
exp = append(exp, s)
}
testsubs.EqualLists(t, exp, subs)
}
subs := subscription.List{
{Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.FourHour},
{Channel: marketSnapshotChannel, Asset: asset.Spot},
}

for _, s := range subs {
ku.Features.Subscriptions = subscription.List{s}
got, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions should not error")
require.Len(t, got, 1, "Should generate just one sub")
}
}

Expand Down Expand Up @@ -2492,18 +2474,90 @@ func TestProcessMarketSnapshot(t *testing.T) {
}
}

func TestSubscribeMarketSnapshot(t *testing.T) {
// TestSubscribeBatches ensures that endpoints support batching, contary to kucoin api docs
func TestSubscribeBatches(t *testing.T) {
t.Parallel()

ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
ku.Features.Subscriptions = subscription.List{}
testexch.SetupWs(t, ku)

ku.Features.Subscriptions = subscription.List{
{Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Asset: asset.Futures, Channel: subscription.TickerChannel},
{Asset: asset.Spot, Channel: marketSnapshotChannel},
}

subs, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
require.Len(t, subs, len(ku.Features.Subscriptions), "Must generate batched subscriptions")

err = ku.Subscribe(subs)
require.NoError(t, err, "Subscribe to small batches should not error")
}

// TestSubscribeTickerAll ensures that ticker subscriptions switch to using all and it works

// TestSubscribeBatchLimit exercises the kucoin batch limits of 300 per connection
// Ensures batching of 100 pairs and the connection symbol limit is still 300 at Kucoin's end
func TestSubscribeBatchLimit(t *testing.T) {
t.Parallel()

ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
ku.Features.Subscriptions = subscription.List{}
testexch.SetupWs(t, ku)

ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: marketSnapshotChannel}}
avail, err := ku.GetAvailablePairs(asset.Spot)
require.NoError(t, err, "GetAvailablePairs must not error")

err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:299], true)
require.NoError(t, err, "StorePairs must not error")

ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.AllTradesChannel}}
subs, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
require.Len(t, subs, 3, "Must get 3 subs")

err = ku.Subscribe(subs)
require.NoError(t, err, "Subscribe must not error")

err = ku.Unsubscribe(subs)
require.NoError(t, err, "Unsubscribe must not error")

err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:320], true)
require.NoError(t, err, "StorePairs must not error")

ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.AllTradesChannel}}
subs, err = ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
require.Len(t, subs, 4, "Must get 4 subs")

err = ku.Subscribe(subs)
require.ErrorContains(t, err, "exceed max subscription count limitation of 300 per session", "Subscribe to MarketSnapshot must error above connection symbol limit")
}

func TestSubscribeTickerAll(t *testing.T) {
t.Parallel()

ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
ku.Features.Subscriptions = subscription.List{}
testexch.SetupWs(t, ku)

avail, err := ku.GetAvailablePairs(asset.Spot)
require.NoError(t, err, "GetAvailablePairs must not error")

err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:500], true)
require.NoError(t, err, "StorePairs must not error")

ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.TickerChannel}}

subs, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
require.Len(t, subs, 1, "Must generate one subscriptions")
require.Equal(t, "/market/ticker:all", subs[0].QualifiedChannel, "QualifiedChannel must be correct")

err = ku.Subscribe(subs)
assert.NoError(t, err, "Subscribe to MarketSnapshot should not error")
require.NoError(t, err, "Subscribe to must not error")
}

func TestSeedLocalCache(t *testing.T) {
Expand Down
94 changes: 48 additions & 46 deletions exchanges/kucoin/kucoin_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ const (
spotMarketAdvancedChannel = "/spotMarket/advancedOrders"

// futures channels
futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol}
futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol}
futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol}
futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol}
futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol}
futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol}
futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol},...
futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol},...
futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol},...
futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol},...
futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol},...
futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol},...
futuresSystemAnnouncementChannel = "/contract/announcement"
futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol}
futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol},...

// futures private channels
futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol}
futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol},...
futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol},...
futuresStopOrdersLifecycleEventChannel = "/contractMarket/advancedOrders"
futuresAccountBalanceEventChannel = "/contractAccount/wallet"
futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol}
)

var (
Expand Down Expand Up @@ -991,13 +991,16 @@ func (ku *Kucoin) GetSubscriptionTemplate(_ *subscription.Subscription) (*templa
spotPairs, _ := ku.GetEnabledPairs(asset.Spot)
subTemplate, err = template.New("master.tmpl").
Funcs(template.FuncMap{
"channelName": channelName,
"removeSpotFromMargin": func(ap map[asset.Item]currency.Pairs) string { return removeSpotFromMargin(ap, spotPairs) },
"isCurrencyChannel": isCurrencyChannel,
"isSymbolChannel": isSymbolChannel,
"isSymbolListChannel": isSymbolListChannel,
"channelInterval": channelInterval,
"assetCurrencies": assetCurrencies,
"channelName": channelName,
"removeSpotFromMargin": func(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) string {
return removeSpotFromMargin(s, ap, spotPairs)
},
"isCurrencyChannel": isCurrencyChannel,
"isSymbolChannel": isSymbolChannel,
"channelInterval": channelInterval,
"assetCurrencies": assetCurrencies,
"joinPairsWithInterval": joinPairsWithInterval,
"batch": common.Batch[currency.Pairs],
}).
Parse(subTplText)
}
Expand Down Expand Up @@ -1592,8 +1595,11 @@ func channelName(s *subscription.Subscription, a asset.Item) string {
return s.Channel
}

// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map
func removeSpotFromMargin(ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string {
// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map for subscriptions to non-margin endpoints
func removeSpotFromMargin(s *subscription.Subscription, ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string {
if strings.HasPrefix(s.Channel, "/margin") {
return ""
}
if p, ok := ap[asset.Margin]; ok {
ap[asset.Margin] = p.Remove(spotPairs...)
}
Expand All @@ -1602,7 +1608,7 @@ func removeSpotFromMargin(ap map[asset.Item]currency.Pairs, spotPairs currency.P

// isSymbolChannel returns if the channel expects receive a symbol
func isSymbolChannel(s *subscription.Subscription) bool {
switch s.Channel {
switch channelName(s, s.Asset) {
case privateSpotTradeOrders, accountBalanceChannel, marginPositionChannel, spotMarketAdvancedChannel, futuresSystemAnnouncementChannel,
futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel:
return false
Expand All @@ -1615,16 +1621,6 @@ func isCurrencyChannel(s *subscription.Subscription) bool {
return s.Channel == marginLoanChannel
}

// isSymbolListChannel returns if the channel expects receive a list of symbol
func isSymbolListChannel(s *subscription.Subscription) bool {
switch channelName(s, s.Asset) {
case marketTickerChannel, marketOrderbookChannel, marketOrderbookDepth5Channel, marketOrderbookDepth50Channel,
marketMatchChannel, indexPriceIndicatorChannel, markPriceIndicatorChannel, marginFundingbookChangeChannel:
return true
}
return false
}

// channelInterval returns the channel interval if it has one
func channelInterval(s *subscription.Subscription) string {
if channelName(s, s.Asset) == marketCandlesChannel {
Expand All @@ -1647,30 +1643,36 @@ func assetCurrencies(s *subscription.Subscription, ap map[asset.Item]currency.Pa
return cs
}

// joinPairsWithInterval returns a list of currency pair symbols joined by comma
// If the subscription has a viable interval it's appended after each symbol
func joinPairsWithInterval(b currency.Pairs, s *subscription.Subscription) string {
out := make([]string, len(b))
suffix, err := intervalToString(s.Interval)
if err == nil {
suffix = "_" + suffix
}
for i, p := range b {
out[i] = p.String() + suffix
}
return strings.Join(out, ",")
}

const subTplText = `
{{- removeSpotFromMargin $.AssetPairs -}}
{{- removeSpotFromMargin $.S $.AssetPairs -}}
{{- if isCurrencyChannel $.S }}
{{- range $currency := assetCurrencies $.S $.AssetPairs -}}
{{ channelName $.S $.S.Asset -}} : {{- $currency }}
{{- $.PairSeparator -}}
{{- end -}}
{{ channelName $.S $.S.Asset -}} : {{- (assetCurrencies $.S $.AssetPairs).Join -}}
{{- else if isSymbolChannel $.S }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- with $name := channelName $.S $asset }}
{{- if isSymbolListChannel $.S }}
{{- $name -}} :
{{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}}
all
{{- else -}}
{{ $pairs.Join }}
{{- end }}
{{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}}
{{- $name -}} :all
{{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}}
{{- else }}
{{- range $pair := $pairs -}}
{{ $name -}} : {{- $pair }}
{{- with $i := channelInterval $.S -}} _ {{- $i }}{{ end -}}
{{- else -}}
{{- range $b := batch $pairs 100 -}}
{{- $name -}} : {{- joinPairsWithInterval $b $.S -}}
{{ $.PairSeparator }}
{{- end }}
{{- end -}}
{{- $.BatchSize -}} 100
{{- end }}
{{- end }}
{{ $.AssetSeparator }}
Expand Down

0 comments on commit 3ae0e28

Please sign in to comment.