Skip to content

Commit

Permalink
Kraken: Subscription templating
Browse files Browse the repository at this point in the history
* Generate N+ subs for pairs
If we generate one sub for all pairs, but then fan it out in the
responses, we end up with a mis-match between the sub store and
GenerateSubs, and when we do FlushChannels it will try to resub
everything again.
  • Loading branch information
gbjk committed Jul 27, 2024
1 parent 789cc45 commit ebd06dc
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 249 deletions.
3 changes: 1 addition & 2 deletions currency/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ func NewBTCUSD() Pair {
return NewPair(BTC, USD)
}

// NewPairDelimiter splits the desired currency string at delimiter, the returns
// a Pair struct
// NewPairDelimiter splits the desired currency string at delimiter, the returns a Pair struct
func NewPairDelimiter(currencyPair, delimiter string) (Pair, error) {
if !strings.Contains(currencyPair, delimiter) {
return EMPTYPAIR,
Expand Down
2 changes: 0 additions & 2 deletions exchanges/kraken/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/thrasher-corp/gocryptotrader/common"
Expand Down Expand Up @@ -37,7 +36,6 @@ const (
// Kraken is the overarching type across the kraken package
type Kraken struct {
exchange.Base
wsRequestMtx sync.Mutex
}

// GetCurrentServerTime returns current server time
Expand Down
109 changes: 64 additions & 45 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
Expand Down Expand Up @@ -930,62 +929,71 @@ func TestWithdrawCancel(t *testing.T) {
// No objection to this becoming a fixture test, so long as it integrates through Un/Subscribe roundtrip
func TestWsSubscribe(t *testing.T) {
k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.TestInstance(k), "TestInstance must not error")
require.NoError(t, testexch.Setup(k), "TestInstance must not error")
testexch.SetupWs(t, k)

err := k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{xbtusdPair}}})
assert.NoError(t, err, "Simple subscription should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription")
subs := k.Websocket.GetSubscriptions()
require.Len(t, subs, 1, "Should add 1 Subscription")
assert.Equal(t, subscription.SubscribedState, subs[0].State(), "Subscription should be subscribed state")

err = k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{xbtusdPair}}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Resubscribing to the same channel should error with SubFailure")
assert.ErrorIs(t, err, subscription.ErrDuplicate, "Resubscribing to the same channel should error with SubscribedAlready")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error")
subs = k.Websocket.GetSubscriptions()
require.Len(t, subs, 1, "Should not add a subscription on error")
assert.Equal(t, subscription.SubscribedState, subs[0].State(), "Existing subscription state should not change")

err = k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Simple error subscription should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker Pairs: DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly")
require.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error")

// Mix success and failure
err = k.Subscribe(subscription.List{
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("ETH", "USD", "/")}},
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}},
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "ELF", "/")}},
})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Mixed error subscription should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/ELF", "Subscribing to an invalid pair should error correctly")
assert.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures")
assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "DWARF/ELF", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly")
require.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures")

// Just failures
err = k.Subscribe(subscription.List{
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}},
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")}},
})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Only failing subscriptions should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/GOBLIN", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "DWARF/GOBLIN", "Subscribing to an invalid pair should error correctly")
assert.ErrorContains(t, err, "DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly")
require.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures")

// Just success
err = k.Subscribe(subscription.List{
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("ETH", "XBT", "/")}},
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("LTC", "ETH", "/")}},
})
assert.NoError(t, err, "Multiple successful subscriptions should not error")

subs := k.Websocket.GetSubscriptions()
subs = k.Websocket.GetSubscriptions()
assert.Len(t, subs, 4, "Should have correct number of subscriptions")

err = k.Unsubscribe(subs[:1])
assert.NoError(t, err, "Simple Unsubscribe should succeed")
assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should have removed 1 channel")

err = k.Unsubscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "WIZARD", "/")}, Key: 1337}})
assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Simple failing Unsubscribe should error UnsubFail")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/WIZARD", "Unsubscribing from an invalid pair should error correctly")
assert.ErrorIs(t, err, subscription.ErrNotFound, "Simple failing Unsubscribe should error NotFound")
assert.ErrorContains(t, err, "DWARF/WIZARD", "Unsubscribing from an invalid pair should error correctly")
assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should not have removed any channels")

err = k.Unsubscribe(subscription.List{
subs[1],
{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "EAGLE", "/")}, Key: 1338},
})
assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Mixed failing Unsubscribe should error UnsubFail")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/EAGLE", "Unsubscribing from an invalid pair should error correctly")
assert.ErrorIs(t, err, subscription.ErrNotFound, "Mixed failing Unsubscribe should error NotFound")
assert.ErrorContains(t, err, "Channel: ticker Pairs: DWARF/EAGLE", "Unsubscribing from an invalid pair should error correctly")

subs = k.Websocket.GetSubscriptions()
assert.Len(t, subs, 2, "Should have removed only 1 more channel")
Expand All @@ -1009,10 +1017,11 @@ func TestWsOrderbookSub(t *testing.T) {
t.Parallel()

k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.TestInstance(k), "TestInstance must not error")
require.NoError(t, testexch.Setup(k), "TestInstance must not error")
testexch.SetupWs(t, k)

