Skip to content

Commit

Permalink
Kucoin: WIP - Batch all symbols
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Jul 11, 2024
1 parent 6bad66d commit 7dd681c
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 91 deletions.
96 changes: 56 additions & 40 deletions exchanges/kucoin/kucoin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"strings"
Expand Down Expand Up @@ -2062,42 +2061,22 @@ func TestGenerateSubscriptions(t *testing.T) {
testsubs.EqualLists(t, exp, subs)
}

// TestGenerateOtherSubscriptions exercises subscriptions with different requirements from the template
// Candles should fan out and include an interval
// Snapshot should fan out and not have an interval
// TestGenerateOtherSubscriptions exercises non-default subscriptions
func TestGenerateOtherSubscriptions(t *testing.T) {
t.Parallel()

ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes

expPairs := map[asset.Item]currency.Pairs{
asset.Spot: subPairs[0:4],
asset.Margin: subPairs[6:8], // Just the Margin pairs not in Spot
}

for a, pairs := range expPairs {
for _, tt := range []struct {
sub *subscription.Subscription
path string
}{
{&subscription.Subscription{Channel: subscription.CandlesChannel, Asset: a, Interval: kline.FourHour}, "/market/candles:%s_4hour"},
{&subscription.Subscription{Channel: marketSnapshotChannel, Asset: a}, "/market/snapshot:%s"},
} {
ku.Features.Subscriptions = subscription.List{tt.sub}

subs, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions with Candles should not error")

exp := subscription.List{}
for _, pair := range pairs {
s := tt.sub.Clone()
s.Asset = a
s.Pairs = currency.Pairs{pair}
s.QualifiedChannel = fmt.Sprintf(tt.path, pair)
exp = append(exp, s)
}
testsubs.EqualLists(t, exp, subs)
}
subs := subscription.List{
{Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.FourHour},
{Channel: marketSnapshotChannel, Asset: asset.Spot},
}

for _, s := range subs {
ku.Features.Subscriptions = subscription.List{s}
got, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions should not error")
require.Len(t, got, 1, "Should generate just one sub")
}
}

Expand Down Expand Up @@ -2492,23 +2471,33 @@ func TestProcessMarketSnapshot(t *testing.T) {
}
}

func TestSubscribeMarketSnapshot(t *testing.T) {
// TestSubscribeBatches ensures that endpoints support batching, contary to kucoin api docs

Check failure on line 2474 in exchanges/kucoin/kucoin_test.go

View workflow job for this annotation

GitHub Actions / Spell checker

contary ==> contrary
func TestSubscribeBatches(t *testing.T) {
t.Parallel()

ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
ku.Features.Subscriptions = subscription.List{}
testexch.SetupWs(t, ku)

ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: marketSnapshotChannel}}
ku.Features.Subscriptions = subscription.List{
{Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Asset: asset.Futures, Channel: subscription.TickerChannel},
{Asset: asset.Spot, Channel: marketSnapshotChannel},
}

subs, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
require.Len(t, subs, len(ku.Features.Subscriptions), "Must generate batched subscriptions")

err = ku.Subscribe(subs)
assert.NoError(t, err, "Subscribe to MarketSnapshot should not error")
require.NoError(t, err, "Subscribe to small batches should not error")
}

// TestSubscribMatch exercises the Match subscription
// Ensures batching works and the connection symbol limit is still 300 at Kucoin's end
func TestSubscribeMatch(t *testing.T) {
// TestSubscribeTickerAll ensures that ticker subscriptions switch to using all and it works

// TestSubscribeBatchLimit exercises the kucoin batch limits of 300 per connection
// Ensures batching of 100 pairs and the connection symbol limit is still 300 at Kucoin's end
func TestSubscribeBatchLimit(t *testing.T) {
t.Parallel()

ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
Expand All @@ -2527,9 +2516,12 @@ func TestSubscribeMatch(t *testing.T) {
require.Len(t, subs, 3, "Must get 3 subs")

err = ku.Subscribe(subs)
require.NoError(t, err, "Subscribe to MarketSnapshot should not error")
require.NoError(t, err, "Subscribe must not error")

err = ku.Unsubscribe(subs)
require.NoError(t, err, "Unsubscribe must not error")

err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:301], true)
err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:320], true)
require.NoError(t, err, "StorePairs must not error")

ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.AllTradesChannel}}
Expand All @@ -2541,6 +2533,30 @@ func TestSubscribeMatch(t *testing.T) {
require.ErrorContains(t, err, "exceed max subscription count limitation of 300 per session", "Subscribe to MarketSnapshot must error above connection symbol limit")
}

func TestSubscribeTickerAll(t *testing.T) {
t.Parallel()

ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
ku.Features.Subscriptions = subscription.List{}
testexch.SetupWs(t, ku)

avail, err := ku.GetAvailablePairs(asset.Spot)
require.NoError(t, err, "GetAvailablePairs must not error")

err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:500], true)
require.NoError(t, err, "StorePairs must not error")

ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.TickerChannel}}

subs, err := ku.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
require.Len(t, subs, 1, "Must generate one subscriptions")
require.Equal(t, "/market/ticker:all", subs[0].QualifiedChannel, "QualifiedChannel must be correct")

err = ku.Subscribe(subs)
require.NoError(t, err, "Subscribe to must not error")
}

