From e403481e31a6a0128091156a39fce0502aed610a Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Thu, 21 Sep 2023 18:24:37 +0700 Subject: [PATCH] Kraken: Sub/Unsub tests and fixes --- exchanges/kraken/kraken_test.go | 106 +++++-- exchanges/kraken/kraken_types.go | 24 +- exchanges/kraken/kraken_websocket.go | 400 ++++++++++++--------------- exchanges/stream/websocket.go | 2 + 4 files changed, 272 insertions(+), 260 deletions(-) diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 1f12e0e8fde..3a8d6018bbd 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -1207,19 +1207,19 @@ func TestWithdrawCancel(t *testing.T) { // setupWsAuthTest will connect both websockets // and should be called directly from a auth ws test -func setupWsAuthTest(t *testing.T) { - t.Helper() - setupWsTest(t) - setupAuthWs(t) +func setupWsAuthTest(tb testing.TB) { + tb.Helper() + setupWsTest(tb) + setupAuthWs(tb) } // setupWsTest will just connect the non-authenticated websocket // and should be called directly from a non-auth ws test -func setupWsTest(t *testing.T) { - t.Helper() +func setupWsTest(tb testing.TB) { + tb.Helper() if !k.Websocket.IsEnabled() { - t.Skip("Websocket not enabled") + tb.Skip("Websocket not enabled") } if wsSetupRan { @@ -1229,7 +1229,7 @@ func setupWsTest(t *testing.T) { var dialer websocket.Dialer if err := k.Websocket.Conn.Dial(&dialer, http.Header{}); err != nil { - t.Fatalf("Dialing the websocket should not error: %s", err) + tb.Fatalf("Dialing the websocket should not error: %s", err) } go k.wsFunnelConnectionData(k.Websocket.Conn, comms) @@ -1237,18 +1237,19 @@ func setupWsTest(t *testing.T) { go k.wsReadData(comms) go func() { err := k.wsPingHandler() - assert.NoError(t, err, "wsPingHandler should not error") + assert.NoError(tb, err, "wsPingHandler should not error") }() } // setupAuthWs will just connect the authenticated websocket and should not be called directly -func setupAuthWs(t *testing.T) { +func setupAuthWs(tb testing.TB) { + tb.Helper() if !k.API.AuthenticatedWebsocketSupport { - t.Skip("Authenticated Websocket not Supported") + tb.Skip("Authenticated Websocket not Supported") } if !sharedtestvalues.AreAPICredentialsSet(k) { - t.Skip("Authenticated Websocket credentials not set") + tb.Skip("Authenticated Websocket credentials not set") } if wsAuthSetupRan { @@ -1259,28 +1260,85 @@ func setupAuthWs(t *testing.T) { var err error var dialer websocket.Dialer if err = k.Websocket.AuthConn.Dial(&dialer, http.Header{}); err != nil { - t.Fatalf("Dialing the auth websocket should not error: %s", err) + tb.Fatalf("Dialing the auth websocket should not error: %s", err) } authToken, err = k.GetWebsocketToken(context.Background()) - assert.NoError(t, err, "GetWebsocketToken should not error") + assert.NoError(tb, err, "GetWebsocketToken should not error") go k.wsFunnelConnectionData(k.Websocket.AuthConn, comms) } -// TestWebsocketSubscribe tests returning a message with an id +// TestWebsocketSubscribe tests unauthenticated websocket subscriptions +// Specifically looking to ensure multiple errors are collected and returned and ws.Subscriptions Added/Removed in cases of: +// single pass, single fail, mixed fail, multiple pass, all fail +// No objection to this becoming a fixture test, so long as it integrates through Un/Subscribe roundtrip func TestWebsocketSubscribe(t *testing.T) { setupWsTest(t) - err := k.Subscribe([]stream.ChannelSubscription{ - { - Channel: defaultSubscribedChannels[0], - Currency: currency.NewPairWithDelimiter("XBT", "USD", "/"), - }, + + err := k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}}) + assert.NoError(t, err, "Simple subscription should not error") + + err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("XBT", "USD", "/")}}) + assert.NoError(t, err, "Resubscribing to the same channel shouldn't error") + + subs := k.Websocket.GetSubscriptions() + assert.Len(t, subs, 1, "Should have correct number of subscriptions") + + err = k.Subscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Simple error subscription should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/HOBBIT", "Subscribing to an invalid pair should yield the correct error") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("ETH", "USD", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "ELF", "/")}, }) - assert.NotNil(t, err, "Blah") - if err != nil { - t.Error(err) - } + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Mixed error subscription should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/ELF", "Subscribing to an invalid pair should yield the correct error") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")}, + }) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Only failing subscriptions should error") + assert.ErrorContains(t, err, "Currency pair not supported DWARF/GOBLIN", "Subscribing to an invalid pair should yield the correct error") + + err = k.Subscribe([]stream.ChannelSubscription{ + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("ETH", "XBT", "/")}, + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("LTC", "ETH", "/")}, + }) + assert.NoError(t, err, "Multiple successful subscriptions should not error") + + subs = k.Websocket.GetSubscriptions() + assert.Len(t, subs, 4, "Should have correct number of subscriptions") + + err = k.Unsubscribe(subs[:1]) + assert.NoError(t, err, "Simple Unsubscribe should succeed") + + err = k.Unsubscribe([]stream.ChannelSubscription{{Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "WIZARD", "/"), Key: 1337}}) + assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Simple failing Unsubscribe should error") + assert.ErrorContains(t, err, "DWARF/WIZARD; Not subscribed to the requested channelID", "Simple failing Unsubscribe should error") + + err = k.Unsubscribe([]stream.ChannelSubscription{ + subs[1], + {Channel: krakenWsTicker, Currency: currency.NewPairWithDelimiter("DWARF", "EAGLE", "/"), Key: 1338}, + }) + assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Mixed failing Unsubscribe should error") + assert.ErrorContains(t, err, "DWARF/EAGLE; Not subscribed to the requested channelID", "Simple failing Unsubscribe should error") + + subs = k.Websocket.GetSubscriptions() + assert.Len(t, subs, 2, "Should have successfully Unsubscribed 1 channel in a mixed error") + + err = k.Unsubscribe(subs) + assert.NoError(t, err, "Unsubscribe multiple passing subscriptions should not error") +} + +// TestWebsocketSubscribeAuth tests returning a message with an id +func TestWebsocketSubscribeAuth(t *testing.T) { + setupWsAuthTest(t) + + t.Error("Test WsAuth") } func TestGetWSToken(t *testing.T) { diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index 2f833479ff2..15e3f1be791 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -1,6 +1,7 @@ package kraken import ( + "errors" "time" "github.com/thrasher-corp/gocryptotrader/currency" @@ -75,6 +76,10 @@ const ( var ( assetTranslator assetTranslatorStore + + errNoWebsocketOrderbookData = errors.New("no websocket orderbook data") + errNoRequestID = errors.New("no RequestID in response") + errMaxDepthMissing = errors.New("MaxDepth missing for subscription") ) // GenericResponse stores general response data for functions that only return success @@ -509,12 +514,12 @@ type WebsocketBaseEventRequest struct { Event string `json:"event"` // eg "unsubscribe" } -// WebsocketUnsubscribeByChannelIDEventRequest handles WS unsubscribe events -type WebsocketUnsubscribeByChannelIDEventRequest struct { +// WebsocketUnsubscribeRequest handles WS unsubscribe events +type WebsocketUnsubscribeRequest struct { WebsocketBaseEventRequest - RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. - Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). - ChannelID int64 `json:"channelID,omitempty"` + Event string `json:"event"` // unsubscribe + RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. + ChannelID int `json:"channelID,omitempty"` } // WebsocketSubscriptionData contains details on WS channel @@ -556,15 +561,6 @@ type WebsocketErrorResponse struct { ErrorMessage string `json:"errorMessage"` } -// WebsocketChannelData Holds relevant data for channels to identify what we're -// doing -type WebsocketChannelData struct { - Subscription string - Pair currency.Pair - ChannelID *int64 - MaxDepth int -} - // WsTokenResponse holds the WS auth token type WsTokenResponse struct { Error []string `json:"error"` diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 59fb179a41c..f19959da38a 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/convert" @@ -55,24 +56,22 @@ const ( krakenWsOrderbookDepth = 1000 ) -// orderbookMutex Ensures if two entries arrive at once, only one can be -// processed at a time var ( - subscriptionChannelPair []WebsocketChannelData - authToken string - pingRequest = WebsocketBaseEventRequest{Event: stream.Ping} - m sync.Mutex - errNoWebsocketOrderbookData = errors.New("no websocket orderbook data") + authToken string + pingRequest = WebsocketBaseEventRequest{Event: stream.Ping} ) // Channels require a topic and a currency // Format [[ticker,but-t4u],[orderbook,nce-btt]] var defaultSubscribedChannels = []string{ krakenWsTicker, - krakenWsTrade, - krakenWsOrderbook, - krakenWsOHLC, - krakenWsSpread} + /* + krakenWsTrade, + krakenWsOrderbook, + krakenWsOHLC, + krakenWsSpread + */ +} var authenticatedChannels = []string{krakenWsOwnTrades, krakenWsOpenOrders} var cancelOrdersStatusMutex sync.Mutex @@ -358,16 +357,17 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { err, respRaw) } + if sub.RequestID == 0 { + return fmt.Errorf("%v %w: %v", k.Name, errNoRequestID, respRaw) + } + k.Websocket.Match.IncomingWithData(sub.RequestID, respRaw) + if sub.Status != "subscribed" && sub.Status != "unsubscribed" { return fmt.Errorf("%v %v %v", k.Name, sub.RequestID, sub.ErrorMessage) } - k.addNewSubscriptionChannelData(&sub) - if sub.RequestID > 0 { - k.Websocket.Match.IncomingWithData(sub.RequestID, respRaw) - } default: k.Websocket.DataHandler <- stream.UnhandledMessageWarning{ Message: k.Name + stream.UnhandledMessage + string(respRaw), @@ -409,25 +409,25 @@ func (k *Kraken) wsAuthPingHandler() error { // wsReadDataResponse classifies the WS response and sends to appropriate handler func (k *Kraken) wsReadDataResponse(response WebsocketDataResponse) error { - if cID, ok := response[0].(float64); ok { - channelID := int64(cID) - channelData, err := getSubscriptionChannelData(channelID) - if err != nil { - return err - } - switch channelData.Subscription { + cID, ok := response[0].(float64) + if !ok { + return nil + } + + if c := k.Websocket.GetSubscription(int64(cID)); c != nil { + switch c.Channel { case krakenWsTicker: t, ok := response[1].(map[string]interface{}) if !ok { return errors.New("received invalid ticker data") } - return k.wsProcessTickers(&channelData, t) + return k.wsProcessTickers(c, t) case krakenWsOHLC: o, ok := response[1].([]interface{}) if !ok { return errors.New("received invalid OHLCV data") } - return k.wsProcessCandles(&channelData, o) + return k.wsProcessCandles(c, o) case krakenWsOrderbook: ob, ok := response[1].(map[string]interface{}) if !ok { @@ -448,24 +448,21 @@ func (k *Kraken) wsReadDataResponse(response WebsocketDataResponse) error { ob[k] = v } } - return k.wsProcessOrderBook(&channelData, ob) + return k.wsProcessOrderBook(c, ob) case krakenWsSpread: s, ok := response[1].([]interface{}) if !ok { return errors.New("received invalid spread data") } - k.wsProcessSpread(&channelData, s) + k.wsProcessSpread(c, s) case krakenWsTrade: t, ok := response[1].([]interface{}) if !ok { return errors.New("received invalid trade data") } - return k.wsProcessTrades(&channelData, t) + return k.wsProcessTrades(c, t) default: - return fmt.Errorf("%s received unidentified data for subscription %s: %+v", - k.Name, - channelData.Subscription, - response) + return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, c.Channel, response) } } @@ -635,60 +632,8 @@ func (k *Kraken) wsProcessOpenOrders(ownOrders interface{}) error { return errors.New(k.Name + " - Invalid own trades data") } -// addNewSubscriptionChannelData stores channel ids, pairs and subscription types to an array -// allowing correlation between subscriptions and returned data -func (k *Kraken) addNewSubscriptionChannelData(response *wsSubscription) { - // We change the / to - to maintain compatibility with REST/config - var pair, fPair currency.Pair - var err error - if response.Pair != "" { - pair, err = currency.NewPairFromString(response.Pair) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - return - } - fPair, err = k.FormatExchangeCurrency(pair, asset.Spot) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - return - } - } - - maxDepth := 0 - if splits := strings.Split(response.ChannelName, "-"); len(splits) > 1 { - maxDepth, err = strconv.Atoi(splits[1]) - if err != nil { - log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err) - } - } - m.Lock() - defer m.Unlock() - subscriptionChannelPair = append(subscriptionChannelPair, WebsocketChannelData{ - Subscription: response.Subscription.Name, - Pair: fPair, - ChannelID: response.ChannelID, - MaxDepth: maxDepth, - }) -} - -// getSubscriptionChannelData retrieves WebsocketChannelData based on response ID -func getSubscriptionChannelData(id int64) (WebsocketChannelData, error) { - m.Lock() - defer m.Unlock() - for i := range subscriptionChannelPair { - if subscriptionChannelPair[i].ChannelID == nil { - continue - } - if id == *subscriptionChannelPair[i].ChannelID { - return subscriptionChannelPair[i], nil - } - } - return WebsocketChannelData{}, - fmt.Errorf("could not get subscription data for id %d", id) -} - // wsProcessTickers converts ticker data and sends it to the datahandler -func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data map[string]interface{}) error { +func (k *Kraken) wsProcessTickers(c *stream.ChannelSubscription, data map[string]interface{}) error { closePrice, err := strconv.ParseFloat(data["c"].([]interface{})[0].(string), 64) if err != nil { return err @@ -727,14 +672,14 @@ func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data map[st Low: lowPrice, Bid: bid, Ask: ask, - AssetType: asset.Spot, - Pair: channelData.Pair, + AssetType: c.Asset, + Pair: c.Currency, } return nil } // wsProcessSpread converts spread/orderbook data and sends it to the datahandler -func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []interface{}) { +func (k *Kraken) wsProcessSpread(c *stream.ChannelSubscription, data []interface{}) { if len(data) < 5 { k.Websocket.DataHandler <- fmt.Errorf("%s unexpected wsProcessSpread data length", k.Name) return @@ -771,7 +716,7 @@ func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []inter log.Debugf(log.ExchangeSys, "%v Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v', Bid volume '%v', Ask volume '%v'", k.Name, - channelData.Pair, + c.Currency, bestBid, bestAsk, convert.TimeFromUnixTimestampDecimal(timeData), @@ -781,7 +726,7 @@ func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data []inter } // wsProcessTrades converts trade data and sends it to the datahandler -func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []interface{}) error { +func (k *Kraken) wsProcessTrades(c *stream.ChannelSubscription, data []interface{}) error { if !k.IsSaveTradeDataEnabled() { return nil } @@ -815,8 +760,8 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []inter } trades[i] = trade.Data{ - AssetType: asset.Spot, - CurrencyPair: channelData.Pair, + AssetType: c.Asset, + CurrencyPair: c.Currency, Exchange: k.Name, Price: price, Amount: amount, @@ -829,7 +774,7 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []inter // wsProcessOrderBook determines if the orderbook data is partial or update // Then sends to appropriate fun -func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[string]interface{}) error { +func (k *Kraken) wsProcessOrderBook(c *stream.ChannelSubscription, data map[string]interface{}) error { // NOTE: Updates are a priority so check if it's an update first as we don't // need multiple map lookups to check for snapshot. askData, asksExist := data["a"].([]interface{}) @@ -842,9 +787,9 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ k.wsRequestMtx.Lock() defer k.wsRequestMtx.Unlock() - err := k.wsProcessOrderBookUpdate(channelData, askData, bidData, checksum) + err := k.wsProcessOrderBookUpdate(c, askData, bidData, checksum) if err != nil { - outbound := channelData.Pair // Format required "XBT/USD" + outbound := c.Currency // Format required "XBT/USD" outbound.Delimiter = "/" go func(resub *stream.ChannelSubscription) { // This was locking the main websocket reader routine and a @@ -856,11 +801,7 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ resub, errResub) } - }(&stream.ChannelSubscription{ - Channel: krakenWsOrderbook, - Currency: outbound, - Asset: asset.Spot, - }) + }(c) return err } return nil @@ -869,21 +810,21 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ askSnapshot, askSnapshotExists := data["as"].([]interface{}) bidSnapshot, bidSnapshotExists := data["bs"].([]interface{}) if !askSnapshotExists && !bidSnapshotExists { - return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, channelData.Pair, asset.Spot) + return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, c.Currency, c.Asset) } - return k.wsProcessOrderBookPartial(channelData, askSnapshot, bidSnapshot) + return k.wsProcessOrderBookPartial(c, askSnapshot, bidSnapshot) } // wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, askData, bidData []interface{}) error { +func (k *Kraken) wsProcessOrderBookPartial(c *stream.ChannelSubscription, askData, bidData []interface{}) error { base := orderbook.Base{ - Pair: channelData.Pair, - Asset: asset.Spot, + Pair: c.Currency, + Asset: c.Asset, VerifyOrderbook: k.CanVerifyOrderbook, Bids: make(orderbook.Items, len(bidData)), Asks: make(orderbook.Items, len(askData)), - MaxDepth: channelData.MaxDepth, + MaxDepth: krakenWsOrderbookDepth, } // Kraken ob data is timestamped per price, GCT orderbook data is // timestamped per entry using the highest last update time, we can attempt @@ -956,10 +897,10 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, as } // wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, askData, bidData []interface{}, checksum string) error { +func (k *Kraken) wsProcessOrderBookUpdate(c *stream.ChannelSubscription, askData, bidData []interface{}, checksum string) error { update := orderbook.Update{ - Asset: asset.Spot, - Pair: channelData.Pair, + Asset: c.Asset, + Pair: c.Currency, Bids: make([]orderbook.Item, len(bidData)), Asks: make([]orderbook.Item, len(askData)), } @@ -1102,12 +1043,9 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, ask return err } - book, err := k.Websocket.Orderbook.GetOrderbook(channelData.Pair, asset.Spot) + book, err := k.Websocket.Orderbook.GetOrderbook(c.Currency, c.Asset) if err != nil { - return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", - channelData.Pair, - asset.Spot, - err) + return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", c.Currency, c.Asset, err) } token, err := strconv.ParseInt(checksum, 10, 64) @@ -1158,7 +1096,7 @@ func trim(s string) string { } // wsProcessCandles converts candle data and sends it to the data handler -func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data []interface{}) error { +func (k *Kraken) wsProcessCandles(c *stream.ChannelSubscription, data []interface{}) error { startTime, err := strconv.ParseFloat(data[0].(string), 64) if err != nil { return err @@ -1195,8 +1133,8 @@ func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data []inte } k.Websocket.DataHandler <- stream.KlineData{ - AssetType: asset.Spot, - Pair: channelData.Pair, + AssetType: c.Asset, + Pair: c.Currency, Timestamp: time.Now(), Exchange: k.Name, StartTime: convert.TimeFromUnixTimestampDecimal(startTime), @@ -1240,129 +1178,147 @@ func (k *Kraken) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e } // Subscribe sends a websocket message to receive data from the channel -func (k *Kraken) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error { - var subscriptions = make(map[string]*[]WebsocketSubscriptionEventRequest) -channels: - for i := range channelsToSubscribe { - s, ok := subscriptions[channelsToSubscribe[i].Channel] - if !ok { - s = &[]WebsocketSubscriptionEventRequest{} - subscriptions[channelsToSubscribe[i].Channel] = s - } +func (k *Kraken) Subscribe(channels []stream.ChannelSubscription) error { + return k.parallelChanOp(channels, k.subscribeToChan) +} - for j := range *s { - (*s)[j].Pairs = append((*s)[j].Pairs, channelsToSubscribe[i].Currency.String()) - (*s)[j].Channels = append((*s)[j].Channels, channelsToSubscribe[i]) - continue channels - } +// Unsubscribe sends a websocket message to stop receiving data from the channel +func (k *Kraken) Unsubscribe(channels []stream.ChannelSubscription) error { + return k.parallelChanOp(channels, k.unsubscribeFromChan) +} - id := k.Websocket.Conn.GenerateMessageID(false) - outbound := WebsocketSubscriptionEventRequest{ - Event: krakenWsSubscribe, - RequestID: id, - Subscription: WebsocketSubscriptionData{ - Name: channelsToSubscribe[i].Channel, - }, - } - if channelsToSubscribe[i].Channel == "book" { - outbound.Subscription.Depth = krakenWsOrderbookDepth - } - if !channelsToSubscribe[i].Currency.IsEmpty() { - outbound.Pairs = []string{channelsToSubscribe[i].Currency.String()} - } - if common.StringDataContains(authenticatedChannels, channelsToSubscribe[i].Channel) { - outbound.Subscription.Token = authToken - } +func (k *Kraken) parallelChanOp(channels []stream.ChannelSubscription, m func(*stream.ChannelSubscription) error) error { + wg := sync.WaitGroup{} + wg.Add(len(channels)) + errC := make(chan error, len(channels)) - outbound.Channels = append(outbound.Channels, channelsToSubscribe[i]) - *s = append(*s, outbound) + for i := range channels { + go func(c *stream.ChannelSubscription) { + defer wg.Done() + if err := m(c); err != nil { + errC <- err + } + }(&channels[i]) } + wg.Wait() + close(errC) + var errs error - for _, subs := range subscriptions { - for i := range *subs { - if common.StringDataContains(authenticatedChannels, (*subs)[i].Subscription.Name) { - _, err := k.Websocket.AuthConn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) - continue - } - _, err := k.Websocket.Conn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) - } + for err := range errC { + errs = common.AppendError(errs, err) } + return errs } -// Unsubscribe sends a websocket message to stop receiving data from the channel -func (k *Kraken) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error { - var unsubs []WebsocketSubscriptionEventRequest -channels: - for x := range channelsToUnsubscribe { - for y := range unsubs { - if unsubs[y].Subscription.Name == channelsToUnsubscribe[x].Channel { - unsubs[y].Pairs = append(unsubs[y].Pairs, - channelsToUnsubscribe[x].Currency.String()) - unsubs[y].Channels = append(unsubs[y].Channels, - channelsToUnsubscribe[x]) - continue channels - } - } - var depth int64 - if channelsToUnsubscribe[x].Channel == "book" { - depth = krakenWsOrderbookDepth - } +// subscribeToChan sends a websocket message to receive data from the channel +func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error { + conn := k.Websocket.Conn + r := WebsocketSubscriptionEventRequest{ + Event: krakenWsSubscribe, + RequestID: conn.GenerateMessageID(false), + Subscription: WebsocketSubscriptionData{ + Name: c.Channel, + }, + } - var id int64 - if common.StringDataContains(authenticatedChannels, channelsToUnsubscribe[x].Channel) { - id = k.Websocket.AuthConn.GenerateMessageID(false) - } else { - id = k.Websocket.Conn.GenerateMessageID(false) - } + if !c.Currency.IsEmpty() { + r.Pairs = []string{c.Currency.String()} + } - unsub := WebsocketSubscriptionEventRequest{ - Event: krakenWsUnsubscribe, - Pairs: []string{channelsToUnsubscribe[x].Currency.String()}, - Subscription: WebsocketSubscriptionData{ - Name: channelsToUnsubscribe[x].Channel, - Depth: depth, - }, - RequestID: id, - } - if common.StringDataContains(authenticatedChannels, channelsToUnsubscribe[x].Channel) { - unsub.Subscription.Token = authToken - } - unsub.Channels = append(unsub.Channels, channelsToUnsubscribe[x]) - unsubs = append(unsubs, unsub) + if c.Channel == krakenWsOrderbook { + r.Subscription.Depth = krakenWsOrderbookDepth } - var errs error - for i := range unsubs { - if common.StringDataContains(authenticatedChannels, unsubs[i].Subscription.Name) { - _, err := k.Websocket.AuthConn.SendMessageReturnResponse(unsubs[i].RequestID, unsubs[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...) - continue - } + if common.StringDataContains(authenticatedChannels, r.Subscription.Name) { + r.Subscription.Token = authToken + conn = k.Websocket.AuthConn + } - _, err := k.Websocket.Conn.SendMessageReturnResponse(unsubs[i].RequestID, unsubs[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue + respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) + if err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) + } + + if err = k.getErrResp(respRaw); err != nil { + wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err) + k.Websocket.DataHandler <- wErr + return wErr + } + + chanID, err := jsonparser.GetInt(respRaw, "channelID") + if err != nil { + return fmt.Errorf("%w error parsing channelID in WS subscribe response: %w", stream.ErrSubscriptionFailure, err) + } + + c.Key = int(chanID) + k.Websocket.AddSuccessfulSubscriptions(*c) + if k.Verbose { + log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", k.Name, c.Channel, c.Currency, chanID) + } + + return nil +} + +// unsubscribeFromChan sends a websocket message to stop receiving data from a channel +func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error { + conn := k.Websocket.Conn + chanID, ok := c.Key.(int) + if !ok || chanID == 0 { + return fmt.Errorf("%w Channel: %s Pair: %s; No channelID set", stream.ErrUnsubscribeFailure, c.Channel, c.Currency) + } + + r := WebsocketUnsubscribeRequest{ + Event: krakenWsUnsubscribe, + RequestID: conn.GenerateMessageID(false), + ChannelID: chanID, + } + + if common.StringDataContains(authenticatedChannels, c.Channel) { + conn = k.Websocket.AuthConn + // TODO - Can we survive without this + // r.Subscription.Token = authToken + } + + respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) + if err != nil { + return err + } + + if err = k.getErrResp(respRaw); err != nil { + wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrUnsubscribeFailure, c.Channel, c.Currency, err) + k.Websocket.DataHandler <- wErr + return wErr + } + + k.Websocket.RemoveSuccessfulUnsubscriptions(*c) + + return nil +} + +// getErrResp takes a json response string and looks for an error event type +// If found it returns the errorMessage +// It might log parsing errors about the nature of the error +// If the error message is not defined it will return a wrapped errUnknownError +func (k *Kraken) getErrResp(resp []byte) error { + event, err := jsonparser.GetUnsafeString(resp, "event") + switch { + case err != nil: + return fmt.Errorf("error parsing WS event: %w from message: %s", err, resp) + case event != "error": + status, _ := jsonparser.GetUnsafeString(resp, "status") // Error is really irrellevant here + if status != "error" { + return nil } - k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...) } - return errs + + var msg string + if msg, err = jsonparser.GetString(resp, "errorMessage"); err != nil { + log.Errorf(log.ExchangeSys, "%s error parsing WS errorMessage: %s from message: %s", k.Name, err, resp) + return fmt.Errorf("error status did not contain errorMessage: %s", resp) + } + return errors.New(msg) } // wsAddOrder creates an order, returned order ID if success diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index ac29a98a3e5..e5daee51868 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -26,6 +26,8 @@ const ( var ( // ErrSubscriptionFailure defines an error when a subscription fails ErrSubscriptionFailure = errors.New("subscription failure") + // ErrUnsubscribeFailure defines an error when a unsubscribe fails + ErrUnsubscribeFailure = errors.New("unsubscribe failure") // ErrAlreadyDisabled is returned when you double-disable the websocket ErrAlreadyDisabled = errors.New("websocket already disabled") // ErrNotConnected defines an error when websocket is not connected