diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 8423dc1e421..14cad60fd3f 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -1338,6 +1338,39 @@ func TestWebsocketSubscribe(t *testing.T) { assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels") } +// TestWsOrderbook tests orderbook subscriptions for MaxDepth params +func TestWsOrderbook(t *testing.T) { + setupWsTest(t) + + err := k.Subscribe([]stream.ChannelSubscription{{ + Channel: krakenWsOrderbook, + Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"), + Params: map[string]any{ + ChannelOrderbookDepthKey: 25, + }}}) + assert.NoError(t, err, "Simple subscription should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription") + + subs := k.Websocket.GetSubscriptions() + assert.Len(t, subs, 1, "Should have 1 subscription channel") + key, ok := subs[0].Key.(stream.DefaultChannelKey) + assert.True(t, ok, "Subscription key should be a DefaultChannelKey") + assert.Equal(t, "book-25", key.Channel, "Key Channel should be correct") + + err = k.Unsubscribe(subs) + assert.NoError(t, err, "Unsubscribe should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels") + + err = k.Subscribe([]stream.ChannelSubscription{{ + Channel: krakenWsOrderbook, + Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"), + Params: map[string]any{ + ChannelOrderbookDepthKey: 42, + }}}) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error") + assert.ErrorContains(t, err, "Subscription depth not supported", "Bad subscription should error about depth") +} + // TestWebsocketSubscribeAuth tests Auth's subscriptions func TestWebsocketSubscribeAuth(t *testing.T) { setupWsAuthTest(t) diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index d9c806d168a..838a67461cb 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -71,6 +71,9 @@ const ( statusOpen = "open" krakenFormat = "2006-01-02T15:04:05.000Z" + + // ChannelOrderbookDepthKey configures the orderbook depth in stream.ChannelSubscription.Params + ChannelOrderbookDepthKey = "_depth" ) var ( diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 6b0bfad8148..3ac89a8add4 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -33,27 +33,27 @@ const ( krakenWSSandboxURL = "wss://sandbox.kraken.com" krakenWSSupportedVersion = "1.4.0" // WS endpoints - krakenWsHeartbeat = "heartbeat" - krakenWsSystemStatus = "systemStatus" - krakenWsSubscribe = "subscribe" - krakenWsSubscriptionStatus = "subscriptionStatus" - krakenWsUnsubscribe = "unsubscribe" - krakenWsTicker = "ticker" - krakenWsOHLC = "ohlc" - krakenWsTrade = "trade" - krakenWsSpread = "spread" - krakenWsOrderbook = "book" - krakenWsOwnTrades = "ownTrades" - krakenWsOpenOrders = "openOrders" - krakenWsAddOrder = "addOrder" - krakenWsCancelOrder = "cancelOrder" - krakenWsCancelAll = "cancelAll" - krakenWsAddOrderStatus = "addOrderStatus" - krakenWsCancelOrderStatus = "cancelOrderStatus" - krakenWsCancelAllOrderStatus = "cancelAllStatus" - krakenWsRateLimit = 50 - krakenWsPingDelay = time.Second * 27 - krakenWsOrderbookDepth = 1000 + krakenWsHeartbeat = "heartbeat" + krakenWsSystemStatus = "systemStatus" + krakenWsSubscribe = "subscribe" + krakenWsSubscriptionStatus = "subscriptionStatus" + krakenWsUnsubscribe = "unsubscribe" + krakenWsTicker = "ticker" + krakenWsOHLC = "ohlc" + krakenWsTrade = "trade" + krakenWsSpread = "spread" + krakenWsOrderbook = "book" + krakenWsOwnTrades = "ownTrades" + krakenWsOpenOrders = "openOrders" + krakenWsAddOrder = "addOrder" + krakenWsCancelOrder = "cancelOrder" + krakenWsCancelAll = "cancelAll" + krakenWsAddOrderStatus = "addOrderStatus" + krakenWsCancelOrderStatus = "cancelOrderStatus" + krakenWsCancelAllOrderStatus = "cancelAllStatus" + krakenWsRateLimit = 50 + krakenWsPingDelay = time.Second * 27 + krakenWsOrderbookDefaultDepth = 1000 ) var ( @@ -781,13 +781,17 @@ func (k *Kraken) wsProcessOrderBook(c *stream.ChannelSubscription, data map[stri // wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair func (k *Kraken) wsProcessOrderBookPartial(c *stream.ChannelSubscription, askData, bidData []interface{}) error { + depth, err := depthFromChan(c) + if err != nil { + return err + } base := orderbook.Base{ Pair: c.Currency, Asset: c.Asset, VerifyOrderbook: k.CanVerifyOrderbook, Bids: make(orderbook.Items, len(bidData)), Asks: make(orderbook.Items, len(askData)), - MaxDepth: krakenWsOrderbookDepth, + MaxDepth: depth, } // Kraken ob data is timestamped per price, GCT orderbook data is // timestamped per entry using the highest last update time, we can attempt @@ -1123,17 +1127,23 @@ func (k *Kraken) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e for i := range defaultSubscribedChannels { for j := range enabledCurrencies { enabledCurrencies[j].Delimiter = "/" - subscriptions = append(subscriptions, stream.ChannelSubscription{ + c := stream.ChannelSubscription{ Channel: defaultSubscribedChannels[i], Currency: enabledCurrencies[j], Asset: asset.Spot, - }) + } + if defaultSubscribedChannels[i] == krakenWsOrderbook { + c.Params[ChannelOrderbookDepthKey] = krakenWsOrderbookDefaultDepth + } + + subscriptions = append(subscriptions, c) } } if k.Websocket.CanUseAuthenticatedEndpoints() { for i := range authenticatedChannels { subscriptions = append(subscriptions, stream.ChannelSubscription{ Channel: authenticatedChannels[i], + Asset: asset.Spot, }) } } @@ -1204,6 +1214,8 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) } else { r.Subscription.Depth = depth + // All responses will have book-N as the channel name + key.Channel += "-" + strconv.Itoa(depth) } } @@ -1223,6 +1235,7 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { return wErr } + c.Key = key k.Websocket.AddSuccessfulSubscriptions(*c) if k.Verbose { log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Currency) @@ -1276,7 +1289,7 @@ func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error { } func depthFromChan(c *stream.ChannelSubscription) (int, error) { - depthAny, ok := c.Params["depth"] + depthAny, ok := c.Params[ChannelOrderbookDepthKey] if !ok { return 0, errMaxDepthMissing }