Skip to content

Commit

Permalink
Kucoin: Add support for MarketSnapshot
Browse files Browse the repository at this point in the history
MarketSnapshot is for the entire currency market; The naming is
confusing because they call all spot endpoints market as well.

Adds test coverage and breaks out the GenDefSubs tests
  • Loading branch information
gbjk committed Nov 3, 2023
1 parent 8b9e8ca commit c861e74
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 55 deletions.
8 changes: 8 additions & 0 deletions currency/currencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ func NewCurrenciesFromStringArray(currencies []string) Currencies {
// Currencies define a range of supported currency codes
type Currencies []Code

// Add adds a currency to the list if it doesn't exist
func (c Currencies) Add(a Code) Currencies {
if !c.Contains(a) {
c = append(c, a)
}
return c
}

// Strings returns an array of currency strings
func (c Currencies) Strings() []string {
list := make([]string, len(c))
Expand Down
12 changes: 12 additions & 0 deletions currency/currencies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package currency
import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
)

func TestCurrenciesUnmarshalJSON(t *testing.T) {
Expand Down Expand Up @@ -62,3 +64,13 @@ func TestMatch(t *testing.T) {
t.Fatal("should not match")
}
}

func TestCurrenciesAdd(t *testing.T) {
c := Currencies{}
c = c.Add(BTC)
assert.Len(t, c, 1, "Should have one currency")
c = c.Add(ETH)
assert.Len(t, c, 2, "Should have one currency")
c = c.Add(BTC)
assert.Len(t, c, 2, "Adding a duplicate should not change anything")
}
130 changes: 85 additions & 45 deletions exchanges/kucoin/kucoin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,47 +1973,52 @@ func TestPushData(t *testing.T) {
}
}

func verifySubs(tb testing.TB, subs []stream.ChannelSubscription, a asset.Item, prefix string, expected ...string) {
tb.Helper()
var sub *stream.ChannelSubscription
for i, s := range subs {
if s.Asset == a && strings.HasPrefix(s.Channel, prefix) {
if len(expected) == 1 && !strings.Contains(s.Channel, expected[0]) {
continue
}
if sub != nil {
assert.Failf(tb, "Too many subs for asset %s with prefix %s", a.String(), prefix)
return
}
sub = &subs[i]
}
}
if assert.NotNil(tb, sub, "Should find a sub for asset %s with prefix %s", a.String(), prefix) {
suffix := strings.TrimPrefix(sub.Channel, prefix)
if len(expected) == 0 {
assert.Empty(tb, suffix, "Sub for asset %s with prefix %s should have no symbol suffix", a.String(), prefix)
} else {
currs := strings.Split(suffix, ",")
assert.ElementsMatch(tb, currs, expected, "Currencies should match in sub for asset %s with prefix %s", a.String(), prefix)
}
}
}

func TestGenerateDefaultSubscriptions(t *testing.T) {
t.Parallel()

subs, err := ku.GenerateDefaultSubscriptions()
assert.NoError(t, err, "GenerateDefaultSubscriptions should not error")

check := func(a asset.Item, prefix string, expected ...string) {
var sub *stream.ChannelSubscription
for i, s := range subs {
if s.Asset == a && strings.HasPrefix(s.Channel, prefix) {
if len(expected) == 1 && !strings.Contains(s.Channel, expected[0]) {
continue
}
if sub != nil {
assert.Failf(t, "Too many subs for asset %s with prefix %s", a.String(), prefix)
return
}
sub = &subs[i]
}
}
if assert.NotNil(t, sub, "Should find a sub for asset %s with prefix %s", a.String(), prefix) {
suffix := strings.TrimPrefix(sub.Channel, prefix)
if len(expected) == 0 {
assert.Empty(t, suffix, "Sub for asset %s with prefix %s should have no symbol suffix", a.String(), prefix)
} else {
currs := strings.Split(suffix, ",")
assert.ElementsMatch(t, currs, expected, "Currencies should match in sub for asset %s with prefix %s", a.String(), prefix)
}
}
}

if assert.Len(t, subs, 12, "Should generate the correct number of subs when not logged in") {
for _, p := range []string{"ticker", "match", "level2"} {
check(asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT")
check(asset.Margin, "/market/"+p+":", "FET-BTC", "FET-ETH", "ANKR-BTC")
verifySubs(t, subs, asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT")
verifySubs(t, subs, asset.Margin, "/market/"+p+":", "FET-BTC", "FET-ETH", "ANKR-BTC")
}
for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} {
check(asset.Futures, "/contractMarket/tickerV2:", c)
check(asset.Futures, "/contractMarket/level2Depth50:", c)
verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c)
verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c)
}
}
}

