From c861e743e8c75dc99ff3fb1fa2bd3a93229b8cc1 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Tue, 31 Oct 2023 11:45:06 +0700 Subject: [PATCH] Kucoin: Add support for MarketSnapshot MarketSnapshot is for the entire currency market; The naming is confusing because they call all spot endpoints market as well. Adds test coverage and breaks out the GenDefSubs tests --- currency/currencies.go | 8 ++ currency/currencies_test.go | 12 +++ exchanges/kucoin/kucoin_test.go | 130 +++++++++++++++++---------- exchanges/kucoin/kucoin_websocket.go | 53 ++++++++--- 4 files changed, 148 insertions(+), 55 deletions(-) diff --git a/currency/currencies.go b/currency/currencies.go index 013b7223924..7bf0d315a60 100644 --- a/currency/currencies.go +++ b/currency/currencies.go @@ -20,6 +20,14 @@ func NewCurrenciesFromStringArray(currencies []string) Currencies { // Currencies define a range of supported currency codes type Currencies []Code +// Add adds a currency to the list if it doesn't exist +func (c Currencies) Add(a Code) Currencies { + if !c.Contains(a) { + c = append(c, a) + } + return c +} + // Strings returns an array of currency strings func (c Currencies) Strings() []string { list := make([]string, len(c)) diff --git a/currency/currencies_test.go b/currency/currencies_test.go index f6d950c9b92..b095045b541 100644 --- a/currency/currencies_test.go +++ b/currency/currencies_test.go @@ -3,6 +3,8 @@ package currency import ( "encoding/json" "testing" + + "github.com/stretchr/testify/assert" ) func TestCurrenciesUnmarshalJSON(t *testing.T) { @@ -62,3 +64,13 @@ func TestMatch(t *testing.T) { t.Fatal("should not match") } } + +func TestCurrenciesAdd(t *testing.T) { + c := Currencies{} + c = c.Add(BTC) + assert.Len(t, c, 1, "Should have one currency") + c = c.Add(ETH) + assert.Len(t, c, 2, "Should have one currency") + c = c.Add(BTC) + assert.Len(t, c, 2, "Adding a duplicate should not change anything") +} diff --git a/exchanges/kucoin/kucoin_test.go b/exchanges/kucoin/kucoin_test.go index f0bfe1c1c43..f2400bf22d8 100644 --- a/exchanges/kucoin/kucoin_test.go +++ b/exchanges/kucoin/kucoin_test.go @@ -1973,47 +1973,52 @@ func TestPushData(t *testing.T) { } } +func verifySubs(tb testing.TB, subs []stream.ChannelSubscription, a asset.Item, prefix string, expected ...string) { + tb.Helper() + var sub *stream.ChannelSubscription + for i, s := range subs { + if s.Asset == a && strings.HasPrefix(s.Channel, prefix) { + if len(expected) == 1 && !strings.Contains(s.Channel, expected[0]) { + continue + } + if sub != nil { + assert.Failf(tb, "Too many subs for asset %s with prefix %s", a.String(), prefix) + return + } + sub = &subs[i] + } + } + if assert.NotNil(tb, sub, "Should find a sub for asset %s with prefix %s", a.String(), prefix) { + suffix := strings.TrimPrefix(sub.Channel, prefix) + if len(expected) == 0 { + assert.Empty(tb, suffix, "Sub for asset %s with prefix %s should have no symbol suffix", a.String(), prefix) + } else { + currs := strings.Split(suffix, ",") + assert.ElementsMatch(tb, currs, expected, "Currencies should match in sub for asset %s with prefix %s", a.String(), prefix) + } + } +} + func TestGenerateDefaultSubscriptions(t *testing.T) { t.Parallel() subs, err := ku.GenerateDefaultSubscriptions() assert.NoError(t, err, "GenerateDefaultSubscriptions should not error") - check := func(a asset.Item, prefix string, expected ...string) { - var sub *stream.ChannelSubscription - for i, s := range subs { - if s.Asset == a && strings.HasPrefix(s.Channel, prefix) { - if len(expected) == 1 && !strings.Contains(s.Channel, expected[0]) { - continue - } - if sub != nil { - assert.Failf(t, "Too many subs for asset %s with prefix %s", a.String(), prefix) - return - } - sub = &subs[i] - } - } - if assert.NotNil(t, sub, "Should find a sub for asset %s with prefix %s", a.String(), prefix) { - suffix := strings.TrimPrefix(sub.Channel, prefix) - if len(expected) == 0 { - assert.Empty(t, suffix, "Sub for asset %s with prefix %s should have no symbol suffix", a.String(), prefix) - } else { - currs := strings.Split(suffix, ",") - assert.ElementsMatch(t, currs, expected, "Currencies should match in sub for asset %s with prefix %s", a.String(), prefix) - } - } - } - if assert.Len(t, subs, 12, "Should generate the correct number of subs when not logged in") { for _, p := range []string{"ticker", "match", "level2"} { - check(asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT") - check(asset.Margin, "/market/"+p+":", "FET-BTC", "FET-ETH", "ANKR-BTC") + verifySubs(t, subs, asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT") + verifySubs(t, subs, asset.Margin, "/market/"+p+":", "FET-BTC", "FET-ETH", "ANKR-BTC") } for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} { - check(asset.Futures, "/contractMarket/tickerV2:", c) - check(asset.Futures, "/contractMarket/level2Depth50:", c) + verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c) + verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c) } } +} + +func TestGenerateAuthSubscriptions(t *testing.T) { + t.Parallel() // Create a parallel safe Kucoin to mess with nu := new(Kucoin) @@ -2022,43 +2027,78 @@ func TestGenerateDefaultSubscriptions(t *testing.T) { nu.Websocket = sharedtestvalues.NewTestWebsocket() nu.Websocket.SetCanUseAuthenticatedEndpoints(true) - subs, err = nu.GenerateDefaultSubscriptions() + subs, err := nu.GenerateDefaultSubscriptions() assert.NoError(t, err, "GenerateDefaultSubscriptions with Auth should not error") if assert.Len(t, subs, 25, "Should generate the correct number of subs when logged in") { for _, p := range []string{"ticker", "match", "level2"} { - check(asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT") - check(asset.Margin, "/market/"+p+":", "FET-BTC", "FET-ETH", "ANKR-BTC") + verifySubs(t, subs, asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT") + verifySubs(t, subs, asset.Margin, "/market/"+p+":", "FET-BTC", "FET-ETH", "ANKR-BTC") } for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} { - check(asset.Futures, "/contractMarket/tickerV2:", c) - check(asset.Futures, "/contractMarket/level2Depth50:", c) + verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c) + verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c) } for _, c := range []string{"AVA", "FET", "BTC", "ETH", "ANKR", "LTC", "USDT"} { - check(asset.Margin, "/margin/loan:", c) + verifySubs(t, subs, asset.Margin, "/margin/loan:", c) } - check(asset.Spot, "/account/balance") - check(asset.Margin, "/margin/position") - check(asset.Margin, "/margin/fundingBook:", "AVA", "FET", "BTC", "ETH", "ANKR", "LTC", "USDT") - check(asset.Futures, "/contractAccount/wallet") - check(asset.Futures, "/contractMarket/advancedOrders") - check(asset.Futures, "/contractMarket/tradeOrders") + verifySubs(t, subs, asset.Spot, "/account/balance") + verifySubs(t, subs, asset.Margin, "/margin/position") + verifySubs(t, subs, asset.Margin, "/margin/fundingBook:", "AVA", "FET", "BTC", "ETH", "ANKR", "LTC", "USDT") + verifySubs(t, subs, asset.Futures, "/contractAccount/wallet") + verifySubs(t, subs, asset.Futures, "/contractMarket/advancedOrders") + verifySubs(t, subs, asset.Futures, "/contractMarket/tradeOrders") } +} + +func TestGenerateCandleSubscription(t *testing.T) { + t.Parallel() + + // Create a parallel safe Kucoin to mess with + nu := new(Kucoin) + nu.Base.Features = ku.Base.Features + nu.Websocket = sharedtestvalues.NewTestWebsocket() + assert.NoError(t, nu.CurrencyPairs.Load(&ku.CurrencyPairs), "Loading Pairs should not error") - // Test candle intervals nu.Features.Enabled.Subscriptions = []stream.ChannelSubscription{ {Channel: stream.CandlesSubscription, Interval: kline.FourHour}, } - subs, err = nu.GenerateDefaultSubscriptions() + subs, err := nu.GenerateDefaultSubscriptions() assert.NoError(t, err, "GenerateDefaultSubscriptions with Candles should not error") if assert.Len(t, subs, 7, "Should generate the correct number of subs for candles") { for _, c := range []string{"BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT"} { - check(asset.Spot, "/market/candles:", c+"_4hour") + verifySubs(t, subs, asset.Spot, "/market/candles:", c+"_4hour") } for _, c := range []string{"FET-BTC", "FET-ETH", "ANKR-BTC"} { - check(asset.Margin, "/market/candles:", c+"_4hour") + verifySubs(t, subs, asset.Margin, "/market/candles:", c+"_4hour") + } + } +} + +func TestGenerateMarketSubscription(t *testing.T) { + t.Parallel() + + // Create a parallel safe Kucoin to mess with + nu := new(Kucoin) + nu.Base.Features = ku.Base.Features + nu.Websocket = sharedtestvalues.NewTestWebsocket() + assert.NoError(t, nu.CurrencyPairs.Load(&ku.CurrencyPairs), "Loading Pairs should not error") + + nu.Features.Enabled.Subscriptions = []stream.ChannelSubscription{ + {Channel: marketSnapshotChannel}, + } + + subs, err := nu.GenerateDefaultSubscriptions() + assert.NoError(t, err, "GenerateDefaultSubscriptions with MarketSnapshot should not error") + + if assert.Len(t, subs, 7, "Should generate the correct number of subs for snapshot") { + for _, c := range []string{"AVA", "BTC", "ETH", "LTC", "USDT"} { + verifySubs(t, subs, asset.Spot, "/market/snapshot:", c) + } + for _, c := range []string{"FET", "ANKR"} { + verifySubs(t, subs, asset.Margin, "/market/snapshot:", c) } } } diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index 4a706c440bc..108d83ad53d 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -34,10 +34,11 @@ const ( privateBullets = "/v1/bullet-private" // spot channels - marketTickerChannel = "/market/ticker:%s" // /market/ticker:{symbol},{symbol}... marketAllTickersChannel = "/market/ticker:all" - marketTickerSnapshotChannel = "/market/snapshot:%s" // /market/snapshot:{symbol} - marketOrderbookLevel2Channels = "/market/level2:%s" // /market/level2:{symbol},{symbol}... + marketTickerChannel = "/market/ticker:%s" // /market/ticker:{symbol},{symbol}... + marketSymbolSnapshotChannel = "/market/snapshot:%s" // /market/snapshot:{symbol} + marketSnapshotChannel = "/market/snapshot:%v" // /market/snapshot:{market} <--- market represents a currency + marketOrderbookLevel2Channels = "/market/level2:%s" // /market/level2:{pair},{pair}... marketOrderbookLevel2to5Channel = "/spotMarket/level2Depth5:%s" // /spotMarket/level2Depth5:{symbol},{symbol}... marketOrderbokLevel2To50Channel = "/spotMarket/level2Depth50:%s" // /spotMarket/level2Depth50:{symbol},{symbol}... marketCandlesChannel = "/market/candles:%s_%s" // /market/candles:{symbol}_{interval} @@ -234,7 +235,7 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { instruments = topicInfo[1] } return ku.processTicker(resp.Data, instruments) - case strings.HasPrefix(marketTickerSnapshotChannel, topicInfo[0]): + case strings.HasPrefix(marketSymbolSnapshotChannel, topicInfo[0]): return ku.processMarketSnapshot(resp.Data, topicInfo[1]) case strings.HasPrefix(marketOrderbookLevel2Channels, topicInfo[0]): return ku.processOrderbookWithDepth(respData, topicInfo[1]) @@ -1017,7 +1018,7 @@ func getChannelsAssetType(channelName string) asset.Item { case futuresTickerV2Channel, futuresTickerChannel, futuresOrderbookLevel2Channel, futuresExecutionDataChannel, futuresOrderbookLevel2Depth5Channel, futuresOrderbookLevel2Depth50Channel, futuresContractMarketDataChannel, futuresSystemAnnouncementChannel, futuresTrasactionStatisticsTimerEventChannel, futuresTradeOrdersBySymbolChannel, futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel, futuresPositionChangeEventChannel: return asset.Futures case marketTickerChannel, marketAllTickersChannel, - marketTickerSnapshotChannel, + marketSnapshotChannel, marketSymbolSnapshotChannel, marketOrderbookLevel2Channels, marketOrderbookLevel2to5Channel, marketOrderbokLevel2To50Channel, marketCandlesChannel, marketMatchChannel, indexPriceIndicatorChannel, markPriceIndicatorChannel, @@ -1064,11 +1065,14 @@ func (ku *Kucoin) GenerateDefaultSubscriptions() ([]subscription.Subscription, e if err != nil { return nil, err } - subs := spotOrMarginSubs(assetPairs, s, false, interval) + subs := spotOrMarginPairSubs(assetPairs, s, false, interval) subscriptions = append(subscriptions, subs...) case s.Channel == marginFundingbookChangeChannel: s.Channel = fmt.Sprintf(s.Channel, assetPairs[asset.Margin].GetCurrencies().Join()) subscriptions = append(subscriptions, s) + case s.Channel == marketSnapshotChannel: + subs := spotOrMarginCurrencySubs(assetPairs, s) + subscriptions = append(subscriptions, subs...) case getChannelsAssetType(s.Channel) == asset.Futures && isSymbolChannel(s.Channel): for _, p := range assetPairs[asset.Futures] { c, err := ku.FormatExchangeCurrency(p, asset.Futures) @@ -1081,7 +1085,7 @@ func (ku *Kucoin) GenerateDefaultSubscriptions() ([]subscription.Subscription, e } case isSymbolChannel(s.Channel): // Subscriptions which can use a single comma-separated sub per asset - subs := spotOrMarginSubs(assetPairs, s, true) + subs := spotOrMarginPairSubs(assetPairs, s, true) subscriptions = append(subscriptions, subs...) default: subscriptions = append(subscriptions, s) @@ -1092,7 +1096,7 @@ func (ku *Kucoin) GenerateDefaultSubscriptions() ([]subscription.Subscription, e // isSymbolChannel returns true it this channel path ends in a formatting %s to accept a Symbol func isSymbolChannel(c string) bool { - return strings.HasSuffix(c, "%s") + return strings.HasSuffix(c, "%s") || strings.HasSuffix(c, "%v") } // channelName converts global channel Names used in config of channel input into kucoin channel names @@ -1104,10 +1108,10 @@ func channelName(name string) string { return name } -// spotOrMarginSubs accepts a map of pairs and a template subscription and returns a list of subscriptions for Spot and Margin pairs +// spotOrMarginPairSubs accepts a map of pairs and a template subscription and returns a list of subscriptions for Spot and Margin pairs // If there's a Spot subscription, it won't be added again as a Margin subscription // If joined param is true then one subscription per asset type with the currencies comma delimited -func spotOrMarginSubs(assetPairs map[asset.Item]currency.Pairs, base subscription.Subscription, join bool, fmtArgs ...any) []subscription.Subscription { //nolint:gocritic // hugeParam +func spotOrMarginPairSubs(assetPairs map[asset.Item]currency.Pairs, base subscription.Subscription, join bool, fmtArgs ...any) []subscription.Subscription { //nolint:gocritic // hugeParam subs := []subscription.Subscription{} add := func(a asset.Item, pairs currency.Pairs) { if len(pairs) == 0 { @@ -1141,6 +1145,35 @@ func spotOrMarginSubs(assetPairs map[asset.Item]currency.Pairs, base subscriptio return subs } +// spotOrMarginCurrencySubs accepts a map of pairs and a template subscription and returns a list of subscriptions for every currency in Spot and Margin pairs +// If there's a Spot subscription, it won't be added again as a Margin subscription +func spotOrMarginCurrencySubs(assetPairs map[asset.Item]currency.Pairs, base subscription.Subscription) []subscription.Subscription { //nolint:gocritic // hugeParam + subs := []subscription.Subscription{} + add := func(a asset.Item, currs currency.Currencies) { + if len(currs) == 0 { + return + } + s := base + s.Asset = a + for _, c := range currs { + s.Channel = fmt.Sprintf(base.Channel, c) + subs = append(subs, s) + } + } + + add(asset.Spot, assetPairs[asset.Spot].GetCurrencies()) + + marginCurrencies := currency.Currencies{} + for _, c := range assetPairs[asset.Margin].GetCurrencies() { + if !assetPairs[asset.Spot].ContainsCurrency(c) { + marginCurrencies = marginCurrencies.Add(c) + } + } + add(asset.Margin, marginCurrencies) + + return subs +} + // orderbookManager defines a way of managing and maintaining synchronisation // across connections and assets. type orderbookManager struct {