Skip to content

Commit

Permalink
Kraken: Add subscription configuration support
Browse files Browse the repository at this point in the history
* Add subscription configuration support to Kraken
* Add subscription.List
* Add subscription.ListToMap
* Abstract GetChannelDifference to subscription.Map.Diff
* Add subscriptiontest.Equal() for comparing lists of subs
  • Loading branch information
Beadko authored and gbjk committed Feb 10, 2024
1 parent 24cacd9 commit 4f3916f
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 112 deletions.
29 changes: 28 additions & 1 deletion exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

var k = &Kraken{}
var wsConnected bool
var btcusdtPair = currency.NewPair(currency.BTC, currency.USDT)

// Please add your own APIkeys here or in config/testdata.json to do correct due diligence testing
const (
Expand Down Expand Up @@ -1336,6 +1337,32 @@ func TestWsOwnTradesSub(t *testing.T) {
assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed channel")
}

// TestGenerateSubscriptions tests the subscriptions generated from configuration
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()

subs, err := k.GenerateSubscriptions()
require.NoError(t, err, "GenerateSubscriptions should not error")
expected := []subscription.Subscription{}
pairs, err := k.GetEnabledPairs(asset.Spot)
for i := range pairs {
pairs[i].Delimiter = "/"
}
require.NoError(t, err, "GetEnabledPairs must not error")
require.False(t, k.Websocket.CanUseAuthenticatedEndpoints(), "Websocket must not be authenticated by default")
for _, exp := range k.Features.Subscriptions {
if exp.Authenticated {
continue
}
s := *exp
s.Channel = channelName(s.Channel)
s.Asset = asset.Spot
s.Pairs = pairs
expected = append(expected, s)
}
testsubs.Equal(t, expected, subs)
}

