diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 085573218e7..d6a0a1ae230 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -37,7 +37,6 @@ import ( ) var k = &Kraken{} -var btcusdtPair = currency.NewPair(currency.BTC, currency.USDT) // Please add your own APIkeys here or in config/testdata.json to do correct due diligence testing const ( @@ -1612,7 +1611,7 @@ func TestWsOrdrbook(t *testing.T) { func TestWsOwnTrades(t *testing.T) { t.Parallel() - k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Channel: krakenWsOwnTrades}) + k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Channel: krakenWsOwnTrades, Pairs: currency.Pairs{btcusdPair}}) pressXToJSON := []byte(`[ [ { diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 33cf068378f..3d0e6297d51 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -177,21 +177,14 @@ func (k *Kraken) wsHandleData(respRaw []byte) error { return common.GetTypeAssertError("string", dataResponse[len(dataResponse)-2], "channelName") } - // wsPair is just used for keying the Subs - wsPair := currency.EMPTYPAIR + pair := currency.EMPTYPAIR if maybePair, ok2 := dataResponse[len(dataResponse)-1].(string); ok2 { var err error - if wsPair, err = currency.NewPairFromString(maybePair); err != nil { + if pair, err = currency.NewPairFromString(maybePair); err != nil { return err } } - - c := k.Websocket.GetSubscription(subscription.Key{Channel: channelName, Pairs: ¤cy.Pairs{wsPair}, Asset: asset.Spot}) - if c == nil { - return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, channelName, wsPair) - } - - return k.wsReadDataResponse(c, wsPair, dataResponse) + return k.wsReadDataResponse(channelName, pair, dataResponse) } var eventResponse map[string]interface{} @@ -328,24 +321,24 @@ func (k *Kraken) wsPingHandler(conn stream.Connection) { } // wsReadDataResponse classifies the WS response and sends to appropriate handler -func (k *Kraken) wsReadDataResponse(c *subscription.Subscription, pair currency.Pair, response WebsocketDataResponse) error { - switch c.Channel { +func (k *Kraken) wsReadDataResponse(channelName string, pair currency.Pair, response WebsocketDataResponse) error { + switch channelName { case krakenWsTicker: - return k.wsProcessTickers(c, response, pair) + return k.wsProcessTickers(response, pair) case krakenWsOHLC: - return k.wsProcessCandles(c, response, pair) + return k.wsProcessCandles(response, pair) case krakenWsOrderbook: - return k.wsProcessOrderBook(c, response, pair) + return k.wsProcessOrderBook(response, pair) case krakenWsSpread: - return k.wsProcessSpread(c, response, pair) + return k.wsProcessSpread(response, pair) case krakenWsTrade: - return k.wsProcessTrades(c, response, pair) + return k.wsProcessTrades(response, pair) case krakenWsOwnTrades: return k.wsProcessOwnTrades(response[0]) case krakenWsOpenOrders: return k.wsProcessOpenOrders(response[0]) default: - return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, c.Channel, response) + return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, channelName, response) } return nil @@ -500,7 +493,7 @@ func (k *Kraken) wsProcessOpenOrders(ownOrders interface{}) error { } // wsProcessTickers converts ticker data and sends it to the datahandler -func (k *Kraken) wsProcessTickers(c *subscription.Subscription, response []any, pair currency.Pair) error { +func (k *Kraken) wsProcessTickers(response []any, pair currency.Pair) error { t, ok := response[1].(map[string]string) if !ok { return errors.New("received invalid ticker data") @@ -523,14 +516,14 @@ func (k *Kraken) wsProcessTickers(c *subscription.Subscription, response []any, Low: data[6], High: data[7], Open: data[8], - AssetType: c.Asset, + AssetType: asset.Spot, Pair: pair, } return nil } // wsProcessSpread converts spread/orderbook data and sends it to the datahandler -func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error { +func (k *Kraken) wsProcessSpread(response WebsocketDataResponse, pair currency.Pair) error { data, ok := response[1].([]any) if !ok { return errors.New("received invalid spread data") @@ -563,7 +556,7 @@ func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response Websocke log.Debugf(log.ExchangeSys, "%v Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v', Bid volume '%v', Ask volume '%v'", k.Name, - c.Pairs, + pair, bestBid, bestAsk, convert.TimeFromUnixTimestampDecimal(timeData), @@ -575,7 +568,7 @@ func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response Websocke } // wsProcessTrades converts trade data and sends it to the datahandler -func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error { +func (k *Kraken) wsProcessTrades(response WebsocketDataResponse, pair currency.Pair) error { data, ok := response[1].([]any) if !ok { return errors.New("received invalid trade data") @@ -610,7 +603,7 @@ func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response Websocke } trades[i] = trade.Data{ - AssetType: c.Asset, + AssetType: asset.Spot, CurrencyPair: pair, Exchange: k.Name, Price: price, @@ -623,10 +616,15 @@ func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response Websocke } // wsProcessOrderBook handles both partial and full orderbook updates -func (k *Kraken) wsProcessOrderBook(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error { +func (k *Kraken) wsProcessOrderBook(response WebsocketDataResponse, pair currency.Pair) error { + c := k.Websocket.GetSubscription(subscription.Key{Channel: krakenWsOrderbook, Asset: asset.Spot, Pairs: ¤cy.Pairs{pair}}) + if c == nil { + return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, krakenWsOrderbook, pair) + } if c.State == subscription.UnsubscribingState { return nil } + ob, ok := response[1].(map[string]any) if !ok { return errors.New("received invalid orderbook data") @@ -681,7 +679,7 @@ func (k *Kraken) wsProcessOrderBook(c *subscription.Subscription, response Webso askSnapshot, askSnapshotExists := ob["as"].([]interface{}) bidSnapshot, bidSnapshotExists := ob["bs"].([]interface{}) if !askSnapshotExists && !bidSnapshotExists { - return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, pair, c.Asset) + return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, pair, asset.Spot) } return k.wsProcessOrderBookPartial(c, pair, askSnapshot, bidSnapshot) @@ -695,7 +693,7 @@ func (k *Kraken) wsProcessOrderBookPartial(c *subscription.Subscription, pair cu } base := orderbook.Base{ Pair: pair, - Asset: c.Asset, + Asset: asset.Spot, VerifyOrderbook: k.CanVerifyOrderbook, Bids: make(orderbook.Items, len(bidData)), Asks: make(orderbook.Items, len(askData)), @@ -803,7 +801,7 @@ func (k *Kraken) wsProcessOrderBookPartial(c *subscription.Subscription, pair cu // wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair func (k *Kraken) wsProcessOrderBookUpdate(c *subscription.Subscription, pair currency.Pair, askData, bidData []any, checksum string) error { update := orderbook.Update{ - Asset: c.Asset, + Asset: asset.Spot, Pair: pair, Bids: make([]orderbook.Item, len(bidData)), Asks: make([]orderbook.Item, len(askData)), @@ -920,9 +918,9 @@ func (k *Kraken) wsProcessOrderBookUpdate(c *subscription.Subscription, pair cur return err } - book, err := k.Websocket.Orderbook.GetOrderbook(pair, c.Asset) + book, err := k.Websocket.Orderbook.GetOrderbook(pair, asset.Spot) if err != nil { - return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", c.Pairs, c.Asset, err) + return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", pair, asset.Spot, err) } token, err := strconv.ParseInt(checksum, 10, 64) @@ -967,7 +965,12 @@ func trim(s string) string { } // wsProcessCandles converts candle data and sends it to the data handler -func (k *Kraken) wsProcessCandles(c *subscription.Subscription, response []any, pair currency.Pair) error { +func (k *Kraken) wsProcessCandles(response []any, pair currency.Pair) error { + c := k.Websocket.GetSubscription(subscription.Key{Channel: krakenWsOHLC, Asset: asset.Spot, Pairs: ¤cy.Pairs{pair}}) + if c == nil { + return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, krakenWsOHLC, pair) + } + dataStr, ok := response[1].([]string) // 8 string quoted floats followed by 1 integer for trade count if !ok || len(dataStr) != 9 { @@ -983,7 +986,7 @@ func (k *Kraken) wsProcessCandles(c *subscription.Subscription, response []any, } k.Websocket.DataHandler <- stream.KlineData{ - AssetType: c.Asset, + AssetType: asset.Spot, Pair: pair, Timestamp: time.Now(), Exchange: k.Name, diff --git a/exchanges/subscription/subscription.go b/exchanges/subscription/subscription.go index 8abe3dfa03b..eeeaf25e9be 100644 --- a/exchanges/subscription/subscription.go +++ b/exchanges/subscription/subscription.go @@ -6,6 +6,7 @@ import ( "maps" "slices" + "github.com/davecgh/go-spew/spew" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/kline" @@ -92,8 +93,12 @@ func (s *Subscription) EnsureKeyed() any { // 1) Empty pairs then only Subscriptions without pairs will be considered // 2) >=1 pairs then Subscriptions which contain all the pairs will be considered func (k Key) Match(m Map) *Subscription { + fmt.Println("Key") + spew.Dump(k) for anyKey, s := range m { candidate, ok := anyKey.(Key) + fmt.Println("Candidate") + spew.Dump(candidate) if !ok { continue }