func TestGenerateAuthSubscriptions(t *testing.T) {
t.Parallel()

// Create a parallel safe Kucoin to mess with
nu := new(Kucoin)
Expand All @@ -2022,43 +2027,78 @@ func TestGenerateDefaultSubscriptions(t *testing.T) {
nu.Websocket = sharedtestvalues.NewTestWebsocket()
nu.Websocket.SetCanUseAuthenticatedEndpoints(true)

subs, err = nu.GenerateDefaultSubscriptions()
subs, err := nu.GenerateDefaultSubscriptions()
assert.NoError(t, err, "GenerateDefaultSubscriptions with Auth should not error")

if assert.Len(t, subs, 25, "Should generate the correct number of subs when logged in") {
for _, p := range []string{"ticker", "match", "level2"} {
check(asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT")
check(asset.Margin, "/market/"+p+":", "FET-BTC", "FET-ETH", "ANKR-BTC")
verifySubs(t, subs, asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT")
verifySubs(t, subs, asset.Margin, "/market/"+p+":", "FET-BTC", "FET-ETH", "ANKR-BTC")
}
for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} {
check(asset.Futures, "/contractMarket/tickerV2:", c)
check(asset.Futures, "/contractMarket/level2Depth50:", c)
verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c)
verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c)
}
for _, c := range []string{"AVA", "FET", "BTC", "ETH", "ANKR", "LTC", "USDT"} {
check(asset.Margin, "/margin/loan:", c)
verifySubs(t, subs, asset.Margin, "/margin/loan:", c)
}
check(asset.Spot, "/account/balance")
check(asset.Margin, "/margin/position")
check(asset.Margin, "/margin/fundingBook:", "AVA", "FET", "BTC", "ETH", "ANKR", "LTC", "USDT")
check(asset.Futures, "/contractAccount/wallet")
check(asset.Futures, "/contractMarket/advancedOrders")
check(asset.Futures, "/contractMarket/tradeOrders")
verifySubs(t, subs, asset.Spot, "/account/balance")
verifySubs(t, subs, asset.Margin, "/margin/position")
verifySubs(t, subs, asset.Margin, "/margin/fundingBook:", "AVA", "FET", "BTC", "ETH", "ANKR", "LTC", "USDT")
verifySubs(t, subs, asset.Futures, "/contractAccount/wallet")
verifySubs(t, subs, asset.Futures, "/contractMarket/advancedOrders")
verifySubs(t, subs, asset.Futures, "/contractMarket/tradeOrders")
}
}

func TestGenerateCandleSubscription(t *testing.T) {
t.Parallel()

// Create a parallel safe Kucoin to mess with
nu := new(Kucoin)
nu.Base.Features = ku.Base.Features
nu.Websocket = sharedtestvalues.NewTestWebsocket()
assert.NoError(t, nu.CurrencyPairs.Load(&ku.CurrencyPairs), "Loading Pairs should not error")

// Test candle intervals
nu.Features.Enabled.Subscriptions = []stream.ChannelSubscription{
{Channel: stream.CandlesSubscription, Interval: kline.FourHour},
}

subs, err = nu.GenerateDefaultSubscriptions()
subs, err := nu.GenerateDefaultSubscriptions()
assert.NoError(t, err, "GenerateDefaultSubscriptions with Candles should not error")

if assert.Len(t, subs, 7, "Should generate the correct number of subs for candles") {
for _, c := range []string{"BTC-USDT", "ETH-USDT", "LTC-USDT", "AVA-USDT"} {
check(asset.Spot, "/market/candles:", c+"_4hour")
verifySubs(t, subs, asset.Spot, "/market/candles:", c+"_4hour")
}
for _, c := range []string{"FET-BTC", "FET-ETH", "ANKR-BTC"} {
check(asset.Margin, "/market/candles:", c+"_4hour")
verifySubs(t, subs, asset.Margin, "/market/candles:", c+"_4hour")
}
}
}

