From 378d5b46b60c488213400b607909b081f24ec106 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Thu, 21 Sep 2023 18:24:37 +0700 Subject: [PATCH] Kraken: Use websocket sub management * Use Websocket subscriptionChannels instead of local slice * Remove ChannelID - Deprecated in docs * Simplify ping handlers and hardcodes message * Add Depth as configurable orderbook channel param * Simplify auth/non-auth channel updates --- exchanges/kraken/kraken_test.go | 542 ++++++++----------------- exchanges/kraken/kraken_types.go | 51 +-- exchanges/kraken/kraken_websocket.go | 571 ++++++++++++--------------- exchanges/stream/websocket.go | 2 + exchanges/stream/websocket_test.go | 2 +- go.mod | 1 + go.sum | 2 + testdata/configtest.json | 4 +- 8 files changed, 458 insertions(+), 717 deletions(-) diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 1f12e0e8fde..8423dc1e421 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -33,13 +33,15 @@ var k = &Kraken{} var wsSetupRan, wsAuthSetupRan bool var comms = make(chan stream.Response) -// Please add your own APIkeys to do correct due diligence testing. +// Please add your own APIkeys here or in config/testdata.json to do correct due diligence testing const ( apiKey = "" apiSecret = "" canManipulateRealOrders = false ) +var btcusdPair = currency.NewPairWithDelimiter("XBT", "USD", "/") + // TestSetup setup func func TestMain(m *testing.M) { k.SetDefaults() @@ -52,9 +54,12 @@ func TestMain(m *testing.M) { if err != nil { log.Fatal(err) } - krakenConfig.API.AuthenticatedSupport = true - krakenConfig.API.Credentials.Key = apiKey - krakenConfig.API.Credentials.Secret = apiSecret + if apiKey != "" { + krakenConfig.API.Credentials.Key = apiKey + } + if apiSecret != "" { + krakenConfig.API.Credentials.Secret = apiSecret + } k.Websocket = sharedtestvalues.NewTestWebsocket() err = k.Setup(krakenConfig) if err != nil { @@ -1207,19 +1212,19 @@ func TestWithdrawCancel(t *testing.T) { // setupWsAuthTest will connect both websockets // and should be called directly from a auth ws test -func setupWsAuthTest(t *testing.T) { - t.Helper() - setupWsTest(t) - setupAuthWs(t) +func setupWsAuthTest(tb testing.TB) { + tb.Helper() + setupWsTest(tb) + setupAuthWs(tb) } // setupWsTest will just connect the non-authenticated websocket // and should be called directly from a non-auth ws test -func setupWsTest(t *testing.T) { - t.Helper() +func setupWsTest(tb testing.TB) { + tb.Helper() if !k.Websocket.IsEnabled() { - t.Skip("Websocket not enabled") + tb.Skip("Websocket not enabled") } if wsSetupRan { @@ -1229,26 +1234,23 @@ func setupWsTest(t *testing.T) { var dialer websocket.Dialer if err := k.Websocket.Conn.Dial(&dialer, http.Header{}); err != nil { - t.Fatalf("Dialing the websocket should not error: %s", err) + tb.Fatalf("Dialing the websocket should not error: %s", err) } go k.wsFunnelConnectionData(k.Websocket.Conn, comms) - go k.wsReadData(comms) - go func() { - err := k.wsPingHandler() - assert.NoError(t, err, "wsPingHandler should not error") - }() + go k.wsPingHandler(k.Websocket.Conn) } // setupAuthWs will just connect the authenticated websocket and should not be called directly -func setupAuthWs(t *testing.T) { +func setupAuthWs(tb testing.TB) { + tb.Helper() if !k.API.AuthenticatedWebsocketSupport { - t.Skip("Authenticated Websocket not Supported") + tb.Skip("Authenticated Websocket not Supported") } if !sharedtestvalues.AreAPICredentialsSet(k) { - t.Skip("Authenticated Websocket credentials not set") + tb.Skip("Authenticated Websocket credentials not set") } if wsAuthSetupRan { @@ -1259,28 +1261,96 @@ func setupAuthWs(t *testing.T) { var err error var dialer websocket.Dialer if err = k.Websocket.AuthConn.Dial(&dialer, http.Header{}); err != nil { - t.Fatalf("Dialing the auth websocket should not error: %s", err) + tb.Fatalf("Dialing the auth websocket should not error: %s", err) } authToken, err = k.GetWebsocketToken(context.Background()) - assert.NoError(t, err, "GetWebsocketToken should not error") + assert.NoError(tb, err, "GetWebsocketToken should not error") go k.wsFunnelConnectionData(k.Websocket.AuthConn, comms) } -// TestWebsocketSubscribe tests returning a message with an id +// TestWebsocketSubscribe tests unauthenticated websocket subscriptions +// Specifically looking to ensure multiple errors are collected and returned and ws.Subscriptions Added/Removed in cases of: +// single pass, single fail, mixed fail, multiple pass, all fail +// No objection to this becoming a fixture test, so long as it integrates through Un/Subscribe roundtrip func TestWebsocketSubscribe(t *testing.T) { setupWsTest(t) - err := k.Subscribe([]stream.ChannelSubscription{ - { - Channel: defaultSubscribedChannels[0], - Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"), - }, + + err := k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}}) + assert.NoError(t, err, "Simple subscription should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription") + + err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}}) + assert.NoError(t, err, "Resubscribing to the same channel shouldn't error") + assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error") + + err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Simple error subscription should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/HOBBIT", "Subscribing to an invalid pair should yield the correct error") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("ETH", "USD", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "ELF", "/")}, }) - assert.NotNil(t, err, "Blah") - if err != nil { - t.Error(err) - } + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Mixed error subscription should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/ELF", "Subscribing to an invalid pair should yield the correct error") + assert.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")}, + }) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Only failing subscriptions should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/GOBLIN", "Subscribing to an invalid pair should yield the correct error") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("ETH", "XBT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("LTC", "ETH", "/")}, + }) + assert.NoError(t, err, "Multiple successful subscriptions should not error") + + subs := k.Websocket.GetSubscriptions() + assert.Len(t, subs, 4, "Should have correct number of subscriptions") + + err = k.Unsubscribe(subs[:1]) + assert.NoError(t, err, "Simple Unsubscribe should succeed") + assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should have removed 1 channel") + + err = k.Unsubscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "WIZARD", "/"), Key: 1337}}) + assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Simple failing Unsubscribe should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/WIZARD", "Simple failing Unsubscribe should error") + assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should not have removed any channels") + + err = k.Unsubscribe([]stream.ChannelSubscription{ + subs[1], + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "EAGLE", "/"), Key: 1338}, + }) + assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Mixed failing Unsubscribe should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/EAGLE", "Simple failing Unsubscribe should error") + + subs = k.Websocket.GetSubscriptions() + assert.Len(t, subs, 2, "Should have removed only 1 more channel") + + err = k.Unsubscribe(subs) + assert.NoError(t, err, "Unsubscribe multiple passing subscriptions should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels") +} + +// TestWebsocketSubscribeAuth tests Auth's subscriptions +func TestWebsocketSubscribeAuth(t *testing.T) { + setupWsAuthTest(t) + + err := k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsOwnTrades}}) + assert.NoError(t, err, "Subsrcibing to ownTrades should not error") + + subs := k.Websocket.GetSubscriptions() + assert.Len(t, subs, 1, "Should add 1 Subscription") + + err = k.Unsubscribe(subs) + assert.NoError(t, err, "Unsubscribing an auth channel should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed channel") } func TestGetWSToken(t *testing.T) { @@ -1351,6 +1421,7 @@ func TestWsSystemStatus(t *testing.T) { func TestWsSubscriptionStatus(t *testing.T) { t.Parallel() pressXToJSON := []byte(`{ + "reqID": 1007, "channelID": 10001, "channelName": "ticker", "event": "subscriptionStatus", @@ -1366,6 +1437,7 @@ func TestWsSubscriptionStatus(t *testing.T) { } pressXToJSON = []byte(`{ + "reqID": 1008, "channelID": 10001, "channelName": "ohlc-5", "event": "subscriptionStatus", @@ -1383,6 +1455,7 @@ func TestWsSubscriptionStatus(t *testing.T) { } pressXToJSON = []byte(`{ + "reqID": 1009, "channelName": "ownTrades", "event": "subscriptionStatus", "status": "subscribed", @@ -1395,6 +1468,7 @@ func TestWsSubscriptionStatus(t *testing.T) { t.Error(err) } pressXToJSON = []byte(`{ + "reqID": 1010, "errorMessage": "Subscription depth not supported", "event": "subscriptionStatus", "pair": "XBT/USD", @@ -1412,366 +1486,76 @@ func TestWsSubscriptionStatus(t *testing.T) { func TestWsTicker(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ - "channelID": 1337, - "channelName": "ticker", - "event": "subscriptionStatus", - "pair": "XBT/EUR", - "status": "subscribed", - "subscription": { - "name": "ticker" - } - }`) + k.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: krakenWsTicker}) + pressXToJSON := []byte(`[2,{"a":["5525.40000",1,"1.000"],"b":["5525.10000",1,"1.000"],"c":["5525.10000","0.00398963"],"h":["5783.00000","5783.00000"],"l":["5505.00000","5505.00000"],"o":["5760.70000","5763.40000"],"p":["5631.44067","5653.78939"],"t":[11493,16267],"v":["2634.11501494","3591.17907851"]},"ticker","XBT/USD"]`) err := k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } - pressXToJSON = []byte(`[ - 1337, - { - "a": [ - "5525.40000", - 1, - "1.000" - ], - "b": [ - "5525.10000", - 1, - "1.000" - ], - "c": [ - "5525.10000", - "0.00398963" - ], - "h": [ - "5783.00000", - "5783.00000" - ], - "l": [ - "5505.00000", - "5505.00000" - ], - "o": [ - "5760.70000", - "5763.40000" - ], - "p": [ - "5631.44067", - "5653.78939" - ], - "t": [ - 11493, - 16267 - ], - "v": [ - "2634.11501494", - "3591.17907851" - ] - }, - "ticker", - "XBT/USD" - ]`) - err = k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + assert.NoError(t, err, "handle WS Ticker should not error") } func TestWsOHLC(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ - "channelID": 13337, - "channelName": "ohlc", - "event": "subscriptionStatus", - "pair": "XBT/EUR", - "status": "subscribed", - "subscription": { - "name": "ohlc" - } - }`) + k.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{ + Key: stream.DefaultChannelKey{ + Channel: krakenWsOHLC + "-5", + Currency: btcusdPair, + Asset: asset.Spot, + }, + Channel: krakenWsOHLC, + Currency: btcusdPair, + Asset: asset.Spot, + }) + pressXToJSON := []byte(`[2,["1542057314.748456","1542057360.435743","3586.70000","3586.70000","3586.60000","3586.60000","3586.68894","0.03373000",2],"ohlc-5","XBT/USD"]`) err := k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } - pressXToJSON = []byte(`[ - 13337, - [ - "1542057314.748456", - "1542057360.435743", - "3586.70000", - "3586.70000", - "3586.60000", - "3586.60000", - "3586.68894", - "0.03373000", - 2 - ], - "ohlc-5", - "XBT/USD" - ]`) - err = k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + assert.NoError(t, err, "handle WS Candles should not error") } func TestWsTrade(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ - "channelID": 133337, - "channelName": "trade", - "event": "subscriptionStatus", - "pair": "XBT/EUR", - "status": "subscribed", - "subscription": { - "name": "trade" - } - }`) + k.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: krakenWsTrade}) + pressXToJSON := []byte(`[2,[["5541.20000","0.15850568","1534614057.321597","s","l",""],["6060.00000","0.02455000","1534614057.324998","b","l",""]],"trade","XBT/USD"]`) err := k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } - pressXToJSON = []byte(`[ - 133337, - [ - [ - "5541.20000", - "0.15850568", - "1534614057.321597", - "s", - "l", - "" - ], - [ - "6060.00000", - "0.02455000", - "1534614057.324998", - "b", - "l", - "" - ] - ], - "trade", - "XBT/USD" - ]`) - err = k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + assert.NoError(t, err, "handle WS Trades should not error") } func TestWsSpread(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ - "channelID": 1333337, - "channelName": "spread", - "event": "subscriptionStatus", - "pair": "XBT/EUR", - "status": "subscribed", - "subscription": { - "name": "spread" - } - }`) + k.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: krakenWsSpread}) + pressXToJSON := []byte(`[2,["5698.40000","5700.00000","1542057299.545897","1.01234567","0.98765432"],"spread","XBT/USD"]`) err := k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } - pressXToJSON = []byte(`[ - 1333337, - [ - "5698.40000", - "5700.00000", - "1542057299.545897", - "1.01234567", - "0.98765432" - ], - "spread", - "XBT/USD" - ]`) - err = k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + assert.NoError(t, err, "handle WS Spread should not error") } func TestWsOrdrbook(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ - "channelID": 13333337, - "channelName": "book", - "event": "subscriptionStatus", - "pair": "XBT/USD", - "status": "subscribed", - "subscription": { - "name": "book" - } - }`) + k.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{ + Key: stream.DefaultChannelKey{ + Channel: krakenWsOrderbook + "-100", + Currency: btcusdPair, + Asset: asset.Spot, + }, + Channel: krakenWsOrderbook, + Currency: btcusdPair, + Asset: asset.Spot, + Params: map[string]any{ + ChannelOrderbookDepthKey: 100, + }, + }) + pressXToJSON := []byte(`[2,{"as":[["5541.30000","2.50700000","1534614248.123678"],["5541.80000","0.33000000","1534614098.345543"],["5542.70000","0.64700000","1534614244.654432"],["5544.30000","2.50700000","1534614248.123678"],["5545.80000","0.33000000","1534614098.345543"],["5546.70000","0.64700000","1534614244.654432"],["5547.70000","0.64700000","1534614244.654432"],["5548.30000","2.50700000","1534614248.123678"],["5549.80000","0.33000000","1534614098.345543"],["5550.70000","0.64700000","1534614244.654432"]],"bs":[["5541.20000","1.52900000","1534614248.765567"],["5539.90000","0.30000000","1534614241.769870"],["5539.50000","5.00000000","1534613831.243486"],["5538.20000","1.52900000","1534614248.765567"],["5537.90000","0.30000000","1534614241.769870"],["5536.50000","5.00000000","1534613831.243486"],["5535.20000","1.52900000","1534614248.765567"],["5534.90000","0.30000000","1534614241.769870"],["5533.50000","5.00000000","1534613831.243486"],["5532.50000","5.00000000","1534613831.243486"]]},"book-100","XBT/USD"]`) err := k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } - pressXToJSON = []byte(`[ - 13333337, - { - "as": [ - [ - "5541.30000", - "2.50700000", - "1534614248.123678" - ], - [ - "5541.80000", - "0.33000000", - "1534614098.345543" - ], - [ - "5542.70000", - "0.64700000", - "1534614244.654432" - ], - [ - "5544.30000", - "2.50700000", - "1534614248.123678" - ], - [ - "5545.80000", - "0.33000000", - "1534614098.345543" - ], - [ - "5546.70000", - "0.64700000", - "1534614244.654432" - ], - [ - "5547.70000", - "0.64700000", - "1534614244.654432" - ], - [ - "5548.30000", - "2.50700000", - "1534614248.123678" - ], - [ - "5549.80000", - "0.33000000", - "1534614098.345543" - ], - [ - "5550.70000", - "0.64700000", - "1534614244.654432" - ] - ], - "bs": [ - [ - "5541.20000", - "1.52900000", - "1534614248.765567" - ], - [ - "5539.90000", - "0.30000000", - "1534614241.769870" - ], - [ - "5539.50000", - "5.00000000", - "1534613831.243486" - ], - [ - "5538.20000", - "1.52900000", - "1534614248.765567" - ], - [ - "5537.90000", - "0.30000000", - "1534614241.769870" - ], - [ - "5536.50000", - "5.00000000", - "1534613831.243486" - ], - [ - "5535.20000", - "1.52900000", - "1534614248.765567" - ], - [ - "5534.90000", - "0.30000000", - "1534614241.769870" - ], - [ - "5533.50000", - "5.00000000", - "1534613831.243486" - ], - [ - "5532.50000", - "5.00000000", - "1534613831.243486" - ] - ] - }, - "book-100", - "XBT/USD" - ]`) - err = k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } - pressXToJSON = []byte(`[ - 13333337, - { - "a": [ - [ - "5541.30000", - "2.50700000", - "1534614248.456738" - ], - [ - "5542.50000", - "0.40100000", - "1534614248.456738" - ] - ], - "c": "4187525586" - }, - "book-10", - "XBT/USD" - ]`) + assert.NoError(t, err, "handle WS Orderbook full snapshot should not error") + + pressXToJSON = []byte(`[2,{"a":[["5541.30000","2.50700000","1534614248.456738"],["5542.50000","0.40100000","1534614248.456738"]],"c":"4187525586"},"book-100","XBT/USD"]`) err = k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } - pressXToJSON = []byte(`[ - 13333337, - { - "b": [ - [ - "5541.30000", - "0.00000000", - "1534614335.345903" - ] - ], - "c": "4187525586" - }, - "book-10", - "XBT/USD" - ]`) + assert.NoError(t, err, "handle WS Orderbook partial update should not error") + + pressXToJSON = []byte(`[2,{"b":[["5541.30000","0.00000000","1534614335.345903"]],"c":"4187525586"},"book-100","XBT/USD"]`) err = k.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + assert.NoError(t, err, "handle WS Orderbook partial update should not error") } func TestWsOwnTrades(t *testing.T) { t.Parallel() + k.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Channel: krakenWsOwnTrades}) pressXToJSON := []byte(`[ [ { @@ -1835,7 +1619,10 @@ func TestWsOwnTrades(t *testing.T) { } } ], - "ownTrades" + "ownTrades", + { + "sequence": 4 + } ]`) err := k.wsHandleData(pressXToJSON) if err != nil { @@ -1873,6 +1660,8 @@ func TestWsOpenOrders(t *testing.T) { k.API.Endpoints = k.NewEndpoints() + k.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Channel: krakenWsOpenOrders}) + fixture, err := os.Open("testdata/wsOpenTrades.json") defer func() { assert.Nil(t, fixture.Close()) }() if err != nil { @@ -2194,7 +1983,6 @@ func TestGetFuturesTrades(t *testing.T) { } var websocketXDGUSDOrderbookUpdates = []string{ - `{"channelID":2304,"channelName":"book-10","event":"subscriptionStatus","pair":"XDG/USD","reqid":163845014,"status":"subscribed","subscription":{"depth":10,"name":"book"}}`, `[2304,{"as":[["0.074602700","278.39626342","1690246067.832139"],["0.074611000","555.65134028","1690246086.243668"],["0.074613300","524.87121572","1690245901.574881"],["0.074624600","77.57180740","1690246060.668500"],["0.074632500","620.64648404","1690246010.904883"],["0.074698400","409.57419037","1690246041.269821"],["0.074700000","61067.71115772","1690246089.485595"],["0.074723200","4394.01869240","1690246087.557913"],["0.074725200","4229.57885125","1690246082.911452"],["0.074738400","212.25501214","1690246089.421559"]],"bs":[["0.074597400","53591.43163675","1690246089.451762"],["0.074596700","33594.18269213","1690246089.514152"],["0.074596600","53598.60351469","1690246089.340781"],["0.074594800","5358.57247081","1690246089.347962"],["0.074594200","30168.21074680","1690246089.345112"],["0.074590900","7089.69894583","1690246088.212880"],["0.074586700","46925.20182082","1690246089.074618"],["0.074577200","5500.00000000","1690246087.568856"],["0.074569600","8132.49888631","1690246086.841219"],["0.074562900","8413.11098009","1690246087.024863"]]},"book-10","XDG/USD"]`, `[2304,{"a":[["0.074700000","0.00000000","1690246089.516119"],["0.074738500","125000.00000000","1690246063.352141","r"]],"c":"2219685759"},"book-10","XDG/USD"]`, `[2304,{"a":[["0.074678800","33476.70673703","1690246089.570183"]],"c":"1897176819"},"book-10","XDG/USD"]`, @@ -2213,19 +2001,35 @@ var websocketXDGUSDOrderbookUpdates = []string{ } var websocketLUNAEUROrderbookUpdates = []string{ - `{"channelID":9536,"channelName":"book-10","event":"subscriptionStatus","pair":"LUNA/EUR","reqid":106845459,"status":"subscribed","subscription":{"depth":10,"name":"book"}}`, `[9536,{"as":[["0.000074650000","147354.32016076","1690249755.076929"],["0.000074710000","5084881.40000000","1690250711.359411"],["0.000074760000","9700502.70476704","1690250743.279490"],["0.000074990000","2933380.23886300","1690249596.627969"],["0.000075000000","433333.33333333","1690245575.626780"],["0.000075020000","152914.84493416","1690243661.232520"],["0.000075070000","146529.90542161","1690249048.358424"],["0.000075250000","737072.85720004","1690211553.549248"],["0.000075400000","670061.64567140","1690250769.261196"],["0.000075460000","980226.63603417","1690250769.627523"]],"bs":[["0.000074590000","71029.87806720","1690250763.012724"],["0.000074580000","15935576.86404000","1690250763.012710"],["0.000074520000","33758611.79634000","1690250718.290955"],["0.000074350000","3156650.58590277","1690250766.499648"],["0.000074340000","301727260.79999999","1690250766.490238"],["0.000074320000","64611496.53837000","1690250742.680258"],["0.000074310000","104228596.60000000","1690250744.679121"],["0.000074300000","40366046.10582000","1690250762.685914"],["0.000074200000","3690216.57320475","1690250645.311465"],["0.000074060000","1337170.52532521","1690250742.012527"]]},"book-10","LUNA/EUR"]`, `[9536,{"b":[["0.000074060000","0.00000000","1690250770.616604"],["0.000074050000","16742421.17790510","1690250710.867730","r"]],"c":"418307145"},"book-10","LUNA/EUR"]`, } var websocketGSTEUROrderbookUpdates = []string{ - `{"channelID":8912,"channelName":"book-10","event":"subscriptionStatus","pair":"GST/EUR","reqid":157734759,"status":"subscribed","subscription":{"depth":10,"name":"book"}}`, `[8912,{"as":[["0.01300","850.00000000","1690230914.230506"],["0.01400","323483.99590510","1690256356.615823"],["0.01500","100287.34442717","1690219133.193345"],["0.01600","67995.78441017","1690118389.451216"],["0.01700","41776.38397740","1689676303.381189"],["0.01800","11785.76177777","1688631951.812452"],["0.01900","23700.00000000","1686935422.319042"],["0.02000","3941.17000000","1689415829.176481"],["0.02100","16598.69173066","1689420942.541943"],["0.02200","17572.51572836","1689851425.907427"]],"bs":[["0.01200","14220.66466572","1690256540.842831"],["0.01100","160223.61546438","1690256401.072463"],["0.01000","63083.48958963","1690256604.037673"],["0.00900","6750.00000000","1690252470.633938"],["0.00800","213059.49706376","1690256360.386301"],["0.00700","1000.00000000","1689869458.464975"],["0.00600","4000.00000000","1690221333.528698"],["0.00100","245000.00000000","1690051368.753455"]]},"book-10","GST/EUR"]`, `[8912,{"b":[["0.01000","60583.48958963","1690256620.206768"],["0.01000","63083.48958963","1690256620.206783"]],"c":"69619317"},"book-10","GST/EUR"]`, } func TestWsOrderbookMax10Depth(t *testing.T) { t.Parallel() + for _, c := range []string{"XDG/USD", "LUNA/EUR", "GST/EUR"} { + p, err := currency.NewPairFromString(c) + assert.NoErrorf(t, err, "NewPairFromString %s should not error", c) + k.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{ + Key: stream.DefaultChannelKey{ + Channel: krakenWsOrderbook + "-10", + Currency: p, + Asset: asset.Spot, + }, + Channel: krakenWsOrderbook, + Currency: p, + Asset: asset.Spot, + Params: map[string]any{ + ChannelOrderbookDepthKey: 10, + }, + }) + } + for x := range websocketXDGUSDOrderbookUpdates { err := k.wsHandleData([]byte(websocketXDGUSDOrderbookUpdates[x])) if err != nil { diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index 2f833479ff2..d9c806d168a 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -1,11 +1,11 @@ package kraken import ( + "errors" "time" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/order" - "github.com/thrasher-corp/gocryptotrader/exchanges/stream" ) const ( @@ -75,6 +75,10 @@ const ( var ( assetTranslator assetTranslatorStore + + errNoWebsocketOrderbookData = errors.New("no websocket orderbook data") + errNoRequestID = errors.New("no RequestID in response") + errMaxDepthMissing = errors.New("MaxDepth missing for subscription") ) // GenericResponse stores general response data for functions that only return success @@ -495,43 +499,37 @@ type WithdrawStatusResponse struct { Status string `json:"status"` } -// WebsocketSubscriptionEventRequest handles WS subscription events -type WebsocketSubscriptionEventRequest struct { - Event string `json:"event"` // subscribe - RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. - Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). - Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` - Channels []stream.ChannelSubscription `json:"-"` // Keeps track of associated subscriptions in batched outgoings -} - -// WebsocketBaseEventRequest Just has an "event" property -type WebsocketBaseEventRequest struct { - Event string `json:"event"` // eg "unsubscribe" +// WebsocketSubscribeRequest contains request data for Subscribing to channels +type WebsocketSubscribeRequest struct { + Event string `json:"event"` // "subscribe" + RequestID int64 `json:"reqid,omitempty"` + Pairs []string `json:"pair,omitempty"` + Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` } -// WebsocketUnsubscribeByChannelIDEventRequest handles WS unsubscribe events -type WebsocketUnsubscribeByChannelIDEventRequest struct { - WebsocketBaseEventRequest - RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. - Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). - ChannelID int64 `json:"channelID,omitempty"` +// WebsocketUnsubscribeRequest contains request data for Unsubscribing from channels +type WebsocketUnsubscribeRequest struct { + Event string `json:"event"` // "unsubscribe" + RequestID int64 `json:"reqid,omitempty"` + Pairs []string `json:"pair,omitempty"` + Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` } // WebsocketSubscriptionData contains details on WS channel type WebsocketSubscriptionData struct { Name string `json:"name,omitempty"` // ticker|ohlc|trade|book|spread|*, * for all (ohlc interval value is 1 if all channels subscribed) Interval int64 `json:"interval,omitempty"` // Optional - Time interval associated with ohlc subscription in minutes. Default 1. Valid Interval values: 1|5|15|30|60|240|1440|10080|21600 - Depth int64 `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000 + Depth int `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000 Token string `json:"token,omitempty"` // Optional used for authenticated requests } // WebsocketEventResponse holds all data response types type WebsocketEventResponse struct { - WebsocketBaseEventRequest + Event string `json:"event"` Status string `json:"status"` Pair currency.Pair `json:"pair,omitempty"` - RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. + RequestID int64 `json:"reqid,omitempty"` Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"` ChannelName string `json:"channelName,omitempty"` WebsocketSubscriptionEventResponse @@ -556,15 +554,6 @@ type WebsocketErrorResponse struct { ErrorMessage string `json:"errorMessage"` } -// WebsocketChannelData Holds relevant data for channels to identify what we're -// doing -type WebsocketChannelData struct { - Subscription string - Pair currency.Pair - ChannelID *int64 - MaxDepth int -} - // WsTokenResponse holds the WS auth token type WsTokenResponse struct { Error []string `json:"error"` diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 59fb179a41c..9e3ea26744e 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/convert" @@ -55,14 +56,8 @@ const ( krakenWsOrderbookDepth = 1000 ) -// orderbookMutex Ensures if two entries arrive at once, only one can be -// processed at a time var ( - subscriptionChannelPair []WebsocketChannelData - authToken string - pingRequest = WebsocketBaseEventRequest{Event: stream.Ping} - m sync.Mutex - errNoWebsocketOrderbookData = errors.New("no websocket orderbook data") + authToken string ) // Channels require a topic and a currency @@ -72,7 +67,8 @@ var defaultSubscribedChannels = []string{ krakenWsTrade, krakenWsOrderbook, krakenWsOHLC, - krakenWsSpread} + krakenWsSpread, +} var authenticatedChannels = []string{krakenWsOwnTrades, krakenWsOpenOrders} var cancelOrdersStatusMutex sync.Mutex @@ -120,24 +116,13 @@ func (k *Kraken) WsConnect() error { k.Websocket.SetCanUseAuthenticatedEndpoints(true) k.Websocket.Wg.Add(1) go k.wsFunnelConnectionData(k.Websocket.AuthConn, comms) - err = k.wsAuthPingHandler() - if err != nil { - log.Errorf(log.ExchangeSys, - "%v - failed setup ping handler for auth connection. Websocket may disconnect unexpectedly. %v\n", - k.Name, - err) - } + k.wsPingHandler(k.Websocket.AuthConn) } } } - err = k.wsPingHandler() - if err != nil { - log.Errorf(log.ExchangeSys, - "%v - failed setup ping handler. Websocket may disconnect unexpectedly. %v\n", - k.Name, - err) - } + k.wsPingHandler(k.Websocket.Conn) + return nil } @@ -210,22 +195,36 @@ func isAwaitingCancelOrderResponses(requestID int64, success bool) bool { func (k *Kraken) wsHandleData(respRaw []byte) error { if strings.HasPrefix(string(respRaw), "[") { var dataResponse WebsocketDataResponse - err := json.Unmarshal(respRaw, &dataResponse) - if err != nil { + if err := json.Unmarshal(respRaw, &dataResponse); err != nil { return err } - if _, ok := dataResponse[0].(float64); ok { - err = k.wsReadDataResponse(dataResponse) - if err != nil { - return err - } + if len(dataResponse) < 3 { + return fmt.Errorf("websocket data array too short: %s", respRaw) } - if _, ok := dataResponse[1].(string); ok { - err = k.wsHandleAuthDataResponse(dataResponse) - if err != nil { + + // For all types of channel second to last field is the channel Name + channelName, ok := dataResponse[len(dataResponse)-2].(string) + if !ok { + return common.GetTypeAssertError("string", dataResponse[len(dataResponse)-2], "channelName") + } + + // wsPair is just used for keying the Subs + wsPair := currency.EMPTYPAIR + if maybePair, ok2 := dataResponse[len(dataResponse)-1].(string); ok2 { + var err error + if wsPair, err = currency.NewPairFromString(maybePair); err != nil { return err } } + + c := k.Websocket.GetSubscription(stream.DefaultChannelKey{Channel: channelName, Currency: wsPair, Asset: asset.Spot}) + if c == nil { + return fmt.Errorf("could not find subscription channel: %s %s %s", asset.Spot, channelName, wsPair) + } + + if err := k.wsReadDataResponse(c, dataResponse); err != nil { + return err + } } else { var eventResponse map[string]interface{} err := json.Unmarshal(respRaw, &eventResponse) @@ -358,16 +357,17 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { err, respRaw) } + if sub.RequestID == 0 { + return fmt.Errorf("%v %w: %v", k.Name, errNoRequestID, respRaw) + } + k.Websocket.Match.IncomingWithData(sub.RequestID, respRaw) + if sub.Status != "subscribed" && sub.Status != "unsubscribed" { return fmt.Errorf("%v %v %v", k.Name, sub.RequestID, sub.ErrorMessage) } - k.addNewSubscriptionChannelData(&sub) - if sub.RequestID > 0 { - k.Websocket.Match.IncomingWithData(sub.RequestID, respRaw) - } default: k.Websocket.DataHandler <- stream.UnhandledMessageWarning{ Message: k.Name + stream.UnhandledMessage + string(respRaw), @@ -379,111 +379,71 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { return nil } -// wsPingHandler sends a message "ping" every 27 to maintain the connection to the websocket -func (k *Kraken) wsPingHandler() error { - message, err := json.Marshal(pingRequest) - if err != nil { - return err - } - k.Websocket.Conn.SetupPingHandler(stream.PingHandler{ - Message: message, - Delay: krakenWsPingDelay, - MessageType: websocket.TextMessage, - }) - return nil -} - -// wsAuthPingHandler sends a message "ping" every 27 to maintain the connection to the websocket -func (k *Kraken) wsAuthPingHandler() error { - message, err := json.Marshal(pingRequest) - if err != nil { - return err - } - k.Websocket.AuthConn.SetupPingHandler(stream.PingHandler{ - Message: message, +// wsPingHandler starts a websocket ping handler every 27s +func (k *Kraken) wsPingHandler(conn stream.Connection) { + conn.SetupPingHandler(stream.PingHandler{ + Message: []byte(`{"event":"ping"}`), Delay: krakenWsPingDelay, MessageType: websocket.TextMessage, }) - return nil } // wsReadDataResponse classifies the WS response and sends to appropriate handler -func (k *Kraken) wsReadDataResponse(response WebsocketDataResponse) error { - if cID, ok := response[0].(float64); ok { - channelID := int64(cID) - channelData, err := getSubscriptionChannelData(channelID) - if err != nil { - return err +func (k *Kraken) wsReadDataResponse(c *stream.ChannelSubscription, response WebsocketDataResponse) error { + switch c.Channel { + case krakenWsTicker: + t, ok := response[1].(map[string]interface{}) + if !ok { + return errors.New("received invalid ticker data") } - switch channelData.Subscription { - case krakenWsTicker: - t, ok := response[1].(map[string]interface{}) - if !ok { - return errors.New("received invalid ticker data") - } - return k.wsProcessTickers(&channelData, t) - case krakenWsOHLC: - o, ok := response[1].([]interface{}) - if !ok { - return errors.New("received invalid OHLCV data") - } - return k.wsProcessCandles(&channelData, o) - case krakenWsOrderbook: - ob, ok := response[1].(map[string]interface{}) - if !ok { + return k.wsProcessTickers(c, t) + case krakenWsOHLC: + o, ok := response[1].([]interface{}) + if !ok { + return errors.New("received invalid OHLCV data") + } + return k.wsProcessCandles(c, o) + case krakenWsOrderbook: + ob, ok := response[1].(map[string]interface{}) + if !ok { + return errors.New("received invalid orderbook data") + } + + if len(response) == 5 { + ob2, okob2 := response[2].(map[string]interface{}) + if !okob2 { return errors.New("received invalid orderbook data") } - if len(response) == 5 { - ob2, okob2 := response[2].(map[string]interface{}) - if !okob2 { - return errors.New("received invalid orderbook data") - } - - // Squish both maps together to process - for k, v := range ob2 { - if _, ok := ob[k]; ok { - return errors.New("cannot merge maps, conflict is present") - } - ob[k] = v + // Squish both maps together to process + for k, v := range ob2 { + if _, ok := ob[k]; ok { + return errors.New("cannot merge maps, conflict is present") } + ob[k] = v } - return k.wsProcessOrderBook(&channelData, ob) - case krakenWsSpread: - s, ok := response[1].([]interface{}) - if !ok { - return errors.New("received invalid spread data") - } - k.wsProcessSpread(&channelData, s) - case krakenWsTrade: - t, ok := response[1].([]interface{}) - if !ok { - return errors.New("received invalid trade data") - } - return k.wsProcessTrades(&channelData, t) - default: - return fmt.Errorf("%s received unidentified data for subscription %s: %+v", - k.Name, - channelData.Subscription, - response) } - } - - return nil -} - -func (k *Kraken) wsHandleAuthDataResponse(response WebsocketDataResponse) error { - if chName, ok := response[1].(string); ok { - switch chName { - case krakenWsOwnTrades: - return k.wsProcessOwnTrades(response[0]) - case krakenWsOpenOrders: - return k.wsProcessOpenOrders(response[0]) - default: - return fmt.Errorf("%v Unidentified websocket data received: %+v", - k.Name, response) + return k.wsProcessOrderBook(c, ob) + case krakenWsSpread: + s, ok := response[1].([]interface{}) + if !ok { + return errors.New("received invalid spread data") + } + k.wsProcessSpread(c, s) + case krakenWsTrade: + t, ok := response[1].([]interface{}) + if !ok { + return errors.New("received invalid trade data") } + return k.wsProcessTrades(c, t) + case krakenWsOwnTrades: + return k.wsProcessOwnTrades(response[0]) + case krakenWsOpenOrders: + return k.wsProcessOpenOrders(response[0]) + default: + return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, c.Channel, response) } + return nil } @@ -635,60 +595,8 @@ func (k *Kraken) wsProcessOpenOrders(ownOrders interface{}) error { return errors.New(k.Name + " - Invalid own trades data") } -// addNewSubscriptionChannelData stores channel ids, pairs and subscription types to an array -// allowing correlation between subscriptions and returned data -func (k *Kraken) addNewSubscriptionChannelData(response *wsSubscription) { - // We change the / to - to maintain compatibility with REST/config - var pair, fPair currency.Pair - var err error - if response.Pair != "" { - pair, err = currency.NewPairFromString(response.Pair) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - return - } - fPair, err = k.FormatExchangeCurrency(pair, asset.Spot) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - return - } - } - - maxDepth := 0 - if splits := strings.Split(response.ChannelName, "-"); len(splits) > 1 { - maxDepth, err = strconv.Atoi(splits[1]) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - } - } - m.Lock() - defer m.Unlock() - subscriptionChannelPair = append(subscriptionChannelPair, WebsocketChannelData{ - Subscription: response.Subscription.Name, - Pair: fPair, - ChannelID: response.ChannelID, - MaxDepth: maxDepth, - }) -} - -// getSubscriptionChannelData retrieves WebsocketChannelData based on response ID -func getSubscriptionChannelData(id int64) (WebsocketChannelData, error) { - m.Lock() - defer m.Unlock() - for i := range subscriptionChannelPair { - if subscriptionChannelPair[i].ChannelID == nil { - continue - } - if id == *subscriptionChannelPair[i].ChannelID { - return subscriptionChannelPair[i], nil - } - } - return WebsocketChannelData{}, - fmt.Errorf("could not get subscription data for id %d", id) -} - // wsProcessTickers converts ticker data and sends it to the datahandler -func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data map[string]interface{}) error { +func (k *Kraken) wsProcessTickers(c *stream.ChannelSubscription, data map[string]interface{}) error { closePrice, err := strconv.ParseFloat(data["c"].([]interface{})[0].(string), 64) if err != nil { return err @@ -727,14 +635,14 @@ func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data map[st Low: lowPrice, Bid: bid, Ask: ask, - AssetType: asset.Spot, - Pair: channelData.Pair, + AssetType: c.Asset, + Pair: c.Currency, } return nil } // wsProcessSpread converts spread/orderbook data and sends it to the datahandler -func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []interface{}) { +func (k *Kraken) wsProcessSpread(c *stream.ChannelSubscription, data []interface{}) { if len(data) < 5 { k.Websocket.DataHandler <- fmt.Errorf("%s unexpected wsProcessSpread data length", k.Name) return @@ -771,7 +679,7 @@ func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []inter log.Debugf(log.ExchangeSys, "%v Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v', Bid volume '%v', Ask volume '%v'", k.Name, - channelData.Pair, + c.Currency, bestBid, bestAsk, convert.TimeFromUnixTimestampDecimal(timeData), @@ -781,7 +689,7 @@ func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []inter } // wsProcessTrades converts trade data and sends it to the datahandler -func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []interface{}) error { +func (k *Kraken) wsProcessTrades(c *stream.ChannelSubscription, data []interface{}) error { if !k.IsSaveTradeDataEnabled() { return nil } @@ -815,8 +723,8 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []inter } trades[i] = trade.Data{ - AssetType: asset.Spot, - CurrencyPair: channelData.Pair, + AssetType: c.Asset, + CurrencyPair: c.Currency, Exchange: k.Name, Price: price, Amount: amount, @@ -829,7 +737,7 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []inter // wsProcessOrderBook determines if the orderbook data is partial or update // Then sends to appropriate fun -func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[string]interface{}) error { +func (k *Kraken) wsProcessOrderBook(c *stream.ChannelSubscription, data map[string]interface{}) error { // NOTE: Updates are a priority so check if it's an update first as we don't // need multiple map lookups to check for snapshot. askData, asksExist := data["a"].([]interface{}) @@ -842,9 +750,9 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ k.wsRequestMtx.Lock() defer k.wsRequestMtx.Unlock() - err := k.wsProcessOrderBookUpdate(channelData, askData, bidData, checksum) + err := k.wsProcessOrderBookUpdate(c, askData, bidData, checksum) if err != nil { - outbound := channelData.Pair // Format required "XBT/USD" + outbound := c.Currency // Format required "XBT/USD" outbound.Delimiter = "/" go func(resub *stream.ChannelSubscription) { // This was locking the main websocket reader routine and a @@ -856,11 +764,7 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ resub, errResub) } - }(&stream.ChannelSubscription{ - Channel: krakenWsOrderbook, - Currency: outbound, - Asset: asset.Spot, - }) + }(c) return err } return nil @@ -869,21 +773,21 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ askSnapshot, askSnapshotExists := data["as"].([]interface{}) bidSnapshot, bidSnapshotExists := data["bs"].([]interface{}) if !askSnapshotExists && !bidSnapshotExists { - return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, channelData.Pair, asset.Spot) + return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, c.Currency, c.Asset) } - return k.wsProcessOrderBookPartial(channelData, askSnapshot, bidSnapshot) + return k.wsProcessOrderBookPartial(c, askSnapshot, bidSnapshot) } // wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, askData, bidData []interface{}) error { +func (k *Kraken) wsProcessOrderBookPartial(c *stream.ChannelSubscription, askData, bidData []interface{}) error { base := orderbook.Base{ - Pair: channelData.Pair, - Asset: asset.Spot, + Pair: c.Currency, + Asset: c.Asset, VerifyOrderbook: k.CanVerifyOrderbook, Bids: make(orderbook.Items, len(bidData)), Asks: make(orderbook.Items, len(askData)), - MaxDepth: channelData.MaxDepth, + MaxDepth: krakenWsOrderbookDepth, } // Kraken ob data is timestamped per price, GCT orderbook data is // timestamped per entry using the highest last update time, we can attempt @@ -956,10 +860,10 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, as } // wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, askData, bidData []interface{}, checksum string) error { +func (k *Kraken) wsProcessOrderBookUpdate(c *stream.ChannelSubscription, askData, bidData []interface{}, checksum string) error { update := orderbook.Update{ - Asset: asset.Spot, - Pair: channelData.Pair, + Asset: c.Asset, + Pair: c.Currency, Bids: make([]orderbook.Item, len(bidData)), Asks: make([]orderbook.Item, len(askData)), } @@ -1102,12 +1006,9 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, ask return err } - book, err := k.Websocket.Orderbook.GetOrderbook(channelData.Pair, asset.Spot) + book, err := k.Websocket.Orderbook.GetOrderbook(c.Currency, c.Asset) if err != nil { - return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", - channelData.Pair, - asset.Spot, - err) + return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", c.Currency, c.Asset, err) } token, err := strconv.ParseInt(checksum, 10, 64) @@ -1158,7 +1059,7 @@ func trim(s string) string { } // wsProcessCandles converts candle data and sends it to the data handler -func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data []interface{}) error { +func (k *Kraken) wsProcessCandles(c *stream.ChannelSubscription, data []interface{}) error { startTime, err := strconv.ParseFloat(data[0].(string), 64) if err != nil { return err @@ -1195,8 +1096,8 @@ func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data []inte } k.Websocket.DataHandler <- stream.KlineData{ - AssetType: asset.Spot, - Pair: channelData.Pair, + AssetType: c.Asset, + Pair: c.Currency, Timestamp: time.Now(), Exchange: k.Name, StartTime: convert.TimeFromUnixTimestampDecimal(startTime), @@ -1240,129 +1141,171 @@ func (k *Kraken) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e } // Subscribe sends a websocket message to receive data from the channel -func (k *Kraken) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error { - var subscriptions = make(map[string]*[]WebsocketSubscriptionEventRequest) -channels: - for i := range channelsToSubscribe { - s, ok := subscriptions[channelsToSubscribe[i].Channel] - if !ok { - s = &[]WebsocketSubscriptionEventRequest{} - subscriptions[channelsToSubscribe[i].Channel] = s - } +func (k *Kraken) Subscribe(channels []stream.ChannelSubscription) error { + return k.parallelChanOp(channels, k.subscribeToChan) +} - for j := range *s { - (*s)[j].Pairs = append((*s)[j].Pairs, channelsToSubscribe[i].Currency.String()) - (*s)[j].Channels = append((*s)[j].Channels, channelsToSubscribe[i]) - continue channels - } +// Unsubscribe sends a websocket message to stop receiving data from the channel +func (k *Kraken) Unsubscribe(channels []stream.ChannelSubscription) error { + return k.parallelChanOp(channels, k.unsubscribeFromChan) +} - id := k.Websocket.Conn.GenerateMessageID(false) - outbound := WebsocketSubscriptionEventRequest{ - Event: krakenWsSubscribe, - RequestID: id, - Subscription: WebsocketSubscriptionData{ - Name: channelsToSubscribe[i].Channel, - }, - } - if channelsToSubscribe[i].Channel == "book" { - outbound.Subscription.Depth = krakenWsOrderbookDepth - } - if !channelsToSubscribe[i].Currency.IsEmpty() { - outbound.Pairs = []string{channelsToSubscribe[i].Currency.String()} - } - if common.StringDataContains(authenticatedChannels, channelsToSubscribe[i].Channel) { - outbound.Subscription.Token = authToken - } +func (k *Kraken) parallelChanOp(channels []stream.ChannelSubscription, m func(*stream.ChannelSubscription) error) error { + wg := sync.WaitGroup{} + wg.Add(len(channels)) + errC := make(chan error, len(channels)) - outbound.Channels = append(outbound.Channels, channelsToSubscribe[i]) - *s = append(*s, outbound) + for i := range channels { + go func(c *stream.ChannelSubscription) { + defer wg.Done() + if err := m(c); err != nil { + errC <- err + } + }(&channels[i]) } + wg.Wait() + close(errC) + var errs error - for _, subs := range subscriptions { - for i := range *subs { - if common.StringDataContains(authenticatedChannels, (*subs)[i].Subscription.Name) { - _, err := k.Websocket.AuthConn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) - continue - } - _, err := k.Websocket.Conn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) - } + for err := range errC { + errs = common.AppendError(errs, err) } + return errs } -// Unsubscribe sends a websocket message to stop receiving data from the channel -func (k *Kraken) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error { - var unsubs []WebsocketSubscriptionEventRequest -channels: - for x := range channelsToUnsubscribe { - for y := range unsubs { - if unsubs[y].Subscription.Name == channelsToUnsubscribe[x].Channel { - unsubs[y].Pairs = append(unsubs[y].Pairs, - channelsToUnsubscribe[x].Currency.String()) - unsubs[y].Channels = append(unsubs[y].Channels, - channelsToUnsubscribe[x]) - continue channels - } - } - var depth int64 - if channelsToUnsubscribe[x].Channel == "book" { - depth = krakenWsOrderbookDepth - } +// subscribeToChan sends a websocket message to receive data from the channel +func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { + conn := k.Websocket.Conn + r := WebsocketSubscribeRequest{ + Event: krakenWsSubscribe, + RequestID: conn.GenerateMessageID(false), + Subscription: WebsocketSubscriptionData{ + Name: c.Channel, + }, + } - var id int64 - if common.StringDataContains(authenticatedChannels, channelsToUnsubscribe[x].Channel) { - id = k.Websocket.AuthConn.GenerateMessageID(false) + if !c.Currency.IsEmpty() { + r.Pairs = []string{c.Currency.String()} + } + + if !c.Asset.IsValid() { + c.Asset = asset.Spot + } + + key := stream.DefaultChannelKey{c.Channel, c.Currency, c.Asset} + + if c.Channel == krakenWsOrderbook { + if depth, err := depthFromChan(c); err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) } else { - id = k.Websocket.Conn.GenerateMessageID(false) + r.Subscription.Depth = depth } + } - unsub := WebsocketSubscriptionEventRequest{ - Event: krakenWsUnsubscribe, - Pairs: []string{channelsToUnsubscribe[x].Currency.String()}, - Subscription: WebsocketSubscriptionData{ - Name: channelsToUnsubscribe[x].Channel, - Depth: depth, - }, - RequestID: id, - } - if common.StringDataContains(authenticatedChannels, channelsToUnsubscribe[x].Channel) { - unsub.Subscription.Token = authToken - } - unsub.Channels = append(unsub.Channels, channelsToUnsubscribe[x]) - unsubs = append(unsubs, unsub) + if common.StringDataContains(authenticatedChannels, r.Subscription.Name) { + r.Subscription.Token = authToken + conn = k.Websocket.AuthConn } - var errs error - for i := range unsubs { - if common.StringDataContains(authenticatedChannels, unsubs[i].Subscription.Name) { - _, err := k.Websocket.AuthConn.SendMessageReturnResponse(unsubs[i].RequestID, unsubs[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...) - continue - } + respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) + if err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) + } + + if err = k.getErrResp(respRaw); err != nil { + wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) + k.Websocket.DataHandler <- wErr + return wErr + } + + k.Websocket.AddSuccessfulSubscriptions(*c) + if k.Verbose { + log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Currency) + } + + return nil +} + +// unsubscribeFromChan sends a websocket message to stop receiving data from a channel +func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error { + conn := k.Websocket.Conn + r := WebsocketUnsubscribeRequest{ + Event: krakenWsUnsubscribe, + RequestID: conn.GenerateMessageID(false), + Subscription: WebsocketSubscriptionData{ + Name: c.Channel, + }, + } + + if !c.Currency.IsEmpty() { + r.Pairs = []string{c.Currency.String()} + } - _, err := k.Websocket.Conn.SendMessageReturnResponse(unsubs[i].RequestID, unsubs[i]) + if c.Channel == krakenWsOrderbook { + depth, err := depthFromChan(c) if err != nil { - errs = common.AppendError(errs, err) - continue + return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) } - k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...) + r.Subscription.Depth = depth } - return errs + + if common.StringDataContains(authenticatedChannels, c.Channel) { + conn = k.Websocket.AuthConn + r.Subscription.Token = authToken + } + + respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) + if err != nil { + return err + } + + if err = k.getErrResp(respRaw); err != nil { + wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrUnsubscribeFailure, c.Channel, c.Currency, err) + k.Websocket.DataHandler <- wErr + return wErr + } + + k.Websocket.RemoveSuccessfulUnsubscriptions(*c) + + return nil +} + +func depthFromChan(c *stream.ChannelSubscription) (int, error) { + depthAny, ok := c.Params["depth"] + if !ok { + return 0, errMaxDepthMissing + } + depthInt, ok2 := depthAny.(int) + if !ok2 { + return 0, common.GetTypeAssertError("int", depthAny, "Subscription.Depth") + } + return depthInt, nil +} + +// getErrResp takes a json response string and looks for an error event type +// If found it returns the errorMessage +// It might log parsing errors about the nature of the error +// If the error message is not defined it will return a wrapped errUnknownError +func (k *Kraken) getErrResp(resp []byte) error { + event, err := jsonparser.GetUnsafeString(resp, "event") + switch { + case err != nil: + return fmt.Errorf("error parsing WS event: %w from message: %s", err, resp) + case event != "error": + status, _ := jsonparser.GetUnsafeString(resp, "status") // Error is really irrellevant here + if status != "error" { + return nil + } + } + + var msg string + if msg, err = jsonparser.GetString(resp, "errorMessage"); err != nil { + log.Errorf(log.ExchangeSys, "%s error parsing WS errorMessage: %s from message: %s", k.Name, err, resp) + return fmt.Errorf("error status did not contain errorMessage: %s", resp) + } + return errors.New(msg) } // wsAddOrder creates an order, returned order ID if success diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 3394c18368d..ecac59bf624 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -26,6 +26,8 @@ const ( var ( // ErrSubscriptionFailure defines an error when a subscription fails ErrSubscriptionFailure = errors.New("subscription failure") + // ErrUnsubscribeFailure defines an error when a unsubscribe fails + ErrUnsubscribeFailure = errors.New("unsubscribe failure") // ErrAlreadyDisabled is returned when you double-disable the websocket ErrAlreadyDisabled = errors.New("websocket already disabled") // ErrNotConnected defines an error when websocket is not connected diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index e1a493ab163..87e268c671e 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -520,7 +520,7 @@ func TestSubscribeUnsubscribe(t *testing.T) { assert.Nil(t, ws.GetSubscription(42), "GetSubscription on empty internal map should return") assert.NoError(t, ws.SubscribeToChannels(subs), "Basic Subscribing should not error") assert.Len(t, ws.GetSubscriptions(), 4, "Should have 4 subscriptions") - byDefKey := ws.GetSubscription(defaultChannelKey{Channel: "TestSub"}) + byDefKey := ws.GetSubscription(DefaultChannelKey{Channel: "TestSub"}) if assert.NotNil(t, byDefKey, "GetSubscription by default key should find a channel") { assert.Equal(t, "TestSub", byDefKey.Channel, "GetSubscription by default key should return a pointer a copy of the right channel") assert.NotSame(t, byDefKey, ws.subscriptions["TestSub"], "GetSubscription returns a fresh pointer") diff --git a/go.mod b/go.mod index 9a466edf8b7..12060dccb8e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/thrasher-corp/gocryptotrader go 1.20 require ( + github.com/buger/jsonparser v1.1.1 github.com/d5/tengo/v2 v2.16.1 github.com/gofrs/uuid v4.4.0+incompatible github.com/gorilla/mux v1.8.0 diff --git a/go.sum b/go.sum index 0daeb87a62e..5c2e9e1d7bb 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1 h1:NDBbPmhS+EqABEs5Kg3n/5ZNjy73Pz7SIV+KCeqyXcs= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= diff --git a/testdata/configtest.json b/testdata/configtest.json index f20bdc24a96..50b4f042147 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -1929,8 +1929,8 @@ } }, "api": { - "authenticatedSupport": false, - "authenticatedWebsocketApiSupport": false, + "authenticatedSupport": true, + "authenticatedWebsocketApiSupport": true, "endpoints": { "url": "NON_DEFAULT_HTTP_LINK_TO_EXCHANGE_API", "urlSecondary": "NON_DEFAULT_HTTP_LINK_TO_EXCHANGE_API",