diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 1f12e0e8fde..e83805a5e3c 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -33,7 +33,7 @@ var k = &Kraken{} var wsSetupRan, wsAuthSetupRan bool var comms = make(chan stream.Response) -// Please add your own APIkeys to do correct due diligence testing. +// Please add your own APIkeys here or in config/testdata.json to do correct due diligence testing const ( apiKey = "" apiSecret = "" @@ -52,9 +52,12 @@ func TestMain(m *testing.M) { if err != nil { log.Fatal(err) } - krakenConfig.API.AuthenticatedSupport = true - krakenConfig.API.Credentials.Key = apiKey - krakenConfig.API.Credentials.Secret = apiSecret + if apiKey != "" { + krakenConfig.API.Credentials.Key = apiKey + } + if apiSecret != "" { + krakenConfig.API.Credentials.Secret = apiSecret + } k.Websocket = sharedtestvalues.NewTestWebsocket() err = k.Setup(krakenConfig) if err != nil { @@ -1207,19 +1210,19 @@ func TestWithdrawCancel(t *testing.T) { // setupWsAuthTest will connect both websockets // and should be called directly from a auth ws test -func setupWsAuthTest(t *testing.T) { - t.Helper() - setupWsTest(t) - setupAuthWs(t) +func setupWsAuthTest(tb testing.TB) { + tb.Helper() + setupWsTest(tb) + setupAuthWs(tb) } // setupWsTest will just connect the non-authenticated websocket // and should be called directly from a non-auth ws test -func setupWsTest(t *testing.T) { - t.Helper() +func setupWsTest(tb testing.TB) { + tb.Helper() if !k.Websocket.IsEnabled() { - t.Skip("Websocket not enabled") + tb.Skip("Websocket not enabled") } if wsSetupRan { @@ -1229,26 +1232,23 @@ func setupWsTest(t *testing.T) { var dialer websocket.Dialer if err := k.Websocket.Conn.Dial(&dialer, http.Header{}); err != nil { - t.Fatalf("Dialing the websocket should not error: %s", err) + tb.Fatalf("Dialing the websocket should not error: %s", err) } go k.wsFunnelConnectionData(k.Websocket.Conn, comms) - go k.wsReadData(comms) - go func() { - err := k.wsPingHandler() - assert.NoError(t, err, "wsPingHandler should not error") - }() + go k.wsPingHandler(k.Websocket.Conn) } // setupAuthWs will just connect the authenticated websocket and should not be called directly -func setupAuthWs(t *testing.T) { +func setupAuthWs(tb testing.TB) { + tb.Helper() if !k.API.AuthenticatedWebsocketSupport { - t.Skip("Authenticated Websocket not Supported") + tb.Skip("Authenticated Websocket not Supported") } if !sharedtestvalues.AreAPICredentialsSet(k) { - t.Skip("Authenticated Websocket credentials not set") + tb.Skip("Authenticated Websocket credentials not set") } if wsAuthSetupRan { @@ -1259,28 +1259,96 @@ func setupAuthWs(t *testing.T) { var err error var dialer websocket.Dialer if err = k.Websocket.AuthConn.Dial(&dialer, http.Header{}); err != nil { - t.Fatalf("Dialing the auth websocket should not error: %s", err) + tb.Fatalf("Dialing the auth websocket should not error: %s", err) } authToken, err = k.GetWebsocketToken(context.Background()) - assert.NoError(t, err, "GetWebsocketToken should not error") + assert.NoError(tb, err, "GetWebsocketToken should not error") go k.wsFunnelConnectionData(k.Websocket.AuthConn, comms) } -// TestWebsocketSubscribe tests returning a message with an id +// TestWebsocketSubscribe tests unauthenticated websocket subscriptions +// Specifically looking to ensure multiple errors are collected and returned and ws.Subscriptions Added/Removed in cases of: +// single pass, single fail, mixed fail, multiple pass, all fail +// No objection to this becoming a fixture test, so long as it integrates through Un/Subscribe roundtrip func TestWebsocketSubscribe(t *testing.T) { setupWsTest(t) - err := k.Subscribe([]stream.ChannelSubscription{ - { - Channel: defaultSubscribedChannels[0], - Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"), - }, + + err := k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}}) + assert.NoError(t, err, "Simple subscription should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription") + + err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}}) + assert.NoError(t, err, "Resubscribing to the same channel shouldn't error") + assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error") + + err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Simple error subscription should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/HOBBIT", "Subscribing to an invalid pair should yield the correct error") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("ETH", "USD", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "ELF", "/")}, }) - assert.NotNil(t, err, "Blah") - if err != nil { - t.Error(err) - } + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Mixed error subscription should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/ELF", "Subscribing to an invalid pair should yield the correct error") + assert.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")}, + }) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Only failing subscriptions should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/GOBLIN", "Subscribing to an invalid pair should yield the correct error") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("ETH", "XBT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("LTC", "ETH", "/")}, + }) + assert.NoError(t, err, "Multiple successful subscriptions should not error") + + subs := k.Websocket.GetSubscriptions() + assert.Len(t, subs, 4, "Should have correct number of subscriptions") + + err = k.Unsubscribe(subs[:1]) + assert.NoError(t, err, "Simple Unsubscribe should succeed") + assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should have removed 1 channel") + + err = k.Unsubscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "WIZARD", "/"), Key: 1337}}) + assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Simple failing Unsubscribe should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/WIZARD", "Simple failing Unsubscribe should error") + assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should not have have removed any channels") + + err = k.Unsubscribe([]stream.ChannelSubscription{ + subs[1], + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "EAGLE", "/"), Key: 1338}, + }) + assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Mixed failing Unsubscribe should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/EAGLE", "Simple failing Unsubscribe should error") + + subs = k.Websocket.GetSubscriptions() + assert.Len(t, subs, 2, "Should have removed only 1 more channel") + + err = k.Unsubscribe(subs) + assert.NoError(t, err, "Unsubscribe multiple passing subscriptions should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed all channels") +} + +// TestWebsocketSubscribeAuth tests Auth's subscriptions +func TestWebsocketSubscribeAuth(t *testing.T) { + setupWsAuthTest(t) + + err := k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsOwnTrades}}) + assert.NoError(t, err, "Subsrcibing to ownTrades should not error") + + subs := k.Websocket.GetSubscriptions() + assert.Len(t, subs, 1, "Should add 1 Subscription") + + err = k.Unsubscribe(subs) + assert.NoError(t, err, "Unsubscribing an auth channel should not error") + assert.Len(t, k.Websocket.GetSubscriptions(), 0, "Should have successfully removed channel") } func TestGetWSToken(t *testing.T) { diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index 2f833479ff2..32f99075e86 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -1,11 +1,11 @@ package kraken import ( + "errors" "time" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/order" - "github.com/thrasher-corp/gocryptotrader/exchanges/stream" ) const ( @@ -75,6 +75,10 @@ const ( var ( assetTranslator assetTranslatorStore + + errNoWebsocketOrderbookData = errors.New("no websocket orderbook data") + errNoRequestID = errors.New("no RequestID in response") + errMaxDepthMissing = errors.New("MaxDepth missing for subscription") ) // GenericResponse stores general response data for functions that only return success @@ -495,43 +499,37 @@ type WithdrawStatusResponse struct { Status string `json:"status"` } -// WebsocketSubscriptionEventRequest handles WS subscription events -type WebsocketSubscriptionEventRequest struct { - Event string `json:"event"` // subscribe - RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. - Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). - Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` - Channels []stream.ChannelSubscription `json:"-"` // Keeps track of associated subscriptions in batched outgoings -} - -// WebsocketBaseEventRequest Just has an "event" property -type WebsocketBaseEventRequest struct { - Event string `json:"event"` // eg "unsubscribe" +// WebsocketSubscribeReq +type WebsocketSubscribeRequest struct { + Event string `json:"event"` // "subscribe" + RequestID int64 `json:"reqid,omitempty"` + Pairs []string `json:"pair,omitempty"` + Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` } -// WebsocketUnsubscribeByChannelIDEventRequest handles WS unsubscribe events -type WebsocketUnsubscribeByChannelIDEventRequest struct { - WebsocketBaseEventRequest - RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. - Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). - ChannelID int64 `json:"channelID,omitempty"` +// WebsocketUnsubscribeRequest handles WS unsubscribe events +type WebsocketUnsubscribeRequest struct { + Event string `json:"event"` // "unsubscribe" + RequestID int64 `json:"reqid,omitempty"` + Pairs []string `json:"pair,omitempty"` + Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` } // WebsocketSubscriptionData contains details on WS channel type WebsocketSubscriptionData struct { Name string `json:"name,omitempty"` // ticker|ohlc|trade|book|spread|*, * for all (ohlc interval value is 1 if all channels subscribed) Interval int64 `json:"interval,omitempty"` // Optional - Time interval associated with ohlc subscription in minutes. Default 1. Valid Interval values: 1|5|15|30|60|240|1440|10080|21600 - Depth int64 `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000 + Depth int `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000 Token string `json:"token,omitempty"` // Optional used for authenticated requests } // WebsocketEventResponse holds all data response types type WebsocketEventResponse struct { - WebsocketBaseEventRequest + Event string `json:"event"` Status string `json:"status"` Pair currency.Pair `json:"pair,omitempty"` - RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. + RequestID int64 `json:"reqid,omitempty"` Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"` ChannelName string `json:"channelName,omitempty"` WebsocketSubscriptionEventResponse @@ -556,15 +554,6 @@ type WebsocketErrorResponse struct { ErrorMessage string `json:"errorMessage"` } -// WebsocketChannelData Holds relevant data for channels to identify what we're -// doing -type WebsocketChannelData struct { - Subscription string - Pair currency.Pair - ChannelID *int64 - MaxDepth int -} - // WsTokenResponse holds the WS auth token type WsTokenResponse struct { Error []string `json:"error"` diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 59fb179a41c..97864a077a1 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/convert" @@ -55,24 +56,21 @@ const ( krakenWsOrderbookDepth = 1000 ) -// orderbookMutex Ensures if two entries arrive at once, only one can be -// processed at a time var ( - subscriptionChannelPair []WebsocketChannelData - authToken string - pingRequest = WebsocketBaseEventRequest{Event: stream.Ping} - m sync.Mutex - errNoWebsocketOrderbookData = errors.New("no websocket orderbook data") + authToken string ) // Channels require a topic and a currency // Format [[ticker,but-t4u],[orderbook,nce-btt]] var defaultSubscribedChannels = []string{ krakenWsTicker, - krakenWsTrade, - krakenWsOrderbook, - krakenWsOHLC, - krakenWsSpread} + /* + krakenWsTrade, + krakenWsOrderbook, + krakenWsOHLC, + krakenWsSpread + */ +} var authenticatedChannels = []string{krakenWsOwnTrades, krakenWsOpenOrders} var cancelOrdersStatusMutex sync.Mutex @@ -120,24 +118,13 @@ func (k *Kraken) WsConnect() error { k.Websocket.SetCanUseAuthenticatedEndpoints(true) k.Websocket.Wg.Add(1) go k.wsFunnelConnectionData(k.Websocket.AuthConn, comms) - err = k.wsAuthPingHandler() - if err != nil { - log.Errorf(log.ExchangeSys, - "%v - failed setup ping handler for auth connection. Websocket may disconnect unexpectedly. %v\n", - k.Name, - err) - } + k.wsPingHandler(k.Websocket.AuthConn) } } } - err = k.wsPingHandler() - if err != nil { - log.Errorf(log.ExchangeSys, - "%v - failed setup ping handler. Websocket may disconnect unexpectedly. %v\n", - k.Name, - err) - } + k.wsPingHandler(k.Websocket.Conn) + return nil } @@ -210,22 +197,36 @@ func isAwaitingCancelOrderResponses(requestID int64, success bool) bool { func (k *Kraken) wsHandleData(respRaw []byte) error { if strings.HasPrefix(string(respRaw), "[") { var dataResponse WebsocketDataResponse - err := json.Unmarshal(respRaw, &dataResponse) - if err != nil { + if err := json.Unmarshal(respRaw, &dataResponse); err != nil { return err } - if _, ok := dataResponse[0].(float64); ok { - err = k.wsReadDataResponse(dataResponse) - if err != nil { - return err - } + if len(dataResponse) < 3 { + return fmt.Errorf("websocket data array too short: %s", respRaw) } - if _, ok := dataResponse[1].(string); ok { - err = k.wsHandleAuthDataResponse(dataResponse) - if err != nil { + + // For all types of channel second to last field is the channel Name + channelName, ok := dataResponse[len(dataResponse)-2].(string) + if !ok { + return common.GetTypeAssertError("string", dataResponse[len(dataResponse)-2], "channelName") + } + + // wsPair is just used for keying the Subs + wsPair := currency.EMPTYPAIR + if maybePair, ok2 := dataResponse[len(dataResponse)-1].(string); ok2 { + var err error + if wsPair, err = currency.NewPairFromString(maybePair); err != nil { return err } } + + c := k.Websocket.GetSubscription(stream.DefaultChannelKey{channelName, wsPair, asset.Spot}) + if c == nil { + return fmt.Errorf("Could not find subscription channel: %s %s %s", asset.Spot, channelName, wsPair) + } + + if err := k.wsReadDataResponse(c, dataResponse); err != nil { + return err + } } else { var eventResponse map[string]interface{} err := json.Unmarshal(respRaw, &eventResponse) @@ -358,16 +359,17 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { err, respRaw) } + if sub.RequestID == 0 { + return fmt.Errorf("%v %w: %v", k.Name, errNoRequestID, respRaw) + } + k.Websocket.Match.IncomingWithData(sub.RequestID, respRaw) + if sub.Status != "subscribed" && sub.Status != "unsubscribed" { return fmt.Errorf("%v %v %v", k.Name, sub.RequestID, sub.ErrorMessage) } - k.addNewSubscriptionChannelData(&sub) - if sub.RequestID > 0 { - k.Websocket.Match.IncomingWithData(sub.RequestID, respRaw) - } default: k.Websocket.DataHandler <- stream.UnhandledMessageWarning{ Message: k.Name + stream.UnhandledMessage + string(respRaw), @@ -379,111 +381,71 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { return nil } -// wsPingHandler sends a message "ping" every 27 to maintain the connection to the websocket -func (k *Kraken) wsPingHandler() error { - message, err := json.Marshal(pingRequest) - if err != nil { - return err - } - k.Websocket.Conn.SetupPingHandler(stream.PingHandler{ - Message: message, - Delay: krakenWsPingDelay, - MessageType: websocket.TextMessage, - }) - return nil -} - -// wsAuthPingHandler sends a message "ping" every 27 to maintain the connection to the websocket -func (k *Kraken) wsAuthPingHandler() error { - message, err := json.Marshal(pingRequest) - if err != nil { - return err - } - k.Websocket.AuthConn.SetupPingHandler(stream.PingHandler{ - Message: message, +// wsPingHandler starts a websocket ping handler every 27s +func (k *Kraken) wsPingHandler(conn stream.Connection) { + conn.SetupPingHandler(stream.PingHandler{ + Message: []byte(`{"event":"ping"}`), Delay: krakenWsPingDelay, MessageType: websocket.TextMessage, }) - return nil } // wsReadDataResponse classifies the WS response and sends to appropriate handler -func (k *Kraken) wsReadDataResponse(response WebsocketDataResponse) error { - if cID, ok := response[0].(float64); ok { - channelID := int64(cID) - channelData, err := getSubscriptionChannelData(channelID) - if err != nil { - return err +func (k *Kraken) wsReadDataResponse(c *stream.ChannelSubscription, response WebsocketDataResponse) error { + switch c.Channel { + case krakenWsTicker: + t, ok := response[1].(map[string]interface{}) + if !ok { + return errors.New("received invalid ticker data") } - switch channelData.Subscription { - case krakenWsTicker: - t, ok := response[1].(map[string]interface{}) - if !ok { - return errors.New("received invalid ticker data") - } - return k.wsProcessTickers(&channelData, t) - case krakenWsOHLC: - o, ok := response[1].([]interface{}) - if !ok { - return errors.New("received invalid OHLCV data") - } - return k.wsProcessCandles(&channelData, o) - case krakenWsOrderbook: - ob, ok := response[1].(map[string]interface{}) - if !ok { + return k.wsProcessTickers(c, t) + case krakenWsOHLC: + o, ok := response[1].([]interface{}) + if !ok { + return errors.New("received invalid OHLCV data") + } + return k.wsProcessCandles(c, o) + case krakenWsOrderbook: + ob, ok := response[1].(map[string]interface{}) + if !ok { + return errors.New("received invalid orderbook data") + } + + if len(response) == 5 { + ob2, okob2 := response[2].(map[string]interface{}) + if !okob2 { return errors.New("received invalid orderbook data") } - if len(response) == 5 { - ob2, okob2 := response[2].(map[string]interface{}) - if !okob2 { - return errors.New("received invalid orderbook data") - } - - // Squish both maps together to process - for k, v := range ob2 { - if _, ok := ob[k]; ok { - return errors.New("cannot merge maps, conflict is present") - } - ob[k] = v + // Squish both maps together to process + for k, v := range ob2 { + if _, ok := ob[k]; ok { + return errors.New("cannot merge maps, conflict is present") } + ob[k] = v } - return k.wsProcessOrderBook(&channelData, ob) - case krakenWsSpread: - s, ok := response[1].([]interface{}) - if !ok { - return errors.New("received invalid spread data") - } - k.wsProcessSpread(&channelData, s) - case krakenWsTrade: - t, ok := response[1].([]interface{}) - if !ok { - return errors.New("received invalid trade data") - } - return k.wsProcessTrades(&channelData, t) - default: - return fmt.Errorf("%s received unidentified data for subscription %s: %+v", - k.Name, - channelData.Subscription, - response) } - } - - return nil -} - -func (k *Kraken) wsHandleAuthDataResponse(response WebsocketDataResponse) error { - if chName, ok := response[1].(string); ok { - switch chName { - case krakenWsOwnTrades: - return k.wsProcessOwnTrades(response[0]) - case krakenWsOpenOrders: - return k.wsProcessOpenOrders(response[0]) - default: - return fmt.Errorf("%v Unidentified websocket data received: %+v", - k.Name, response) + return k.wsProcessOrderBook(c, ob) + case krakenWsSpread: + s, ok := response[1].([]interface{}) + if !ok { + return errors.New("received invalid spread data") + } + k.wsProcessSpread(c, s) + case krakenWsTrade: + t, ok := response[1].([]interface{}) + if !ok { + return errors.New("received invalid trade data") } + return k.wsProcessTrades(c, t) + case krakenWsOwnTrades: + return k.wsProcessOwnTrades(response[0]) + case krakenWsOpenOrders: + return k.wsProcessOpenOrders(response[0]) + default: + return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, c.Channel, response) } + return nil } @@ -635,60 +597,8 @@ func (k *Kraken) wsProcessOpenOrders(ownOrders interface{}) error { return errors.New(k.Name + " - Invalid own trades data") } -// addNewSubscriptionChannelData stores channel ids, pairs and subscription types to an array -// allowing correlation between subscriptions and returned data -func (k *Kraken) addNewSubscriptionChannelData(response *wsSubscription) { - // We change the / to - to maintain compatibility with REST/config - var pair, fPair currency.Pair - var err error - if response.Pair != "" { - pair, err = currency.NewPairFromString(response.Pair) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - return - } - fPair, err = k.FormatExchangeCurrency(pair, asset.Spot) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - return - } - } - - maxDepth := 0 - if splits := strings.Split(response.ChannelName, "-"); len(splits) > 1 { - maxDepth, err = strconv.Atoi(splits[1]) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - } - } - m.Lock() - defer m.Unlock() - subscriptionChannelPair = append(subscriptionChannelPair, WebsocketChannelData{ - Subscription: response.Subscription.Name, - Pair: fPair, - ChannelID: response.ChannelID, - MaxDepth: maxDepth, - }) -} - -// getSubscriptionChannelData retrieves WebsocketChannelData based on response ID -func getSubscriptionChannelData(id int64) (WebsocketChannelData, error) { - m.Lock() - defer m.Unlock() - for i := range subscriptionChannelPair { - if subscriptionChannelPair[i].ChannelID == nil { - continue - } - if id == *subscriptionChannelPair[i].ChannelID { - return subscriptionChannelPair[i], nil - } - } - return WebsocketChannelData{}, - fmt.Errorf("could not get subscription data for id %d", id) -} - // wsProcessTickers converts ticker data and sends it to the datahandler -func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data map[string]interface{}) error { +func (k *Kraken) wsProcessTickers(c *stream.ChannelSubscription, data map[string]interface{}) error { closePrice, err := strconv.ParseFloat(data["c"].([]interface{})[0].(string), 64) if err != nil { return err @@ -727,14 +637,14 @@ func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data map[st Low: lowPrice, Bid: bid, Ask: ask, - AssetType: asset.Spot, - Pair: channelData.Pair, + AssetType: c.Asset, + Pair: c.Currency, } return nil } // wsProcessSpread converts spread/orderbook data and sends it to the datahandler -func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []interface{}) { +func (k *Kraken) wsProcessSpread(c *stream.ChannelSubscription, data []interface{}) { if len(data) < 5 { k.Websocket.DataHandler <- fmt.Errorf("%s unexpected wsProcessSpread data length", k.Name) return @@ -771,7 +681,7 @@ func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []inter log.Debugf(log.ExchangeSys, "%v Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v', Bid volume '%v', Ask volume '%v'", k.Name, - channelData.Pair, + c.Currency, bestBid, bestAsk, convert.TimeFromUnixTimestampDecimal(timeData), @@ -781,7 +691,7 @@ func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []inter } // wsProcessTrades converts trade data and sends it to the datahandler -func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []interface{}) error { +func (k *Kraken) wsProcessTrades(c *stream.ChannelSubscription, data []interface{}) error { if !k.IsSaveTradeDataEnabled() { return nil } @@ -815,8 +725,8 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []inter } trades[i] = trade.Data{ - AssetType: asset.Spot, - CurrencyPair: channelData.Pair, + AssetType: c.Asset, + CurrencyPair: c.Currency, Exchange: k.Name, Price: price, Amount: amount, @@ -829,7 +739,7 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []inter // wsProcessOrderBook determines if the orderbook data is partial or update // Then sends to appropriate fun -func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[string]interface{}) error { +func (k *Kraken) wsProcessOrderBook(c *stream.ChannelSubscription, data map[string]interface{}) error { // NOTE: Updates are a priority so check if it's an update first as we don't // need multiple map lookups to check for snapshot. askData, asksExist := data["a"].([]interface{}) @@ -842,9 +752,9 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ k.wsRequestMtx.Lock() defer k.wsRequestMtx.Unlock() - err := k.wsProcessOrderBookUpdate(channelData, askData, bidData, checksum) + err := k.wsProcessOrderBookUpdate(c, askData, bidData, checksum) if err != nil { - outbound := channelData.Pair // Format required "XBT/USD" + outbound := c.Currency // Format required "XBT/USD" outbound.Delimiter = "/" go func(resub *stream.ChannelSubscription) { // This was locking the main websocket reader routine and a @@ -856,11 +766,7 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ resub, errResub) } - }(&stream.ChannelSubscription{ - Channel: krakenWsOrderbook, - Currency: outbound, - Asset: asset.Spot, - }) + }(c) return err } return nil @@ -869,21 +775,21 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ askSnapshot, askSnapshotExists := data["as"].([]interface{}) bidSnapshot, bidSnapshotExists := data["bs"].([]interface{}) if !askSnapshotExists && !bidSnapshotExists { - return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, channelData.Pair, asset.Spot) + return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, c.Currency, c.Asset) } - return k.wsProcessOrderBookPartial(channelData, askSnapshot, bidSnapshot) + return k.wsProcessOrderBookPartial(c, askSnapshot, bidSnapshot) } // wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, askData, bidData []interface{}) error { +func (k *Kraken) wsProcessOrderBookPartial(c *stream.ChannelSubscription, askData, bidData []interface{}) error { base := orderbook.Base{ - Pair: channelData.Pair, - Asset: asset.Spot, + Pair: c.Currency, + Asset: c.Asset, VerifyOrderbook: k.CanVerifyOrderbook, Bids: make(orderbook.Items, len(bidData)), Asks: make(orderbook.Items, len(askData)), - MaxDepth: channelData.MaxDepth, + MaxDepth: krakenWsOrderbookDepth, } // Kraken ob data is timestamped per price, GCT orderbook data is // timestamped per entry using the highest last update time, we can attempt @@ -956,10 +862,10 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, as } // wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, askData, bidData []interface{}, checksum string) error { +func (k *Kraken) wsProcessOrderBookUpdate(c *stream.ChannelSubscription, askData, bidData []interface{}, checksum string) error { update := orderbook.Update{ - Asset: asset.Spot, - Pair: channelData.Pair, + Asset: c.Asset, + Pair: c.Currency, Bids: make([]orderbook.Item, len(bidData)), Asks: make([]orderbook.Item, len(askData)), } @@ -1102,12 +1008,9 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, ask return err } - book, err := k.Websocket.Orderbook.GetOrderbook(channelData.Pair, asset.Spot) + book, err := k.Websocket.Orderbook.GetOrderbook(c.Currency, c.Asset) if err != nil { - return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", - channelData.Pair, - asset.Spot, - err) + return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", c.Currency, c.Asset, err) } token, err := strconv.ParseInt(checksum, 10, 64) @@ -1158,7 +1061,7 @@ func trim(s string) string { } // wsProcessCandles converts candle data and sends it to the data handler -func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data []interface{}) error { +func (k *Kraken) wsProcessCandles(c *stream.ChannelSubscription, data []interface{}) error { startTime, err := strconv.ParseFloat(data[0].(string), 64) if err != nil { return err @@ -1195,8 +1098,8 @@ func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data []inte } k.Websocket.DataHandler <- stream.KlineData{ - AssetType: asset.Spot, - Pair: channelData.Pair, + AssetType: c.Asset, + Pair: c.Currency, Timestamp: time.Now(), Exchange: k.Name, StartTime: convert.TimeFromUnixTimestampDecimal(startTime), @@ -1240,129 +1143,165 @@ func (k *Kraken) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e } // Subscribe sends a websocket message to receive data from the channel -func (k *Kraken) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error { - var subscriptions = make(map[string]*[]WebsocketSubscriptionEventRequest) -channels: - for i := range channelsToSubscribe { - s, ok := subscriptions[channelsToSubscribe[i].Channel] - if !ok { - s = &[]WebsocketSubscriptionEventRequest{} - subscriptions[channelsToSubscribe[i].Channel] = s - } +func (k *Kraken) Subscribe(channels []stream.ChannelSubscription) error { + return k.parallelChanOp(channels, k.subscribeToChan) +} - for j := range *s { - (*s)[j].Pairs = append((*s)[j].Pairs, channelsToSubscribe[i].Currency.String()) - (*s)[j].Channels = append((*s)[j].Channels, channelsToSubscribe[i]) - continue channels - } +// Unsubscribe sends a websocket message to stop receiving data from the channel +func (k *Kraken) Unsubscribe(channels []stream.ChannelSubscription) error { + return k.parallelChanOp(channels, k.unsubscribeFromChan) +} - id := k.Websocket.Conn.GenerateMessageID(false) - outbound := WebsocketSubscriptionEventRequest{ - Event: krakenWsSubscribe, - RequestID: id, - Subscription: WebsocketSubscriptionData{ - Name: channelsToSubscribe[i].Channel, - }, - } - if channelsToSubscribe[i].Channel == "book" { - outbound.Subscription.Depth = krakenWsOrderbookDepth - } - if !channelsToSubscribe[i].Currency.IsEmpty() { - outbound.Pairs = []string{channelsToSubscribe[i].Currency.String()} - } - if common.StringDataContains(authenticatedChannels, channelsToSubscribe[i].Channel) { - outbound.Subscription.Token = authToken - } +func (k *Kraken) parallelChanOp(channels []stream.ChannelSubscription, m func(*stream.ChannelSubscription) error) error { + wg := sync.WaitGroup{} + wg.Add(len(channels)) + errC := make(chan error, len(channels)) - outbound.Channels = append(outbound.Channels, channelsToSubscribe[i]) - *s = append(*s, outbound) + for i := range channels { + go func(c *stream.ChannelSubscription) { + defer wg.Done() + if err := m(c); err != nil { + errC <- err + } + }(&channels[i]) } + wg.Wait() + close(errC) + var errs error - for _, subs := range subscriptions { - for i := range *subs { - if common.StringDataContains(authenticatedChannels, (*subs)[i].Subscription.Name) { - _, err := k.Websocket.AuthConn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) - continue - } - _, err := k.Websocket.Conn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) - } + for err := range errC { + errs = common.AppendError(errs, err) } + return errs } -// Unsubscribe sends a websocket message to stop receiving data from the channel -func (k *Kraken) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error { - var unsubs []WebsocketSubscriptionEventRequest -channels: - for x := range channelsToUnsubscribe { - for y := range unsubs { - if unsubs[y].Subscription.Name == channelsToUnsubscribe[x].Channel { - unsubs[y].Pairs = append(unsubs[y].Pairs, - channelsToUnsubscribe[x].Currency.String()) - unsubs[y].Channels = append(unsubs[y].Channels, - channelsToUnsubscribe[x]) - continue channels - } - } - var depth int64 - if channelsToUnsubscribe[x].Channel == "book" { - depth = krakenWsOrderbookDepth - } +// subscribeToChan sends a websocket message to receive data from the channel +func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { + conn := k.Websocket.Conn + r := WebsocketSubscribeRequest{ + Event: krakenWsSubscribe, + RequestID: conn.GenerateMessageID(false), + Subscription: WebsocketSubscriptionData{ + Name: c.Channel, + }, + } + + if !c.Currency.IsEmpty() { + r.Pairs = []string{c.Currency.String()} + } - var id int64 - if common.StringDataContains(authenticatedChannels, channelsToUnsubscribe[x].Channel) { - id = k.Websocket.AuthConn.GenerateMessageID(false) + if c.Channel == krakenWsOrderbook { + if depth, err := depthFromChan(c); err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) } else { - id = k.Websocket.Conn.GenerateMessageID(false) + r.Subscription.Depth = depth } + } - unsub := WebsocketSubscriptionEventRequest{ - Event: krakenWsUnsubscribe, - Pairs: []string{channelsToUnsubscribe[x].Currency.String()}, - Subscription: WebsocketSubscriptionData{ - Name: channelsToUnsubscribe[x].Channel, - Depth: depth, - }, - RequestID: id, - } - if common.StringDataContains(authenticatedChannels, channelsToUnsubscribe[x].Channel) { - unsub.Subscription.Token = authToken - } - unsub.Channels = append(unsub.Channels, channelsToUnsubscribe[x]) - unsubs = append(unsubs, unsub) + if common.StringDataContains(authenticatedChannels, r.Subscription.Name) { + r.Subscription.Token = authToken + conn = k.Websocket.AuthConn } - var errs error - for i := range unsubs { - if common.StringDataContains(authenticatedChannels, unsubs[i].Subscription.Name) { - _, err := k.Websocket.AuthConn.SendMessageReturnResponse(unsubs[i].RequestID, unsubs[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...) - continue + respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) + if err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) + } + + if err = k.getErrResp(respRaw); err != nil { + wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) + k.Websocket.DataHandler <- wErr + return wErr + } + + k.Websocket.AddSuccessfulSubscriptions(*c) + if k.Verbose { + log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Currency) + } + + return nil +} + +// unsubscribeFromChan sends a websocket message to stop receiving data from a channel +func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error { + conn := k.Websocket.Conn + r := WebsocketUnsubscribeRequest{ + Event: krakenWsUnsubscribe, + RequestID: conn.GenerateMessageID(false), + Subscription: WebsocketSubscriptionData{ + Name: c.Channel, + }, + } + + if !c.Currency.IsEmpty() { + r.Pairs = []string{c.Currency.String()} + } + + if c.Channel == krakenWsOrderbook { + if depth, err := depthFromChan(c); err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) + } else { + r.Subscription.Depth = depth } + } - _, err := k.Websocket.Conn.SendMessageReturnResponse(unsubs[i].RequestID, unsubs[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue + if common.StringDataContains(authenticatedChannels, c.Channel) { + conn = k.Websocket.AuthConn + r.Subscription.Token = authToken + } + + respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) + if err != nil { + return err + } + + if err = k.getErrResp(respRaw); err != nil { + wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrUnsubscribeFailure, c.Channel, c.Currency, err) + k.Websocket.DataHandler <- wErr + return wErr + } + + k.Websocket.RemoveSuccessfulUnsubscriptions(*c) + + return nil +} + +func depthFromChan(c *stream.ChannelSubscription) (int, error) { + depthAny, ok := c.Params["depth"] + if !ok { + return 0, errMaxDepthMissing + } + depthInt, ok2 := depthAny.(int) + if !ok2 { + return 0, common.GetTypeAssertError("int", depthAny, "Subscription.Depth") + } + return depthInt, nil +} + +// getErrResp takes a json response string and looks for an error event type +// If found it returns the errorMessage +// It might log parsing errors about the nature of the error +// If the error message is not defined it will return a wrapped errUnknownError +func (k *Kraken) getErrResp(resp []byte) error { + event, err := jsonparser.GetUnsafeString(resp, "event") + switch { + case err != nil: + return fmt.Errorf("error parsing WS event: %w from message: %s", err, resp) + case event != "error": + status, _ := jsonparser.GetUnsafeString(resp, "status") // Error is really irrellevant here + if status != "error" { + return nil } - k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...) } - return errs + + var msg string + if msg, err = jsonparser.GetString(resp, "errorMessage"); err != nil { + log.Errorf(log.ExchangeSys, "%s error parsing WS errorMessage: %s from message: %s", k.Name, err, resp) + return fmt.Errorf("error status did not contain errorMessage: %s", resp) + } + return errors.New(msg) } // wsAddOrder creates an order, returned order ID if success diff --git a/exchanges/stream/stream_types.go b/exchanges/stream/stream_types.go index bc17df6dc0b..5315f0cffa2 100644 --- a/exchanges/stream/stream_types.go +++ b/exchanges/stream/stream_types.go @@ -31,7 +31,8 @@ type Response struct { Raw []byte } -type defaultChannelKey struct { +// DefaultChannelKey is the fallback key for AddSuccessfulSubscriptions +type DefaultChannelKey struct { Channel string Currency currency.Pair Asset asset.Item diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index ac29a98a3e5..47f29f755fb 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -26,6 +26,8 @@ const ( var ( // ErrSubscriptionFailure defines an error when a subscription fails ErrSubscriptionFailure = errors.New("subscription failure") + // ErrUnsubscribeFailure defines an error when a unsubscribe fails + ErrUnsubscribeFailure = errors.New("unsubscribe failure") // ErrAlreadyDisabled is returned when you double-disable the websocket ErrAlreadyDisabled = errors.New("websocket already disabled") // ErrNotConnected defines an error when websocket is not connected @@ -941,7 +943,7 @@ func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription) for i := range channels { key := channels[i].Key if key == nil { - key = defaultChannelKey{ + key = DefaultChannelKey{ Channel: channels[i].Channel, Asset: channels[i].Asset, Currency: channels[i].Currency, diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index e1a493ab163..87e268c671e 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -520,7 +520,7 @@ func TestSubscribeUnsubscribe(t *testing.T) { assert.Nil(t, ws.GetSubscription(42), "GetSubscription on empty internal map should return") assert.NoError(t, ws.SubscribeToChannels(subs), "Basic Subscribing should not error") assert.Len(t, ws.GetSubscriptions(), 4, "Should have 4 subscriptions") - byDefKey := ws.GetSubscription(defaultChannelKey{Channel: "TestSub"}) + byDefKey := ws.GetSubscription(DefaultChannelKey{Channel: "TestSub"}) if assert.NotNil(t, byDefKey, "GetSubscription by default key should find a channel") { assert.Equal(t, "TestSub", byDefKey.Channel, "GetSubscription by default key should return a pointer a copy of the right channel") assert.NotSame(t, byDefKey, ws.subscriptions["TestSub"], "GetSubscription returns a fresh pointer") diff --git a/testdata/configtest.json b/testdata/configtest.json index f20bdc24a96..50b4f042147 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -1929,8 +1929,8 @@ } }, "api": { - "authenticatedSupport": false, - "authenticatedWebsocketApiSupport": false, + "authenticatedSupport": true, + "authenticatedWebsocketApiSupport": true, "endpoints": { "url": "NON_DEFAULT_HTTP_LINK_TO_EXCHANGE_API", "urlSecondary": "NON_DEFAULT_HTTP_LINK_TO_EXCHANGE_API",