From 191c5c17d2d96c4de72299131f876371a7ffe413 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Thu, 28 Sep 2023 15:17:41 +0700 Subject: [PATCH] Kraken: Add configurable Book depth --- exchanges/kraken/kraken_test.go | 22 ++++++++++ exchanges/kraken/kraken_types.go | 2 + exchanges/kraken/kraken_websocket.go | 66 +++++++++++++++++----------- 3 files changed, 65 insertions(+), 25 deletions(-) diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index e83805a5e3c..687f386e4a9 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -1336,6 +1336,28 @@ func TestWebsocketSubscribe(t *testing.T) { assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels") } +// TestWsOrderbook tests orderbook subscriptions for MaxDepth params +func TestWsOrderbook(t *testing.T) { + setupWsTest(t) + + c := []stream.ChannelSubscription{{ + Channel: krakenWsOrderbook, + Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"), + Params: map[string]any{ + ChannelOrderbookDepthKey: 25, + }}} + + err := k.Subscribe(c) + assert.NoError(t, err, "Simple subscription should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription") + + time.Sleep(2 * time.Second) + + err = k.Unsubscribe(c) + assert.NoError(t, err, "Unsubscribe should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels") +} + // TestWebsocketSubscribeAuth tests Auth's subscriptions func TestWebsocketSubscribeAuth(t *testing.T) { setupWsAuthTest(t) diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index 32f99075e86..09fab166bda 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -71,6 +71,8 @@ const ( statusOpen = "open" krakenFormat = "2006-01-02T15:04:05.000Z" + + ChannelOrderbookDepthKey = "_depth" ) var ( diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 97864a077a1..e5979773305 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -33,27 +33,27 @@ const ( krakenWSSandboxURL = "wss://sandbox.kraken.com" krakenWSSupportedVersion = "1.4.0" // WS endpoints - krakenWsHeartbeat = "heartbeat" - krakenWsSystemStatus = "systemStatus" - krakenWsSubscribe = "subscribe" - krakenWsSubscriptionStatus = "subscriptionStatus" - krakenWsUnsubscribe = "unsubscribe" - krakenWsTicker = "ticker" - krakenWsOHLC = "ohlc" - krakenWsTrade = "trade" - krakenWsSpread = "spread" - krakenWsOrderbook = "book" - krakenWsOwnTrades = "ownTrades" - krakenWsOpenOrders = "openOrders" - krakenWsAddOrder = "addOrder" - krakenWsCancelOrder = "cancelOrder" - krakenWsCancelAll = "cancelAll" - krakenWsAddOrderStatus = "addOrderStatus" - krakenWsCancelOrderStatus = "cancelOrderStatus" - krakenWsCancelAllOrderStatus = "cancelAllStatus" - krakenWsRateLimit = 50 - krakenWsPingDelay = time.Second * 27 - krakenWsOrderbookDepth = 1000 + krakenWsHeartbeat = "heartbeat" + krakenWsSystemStatus = "systemStatus" + krakenWsSubscribe = "subscribe" + krakenWsSubscriptionStatus = "subscriptionStatus" + krakenWsUnsubscribe = "unsubscribe" + krakenWsTicker = "ticker" + krakenWsOHLC = "ohlc" + krakenWsTrade = "trade" + krakenWsSpread = "spread" + krakenWsOrderbook = "book" + krakenWsOwnTrades = "ownTrades" + krakenWsOpenOrders = "openOrders" + krakenWsAddOrder = "addOrder" + krakenWsCancelOrder = "cancelOrder" + krakenWsCancelAll = "cancelAll" + krakenWsAddOrderStatus = "addOrderStatus" + krakenWsCancelOrderStatus = "cancelOrderStatus" + krakenWsCancelAllOrderStatus = "cancelAllStatus" + krakenWsRateLimit = 50 + krakenWsPingDelay = time.Second * 27 + krakenWsOrderbookDefaultDepth = 1000 ) var ( @@ -783,13 +783,18 @@ func (k *Kraken) wsProcessOrderBook(c *stream.ChannelSubscription, data map[stri // wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair func (k *Kraken) wsProcessOrderBookPartial(c *stream.ChannelSubscription, askData, bidData []interface{}) error { + depth, err := depthFromChan(c) + if err != nil { + log.Errorln(log.Global, "From here") + return err + } base := orderbook.Base{ Pair: c.Currency, Asset: c.Asset, VerifyOrderbook: k.CanVerifyOrderbook, Bids: make(orderbook.Items, len(bidData)), Asks: make(orderbook.Items, len(askData)), - MaxDepth: krakenWsOrderbookDepth, + MaxDepth: depth, } // Kraken ob data is timestamped per price, GCT orderbook data is // timestamped per entry using the highest last update time, we can attempt @@ -1125,17 +1130,23 @@ func (k *Kraken) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e for i := range defaultSubscribedChannels { for j := range enabledCurrencies { enabledCurrencies[j].Delimiter = "/" - subscriptions = append(subscriptions, stream.ChannelSubscription{ + c := stream.ChannelSubscription{ Channel: defaultSubscribedChannels[i], Currency: enabledCurrencies[j], Asset: asset.Spot, - }) + } + if defaultSubscribedChannels[i] == krakenWsOrderbook { + c.Params[ChannelOrderbookDepthKey] = krakenWsOrderbookDefaultDepth + } + + subscriptions = append(subscriptions, c) } } if k.Websocket.CanUseAuthenticatedEndpoints() { for i := range authenticatedChannels { subscriptions = append(subscriptions, stream.ChannelSubscription{ Channel: authenticatedChannels[i], + Asset: asset.Spot, }) } } @@ -1192,11 +1203,15 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { r.Pairs = []string{c.Currency.String()} } + key := stream.DefaultChannelKey{c.Channel, c.Currency, c.Asset} + if c.Channel == krakenWsOrderbook { if depth, err := depthFromChan(c); err != nil { return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) } else { r.Subscription.Depth = depth + // All responses will have book-N as the channel name + key.Channel += "-" + strconv.Itoa(depth) } } @@ -1216,6 +1231,7 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { return wErr } + c.Key = key k.Websocket.AddSuccessfulSubscriptions(*c) if k.Verbose { log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Currency) @@ -1269,7 +1285,7 @@ func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error { } func depthFromChan(c *stream.ChannelSubscription) (int, error) { - depthAny, ok := c.Params["depth"] + depthAny, ok := c.Params[ChannelOrderbookDepthKey] if !ok { return 0, errMaxDepthMissing }