Skip to content

Commit

Permalink
Kraken: Sub/Unsub tests and fixes
Browse files Browse the repository at this point in the history
* Use Websocket subscriptionChannels instead of local slice
* Remove ChannelID - Deprecated in docs
* Simplify ping handlers and hardcodes message
* Add Depth as configurable orderbook channel param
* Simplify auth/non-auth channel updates
  • Loading branch information
gbjk committed Sep 28, 2023
1 parent aca6f95 commit 469d0be
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 385 deletions.
132 changes: 100 additions & 32 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var k = &Kraken{}
var wsSetupRan, wsAuthSetupRan bool
var comms = make(chan stream.Response)

// Please add your own APIkeys to do correct due diligence testing.
// Please add your own APIkeys here or in config/testdata.json to do correct due diligence testing
const (
apiKey = ""
apiSecret = ""
Expand All @@ -52,9 +52,12 @@ func TestMain(m *testing.M) {
if err != nil {
log.Fatal(err)
}
krakenConfig.API.AuthenticatedSupport = true
krakenConfig.API.Credentials.Key = apiKey
krakenConfig.API.Credentials.Secret = apiSecret
if apiKey != "" {
krakenConfig.API.Credentials.Key = apiKey
}
if apiSecret != "" {
krakenConfig.API.Credentials.Secret = apiSecret
}
k.Websocket = sharedtestvalues.NewTestWebsocket()
err = k.Setup(krakenConfig)
if err != nil {
Expand Down Expand Up @@ -1207,19 +1210,19 @@ func TestWithdrawCancel(t *testing.T) {

// setupWsAuthTest will connect both websockets
// and should be called directly from a auth ws test
func setupWsAuthTest(t *testing.T) {
t.Helper()
setupWsTest(t)
setupAuthWs(t)
func setupWsAuthTest(tb testing.TB) {
tb.Helper()
setupWsTest(tb)
setupAuthWs(tb)
}

// setupWsTest will just connect the non-authenticated websocket
// and should be called directly from a non-auth ws test
func setupWsTest(t *testing.T) {
t.Helper()
func setupWsTest(tb testing.TB) {
tb.Helper()

if !k.Websocket.IsEnabled() {
t.Skip("Websocket not enabled")
tb.Skip("Websocket not enabled")
}

if wsSetupRan {
Expand All @@ -1229,26 +1232,23 @@ func setupWsTest(t *testing.T) {

var dialer websocket.Dialer
if err := k.Websocket.Conn.Dial(&dialer, http.Header{}); err != nil {
t.Fatalf("Dialing the websocket should not error: %s", err)
tb.Fatalf("Dialing the websocket should not error: %s", err)
}

go k.wsFunnelConnectionData(k.Websocket.Conn, comms)

go k.wsReadData(comms)
go func() {
err := k.wsPingHandler()
assert.NoError(t, err, "wsPingHandler should not error")
}()
go k.wsPingHandler(k.Websocket.Conn)
}

// setupAuthWs will just connect the authenticated websocket and should not be called directly
func setupAuthWs(t *testing.T) {
func setupAuthWs(tb testing.TB) {
tb.Helper()
if !k.API.AuthenticatedWebsocketSupport {
t.Skip("Authenticated Websocket not Supported")
tb.Skip("Authenticated Websocket not Supported")
}

if !sharedtestvalues.AreAPICredentialsSet(k) {
t.Skip("Authenticated Websocket credentials not set")
tb.Skip("Authenticated Websocket credentials not set")
}

if wsAuthSetupRan {
Expand All @@ -1259,28 +1259,96 @@ func setupAuthWs(t *testing.T) {
var err error
var dialer websocket.Dialer
if err = k.Websocket.AuthConn.Dial(&dialer, http.Header{}); err != nil {
t.Fatalf("Dialing the auth websocket should not error: %s", err)
tb.Fatalf("Dialing the auth websocket should not error: %s", err)
}

authToken, err = k.GetWebsocketToken(context.Background())
assert.NoError(t, err, "GetWebsocketToken should not error")
assert.NoError(tb, err, "GetWebsocketToken should not error")

go k.wsFunnelConnectionData(k.Websocket.AuthConn, comms)
}

// TestWebsocketSubscribe tests returning a message with an id
// TestWebsocketSubscribe tests unauthenticated websocket subscriptions
// Specifically looking to ensure multiple errors are collected and returned and ws.Subscriptions Added/Removed in cases of:
// single pass, single fail, mixed fail, multiple pass, all fail
// No objection to this becoming a fixture test, so long as it integrates through Un/Subscribe roundtrip
func TestWebsocketSubscribe(t *testing.T) {
setupWsTest(t)
err := k.Subscribe([]stream.ChannelSubscription{
{
Channel: defaultSubscribedChannels[0],
Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"),
},

err := k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}})
assert.NoError(t, err, "Simple subscription should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription")

err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}})
assert.NoError(t, err, "Resubscribing to the same channel shouldn't error")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error")

err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Simple error subscription should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/HOBBIT", "Subscribing to an invalid pair should yield the correct error")

err = k.Subscribe([]stream.ChannelSubscription{
{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("ETH", "USD", "/")},
{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")},
{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "ELF", "/")},
})
assert.NotNil(t, err, "Blah")
if err != nil {
t.Error(err)
}
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Mixed error subscription should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/ELF", "Subscribing to an invalid pair should yield the correct error")
assert.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures")

err = k.Subscribe([]stream.ChannelSubscription{
{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")},
{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")},
})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Only failing subscriptions should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/GOBLIN", "Subscribing to an invalid pair should yield the correct error")

err = k.Subscribe([]stream.ChannelSubscription{
{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("ETH", "XBT", "/")},
{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("LTC", "ETH", "/")},
})
assert.NoError(t, err, "Multiple successful subscriptions should not error")

subs := k.Websocket.GetSubscriptions()
assert.Len(t, subs, 4, "Should have correct number of subscriptions")

err = k.Unsubscribe(subs[:1])
assert.NoError(t, err, "Simple Unsubscribe should succeed")
assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should have removed 1 channel")

err = k.Unsubscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "WIZARD", "/"), Key: 1337}})
assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Simple failing Unsubscribe should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/WIZARD", "Simple failing Unsubscribe should error")
assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should not have have removed any channels")

err = k.Unsubscribe([]stream.ChannelSubscription{
subs[1],
{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "EAGLE", "/"), Key: 1338},
})
assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Mixed failing Unsubscribe should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/EAGLE", "Simple failing Unsubscribe should error")

subs = k.Websocket.GetSubscriptions()
assert.Len(t, subs, 2, "Should have removed only 1 more channel")

err = k.Unsubscribe(subs)
assert.NoError(t, err, "Unsubscribe multiple passing subscriptions should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels")
}

// TestWebsocketSubscribeAuth tests Auth's subscriptions
func TestWebsocketSubscribeAuth(t *testing.T) {
setupWsAuthTest(t)

err := k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsOwnTrades}})
assert.NoError(t, err, "Subsrcibing to ownTrades should not error")

subs := k.Websocket.GetSubscriptions()
assert.Len(t, subs, 1, "Should add 1 Subscription")

err = k.Unsubscribe(subs)
assert.NoError(t, err, "Unsubscribing an auth channel should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed channel")
}

func TestGetWSToken(t *testing.T) {
Expand Down
51 changes: 20 additions & 31 deletions exchanges/kraken/kraken_types.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package kraken

import (
"errors"
"time"

"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
)

const (
Expand Down Expand Up @@ -75,6 +75,10 @@ const (

var (
assetTranslator assetTranslatorStore

errNoWebsocketOrderbookData = errors.New("no websocket orderbook data")
errNoRequestID = errors.New("no RequestID in response")
errMaxDepthMissing = errors.New("MaxDepth missing for subscription")
)

// GenericResponse stores general response data for functions that only return success
Expand Down Expand Up @@ -495,43 +499,37 @@ type WithdrawStatusResponse struct {
Status string `json:"status"`
}

// WebsocketSubscriptionEventRequest handles WS subscription events
type WebsocketSubscriptionEventRequest struct {
Event string `json:"event"` // subscribe
RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message.
Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3).
Subscription WebsocketSubscriptionData `json:"subscription,omitempty"`
Channels []stream.ChannelSubscription `json:"-"` // Keeps track of associated subscriptions in batched outgoings
}

// WebsocketBaseEventRequest Just has an "event" property
type WebsocketBaseEventRequest struct {
Event string `json:"event"` // eg "unsubscribe"
// WebsocketSubscribeReq
type WebsocketSubscribeRequest struct {
Event string `json:"event"` // "subscribe"
RequestID int64 `json:"reqid,omitempty"`
Pairs []string `json:"pair,omitempty"`
Subscription WebsocketSubscriptionData `json:"subscription,omitempty"`
}

// WebsocketUnsubscribeByChannelIDEventRequest handles WS unsubscribe events
type WebsocketUnsubscribeByChannelIDEventRequest struct {
WebsocketBaseEventRequest
RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message.
Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3).
ChannelID int64 `json:"channelID,omitempty"`
// WebsocketUnsubscribeRequest handles WS unsubscribe events
type WebsocketUnsubscribeRequest struct {
Event string `json:"event"` // "unsubscribe"
RequestID int64 `json:"reqid,omitempty"`
Pairs []string `json:"pair,omitempty"`
Subscription WebsocketSubscriptionData `json:"subscription,omitempty"`
}

// WebsocketSubscriptionData contains details on WS channel
type WebsocketSubscriptionData struct {
Name string `json:"name,omitempty"` // ticker|ohlc|trade|book|spread|*, * for all (ohlc interval value is 1 if all channels subscribed)
Interval int64 `json:"interval,omitempty"` // Optional - Time interval associated with ohlc subscription in minutes. Default 1. Valid Interval values: 1|5|15|30|60|240|1440|10080|21600
Depth int64 `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000
Depth int `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000
Token string `json:"token,omitempty"` // Optional used for authenticated requests

}

// WebsocketEventResponse holds all data response types
type WebsocketEventResponse struct {
WebsocketBaseEventRequest
Event string `json:"event"`
Status string `json:"status"`
Pair currency.Pair `json:"pair,omitempty"`
RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message.
RequestID int64 `json:"reqid,omitempty"`
Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"`
ChannelName string `json:"channelName,omitempty"`
WebsocketSubscriptionEventResponse
Expand All @@ -556,15 +554,6 @@ type WebsocketErrorResponse struct {
ErrorMessage string `json:"errorMessage"`
}

// WebsocketChannelData Holds relevant data for channels to identify what we're
// doing
type WebsocketChannelData struct {
Subscription string
Pair currency.Pair
ChannelID *int64
MaxDepth int
}

// WsTokenResponse holds the WS auth token
type WsTokenResponse struct {
Error []string `json:"error"`
Expand Down
Loading

0 comments on commit 469d0be

Please sign in to comment.