func TestSeedLocalCache(t *testing.T) {
t.Parallel()
pair, err := currency.NewPairFromString("ETH-USDT")
Expand Down
99 changes: 48 additions & 51 deletions exchanges/kucoin/kucoin_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ const (
spotMarketAdvancedChannel = "/spotMarket/advancedOrders"

// futures channels
futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol}
futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol}
futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol}
futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol}
futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol}
futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol}
futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol},...
futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol},...
futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol},...
futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol},...
futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol},...
futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol},...
futuresSystemAnnouncementChannel = "/contract/announcement"
futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol}
futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol},...

// futures private channels
futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol}
futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol},...
futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol},...
futuresStopOrdersLifecycleEventChannel = "/contractMarket/advancedOrders"
futuresAccountBalanceEventChannel = "/contractAccount/wallet"
futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol}
)

var (
Expand Down Expand Up @@ -987,14 +987,16 @@ func (ku *Kucoin) GetSubscriptionTemplate(_ *subscription.Subscription) (*templa
spotPairs, _ := ku.GetEnabledPairs(asset.Spot)
subTemplate, err = template.New("master.tmpl").
Funcs(template.FuncMap{
"channelName": channelName,
"removeSpotFromMargin": func(ap map[asset.Item]currency.Pairs) string { return removeSpotFromMargin(ap, spotPairs) },
"isCurrencyChannel": isCurrencyChannel,
"isSymbolChannel": isSymbolChannel,
"isSymbolListChannel": isSymbolListChannel,
"channelInterval": channelInterval,
"assetCurrencies": assetCurrencies,
"batch": common.Batch[currency.Pairs],
"channelName": channelName,
"removeSpotFromMargin": func(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) string {
return removeSpotFromMargin(s, ap, spotPairs)
},
"isCurrencyChannel": isCurrencyChannel,
"isSymbolChannel": isSymbolChannel,
"channelInterval": channelInterval,
"assetCurrencies": assetCurrencies,
"joinPairsWithInterval": joinPairsWithInterval,
"batch": common.Batch[currency.Pairs],
}).
Parse(subTplText)
}
Expand Down Expand Up @@ -1589,8 +1591,11 @@ func channelName(s *subscription.Subscription, a asset.Item) string {
return s.Channel
}

// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map
func removeSpotFromMargin(ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string {
// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map for subscriptions to non-margin endpoints
func removeSpotFromMargin(s *subscription.Subscription, ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string {
if strings.HasPrefix(s.Channel, "/margin") {
return ""
}
if p, ok := ap[asset.Margin]; ok {
ap[asset.Margin] = p.Remove(spotPairs...)
}
Expand All @@ -1612,16 +1617,6 @@ func isCurrencyChannel(s *subscription.Subscription) bool {
return s.Channel == marginLoanChannel
}

// isSymbolListChannel returns if the channel expects receive a list of symbol
func isSymbolListChannel(c string) bool {
switch c {
case marketTickerChannel, marketOrderbookChannel, marketOrderbookDepth5Channel, marketOrderbookDepth50Channel,
marketMatchChannel, indexPriceIndicatorChannel, markPriceIndicatorChannel, marginFundingbookChangeChannel:
return true
}
return false
}

// channelInterval returns the channel interval if it has one
func channelInterval(s *subscription.Subscription) string {
if channelName(s, s.Asset) == marketCandlesChannel {
Expand All @@ -1644,34 +1639,36 @@ func assetCurrencies(s *subscription.Subscription, ap map[asset.Item]currency.Pa
return cs
}

// joinPairsWithInterval returns a list of currency pair symbols joined by comma
// If the subscription has a viable interval it's appended after each symbol
func joinPairsWithInterval(b currency.Pairs, s *subscription.Subscription) string {
out := make([]string, len(b))
suffix, err := intervalToString(s.Interval)
if err == nil {
suffix = "_" + suffix
}
for i, p := range b {
out[i] = p.String() + suffix
}
return strings.Join(out, ",")
}

const subTplText = `
{{- removeSpotFromMargin $.AssetPairs -}}
{{- removeSpotFromMargin $.S $.AssetPairs -}}
{{- if isCurrencyChannel $.S }}
{{- range $currency := assetCurrencies $.S $.AssetPairs -}}
{{ channelName $.S $.S.Asset -}} : {{- $currency }}
{{- $.PairSeparator -}}
{{- end -}}
{{ channelName $.S $.S.Asset -}} : {{- (assetCurrencies $.S $.AssetPairs).Join -}}
{{- else if isSymbolChannel $.S }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- with $name := channelName $.S $asset }}
{{- if isSymbolListChannel $name }}
{{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}}
{{- $name -}} :all
{{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}}
{{- else -}}
{{- range $b := batch $pairs 100 -}}
{{- $name -}} : {{- $b.Join -}}
{{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}}
{{ $.PairSeparator }}
{{- end -}}
{{- $.BatchSize -}} 100
{{- end }}
{{- else }}
{{- range $pair := $pairs -}}
{{ $name -}} : {{- $pair }}
{{- with $i := channelInterval $.S -}} _ {{- $i }}{{ end -}}
{{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}}
{{- $name -}} :all
{{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}}
{{- else -}}
{{- range $b := batch $pairs 100 -}}
{{- $name -}} : {{- joinPairsWithInterval $b $.S -}}
{{ $.PairSeparator }}
{{- end }}
{{- end -}}
{{- $.BatchSize -}} 100
{{- end }}
{{- end }}
{{ $.AssetSeparator }}
Expand Down

0 comments on commit 7dd681c

Please sign in to comment.