func TestGetWSToken(t *testing.T) {
t.Parallel()
sharedtestvalues.SkipTestIfCredentialsUnset(t, k)
Expand Down
138 changes: 67 additions & 71 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ const (
krakenAuthWSURL = "wss://ws-auth.kraken.com"
krakenWSSandboxURL = "wss://sandbox.kraken.com"
krakenWSSupportedVersion = "1.4.0"
// WS endpoints

// Websocket Channels
krakenWsHeartbeat = "heartbeat"
krakenWsSystemStatus = "systemStatus"
krakenWsSubscribe = "subscribe"
Expand All @@ -58,21 +59,20 @@ const (
krakenWsCandlesDefaultTimeframe = 1
)

var subscriptionNames = map[string]string{
subscription.TickerChannel: krakenWsTicker,
subscription.OrderbookChannel: krakenWsOrderbook,
subscription.CandlesChannel: krakenWsOHLC,
subscription.AllTradesChannel: krakenWsTrade,
subscription.MyTradesChannel: krakenWsOwnTrades,
subscription.MyOrdersChannel: krakenWsOpenOrders,
// No equivalents for: AllOrders
}

var (
authToken string
)

// Channels require a topic and a currency
// Format [[ticker,but-t4u],[orderbook,nce-btt]]
var defaultSubscribedChannels = []string{
krakenWsTicker,
krakenWsTrade,
krakenWsOrderbook,
krakenWsOHLC,
krakenWsSpread,
}
var authenticatedChannels = []string{krakenWsOwnTrades, krakenWsOpenOrders}

// WsConnect initiates a websocket connection
func (k *Kraken) WsConnect() error {
if !k.Websocket.IsEnabled() || !k.IsEnabled() {
Expand Down Expand Up @@ -999,41 +999,37 @@ func (k *Kraken) wsProcessCandles(c *subscription.Subscription, response []any,
return nil
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (k *Kraken) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) {
// channelName converts global channel Names used in config of channel input into kucoin channel names
// returns the name unchanged if no match is found
func channelName(name string) string {
if s, ok := subscriptionNames[name]; ok {
return s
}
return name
}

// GenerateSubscriptions sets up the configured subscriptions for the websocket
func (k *Kraken) GenerateSubscriptions() ([]subscription.Subscription, error) {
subscriptions := []subscription.Subscription{}
enabledPairs, err := k.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
var subscriptions []subscription.Subscription
for i := range defaultSubscribedChannels {
/*
for j := range enabledPairs {
enabledPairs[j].Delimiter = "/"
}
*/
c := subscription.Subscription{
Channel: defaultSubscribedChannels[i],
Pairs: enabledPairs,
Asset: asset.Spot,
Params: map[string]any{},
}
switch defaultSubscribedChannels[i] {
case krakenWsOrderbook:
c.Params[ChannelOrderbookDepthKey] = krakenWsOrderbookDefaultDepth
case krakenWsOHLC:
c.Params[ChannelCandlesTimeframeKey] = krakenWsCandlesDefaultTimeframe
}
subscriptions = append(subscriptions, c)
for i := range enabledPairs {
enabledPairs[i].Delimiter = "/"
}
if k.Websocket.CanUseAuthenticatedEndpoints() {
for i := range authenticatedChannels {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: authenticatedChannels[i],
Asset: asset.Spot,
})
authed := k.Websocket.CanUseAuthenticatedEndpoints()
for _, baseSub := range k.Features.Subscriptions {
if !authed && baseSub.Authenticated {
continue
}
s := *baseSub
s.Channel = channelName(s.Channel)
s.Asset = asset.Spot
s.Pairs = enabledPairs
subscriptions = append(subscriptions, s)
}

return subscriptions, nil
}

Expand All @@ -1048,105 +1044,105 @@ func (k *Kraken) Unsubscribe(channels []subscription.Subscription) error {
}

// subscribeToChan sends a websocket message to receive data from the channel
func (k *Kraken) subscribeToChan(chans []subscription.Subscription) error {
if len(chans) != 1 {
func (k *Kraken) subscribeToChan(subs []subscription.Subscription) error {
if len(subs) != 1 {
return errors.New("Kraken subscription batching not yet implemented")
}
c := chans[0]
r, err := k.reqForSub(krakenWsSubscribe, &c)
s := subs[0]
r, err := k.reqForSub(krakenWsSubscribe, &s)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, s.Channel, s.Pairs, err)
}

if !c.Asset.IsValid() {
c.Asset = asset.Spot
if !s.Asset.IsValid() {
s.Asset = asset.Spot
}

err = ensureChannelKeyed(&c, r)
err = ensureChannelKeyed(&s, r)
if err != nil {
return err
}

c.State = subscription.SubscribingState
err = k.Websocket.AddSubscription(&c)
s.State = subscription.SubscribingState
err = k.Websocket.AddSubscription(&s)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, s.Channel, s.Pairs, err)
}

conn := k.Websocket.Conn
if common.StringDataContains(authenticatedChannels, r.Subscription.Name) {
if s.Authenticated {
r.Subscription.Token = authToken
conn = k.Websocket.AuthConn
}

respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r)
if err != nil {
k.Websocket.RemoveSubscriptions(c)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
k.Websocket.RemoveSubscriptions(s)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, s.Channel, s.Pairs, err)
}

if err = k.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrSubscriptionFailure, s.Channel, s.Pairs, err)
k.Websocket.DataHandler <- wErr
// Currently all or nothing on pairs; Alternatively parse response and remove failing pairs and retry
k.Websocket.RemoveSubscriptions(c)
k.Websocket.RemoveSubscriptions(s)
return wErr
}

if err = k.Websocket.SetSubscriptionState(&c, subscription.SubscribedState); err != nil {
if err = k.Websocket.SetSubscriptionState(&s, subscription.SubscribedState); err != nil {
log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, err)
}

if k.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Pairs)
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, s.Channel, s.Pairs)
}

return nil
}

// unsubscribeFromChan sends a websocket message to stop receiving data from a channel
func (k *Kraken) unsubscribeFromChan(chans []subscription.Subscription) error {
if len(chans) != 1 {
func (k *Kraken) unsubscribeFromChan(subs []subscription.Subscription) error {
if len(subs) != 1 {
return errors.New("Kraken subscription batching not yet implemented")
}
c := chans[0]
r, err := k.reqForSub(krakenWsUnsubscribe, &c)
s := subs[0]
r, err := k.reqForSub(krakenWsUnsubscribe, &s)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrUnsubscribeFailure, c.Channel, c.Pairs, err)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrUnsubscribeFailure, s.Channel, s.Pairs, err)
}

