diff --git a/currency/pair.go b/currency/pair.go index 1f0a4f8c5ee..212b6c501e1 100644 --- a/currency/pair.go +++ b/currency/pair.go @@ -19,8 +19,7 @@ func NewBTCUSD() Pair { return NewPair(BTC, USD) } -// NewPairDelimiter splits the desired currency string at delimiter, the returns -// a Pair struct +// NewPairDelimiter splits the desired currency string at delimiter, the returns a Pair struct func NewPairDelimiter(currencyPair, delimiter string) (Pair, error) { if !strings.Contains(currencyPair, delimiter) { return EMPTYPAIR, diff --git a/exchanges/kraken/kraken.go b/exchanges/kraken/kraken.go index da33fca884c..d0568f36813 100644 --- a/exchanges/kraken/kraken.go +++ b/exchanges/kraken/kraken.go @@ -9,7 +9,6 @@ import ( "net/url" "strconv" "strings" - "sync" "time" "github.com/thrasher-corp/gocryptotrader/common" @@ -37,7 +36,6 @@ const ( // Kraken is the overarching type across the kraken package type Kraken struct { exchange.Base - wsRequestMtx sync.Mutex } // GetCurrentServerTime returns current server time diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 419c8ade73a..6bab27f0b94 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -24,7 +24,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" - "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" @@ -930,45 +929,54 @@ func TestWithdrawCancel(t *testing.T) { // No objection to this becoming a fixture test, so long as it integrates through Un/Subscribe roundtrip func TestWsSubscribe(t *testing.T) { k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - require.NoError(t, testexch.TestInstance(k), "TestInstance must not error") + require.NoError(t, testexch.Setup(k), "TestInstance must not error") testexch.SetupWs(t, k) err := k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{xbtusdPair}}}) assert.NoError(t, err, "Simple subscription should not error") - assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription") + subs := k.Websocket.GetSubscriptions() + require.Len(t, subs, 1, "Should add 1 Subscription") + assert.Equal(t, subscription.SubscribedState, subs[0].State(), "Subscription should be subscribed state") err = k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{xbtusdPair}}}) - assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Resubscribing to the same channel should error with SubFailure") assert.ErrorIs(t, err, subscription.ErrDuplicate, "Resubscribing to the same channel should error with SubscribedAlready") - assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error") + subs = k.Websocket.GetSubscriptions() + require.Len(t, subs, 1, "Should not add a subscription on error") + assert.Equal(t, subscription.SubscribedState, subs[0].State(), "Existing subscription state should not change") err = k.Subscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}}) - assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Simple error subscription should error") - assert.ErrorContains(t, err, "Currency pair not supported DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly") + assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker Pairs: DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly") + require.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error") + // Mix success and failure err = k.Subscribe(subscription.List{ {Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("ETH", "USD", "/")}}, {Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}, {Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "ELF", "/")}}, }) - assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Mixed error subscription should error") - assert.ErrorContains(t, err, "Currency pair not supported DWARF/ELF", "Subscribing to an invalid pair should error correctly") - assert.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures") + assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker", "Subscribing to an invalid pair should error correctly") + assert.ErrorContains(t, err, "DWARF/ELF", "Subscribing to an invalid pair should error correctly") + assert.ErrorContains(t, err, "DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly") + require.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures") + // Just failures err = k.Subscribe(subscription.List{ {Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}, {Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")}}, }) - assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Only failing subscriptions should error") - assert.ErrorContains(t, err, "Currency pair not supported DWARF/GOBLIN", "Subscribing to an invalid pair should error correctly") + assert.ErrorContains(t, err, "Currency pair not supported; Channel: ticker", "Subscribing to an invalid pair should error correctly") + assert.ErrorContains(t, err, "DWARF/GOBLIN", "Subscribing to an invalid pair should error correctly") + assert.ErrorContains(t, err, "DWARF/HOBBIT", "Subscribing to an invalid pair should error correctly") + require.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures") + // Just success err = k.Subscribe(subscription.List{ {Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("ETH", "XBT", "/")}}, {Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("LTC", "ETH", "/")}}, }) assert.NoError(t, err, "Multiple successful subscriptions should not error") - subs := k.Websocket.GetSubscriptions() + subs = k.Websocket.GetSubscriptions() assert.Len(t, subs, 4, "Should have correct number of subscriptions") err = k.Unsubscribe(subs[:1]) @@ -976,16 +984,16 @@ func TestWsSubscribe(t *testing.T) { assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should have removed 1 channel") err = k.Unsubscribe(subscription.List{{Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "WIZARD", "/")}, Key: 1337}}) - assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Simple failing Unsubscribe should error UnsubFail") - assert.ErrorContains(t, err, "Currency pair not supported DWARF/WIZARD", "Unsubscribing from an invalid pair should error correctly") + assert.ErrorIs(t, err, subscription.ErrNotFound, "Simple failing Unsubscribe should error NotFound") + assert.ErrorContains(t, err, "DWARF/WIZARD", "Unsubscribing from an invalid pair should error correctly") assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should not have removed any channels") err = k.Unsubscribe(subscription.List{ subs[1], {Channel: subscription.TickerChannel, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "EAGLE", "/")}, Key: 1338}, }) - assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Mixed failing Unsubscribe should error UnsubFail") - assert.ErrorContains(t, err, "Currency pair not supported DWARF/EAGLE", "Unsubscribing from an invalid pair should error correctly") + assert.ErrorIs(t, err, subscription.ErrNotFound, "Mixed failing Unsubscribe should error NotFound") + assert.ErrorContains(t, err, "Channel: ticker Pairs: DWARF/EAGLE", "Unsubscribing from an invalid pair should error correctly") subs = k.Websocket.GetSubscriptions() assert.Len(t, subs, 2, "Should have removed only 1 more channel") @@ -1009,10 +1017,11 @@ func TestWsOrderbookSub(t *testing.T) { t.Parallel() k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - require.NoError(t, testexch.TestInstance(k), "TestInstance must not error") + require.NoError(t, testexch.Setup(k), "TestInstance must not error") testexch.SetupWs(t, k) err := k.Subscribe(subscription.List{{ + Asset: asset.Spot, Channel: subscription.OrderbookChannel, Pairs: currency.Pairs{xbtusdPair}, Levels: 25, @@ -1031,7 +1040,6 @@ func TestWsOrderbookSub(t *testing.T) { Pairs: currency.Pairs{xbtusdPair}, Levels: 42, }}) - assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error") assert.ErrorContains(t, err, "Subscription depth not supported", "Bad subscription should error about depth") } @@ -1040,10 +1048,11 @@ func TestWsCandlesSub(t *testing.T) { t.Parallel() k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - require.NoError(t, testexch.TestInstance(k), "TestInstance must not error") + require.NoError(t, testexch.Setup(k), "TestInstance must not error") testexch.SetupWs(t, k) err := k.Subscribe(subscription.List{{ + Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: currency.Pairs{xbtusdPair}, Interval: kline.OneHour, @@ -1062,7 +1071,6 @@ func TestWsCandlesSub(t *testing.T) { Pairs: currency.Pairs{xbtusdPair}, Interval: kline.Interval(time.Minute * time.Duration(127)), }}) - assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error") assert.ErrorContains(t, err, "Subscription ohlc interval not supported", "Bad subscription should error about interval") } @@ -1073,7 +1081,7 @@ func TestWsOwnTradesSub(t *testing.T) { sharedtestvalues.SkipTestIfCredentialsUnset(t, k) k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - require.NoError(t, testexch.TestInstance(k), "TestInstance must not error") + require.NoError(t, testexch.Setup(k), "TestInstance must not error") testexch.SetupWs(t, k) err := k.Subscribe(subscription.List{{Channel: subscription.MyTradesChannel, Authenticated: true}}) @@ -1091,30 +1099,40 @@ func TestWsOwnTradesSub(t *testing.T) { func TestGenerateSubscriptions(t *testing.T) { t.Parallel() - subs, err := k.generateSubscriptions() - require.NoError(t, err, "generateSubscriptions should not error") pairs, err := k.GetEnabledPairs(asset.Spot) require.NoError(t, err, "GetEnabledPairs must not error") - pairs = pairs.Format(currency.PairFormat{Uppercase: true, Delimiter: "/"}) require.False(t, k.Websocket.CanUseAuthenticatedEndpoints(), "Websocket must not be authenticated by default") - expected := subscription.List{} - for _, exp := range k.Features.Subscriptions { - if exp.Authenticated { - continue - } - s := exp.Clone() + exp := subscription.List{ + {Channel: subscription.TickerChannel}, + {Channel: subscription.AllTradesChannel}, + {Channel: subscription.CandlesChannel, Interval: kline.OneMin}, + {Channel: subscription.OrderbookChannel, Levels: 1000}, + } + for _, s := range exp { + s.QualifiedChannel = channelName(s) s.Asset = asset.Spot s.Pairs = pairs - expected = append(expected, s) } - testsubs.Equal(t, expected, subs) + subs, err := k.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions should not error") + testsubs.EqualLists(t, exp, subs) + + k.Websocket.SetCanUseAuthenticatedEndpoints(true) + exp = append(exp, subscription.List{ + {Channel: subscription.MyOrdersChannel, QualifiedChannel: krakenWsOpenOrders}, + {Channel: subscription.MyTradesChannel, QualifiedChannel: krakenWsOwnTrades}, + }...) + subs, err = k.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions should not error") + testsubs.EqualLists(t, exp, subs) } func TestGetWSToken(t *testing.T) { t.Parallel() sharedtestvalues.SkipTestIfCredentialsUnset(t, k) + k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - require.NoError(t, testexch.TestInstance(k), "TestInstance must not error") + require.NoError(t, testexch.Setup(k), "TestInstance must not error") testexch.SetupWs(t, k) resp, err := k.GetWebsocketToken(context.Background()) @@ -1164,9 +1182,8 @@ func TestWsCancelAllOrders(t *testing.T) { func TestWsHandleData(t *testing.T) { t.Parallel() - base := k k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - require.NoError(t, testexch.TestInstance(k), "TestInstance must not error") + require.NoError(t, testexch.Setup(k), "TestInstance must not error") for _, l := range []int{10, 100} { err := k.Websocket.AddSuccessfulSubscriptions(&subscription.Subscription{ Channel: subscription.OrderbookChannel, @@ -1176,7 +1193,7 @@ func TestWsHandleData(t *testing.T) { }) require.NoError(t, err, "AddSuccessfulSubscriptions must not error") } - sharedtestvalues.TestFixtureToDataHandler(t, base, k, "testdata/wsHandleData.json", k.wsHandleData) + testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", k.wsHandleData) } func TestWsOpenOrders(t *testing.T) { @@ -1414,19 +1431,21 @@ var websocketGSTEUROrderbookUpdates = []string{ func TestWsOrderbookMax10Depth(t *testing.T) { t.Parallel() k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - require.NoError(t, testexch.TestInstance(k), "TestInstance must not error") + require.NoError(t, testexch.Setup(k), "TestInstance must not error") pairs := currency.Pairs{ currency.NewPairWithDelimiter("XDG", "USD", "/"), currency.NewPairWithDelimiter("LUNA", "EUR", "/"), currency.NewPairWithDelimiter("GST", "EUR", "/"), } - err := k.Websocket.AddSuccessfulSubscriptions(&subscription.Subscription{ - Channel: subscription.OrderbookChannel, - Pairs: pairs, - Asset: asset.Spot, - Levels: 10, - }) - require.NoError(t, err, "AddSuccessfulSubscriptions must not error") + for _, p := range pairs { + err := k.Websocket.AddSuccessfulSubscriptions(&subscription.Subscription{ + Channel: subscription.OrderbookChannel, + Pairs: currency.Pairs{p}, + Asset: asset.Spot, + Levels: 10, + }) + require.NoError(t, err, "AddSuccessfulSubscriptions must not error") + } for x := range websocketXDGUSDOrderbookUpdates { err := k.wsHandleData([]byte(websocketXDGUSDOrderbookUpdates[x])) diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 504367dfd62..168a0fdfd23 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -10,6 +10,7 @@ import ( "net/http" "strconv" "strings" + "text/template" "time" "github.com/buger/jsonparser" @@ -39,8 +40,10 @@ const ( krakenWsHeartbeat = "heartbeat" krakenWsSystemStatus = "systemStatus" krakenWsSubscribe = "subscribe" - krakenWsSubscriptionStatus = "subscriptionStatus" krakenWsUnsubscribe = "unsubscribe" + krakenWsSubscribed = "subscribed" + krakenWsUnsubscribed = "unsubscribed" + krakenWsSubscriptionStatus = "subscriptionStatus" krakenWsTicker = "ticker" krakenWsOHLC = "ohlc" krakenWsTrade = "trade" @@ -60,7 +63,7 @@ const ( krakenWsCandlesDefaultTimeframe = 1 ) -var standardChannelNames = map[string]string{ +var channelNames = map[string]string{ subscription.TickerChannel: krakenWsTicker, subscription.OrderbookChannel: krakenWsOrderbook, subscription.CandlesChannel: krakenWsOHLC, @@ -72,7 +75,7 @@ var standardChannelNames = map[string]string{ var reverseChannelNames = map[string]string{} func init() { - for k, v := range standardChannelNames { + for k, v := range channelNames { reverseChannelNames[v] = k } } @@ -82,8 +85,19 @@ var ( errParsingWSField = errors.New("error parsing WS field") errUnknownError = errors.New("unknown error") errCancellingOrder = errors.New("error cancelling order") + errSubPairMissing = errors.New("pair missing from subscription response") + errInvalidChecksum = errors.New("invalid checksum") ) +var defaultSubscriptions = subscription.List{ + {Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 1000}, + {Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true}, + {Enabled: true, Channel: subscription.MyTradesChannel, Authenticated: true}, +} + // WsConnect initiates a websocket connection func (k *Kraken) WsConnect() error { if !k.Websocket.IsEnabled() || !k.IsEnabled() { @@ -166,7 +180,7 @@ func (k *Kraken) wsReadData(comms chan stream.Response) { case resp := <-comms: err := k.wsHandleData(resp.Raw) if err != nil { - k.Websocket.DataHandler <- fmt.Errorf("%s - unhandled websocket data: %v", k.Name, err) + k.Websocket.DataHandler <- err } } } @@ -179,7 +193,7 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { return err } if len(msg) < 3 { - return fmt.Errorf("websocket data array too short: %s", respRaw) + return fmt.Errorf("data array too short: %s", respRaw) } // For all types of channel second to last field is the channel Name @@ -198,16 +212,20 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { return k.wsReadDataResponse(channelName, pair, msg) } + event, err := jsonparser.GetString(respRaw, "event") + if err != nil { + return fmt.Errorf("%w parsing: %s", err, respRaw) + } + + if event == krakenWsSubscriptionStatus { // Must happen before IncomingWithData to avoid race + k.wsProcessSubStatus(respRaw) + } + reqID, err := jsonparser.GetInt(respRaw, "reqid") if err == nil && reqID != 0 && k.Websocket.Match.IncomingWithData(reqID, respRaw) { return nil } - event, err := jsonparser.GetString(respRaw, "event") - if err != nil { - return fmt.Errorf("%s - err %s could not parse websocket data: %s", k.Name, err, respRaw) - } - if event == "" { return nil } @@ -222,7 +240,7 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { return k.wsProcessSystemStatus(respRaw) default: k.Websocket.DataHandler <- stream.UnhandledMessageWarning{ - Message: fmt.Sprintf("%s %s: %s", k.Name, stream.UnhandledMessage, respRaw), + Message: fmt.Sprintf("%s: %s", stream.UnhandledMessage, respRaw), } } @@ -260,7 +278,7 @@ func (k *Kraken) wsReadDataResponse(channelName string, pair currency.Pair, resp case krakenWsOrderbook: return k.wsProcessOrderBook(channelName, response, pair) default: - return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, channelName, response) + return fmt.Errorf("received unidentified data for subscription %s: %+v", channelName, response) } } @@ -268,10 +286,10 @@ func (k *Kraken) wsProcessSystemStatus(respRaw []byte) error { var systemStatus wsSystemStatus err := json.Unmarshal(respRaw, &systemStatus) if err != nil { - return fmt.Errorf("%s - err %s unable to parse system status response: %s", k.Name, err, respRaw) + return fmt.Errorf("%s parsing system status: %s", err, respRaw) } if systemStatus.Status != "online" { - k.Websocket.DataHandler <- fmt.Errorf("%v Websocket status '%v'", k.Name, systemStatus.Status) + k.Websocket.DataHandler <- fmt.Errorf("system status not online: %v", systemStatus.Status) } if systemStatus.Version > krakenWSSupportedVersion { log.Warnf(log.ExchangeSys, "%v New version of Websocket API released. Was %v Now %v", k.Name, krakenWSSupportedVersion, systemStatus.Version) @@ -424,7 +442,7 @@ func (k *Kraken) wsProcessOpenOrders(ownOrders interface{}) error { } return nil } - return errors.New(k.Name + " - Invalid own trades data") + return errors.New("invalid own trades data") } // wsProcessTickers converts ticker data and sends it to the datahandler @@ -474,27 +492,27 @@ func (k *Kraken) wsProcessSpread(response []any, pair currency.Pair) error { return errors.New("received invalid spread data") } if len(data) < 5 { - return fmt.Errorf("%s unexpected wsProcessSpread data length", k.Name) + return errors.New("unexpected wsProcessSpread data length") } bestBid, ok := data[0].(string) if !ok { - return fmt.Errorf("%s wsProcessSpread: unable to type assert bestBid", k.Name) + return errors.New("wsProcessSpread: unable to type assert bestBid") } bestAsk, ok := data[1].(string) if !ok { - return fmt.Errorf("%s wsProcessSpread: unable to type assert bestAsk", k.Name) + return errors.New("wsProcessSpread: unable to type assert bestAsk") } timeData, err := strconv.ParseFloat(data[2].(string), 64) if err != nil { - return fmt.Errorf("%s wsProcessSpread: unable to parse timeData. Error: %s", k.Name, err) + return fmt.Errorf("wsProcessSpread: unable to parse timeData: %w", err) } bidVolume, ok := data[3].(string) if !ok { - return fmt.Errorf("%s wsProcessSpread: unable to type assert bidVolume", k.Name) + return errors.New("wsProcessSpread: unable to type assert bidVolume") } askVolume, ok := data[4].(string) if !ok { - return fmt.Errorf("%s wsProcessSpread: unable to type assert askVolume", k.Name) + return errors.New("wsProcessSpread: unable to type assert askVolume") } if k.Verbose { @@ -564,15 +582,20 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { // wsProcessOrderBook handles both partial and full orderbook updates func (k *Kraken) wsProcessOrderBook(channelName string, response []any, pair currency.Pair) error { - key := &subscription.Subscription{Channel: channelName, Asset: asset.Spot, Pairs: currency.Pairs{pair}} + key := &subscription.Subscription{ + Channel: channelName, + Asset: asset.Spot, + Pairs: currency.Pairs{pair}, + } if err := fqChannelNameSub(key); err != nil { return err } - c := k.Websocket.GetSubscription(key) - if c == nil { + s := k.Websocket.GetSubscription(key) + if s == nil { return fmt.Errorf("%w: %s %s %s", subscription.ErrNotFound, asset.Spot, channelName, pair) } - if c.State() == subscription.UnsubscribingState { + if s.State() == subscription.UnsubscribingState { + // We only care if it's currently unsubscribing return nil } @@ -605,23 +628,16 @@ func (k *Kraken) wsProcessOrderBook(channelName string, response []any, pair cur return errors.New("could not process orderbook update checksum not found") } - k.wsRequestMtx.Lock() - defer k.wsRequestMtx.Unlock() err := k.wsProcessOrderBookUpdate(pair, askData, bidData, checksum) - if err != nil { - outbound := pair - outbound.Delimiter = "/" - go func(resub *subscription.Subscription) { - // This was locking the main websocket reader routine and a - // backlog occurred. So put this into it's own go routine. - errResub := k.Websocket.ResubscribeToChannel(resub) - if errResub != nil && errResub != subscription.ErrInStateAlready { - log.Errorf(log.WebsocketMgr, "resubscription failure for %v: %v", resub, errResub) + if errors.Is(err, errInvalidChecksum) { + log.Debugf(log.Global, "Resubscribing to invalid %s orderbook", pair) + go func() { + if e2 := k.Websocket.ResubscribeToChannel(s); e2 != nil && !errors.Is(e2, subscription.ErrInStateAlready) { + log.Errorf(log.ExchangeSys, "%s resubscription failure for %v: %v", k.Name, pair, e2) } - }(c) - return err + }() } - return nil + return err } askSnapshot, askSnapshotExists := ob["as"].([]interface{}) @@ -630,18 +646,18 @@ func (k *Kraken) wsProcessOrderBook(channelName string, response []any, pair cur return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, pair, asset.Spot) } - return k.wsProcessOrderBookPartial(c, pair, askSnapshot, bidSnapshot) + return k.wsProcessOrderBookPartial(pair, askSnapshot, bidSnapshot, key.Levels) } // wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookPartial(s *subscription.Subscription, pair currency.Pair, askData, bidData []any) error { +func (k *Kraken) wsProcessOrderBookPartial(pair currency.Pair, askData, bidData []any, levels int) error { base := orderbook.Base{ Pair: pair, Asset: asset.Spot, VerifyOrderbook: k.CanVerifyOrderbook, Bids: make(orderbook.Tranches, len(bidData)), Asks: make(orderbook.Tranches, len(askData)), - MaxDepth: s.Levels, + MaxDepth: levels, ChecksumStringRequired: true, } // Kraken ob data is timestamped per price, GCT orderbook data is @@ -895,11 +911,7 @@ func validateCRC32(b *orderbook.Base, token uint32) error { } if check := crc32.ChecksumIEEE([]byte(checkStr.String())); check != token { - return fmt.Errorf("%s %s invalid checksum %d, expected %d", - b.Pair, - b.Asset, - check, - token) + return fmt.Errorf("%s %s %w %d, expected %d", b.Pair, b.Asset, errInvalidChecksum, check, token) } return nil } @@ -956,39 +968,82 @@ func (k *Kraken) wsProcessCandles(channelName string, response []any, pair curre return nil } -// generateSubscriptions sets up the configured subscriptions for the websocket +// GetSubscriptionTemplate returns a subscription channel template +func (k *Kraken) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText) +} + func (k *Kraken) generateSubscriptions() (subscription.List, error) { - subscriptions := subscription.List{} - pairs, err := k.GetEnabledPairs(asset.Spot) - if err != nil { - return nil, err + return k.Features.Subscriptions.ExpandTemplates(k) +} + +// Subscribe adds a channel subscription to the websocket +func (k *Kraken) Subscribe(in subscription.List) error { + var errs error + + if in, errs = in.ExpandTemplates(k); errs != nil { + return errs } - authed := k.Websocket.CanUseAuthenticatedEndpoints() - for _, baseSub := range k.Features.Subscriptions { - if !authed && baseSub.Authenticated { - continue + + // Collect valid subs to subscribe to; Note that we won't RemoveSub on any that err on SetState or AddSub + subs := subscription.List{} + for _, s := range in { + if s.State() != subscription.ResubscribingState { + if err := k.Websocket.AddSubscriptions(s); err != nil { + errs = common.AppendError(errs, fmt.Errorf("%w; Channel: %s Pairs: %s", err, s.Channel, s.Pairs.Join())) + continue + } } - s := baseSub.Clone() - s.Asset = asset.Spot - s.Pairs = pairs - subscriptions = append(subscriptions, s) + subs = append(subs, s) } - return subscriptions, nil -} + // Group subscriptions by pairs for request, but expect per-sub responses + groupedSubs := subs.GroupPairs() + + errs = common.AppendError(errs, + k.ParallelChanOp(groupedSubs, func(s subscription.List) error { return k.manageSubs(krakenWsSubscribe, s) }, 1), + ) -// Subscribe sends a websocket message to receive data from the channel -func (k *Kraken) Subscribe(subs subscription.List) error { - return k.ParallelChanOp(subs, k.subscribeToChan, 1) + for _, s := range subs { + if s.State() != subscription.SubscribedState { + _ = s.SetState(subscription.InactiveState) + if err := k.Websocket.RemoveSubscriptions(s); err != nil { + errs = common.AppendError(errs, fmt.Errorf("error removing failed subscription: %w; Channel: %s Pairs: %s", err, s.Channel, s.Pairs.Join())) + } + } + } + + return errs } -// Unsubscribe sends a websocket message to stop receiving data from the channel -func (k *Kraken) Unsubscribe(subs subscription.List) error { - return k.ParallelChanOp(subs, k.unsubscribeFromChan, 1) +// Unsubscribe removes a channel subscriptions from the websocket +func (k *Kraken) Unsubscribe(keys subscription.List) error { + var errs error + // Make sure we have the concrete subscriptions, since we will change the state + subs := make(subscription.List, 0, len(keys)) + for _, key := range keys { + if s := k.Websocket.GetSubscription(key); s == nil { + errs = common.AppendError(errs, fmt.Errorf("%w; Channel: %s Pairs: %s", subscription.ErrNotFound, key.Channel, key.Pairs.Join())) + } else { + if s.State() != subscription.ResubscribingState { + if err := s.SetState(subscription.UnsubscribingState); err != nil { + errs = common.AppendError(errs, fmt.Errorf("%w; Channel: %s Pairs: %s", err, s.Channel, s.Pairs.Join())) + continue + } + } + subs = append(subs, s) + } + } + + subs = subs.GroupPairs() + + return common.AppendError(errs, + k.ParallelChanOp(subs, func(s subscription.List) error { return k.manageSubs(krakenWsUnsubscribe, s) }, 1), + ) } -// subscribeToChan sends a websocket message to receive data from the channel -func (k *Kraken) subscribeToChan(subs subscription.List) error { +// manageSubs handles both websocket channel subscribe and unsubscribe +func (k *Kraken) manageSubs(op string, subs subscription.List) error { if len(subs) != 1 { return subscription.ErrBatchingNotSupported } @@ -996,17 +1051,18 @@ func (k *Kraken) subscribeToChan(subs subscription.List) error { s := subs[0] if err := enforceStandardChannelNames(s); err != nil { - return fmt.Errorf("%w %w", stream.ErrSubscriptionFailure, err) + return err } + reqFmt := currency.PairFormat{Uppercase: true, Delimiter: "/"} r := &WebsocketSubRequest{ - Event: krakenWsSubscribe, + Event: op, RequestID: k.Websocket.Conn.GenerateMessageID(false), Subscription: WebsocketSubscriptionData{ - Name: apiChannelName(s), + Name: s.QualifiedChannel, Depth: s.Levels, }, - Pairs: s.Pairs.Format(currency.PairFormat{Uppercase: true, Delimiter: "/"}).Strings(), + Pairs: s.Pairs.Format(reqFmt).Strings(), } if s.Interval != 0 { @@ -1014,123 +1070,171 @@ func (k *Kraken) subscribeToChan(subs subscription.List) error { r.Subscription.Interval = int(time.Duration(s.Interval).Minutes()) } - if !s.Asset.IsValid() { - s.Asset = asset.Spot + conn := k.Websocket.Conn + if s.Authenticated { + r.Subscription.Token = authToken + conn = k.Websocket.AuthConn } - if err := s.SetState(subscription.SubscribingState); err != nil { - log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, err) - } + resps, err := conn.SendMessageReturnResponses(r.RequestID, r, len(s.Pairs)) - if err := k.Websocket.AddSubscriptions(s); err != nil { - return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, s.Channel, s.Pairs, err) + if err != nil && errors.Is(err, stream.ErrSignatureTimeout) { // Ignore a timeout, because we'll track individual subscriptions + return fmt.Errorf("%w; Channel: %s Pair: %s", err, s.Channel, s.Pairs) } - conn := k.Websocket.Conn - if s.Authenticated { - r.Subscription.Token = authToken - conn = k.Websocket.AuthConn + return k.handleSubResps(s, resps, op) +} + +// handleSubResps takes a collection of subscription responses from Kraken +// We submit a subscription for N+ pairs, and we get N+ individual responses +// Returns an error collection of unique errors and its pairs +func (k *Kraken) handleSubResps(s *subscription.Subscription, resps [][]byte, op string) error { + reqFmt := currency.PairFormat{Uppercase: true, Delimiter: "/"} + + errMap := map[string]error{} + pairErrs := map[currency.Pair]error{} + for _, p := range s.Pairs { + pairErrs[p.Format(reqFmt)] = errSubPairMissing } - respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) - if err == nil { - err = k.getSubErrResp(respRaw, krakenWsSubscribe) + subPairs := currency.Pairs{} + for _, resp := range resps { + pName, err := jsonparser.GetUnsafeString(resp, "pair") + if err != nil { + return fmt.Errorf("%w parsing WS pair from message: %s", err, resp) + } + pair, err := currency.NewPairDelimiter(pName, "/") + if err != nil { + return fmt.Errorf("%w parsing WS pair; Channel: %s Pair: %s", err, s.Channel, pName) + } + if err := k.getSubRespErr(resp, op); err != nil { + // Remove the pair name from the error so we can group errors + errStr := strings.TrimSpace(strings.TrimSuffix(err.Error(), pName)) + if _, ok := errMap[errStr]; !ok { + errMap[errStr] = errors.New(errStr) + } + pairErrs[pair] = errMap[errStr] + } else { + delete(pairErrs, pair) + if k.Verbose && op == krakenWsSubscribe { + subPairs = subPairs.Add(pair) + } + } } - if err != nil { - err = fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrSubscriptionFailure, s.Channel, s.Pairs, err) - k.Websocket.DataHandler <- err - // Currently all or nothing on pairs; Alternatively parse response and remove failing pairs and retry - _ = k.Websocket.RemoveSubscriptions(s) - return err + // 2) Reverse the collection and report a list of pairs with each unique error, and re-add the missing and error pairs for unsubscribe + errPairs := map[error]currency.Pairs{} + for pair, err := range pairErrs { + errPairs[err] = errPairs[err].Add(pair) } - if err = s.SetState(subscription.SubscribedState); err != nil { - log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, err) + var errs error + for err, pairs := range errPairs { + errs = common.AppendError(errs, fmt.Errorf("%w; Channel: %s Pairs: %s", err, s.Channel, pairs.Join())) } - if k.Verbose { - log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, s.Channel, s.Pairs) + if k.Verbose && len(subPairs) > 0 { + log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pairs: %s", k.Name, s.Channel, subPairs.Join()) } - return nil + return errs } -// unsubscribeFromChan sends a websocket message to stop receiving data from a channel -func (k *Kraken) unsubscribeFromChan(subs subscription.List) error { - if len(subs) != 1 { - return subscription.ErrBatchingNotSupported +// getSubErrResp calls getRespErr and if there's no error from that ensures the status matches the sub operation +func (k *Kraken) getSubRespErr(resp []byte, op string) error { + if err := k.getRespErr(resp); err != nil { + return err + } + exp := op + "d" // subscribed or unsubscribed + if status, err := jsonparser.GetUnsafeString(resp, "status"); err != nil { + return fmt.Errorf("error parsing WS status: %w from message: %s", err, resp) + } else if status != exp { + return fmt.Errorf("wrong WS status: %s; expected: %s from message %s", exp, op, resp) } - s := subs[0] + return nil +} - if err := enforceStandardChannelNames(s); err != nil { - return fmt.Errorf("%w %w", stream.ErrUnsubscribeFailure, err) +// getRespErr takes a json response string and looks for an error event type +// If found it returns the errorMessage +// It might log parsing errors about the nature of the error +// If the error message is not defined it will return a wrapped errUnknownError +func (k *Kraken) getRespErr(resp []byte) error { + event, err := jsonparser.GetUnsafeString(resp, "event") + switch { + case err != nil: + return fmt.Errorf("error parsing WS event: %w from message: %s", err, resp) + case event != "error": + status, _ := jsonparser.GetUnsafeString(resp, "status") // Error is really irrellevant here + if status != "error" { + return nil + } } - r := &WebsocketSubRequest{ - Event: krakenWsUnsubscribe, - RequestID: k.Websocket.Conn.GenerateMessageID(false), - Subscription: WebsocketSubscriptionData{ - Name: apiChannelName(s), - Depth: s.Levels, - }, - Pairs: s.Pairs.Format(currency.PairFormat{Uppercase: true, Delimiter: "/"}).Strings(), + var msg string + if msg, err = jsonparser.GetString(resp, "errorMessage"); err != nil { + log.Errorf(log.ExchangeSys, "%s error parsing WS errorMessage: %s from message: %s", k.Name, err, resp) + return fmt.Errorf("error status did not contain errorMessage: %s", resp) } + return errors.New(msg) +} - if s.Interval != 0 { - // TODO: Can Interval type be a kraken specific type with a MarshalText so we don't have to duplicate this - r.Subscription.Interval = int(time.Duration(s.Interval).Minutes()) +// wsProcessSubStatus handles creating or removing Subscriptions as soon as we receive a message +// It's job is to ensure that subscription state is kept correct sequentially between WS messages +// If this responsibility was moved to Subscribe then we would have a race due to the channel connecting IncomingWithData +// All errors are ignored; We only care about success +func (k *Kraken) wsProcessSubStatus(resp []byte) { + pName, err := jsonparser.GetUnsafeString(resp, "pair") + if err != nil { + return } - - if err := s.SetState(subscription.UnsubscribingState); err != nil { - // err is probably ErrChannelInStateAlready, but we want to bubble it up to prevent an attempt to Subscribe again - // We can catch and ignore it in our call to resub - return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrUnsubscribeFailure, s.Channel, s.Pairs, err) + pair, err := currency.NewPairFromString(pName) + if err != nil { + return } - - conn := k.Websocket.Conn - if s.Authenticated { - conn = k.Websocket.AuthConn - r.Subscription.Token = authToken + channelName, err := jsonparser.GetUnsafeString(resp, "channelName") + if err != nil { + return } - - respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r) + if err = k.getRespErr(resp); err != nil { + return + } + status, err := jsonparser.GetUnsafeString(resp, "status") if err != nil { - if e2 := s.SetState(subscription.SubscribedState); e2 != nil { - log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, e2) - } - return err + return } - - if err := k.getSubErrResp(respRaw, krakenWsUnsubscribe); err != nil { - wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrUnsubscribeFailure, s.Channel, s.Pairs, err) - k.Websocket.DataHandler <- wErr - if e2 := s.SetState(subscription.SubscribedState); e2 != nil { - log.Errorf(log.ExchangeSys, "%s error setting channel to subscribed: %s", k.Name, e2) - } - return wErr + key := &subscription.Subscription{ + // We don't use asset because it's either Empty or Spot, but not both + Channel: channelName, + Pairs: currency.Pairs{pair}, } - return k.Websocket.RemoveSubscriptions(s) -} + if err = fqChannelNameSub(key); err != nil { + return + } + s := k.Websocket.GetSubscription(&subscription.IgnoringAssetKey{Subscription: key}) + if s == nil { + log.Errorf(log.ExchangeSys, "%s %s Channel: %s Pairs: %s", k.Name, subscription.ErrNotFound, key.Channel, key.Pairs.Join()) + return + } -func (k *Kraken) getSubErrResp(resp []byte, op string) error { - if err := k.getErrResp(resp); err != nil { - return err + if status == krakenWsSubscribed { + err = s.SetState(subscription.SubscribedState) + } else if s.State() != subscription.ResubscribingState { // Do not remove a resubscribing sub which just unsubbed + err = k.Websocket.RemoveSubscriptions(s) + if e2 := s.SetState(subscription.UnsubscribedState); e2 != nil { + err = common.AppendError(err, e2) + } } - exp := op + "d" - if status, err := jsonparser.GetUnsafeString(resp, "status"); err != nil { - return fmt.Errorf("error parsing WS status: %w from message: %s", err, resp) - } else if status != exp { - return fmt.Errorf("wrong WS status: %s; expected: %s from message %s", exp, op, resp) + + if err != nil { + log.Errorf(log.ExchangeSys, "%s %s Channel: %s Pairs: %s", k.Name, err, s.Channel, s.Pairs.Join()) } - return nil } -// apiChannelName converts a global channel name to kraken bespoke names -func apiChannelName(s *subscription.Subscription) string { - if n, ok := standardChannelNames[s.Channel]; ok { +// channelName converts a global channel name to kraken bespoke names +func channelName(s *subscription.Subscription) string { + if n, ok := channelNames[s.Channel]; ok { return n } return s.Channel @@ -1144,7 +1248,7 @@ func enforceStandardChannelNames(s *subscription.Subscription) error { return nil } -// fqChannelNameToSub converts an fqChannelName into standard name and subscription params +// fqChannelNameSub converts an fully qualified channel name into standard name and subscription params // e.g. book-5 => subscription.OrderbookChannel with Levels: 5 func fqChannelNameSub(s *subscription.Subscription) error { parts := strings.Split(s.Channel, "-") @@ -1174,30 +1278,6 @@ func fqChannelNameSub(s *subscription.Subscription) error { return nil } -// getErrResp takes a json response string and looks for an error event type -// If found it returns the errorMessage -// It might log parsing errors about the nature of the error -// If the error message is not defined it will return a wrapped errUnknownError -func (k *Kraken) getErrResp(resp []byte) error { - event, err := jsonparser.GetUnsafeString(resp, "event") - switch { - case err != nil: - return fmt.Errorf("error parsing WS event: %w from message: %s", err, resp) - case event != "error": - status, _ := jsonparser.GetUnsafeString(resp, "status") // Error is really irrellevant here - if status != "error" { - return nil - } - } - - var msg string - if msg, err = jsonparser.GetString(resp, "errorMessage"); err != nil { - log.Errorf(log.ExchangeSys, "%s error parsing WS errorMessage: %s from message: %s", k.Name, err, resp) - return fmt.Errorf("error status did not contain errorMessage: %s", resp) - } - return errors.New(msg) -} - // wsAddOrder creates an order, returned order ID if success func (k *Kraken) wsAddOrder(request *WsAddOrderRequest) (string, error) { if request == nil { @@ -1216,7 +1296,7 @@ func (k *Kraken) wsAddOrder(request *WsAddOrderRequest) (string, error) { return "", err } if resp.Status == "error" { - return "", errors.New(k.Name + "AddOrder error: " + resp.ErrorMessage) + return "", errors.New("AddOrder error: " + resp.ErrorMessage) } k.Websocket.DataHandler <- &order.Detail{ Exchange: k.Name, @@ -1290,7 +1370,27 @@ func (k *Kraken) wsCancelAllOrders() (*WsCancelOrderResponse, error) { return &WsCancelOrderResponse{}, err } if resp.ErrorMessage != "" { - return &WsCancelOrderResponse{}, errors.New(k.Name + " - " + resp.ErrorMessage) + return &WsCancelOrderResponse{}, errors.New(resp.ErrorMessage) } return &resp, nil } + +/* +One sub per-pair. We don't use one sub with many pairs because: + - Kraken will fan out in responses anyay + - resubscribe is messy when our subs don't match their respsonses + - FlushChannels and GetChannelDiff would incorrectly resub existing subs if we don't generate the same as we've stored +*/ +const subTplText = ` +{{- if $.S.Asset -}} + {{ range $asset, $pairs := $.AssetPairs }} + {{- range $p := $pairs -}} + {{- channelName $.S }} + {{- $.PairSeparator }} + {{- end -}} + {{ $.AssetSeparator }} + {{- end -}} +{{- else -}} + {{- channelName $.S }} +{{- end }} +` diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index 62591d6c403..871ffd9281d 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -28,7 +28,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" - "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -169,14 +168,7 @@ func (k *Kraken) SetDefaults() { GlobalResultLimit: 720, }, }, - Subscriptions: []*subscription.Subscription{ - {Enabled: true, Channel: subscription.TickerChannel}, - {Enabled: true, Channel: subscription.AllTradesChannel}, - {Enabled: true, Channel: subscription.CandlesChannel, Interval: kline.OneMin}, - {Enabled: true, Channel: subscription.OrderbookChannel, Levels: 1000}, - {Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true}, - {Enabled: true, Channel: subscription.MyTradesChannel, Authenticated: true}, - }, + Subscriptions: defaultSubscriptions.Clone(), } k.Requester, err = request.New(k.Name, diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 21b00023aa3..b310ca5bb05 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -23,15 +23,14 @@ const ( // Public websocket errors var ( - ErrWebsocketNotEnabled = errors.New("websocket not enabled") - ErrSubscriptionFailure = errors.New("subscription failure") - ErrSubscriptionPartialFailure = errors.New("partial failure") - ErrSubscriptionNotSupported = errors.New("subscription channel not supported ") - ErrUnsubscribeFailure = errors.New("unsubscribe failure") - ErrAlreadyDisabled = errors.New("websocket already disabled") - ErrNotConnected = errors.New("websocket is not connected") - ErrNoMessageListener = errors.New("websocket listener not found for message") - ErrSignatureTimeout = errors.New("websocket timeout waiting for response with signature") + ErrWebsocketNotEnabled = errors.New("websocket not enabled") + ErrSubscriptionFailure = errors.New("subscription failure") + ErrSubscriptionNotSupported = errors.New("subscription channel not supported ") + ErrUnsubscribeFailure = errors.New("unsubscribe failure") + ErrAlreadyDisabled = errors.New("websocket already disabled") + ErrNotConnected = errors.New("websocket is not connected") + ErrNoMessageListener = errors.New("websocket listener not found for message") + ErrSignatureTimeout = errors.New("websocket timeout waiting for response with signature") ) // Private websocket errors