err := k.Subscribe(subscription.List{{
Asset: asset.Spot,
Channel: subscription.OrderbookChannel,
Pairs: currency.Pairs{xbtusdPair},
Levels: 25,
Expand All @@ -1031,7 +1040,6 @@ func TestWsOrderbookSub(t *testing.T) {
Pairs: currency.Pairs{xbtusdPair},
Levels: 42,
}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error")
assert.ErrorContains(t, err, "Subscription depth not supported", "Bad subscription should error about depth")
}

Expand All @@ -1040,10 +1048,11 @@ func TestWsCandlesSub(t *testing.T) {
t.Parallel()

k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.TestInstance(k), "TestInstance must not error")
require.NoError(t, testexch.Setup(k), "TestInstance must not error")
testexch.SetupWs(t, k)

err := k.Subscribe(subscription.List{{
Asset: asset.Spot,
Channel: subscription.CandlesChannel,
Pairs: currency.Pairs{xbtusdPair},
Interval: kline.OneHour,
Expand All @@ -1062,7 +1071,6 @@ func TestWsCandlesSub(t *testing.T) {
Pairs: currency.Pairs{xbtusdPair},
Interval: kline.Interval(time.Minute * time.Duration(127)),
}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error")
assert.ErrorContains(t, err, "Subscription ohlc interval not supported", "Bad subscription should error about interval")
}

Expand All @@ -1073,7 +1081,7 @@ func TestWsOwnTradesSub(t *testing.T) {
sharedtestvalues.SkipTestIfCredentialsUnset(t, k)

k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.TestInstance(k), "TestInstance must not error")
require.NoError(t, testexch.Setup(k), "TestInstance must not error")
testexch.SetupWs(t, k)

err := k.Subscribe(subscription.List{{Channel: subscription.MyTradesChannel, Authenticated: true}})
Expand All @@ -1091,30 +1099,40 @@ func TestWsOwnTradesSub(t *testing.T) {
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()

subs, err := k.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions should not error")
pairs, err := k.GetEnabledPairs(asset.Spot)
require.NoError(t, err, "GetEnabledPairs must not error")
pairs = pairs.Format(currency.PairFormat{Uppercase: true, Delimiter: "/"})
require.False(t, k.Websocket.CanUseAuthenticatedEndpoints(), "Websocket must not be authenticated by default")
expected := subscription.List{}
for _, exp := range k.Features.Subscriptions {
if exp.Authenticated {
continue
}
s := exp.Clone()
exp := subscription.List{
{Channel: subscription.TickerChannel},
{Channel: subscription.AllTradesChannel},
{Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Channel: subscription.OrderbookChannel, Levels: 1000},
}
for _, s := range exp {
s.QualifiedChannel = channelName(s)
s.Asset = asset.Spot
s.Pairs = pairs
expected = append(expected, s)
}
testsubs.Equal(t, expected, subs)
subs, err := k.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions should not error")
testsubs.EqualLists(t, exp, subs)

k.Websocket.SetCanUseAuthenticatedEndpoints(true)
exp = append(exp, subscription.List{
{Channel: subscription.MyOrdersChannel, QualifiedChannel: krakenWsOpenOrders},
{Channel: subscription.MyTradesChannel, QualifiedChannel: krakenWsOwnTrades},
}...)
subs, err = k.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions should not error")
testsubs.EqualLists(t, exp, subs)
}

func TestGetWSToken(t *testing.T) {
t.Parallel()
sharedtestvalues.SkipTestIfCredentialsUnset(t, k)

k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.TestInstance(k), "TestInstance must not error")
require.NoError(t, testexch.Setup(k), "TestInstance must not error")
testexch.SetupWs(t, k)

resp, err := k.GetWebsocketToken(context.Background())
Expand Down Expand Up @@ -1164,9 +1182,8 @@ func TestWsCancelAllOrders(t *testing.T) {

func TestWsHandleData(t *testing.T) {
t.Parallel()
base := k
k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.TestInstance(k), "TestInstance must not error")
require.NoError(t, testexch.Setup(k), "TestInstance must not error")
for _, l := range []int{10, 100} {
err := k.Websocket.AddSuccessfulSubscriptions(&subscription.Subscription{
Channel: subscription.OrderbookChannel,
Expand All @@ -1176,7 +1193,7 @@ func TestWsHandleData(t *testing.T) {
})
require.NoError(t, err, "AddSuccessfulSubscriptions must not error")
}
sharedtestvalues.TestFixtureToDataHandler(t, base, k, "testdata/wsHandleData.json", k.wsHandleData)
testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", k.wsHandleData)
}

func TestWsOpenOrders(t *testing.T) {
Expand Down Expand Up @@ -1414,19 +1431,21 @@ var websocketGSTEUROrderbookUpdates = []string{
func TestWsOrderbookMax10Depth(t *testing.T) {
t.Parallel()
k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.TestInstance(k), "TestInstance must not error")
require.NoError(t, testexch.Setup(k), "TestInstance must not error")
pairs := currency.Pairs{
currency.NewPairWithDelimiter("XDG", "USD", "/"),
currency.NewPairWithDelimiter("LUNA", "EUR", "/"),
currency.NewPairWithDelimiter("GST", "EUR", "/"),
}
err := k.Websocket.AddSuccessfulSubscriptions(&subscription.Subscription{
Channel: subscription.OrderbookChannel,
Pairs: pairs,
Asset: asset.Spot,
Levels: 10,
})
require.NoError(t, err, "AddSuccessfulSubscriptions must not error")
for _, p := range pairs {
err := k.Websocket.AddSuccessfulSubscriptions(&subscription.Subscription{
Channel: subscription.OrderbookChannel,
Pairs: currency.Pairs{p},
Asset: asset.Spot,
Levels: 10,
})
require.NoError(t, err, "AddSuccessfulSubscriptions must not error")
}

for x := range websocketXDGUSDOrderbookUpdates {
err := k.wsHandleData([]byte(websocketXDGUSDOrderbookUpdates[x]))
Expand Down
Loading

0 comments on commit ebd06dc

Please sign in to comment.