Skip to content

Commit

Permalink
Kraken: Subscription Templates
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Jul 23, 2024
1 parent 4654a67 commit f9f27f3
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 32 deletions.
2 changes: 1 addition & 1 deletion exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ func TestGenerateSubscriptions(t *testing.T) {
{Channel: subscription.OrderbookChannel, Levels: 1000},
}
for _, s := range exp {
s.QualifiedChannel = apiChannelName(s)
s.QualifiedChannel = channelName(s)
s.Asset = asset.Spot
s.Pairs = pairs
}
Expand Down
66 changes: 44 additions & 22 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"errors"
"fmt"
"hash/crc32"
"html/template"
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/buger/jsonparser"
Expand Down Expand Up @@ -62,7 +62,7 @@ const (
krakenWsCandlesDefaultTimeframe = 1
)

var standardChannelNames = map[string]string{
var channelNames = map[string]string{
subscription.TickerChannel: krakenWsTicker,
subscription.OrderbookChannel: krakenWsOrderbook,
subscription.CandlesChannel: krakenWsOHLC,
Expand All @@ -74,7 +74,7 @@ var standardChannelNames = map[string]string{
var reverseChannelNames = map[string]string{}

func init() {
for k, v := range standardChannelNames {
for k, v := range channelNames {
reverseChannelNames[v] = k
}
}
Expand All @@ -88,6 +88,15 @@ var (
errInvalidChecksum = errors.New("invalid checksum")
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 1000},
{Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyTradesChannel, Authenticated: true},
}

// WsConnect initiates a websocket connection
func (k *Kraken) WsConnect() error {
if !k.Websocket.IsEnabled() || !k.IsEnabled() {
Expand Down Expand Up @@ -958,15 +967,11 @@ func (k *Kraken) wsProcessCandles(channelName string, response []any, pair curre
return nil
}

/*
generateSubscriptions sets up the configured subscriptions for the websocket
We don't use one sub with many pairs because:
- Kraken will fan out in responses anyay
- resubscribe is messy when our subs don't match their respsonses
- FlushChannels and GetChannelDiff would incorrectly resub existing subs if we don't generate the same as we've stored
// GetSubscriptionTemplate returns a subscription channel template
func (k *Kraken) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText)
}

We use $pair in a comment in the Templates to fan out without using it in QualifiedChannel
*/
func (k *Kraken) generateSubscriptions() (subscription.List, error) {
return k.Features.Subscriptions.ExpandTemplates(k)
}
Expand All @@ -975,6 +980,10 @@ func (k *Kraken) generateSubscriptions() (subscription.List, error) {
func (k *Kraken) Subscribe(in subscription.List) error {
var errs error

if in, errs = in.ExpandTemplates(k); errs != nil {
return errs
}

// Collect valid subs to subscribe to; Note that we won't RemoveSub on any that err on SetState or AddSub
subs := subscription.List{}
for _, s := range in {
Expand All @@ -987,7 +996,7 @@ func (k *Kraken) Subscribe(in subscription.List) error {
subs = append(subs, s)
}

// Group subscriptions by pairs for request, but expect per-sub responses; See generateSubscriptions() for explanation
// Group subscriptions by pairs for request, but expect per-sub responses
groupedSubs := subs.GroupPairs()

errs = common.AppendError(errs,
Expand Down Expand Up @@ -1221,16 +1230,9 @@ func (k *Kraken) wsProcessSubStatus(resp []byte) {
}
}

// GetSubscriptionTemplateFuncs returns functions available for subscription channel templating
func (b *Kraken) GetSubscriptionTemplateFuncs() template.FuncMap {
return template.FuncMap{
"channel": apiChannelName,
}
}

// apiChannelName converts a global channel name to kraken bespoke names
func apiChannelName(s *subscription.Subscription) string {
if n, ok := standardChannelNames[s.Channel]; ok {
// channelName converts a global channel name to kraken bespoke names
func channelName(s *subscription.Subscription) string {
if n, ok := channelNames[s.Channel]; ok {
return n
}
return s.Channel
Expand Down Expand Up @@ -1362,3 +1364,23 @@ func (k *Kraken) wsCancelAllOrders() (*WsCancelOrderResponse, error) {
}
return &resp, nil
}

/*
One sub per-pair. We don't use one sub with many pairs because:
- Kraken will fan out in responses anyay
- resubscribe is messy when our subs don't match their respsonses
- FlushChannels and GetChannelDiff would incorrectly resub existing subs if we don't generate the same as we've stored
*/
const subTplText = `
{{- if $.S.Asset -}}
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs -}}
{{- channelName $.S }}
{{- $.PairSeparator }}
{{- end -}}
{{ $.AssetSeparator }}
{{- end -}}
{{- else -}}
{{- channelName $.S }}
{{- end }}
`
10 changes: 1 addition & 9 deletions exchanges/kraken/kraken_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
Expand Down Expand Up @@ -169,14 +168,7 @@ func (k *Kraken) SetDefaults() {
GlobalResultLimit: 720,
},
},
Subscriptions: []*subscription.Subscription{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel, Template: "{{channel $s}}{{/* fan out $pair */}}"},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel, Template: "{{channel $s}}{{/* fan out $pair */}}"},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Template: "{{channel $s}}{{/* fan out $pair */}}", Interval: kline.OneMin},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Template: "{{channel $s}}{{/* fan out $pair */}}", Levels: 1000},
{Enabled: true, Channel: subscription.MyOrdersChannel, Template: "{{channel $s}}", Authenticated: true},
{Enabled: true, Channel: subscription.MyTradesChannel, Template: "{{channel $s}}", Authenticated: true},
},
Subscriptions: defaultSubscriptions.Clone(),
}

k.Requester, err = request.New(k.Name,
Expand Down

0 comments on commit f9f27f3

Please sign in to comment.