func TestGenerateMarketSubscription(t *testing.T) {
t.Parallel()

// Create a parallel safe Kucoin to mess with
nu := new(Kucoin)
nu.Base.Features = ku.Base.Features
nu.Websocket = sharedtestvalues.NewTestWebsocket()
assert.NoError(t, nu.CurrencyPairs.Load(&ku.CurrencyPairs), "Loading Pairs should not error")

nu.Features.Enabled.Subscriptions = []stream.ChannelSubscription{
{Channel: marketSnapshotChannel},
}

subs, err := nu.GenerateDefaultSubscriptions()
assert.NoError(t, err, "GenerateDefaultSubscriptions with MarketSnapshot should not error")

if assert.Len(t, subs, 7, "Should generate the correct number of subs for snapshot") {
for _, c := range []string{"AVA", "BTC", "ETH", "LTC", "USDT"} {
verifySubs(t, subs, asset.Spot, "/market/snapshot:", c)
}
for _, c := range []string{"FET", "ANKR"} {
verifySubs(t, subs, asset.Margin, "/market/snapshot:", c)
}
}
}
Expand Down
53 changes: 43 additions & 10 deletions exchanges/kucoin/kucoin_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ const (
privateBullets = "/v1/bullet-private"

// spot channels
marketTickerChannel = "/market/ticker:%s" // /market/ticker:{symbol},{symbol}...
marketAllTickersChannel = "/market/ticker:all"
marketTickerSnapshotChannel = "/market/snapshot:%s" // /market/snapshot:{symbol}
marketOrderbookLevel2Channels = "/market/level2:%s" // /market/level2:{symbol},{symbol}...
marketTickerChannel = "/market/ticker:%s" // /market/ticker:{symbol},{symbol}...
marketSymbolSnapshotChannel = "/market/snapshot:%s" // /market/snapshot:{symbol}
marketSnapshotChannel = "/market/snapshot:%v" // /market/snapshot:{market} <--- market represents a currency
marketOrderbookLevel2Channels = "/market/level2:%s" // /market/level2:{pair},{pair}...
marketOrderbookLevel2to5Channel = "/spotMarket/level2Depth5:%s" // /spotMarket/level2Depth5:{symbol},{symbol}...
marketOrderbokLevel2To50Channel = "/spotMarket/level2Depth50:%s" // /spotMarket/level2Depth50:{symbol},{symbol}...
marketCandlesChannel = "/market/candles:%s_%s" // /market/candles:{symbol}_{interval}
Expand Down Expand Up @@ -234,7 +235,7 @@ func (ku *Kucoin) wsHandleData(respData []byte) error {
instruments = topicInfo[1]
}
return ku.processTicker(resp.Data, instruments)
case strings.HasPrefix(marketTickerSnapshotChannel, topicInfo[0]):
case strings.HasPrefix(marketSymbolSnapshotChannel, topicInfo[0]):
return ku.processMarketSnapshot(resp.Data, topicInfo[1])
case strings.HasPrefix(marketOrderbookLevel2Channels, topicInfo[0]):
return ku.processOrderbookWithDepth(respData, topicInfo[1])
Expand Down Expand Up @@ -1017,7 +1018,7 @@ func getChannelsAssetType(channelName string) asset.Item {
case futuresTickerV2Channel, futuresTickerChannel, futuresOrderbookLevel2Channel, futuresExecutionDataChannel, futuresOrderbookLevel2Depth5Channel, futuresOrderbookLevel2Depth50Channel, futuresContractMarketDataChannel, futuresSystemAnnouncementChannel, futuresTrasactionStatisticsTimerEventChannel, futuresTradeOrdersBySymbolChannel, futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel, futuresPositionChangeEventChannel:
return asset.Futures
case marketTickerChannel, marketAllTickersChannel,
marketTickerSnapshotChannel,
marketSnapshotChannel, marketSymbolSnapshotChannel,
marketOrderbookLevel2Channels, marketOrderbookLevel2to5Channel,
marketOrderbokLevel2To50Channel, marketCandlesChannel,
marketMatchChannel, indexPriceIndicatorChannel, markPriceIndicatorChannel,
Expand Down Expand Up @@ -1064,11 +1065,14 @@ func (ku *Kucoin) GenerateDefaultSubscriptions() ([]subscription.Subscription, e
if err != nil {
return nil, err
}
subs := spotOrMarginSubs(assetPairs, s, false, interval)
subs := spotOrMarginPairSubs(assetPairs, s, false, interval)
subscriptions = append(subscriptions, subs...)
case s.Channel == marginFundingbookChangeChannel:
s.Channel = fmt.Sprintf(s.Channel, assetPairs[asset.Margin].GetCurrencies().Join())
subscriptions = append(subscriptions, s)
case s.Channel == marketSnapshotChannel:
subs := spotOrMarginCurrencySubs(assetPairs, s)
subscriptions = append(subscriptions, subs...)
case getChannelsAssetType(s.Channel) == asset.Futures && isSymbolChannel(s.Channel):
for _, p := range assetPairs[asset.Futures] {
c, err := ku.FormatExchangeCurrency(p, asset.Futures)
Expand All @@ -1081,7 +1085,7 @@ func (ku *Kucoin) GenerateDefaultSubscriptions() ([]subscription.Subscription, e
}
case isSymbolChannel(s.Channel):
// Subscriptions which can use a single comma-separated sub per asset
subs := spotOrMarginSubs(assetPairs, s, true)
subs := spotOrMarginPairSubs(assetPairs, s, true)
subscriptions = append(subscriptions, subs...)
default:
subscriptions = append(subscriptions, s)
Expand All @@ -1092,7 +1096,7 @@ func (ku *Kucoin) GenerateDefaultSubscriptions() ([]subscription.Subscription, e

// isSymbolChannel returns true it this channel path ends in a formatting %s to accept a Symbol
func isSymbolChannel(c string) bool {
return strings.HasSuffix(c, "%s")
return strings.HasSuffix(c, "%s") || strings.HasSuffix(c, "%v")
}

// channelName converts global channel Names used in config of channel input into kucoin channel names
Expand All @@ -1104,10 +1108,10 @@ func channelName(name string) string {
return name
}

// spotOrMarginSubs accepts a map of pairs and a template subscription and returns a list of subscriptions for Spot and Margin pairs
// spotOrMarginPairSubs accepts a map of pairs and a template subscription and returns a list of subscriptions for Spot and Margin pairs
// If there's a Spot subscription, it won't be added again as a Margin subscription
// If joined param is true then one subscription per asset type with the currencies comma delimited
func spotOrMarginSubs(assetPairs map[asset.Item]currency.Pairs, base subscription.Subscription, join bool, fmtArgs ...any) []subscription.Subscription { //nolint:gocritic // hugeParam
func spotOrMarginPairSubs(assetPairs map[asset.Item]currency.Pairs, base subscription.Subscription, join bool, fmtArgs ...any) []subscription.Subscription { //nolint:gocritic // hugeParam
subs := []subscription.Subscription{}
add := func(a asset.Item, pairs currency.Pairs) {
if len(pairs) == 0 {
Expand Down Expand Up @@ -1141,6 +1145,35 @@ func spotOrMarginSubs(assetPairs map[asset.Item]currency.Pairs, base subscriptio
return subs
}

// spotOrMarginCurrencySubs accepts a map of pairs and a template subscription and returns a list of subscriptions for every currency in Spot and Margin pairs
// If there's a Spot subscription, it won't be added again as a Margin subscription
func spotOrMarginCurrencySubs(assetPairs map[asset.Item]currency.Pairs, base subscription.Subscription) []subscription.Subscription { //nolint:gocritic // hugeParam
subs := []subscription.Subscription{}
add := func(a asset.Item, currs currency.Currencies) {
if len(currs) == 0 {
return
}
s := base
s.Asset = a
for _, c := range currs {
s.Channel = fmt.Sprintf(base.Channel, c)
subs = append(subs, s)
}
}

add(asset.Spot, assetPairs[asset.Spot].GetCurrencies())

marginCurrencies := currency.Currencies{}
for _, c := range assetPairs[asset.Margin].GetCurrencies() {
if !assetPairs[asset.Spot].ContainsCurrency(c) {
marginCurrencies = marginCurrencies.Add(c)
}
}
add(asset.Margin, marginCurrencies)

return subs
}

// orderbookManager defines a way of managing and maintaining synchronisation
// across connections and assets.
type orderbookManager struct {
Expand Down

0 comments on commit c861e74

Please sign in to comment.