Skip to content

Commit

Permalink
Kraken: Fix WsOwnTrades err on no sub
Browse files Browse the repository at this point in the history
Move GetSubscription to the channels which actually need it.
OwnTrades, in particular, won't have an upfront single pair for the
message
  • Loading branch information
gbjk committed Feb 13, 2024
1 parent f5f2b93 commit becb0ce
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
3 changes: 1 addition & 2 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
)

var k = &Kraken{}
var btcusdtPair = currency.NewPair(currency.BTC, currency.USDT)

// Please add your own APIkeys here or in config/testdata.json to do correct due diligence testing
const (
Expand Down Expand Up @@ -1612,7 +1611,7 @@ func TestWsOrdrbook(t *testing.T) {

func TestWsOwnTrades(t *testing.T) {
t.Parallel()
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Channel: krakenWsOwnTrades})
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Channel: krakenWsOwnTrades, Pairs: currency.Pairs{btcusdPair}})
pressXToJSON := []byte(`[
[
{
Expand Down
67 changes: 35 additions & 32 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,21 +177,14 @@ func (k *Kraken) wsHandleData(respRaw []byte) error {
return common.GetTypeAssertError("string", dataResponse[len(dataResponse)-2], "channelName")
}

// wsPair is just used for keying the Subs
wsPair := currency.EMPTYPAIR
pair := currency.EMPTYPAIR
if maybePair, ok2 := dataResponse[len(dataResponse)-1].(string); ok2 {
var err error
if wsPair, err = currency.NewPairFromString(maybePair); err != nil {
if pair, err = currency.NewPairFromString(maybePair); err != nil {
return err
}
}

c := k.Websocket.GetSubscription(subscription.Key{Channel: channelName, Pairs: &currency.Pairs{wsPair}, Asset: asset.Spot})
if c == nil {
return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, channelName, wsPair)
}

return k.wsReadDataResponse(c, wsPair, dataResponse)
return k.wsReadDataResponse(channelName, pair, dataResponse)
}

var eventResponse map[string]interface{}
Expand Down Expand Up @@ -328,24 +321,24 @@ func (k *Kraken) wsPingHandler(conn stream.Connection) {
}

// wsReadDataResponse classifies the WS response and sends to appropriate handler
func (k *Kraken) wsReadDataResponse(c *subscription.Subscription, pair currency.Pair, response WebsocketDataResponse) error {
switch c.Channel {
func (k *Kraken) wsReadDataResponse(channelName string, pair currency.Pair, response WebsocketDataResponse) error {
switch channelName {
case krakenWsTicker:
return k.wsProcessTickers(c, response, pair)
return k.wsProcessTickers(response, pair)
case krakenWsOHLC:
return k.wsProcessCandles(c, response, pair)
return k.wsProcessCandles(response, pair)
case krakenWsOrderbook:
return k.wsProcessOrderBook(c, response, pair)
return k.wsProcessOrderBook(response, pair)
case krakenWsSpread:
return k.wsProcessSpread(c, response, pair)
return k.wsProcessSpread(response, pair)
case krakenWsTrade:
return k.wsProcessTrades(c, response, pair)
return k.wsProcessTrades(response, pair)
case krakenWsOwnTrades:
return k.wsProcessOwnTrades(response[0])
case krakenWsOpenOrders:
return k.wsProcessOpenOrders(response[0])
default:
return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, c.Channel, response)
return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, channelName, response)
}

return nil
Expand Down Expand Up @@ -500,7 +493,7 @@ func (k *Kraken) wsProcessOpenOrders(ownOrders interface{}) error {
}

// wsProcessTickers converts ticker data and sends it to the datahandler
func (k *Kraken) wsProcessTickers(c *subscription.Subscription, response []any, pair currency.Pair) error {
func (k *Kraken) wsProcessTickers(response []any, pair currency.Pair) error {
t, ok := response[1].(map[string]string)
if !ok {
return errors.New("received invalid ticker data")
Expand All @@ -523,14 +516,14 @@ func (k *Kraken) wsProcessTickers(c *subscription.Subscription, response []any,
Low: data[6],
High: data[7],
Open: data[8],
AssetType: c.Asset,
AssetType: asset.Spot,
Pair: pair,
}
return nil
}

// wsProcessSpread converts spread/orderbook data and sends it to the datahandler
func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error {
func (k *Kraken) wsProcessSpread(response WebsocketDataResponse, pair currency.Pair) error {
data, ok := response[1].([]any)
if !ok {
return errors.New("received invalid spread data")
Expand Down Expand Up @@ -563,7 +556,7 @@ func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response Websocke
log.Debugf(log.ExchangeSys,
"%v Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v', Bid volume '%v', Ask volume '%v'",
k.Name,
c.Pairs,
pair,
bestBid,
bestAsk,
convert.TimeFromUnixTimestampDecimal(timeData),
Expand All @@ -575,7 +568,7 @@ func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response Websocke
}

// wsProcessTrades converts trade data and sends it to the datahandler
func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error {
func (k *Kraken) wsProcessTrades(response WebsocketDataResponse, pair currency.Pair) error {
data, ok := response[1].([]any)
if !ok {
return errors.New("received invalid trade data")
Expand Down Expand Up @@ -610,7 +603,7 @@ func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response Websocke
}

trades[i] = trade.Data{
AssetType: c.Asset,
AssetType: asset.Spot,
CurrencyPair: pair,
Exchange: k.Name,
Price: price,
Expand All @@ -623,10 +616,15 @@ func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response Websocke
}

// wsProcessOrderBook handles both partial and full orderbook updates
func (k *Kraken) wsProcessOrderBook(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error {
func (k *Kraken) wsProcessOrderBook(response WebsocketDataResponse, pair currency.Pair) error {
c := k.Websocket.GetSubscription(subscription.Key{Channel: krakenWsOrderbook, Asset: asset.Spot, Pairs: &currency.Pairs{pair}})
if c == nil {
return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, krakenWsOrderbook, pair)
}
if c.State == subscription.UnsubscribingState {
return nil
}

ob, ok := response[1].(map[string]any)
if !ok {
return errors.New("received invalid orderbook data")
Expand Down Expand Up @@ -681,7 +679,7 @@ func (k *Kraken) wsProcessOrderBook(c *subscription.Subscription, response Webso
askSnapshot, askSnapshotExists := ob["as"].([]interface{})
bidSnapshot, bidSnapshotExists := ob["bs"].([]interface{})
if !askSnapshotExists && !bidSnapshotExists {
return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, pair, c.Asset)
return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, pair, asset.Spot)
}

return k.wsProcessOrderBookPartial(c, pair, askSnapshot, bidSnapshot)
Expand All @@ -695,7 +693,7 @@ func (k *Kraken) wsProcessOrderBookPartial(c *subscription.Subscription, pair cu
}
base := orderbook.Base{
Pair: pair,
Asset: c.Asset,
Asset: asset.Spot,
VerifyOrderbook: k.CanVerifyOrderbook,
Bids: make(orderbook.Items, len(bidData)),
Asks: make(orderbook.Items, len(askData)),
Expand Down Expand Up @@ -803,7 +801,7 @@ func (k *Kraken) wsProcessOrderBookPartial(c *subscription.Subscription, pair cu
// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair
func (k *Kraken) wsProcessOrderBookUpdate(c *subscription.Subscription, pair currency.Pair, askData, bidData []any, checksum string) error {
update := orderbook.Update{
Asset: c.Asset,
Asset: asset.Spot,
Pair: pair,
Bids: make([]orderbook.Item, len(bidData)),
Asks: make([]orderbook.Item, len(askData)),
Expand Down Expand Up @@ -920,9 +918,9 @@ func (k *Kraken) wsProcessOrderBookUpdate(c *subscription.Subscription, pair cur
return err
}

book, err := k.Websocket.Orderbook.GetOrderbook(pair, c.Asset)
book, err := k.Websocket.Orderbook.GetOrderbook(pair, asset.Spot)
if err != nil {
return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", c.Pairs, c.Asset, err)
return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", pair, asset.Spot, err)
}

token, err := strconv.ParseInt(checksum, 10, 64)
Expand Down Expand Up @@ -967,7 +965,12 @@ func trim(s string) string {
}

// wsProcessCandles converts candle data and sends it to the data handler
func (k *Kraken) wsProcessCandles(c *subscription.Subscription, response []any, pair currency.Pair) error {
func (k *Kraken) wsProcessCandles(response []any, pair currency.Pair) error {
c := k.Websocket.GetSubscription(subscription.Key{Channel: krakenWsOHLC, Asset: asset.Spot, Pairs: &currency.Pairs{pair}})
if c == nil {
return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, krakenWsOHLC, pair)
}

dataStr, ok := response[1].([]string)
// 8 string quoted floats followed by 1 integer for trade count
if !ok || len(dataStr) != 9 {
Expand All @@ -983,7 +986,7 @@ func (k *Kraken) wsProcessCandles(c *subscription.Subscription, response []any,
}

k.Websocket.DataHandler <- stream.KlineData{
AssetType: c.Asset,
AssetType: asset.Spot,
Pair: pair,
Timestamp: time.Now(),
Exchange: k.Name,
Expand Down
5 changes: 5 additions & 0 deletions exchanges/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"maps"
"slices"

"github.com/davecgh/go-spew/spew"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
Expand Down Expand Up @@ -92,8 +93,12 @@ func (s *Subscription) EnsureKeyed() any {
// 1) Empty pairs then only Subscriptions without pairs will be considered
// 2) >=1 pairs then Subscriptions which contain all the pairs will be considered
func (k Key) Match(m Map) *Subscription {
fmt.Println("Key")
spew.Dump(k)
for anyKey, s := range m {
candidate, ok := anyKey.(Key)
fmt.Println("Candidate")
spew.Dump(candidate)
if !ok {
continue
}
Expand Down

0 comments on commit becb0ce

Please sign in to comment.