c.EnsureKeyed()
s.EnsureKeyed()

if err = k.Websocket.SetSubscriptionState(&c, subscription.UnsubscribingState); err != nil {
if err = k.Websocket.SetSubscriptionState(&s, subscription.UnsubscribingState); err != nil {
// err is probably ErrChannelInStateAlready, but we want to bubble it up to prevent an attempt to Subscribe again
// We can catch and ignore it in our call to resub
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrUnsubscribeFailure, c.Channel, c.Pairs, err)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrUnsubscribeFailure, s.Channel, s.Pairs, err)
}

conn := k.Websocket.Conn
if common.StringDataContains(authenticatedChannels, c.Channel) {
if s.Authenticated {
conn = k.Websocket.AuthConn
r.Subscription.Token = authToken
}

respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r)
if err != nil {
if e2 := k.Websocket.SetSubscriptionState(&c, subscription.SubscribedState); e2 != nil {
if e2 := k.Websocket.SetSubscriptionState(&s, subscription.SubscribedState); e2 != nil {
log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, e2)
}
return err
}

if err = k.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrUnsubscribeFailure, c.Channel, c.Pairs, err)
wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrUnsubscribeFailure, s.Channel, s.Pairs, err)
k.Websocket.DataHandler <- wErr
if e2 := k.Websocket.SetSubscriptionState(&c, subscription.SubscribedState); e2 != nil {
if e2 := k.Websocket.SetSubscriptionState(&s, subscription.SubscribedState); e2 != nil {
log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, e2)
}
return wErr
}

k.Websocket.RemoveSubscriptions(c)
k.Websocket.RemoveSubscriptions(s)

return nil
}
Expand Down
11 changes: 10 additions & 1 deletion exchanges/kraken/kraken_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
Expand Down Expand Up @@ -191,6 +192,14 @@ func (k *Kraken) SetDefaults() {
GlobalResultLimit: 720,
},
},
Subscriptions: []*subscription.Subscription{
{Enabled: true, Channel: subscription.TickerChannel},
{Enabled: true, Channel: subscription.AllTradesChannel},
{Enabled: true, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Channel: subscription.OrderbookChannel, Levels: 1000},
{Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyTradesChannel, Authenticated: true},
},
}

k.Requester, err = request.New(k.Name,
Expand Down Expand Up @@ -242,7 +251,7 @@ func (k *Kraken) Setup(exch *config.Exchange) error {
Connector: k.WsConnect,
Subscriber: k.Subscribe,
Unsubscriber: k.Unsubscribe,
GenerateSubscriptions: k.GenerateDefaultSubscriptions,
GenerateSubscriptions: k.GenerateSubscriptions,
Features: &k.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{SortBuffer: true},
})
Expand Down
32 changes: 3 additions & 29 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,36 +875,10 @@ func (w *Websocket) GetName() string {

// GetChannelDifference finds the difference between the subscribed channels
// and the new subscription list when pairs are disabled or enabled.
func (w *Websocket) GetChannelDifference(genSubs []subscription.Subscription) (sub, unsub []subscription.Subscription) {
func (w *Websocket) GetChannelDifference(newSubs []subscription.Subscription) (sub, unsub []subscription.Subscription) {
w.subscriptionMutex.RLock()
unsubMap := subscription.Map{}
for k, c := range w.subscriptions {
unsubMap[k] = c
}
w.subscriptionMutex.RUnlock()

for i := range genSubs {
key := genSubs[i].EnsureKeyed()

var found *subscription.Subscription
if m, ok := key.(subscription.MatchableKey); ok {
found = m.Match(unsubMap)
} else {
found = unsubMap[key]
}

if found != nil {
delete(unsubMap, found.Key) // If it's in both then we remove it from the unsubscribe list
} else {
sub = append(sub, genSubs[i]) // If it's in genSubs but not existing subs we want to subscribe
}
}

for _, c := range unsubMap {
unsub = append(unsub, *c)
}

return
defer w.subscriptionMutex.RUnlock()
return w.subscriptions.Diff(subscription.ListToMap(newSubs))
}

// UnsubscribeChannels unsubscribes from a websocket channel
Expand Down
Loading

0 comments on commit 4f3916f

Please sign in to comment.