Skip to content

Commit

Permalink
Kraken: Add configurable Book depth
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Oct 1, 2023
1 parent a3ae4e1 commit 884ea94
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 25 deletions.
33 changes: 33 additions & 0 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,39 @@ func TestWebsocketSubscribe(t *testing.T) {
assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels")
}

// TestWsOrderbook tests orderbook subscriptions for MaxDepth params
func TestWsOrderbook(t *testing.T) {
setupWsTest(t)

err := k.Subscribe([]stream.ChannelSubscription{{
Channel: krakenWsOrderbook,
Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"),
Params: map[string]any{
ChannelOrderbookDepthKey: 25,
}}})
assert.NoError(t, err, "Simple subscription should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription")

subs := k.Websocket.GetSubscriptions()
assert.Len(t, subs, 1, "Should have 1 subscription channel")
key, ok := subs[0].Key.(stream.DefaultChannelKey)
assert.True(t, ok, "Subscription key should be a DefaultChannelKey")
assert.Equal(t, "book-25", key.Channel, "Key Channel should be correct")

err = k.Unsubscribe(subs)
assert.NoError(t, err, "Unsubscribe should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels")

err = k.Subscribe([]stream.ChannelSubscription{{
Channel: krakenWsOrderbook,
Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"),
Params: map[string]any{
ChannelOrderbookDepthKey: 42,
}}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error")
assert.ErrorContains(t, err, "Subscription depth not supported", "Bad subscription should error about depth")
}

// TestWebsocketSubscribeAuth tests Auth's subscriptions
func TestWebsocketSubscribeAuth(t *testing.T) {
setupWsAuthTest(t)
Expand Down
3 changes: 3 additions & 0 deletions exchanges/kraken/kraken_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ const (
statusOpen = "open"

krakenFormat = "2006-01-02T15:04:05.000Z"

// ChannelOrderbookDepthKey configures the orderbook depth in stream.ChannelSubscription.Params
ChannelOrderbookDepthKey = "_depth"
)

var (
Expand Down
63 changes: 38 additions & 25 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,27 @@ const (
krakenWSSandboxURL = "wss://sandbox.kraken.com"
krakenWSSupportedVersion = "1.4.0"
// WS endpoints
krakenWsHeartbeat = "heartbeat"
krakenWsSystemStatus = "systemStatus"
krakenWsSubscribe = "subscribe"
krakenWsSubscriptionStatus = "subscriptionStatus"
krakenWsUnsubscribe = "unsubscribe"
krakenWsTicker = "ticker"
krakenWsOHLC = "ohlc"
krakenWsTrade = "trade"
krakenWsSpread = "spread"
krakenWsOrderbook = "book"
krakenWsOwnTrades = "ownTrades"
krakenWsOpenOrders = "openOrders"
krakenWsAddOrder = "addOrder"
krakenWsCancelOrder = "cancelOrder"
krakenWsCancelAll = "cancelAll"
krakenWsAddOrderStatus = "addOrderStatus"
krakenWsCancelOrderStatus = "cancelOrderStatus"
krakenWsCancelAllOrderStatus = "cancelAllStatus"
krakenWsRateLimit = 50
krakenWsPingDelay = time.Second * 27
krakenWsOrderbookDepth = 1000
krakenWsHeartbeat = "heartbeat"
krakenWsSystemStatus = "systemStatus"
krakenWsSubscribe = "subscribe"
krakenWsSubscriptionStatus = "subscriptionStatus"
krakenWsUnsubscribe = "unsubscribe"
krakenWsTicker = "ticker"
krakenWsOHLC = "ohlc"
krakenWsTrade = "trade"
krakenWsSpread = "spread"
krakenWsOrderbook = "book"
krakenWsOwnTrades = "ownTrades"
krakenWsOpenOrders = "openOrders"
krakenWsAddOrder = "addOrder"
krakenWsCancelOrder = "cancelOrder"
krakenWsCancelAll = "cancelAll"
krakenWsAddOrderStatus = "addOrderStatus"
krakenWsCancelOrderStatus = "cancelOrderStatus"
krakenWsCancelAllOrderStatus = "cancelAllStatus"
krakenWsRateLimit = 50
krakenWsPingDelay = time.Second * 27
krakenWsOrderbookDefaultDepth = 1000
)

var (
Expand Down Expand Up @@ -781,13 +781,17 @@ func (k *Kraken) wsProcessOrderBook(c *stream.ChannelSubscription, data map[stri

// wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair
func (k *Kraken) wsProcessOrderBookPartial(c *stream.ChannelSubscription, askData, bidData []interface{}) error {
depth, err := depthFromChan(c)
if err != nil {
return err
}
base := orderbook.Base{
Pair: c.Currency,
Asset: c.Asset,
VerifyOrderbook: k.CanVerifyOrderbook,
Bids: make(orderbook.Items, len(bidData)),
Asks: make(orderbook.Items, len(askData)),
MaxDepth: krakenWsOrderbookDepth,
MaxDepth: depth,
}
// Kraken ob data is timestamped per price, GCT orderbook data is
// timestamped per entry using the highest last update time, we can attempt
Expand Down Expand Up @@ -1123,17 +1127,23 @@ func (k *Kraken) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e
for i := range defaultSubscribedChannels {
for j := range enabledCurrencies {
enabledCurrencies[j].Delimiter = "/"
subscriptions = append(subscriptions, stream.ChannelSubscription{
c := stream.ChannelSubscription{
Channel: defaultSubscribedChannels[i],
Currency: enabledCurrencies[j],
Asset: asset.Spot,
})
}
if defaultSubscribedChannels[i] == krakenWsOrderbook {
c.Params[ChannelOrderbookDepthKey] = krakenWsOrderbookDefaultDepth
}

subscriptions = append(subscriptions, c)
}
}
if k.Websocket.CanUseAuthenticatedEndpoints() {
for i := range authenticatedChannels {
subscriptions = append(subscriptions, stream.ChannelSubscription{
Channel: authenticatedChannels[i],
Asset: asset.Spot,
})
}
}
Expand Down Expand Up @@ -1204,6 +1214,8 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
} else {

Check warning on line 1215 in exchanges/kraken/kraken_websocket.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary) (revive)
r.Subscription.Depth = depth
// All responses will have book-N as the channel name
key.Channel += "-" + strconv.Itoa(depth)
}
}

Expand All @@ -1223,6 +1235,7 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error {
return wErr
}

c.Key = key
k.Websocket.AddSuccessfulSubscriptions(*c)
if k.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Currency)
Expand Down Expand Up @@ -1276,7 +1289,7 @@ func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error {
}

func depthFromChan(c *stream.ChannelSubscription) (int, error) {
depthAny, ok := c.Params["depth"]
depthAny, ok := c.Params[ChannelOrderbookDepthKey]
if !ok {
return 0, errMaxDepthMissing
}
Expand Down

0 comments on commit 884ea94

Please sign in to comment.