Skip to content

Commit

Permalink
Kraken: Fix WsOwnTrades err on no sub
Browse files Browse the repository at this point in the history
Move GetSubscription to the channels which actually need it.
OwnTrades, in particular, won't have an upfront single pair for the
message
  • Loading branch information
gbjk committed Feb 13, 2024
1 parent f5f2b93 commit a4a4deb
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 62 deletions.
48 changes: 22 additions & 26 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
)

var k = &Kraken{}
var btcusdtPair = currency.NewPair(currency.BTC, currency.USDT)

// Please add your own APIkeys here or in config/testdata.json to do correct due diligence testing
const (
Expand Down Expand Up @@ -1612,7 +1611,7 @@ func TestWsOrdrbook(t *testing.T) {

func TestWsOwnTrades(t *testing.T) {
t.Parallel()
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Channel: krakenWsOwnTrades})
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Channel: krakenWsOwnTrades, Pairs: currency.Pairs{btcusdPair}})
pressXToJSON := []byte(`[
[
{
Expand Down Expand Up @@ -2014,29 +2013,28 @@ var websocketGSTEUROrderbookUpdates = []string{

func TestWsOrderbookMax10Depth(t *testing.T) {
t.Parallel()
for _, c := range []string{"XDG/USD", "LUNA/EUR", "GST/EUR"} {
p, err := currency.NewPairFromString(c)
assert.NoErrorf(t, err, "NewPairFromString %s should not error", c)
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{
Key: subscription.Key{
Channel: krakenWsOrderbook + "-10",
Pairs: &currency.Pairs{p},
Asset: asset.Spot,
},
Channel: krakenWsOrderbook,
Pairs: currency.Pairs{p},
Asset: asset.Spot,
Params: map[string]any{
ChannelOrderbookDepthKey: 10,
},
})
pairs := currency.Pairs{
currency.NewPairWithDelimiter("XDG", "USD", "/"),
currency.NewPairWithDelimiter("LUNA", "EUR", "/"),
currency.NewPairWithDelimiter("GST", "EUR", "/"),
}
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{
Key: subscription.Key{
Channel: krakenWsOrderbook + "-10",
Pairs: &pairs,
Asset: asset.Spot,
},
Channel: krakenWsOrderbook,
Pairs: pairs,
Asset: asset.Spot,
Params: map[string]any{
ChannelOrderbookDepthKey: 10,
},
})

for x := range websocketXDGUSDOrderbookUpdates {
err := k.wsHandleData([]byte(websocketXDGUSDOrderbookUpdates[x]))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err, "wsHandleData should not error")
}

for x := range websocketLUNAEUROrderbookUpdates {
Expand All @@ -2045,17 +2043,15 @@ func TestWsOrderbookMax10Depth(t *testing.T) {
// storage and checksum calc. Might need to store raw strings as fields
// in the orderbook.Item struct.
// Required checksum: 7465000014735432016076747100005084881400000007476000097005027047670474990000293338023886300750000004333333333333375020000152914844934167507000014652990542161752500007370728572000475400000670061645671407546000098022663603417745900007102987806720745800001593557686404000745200003375861179634000743500003156650585902777434000030172726079999999743200006461149653837000743100001042285966000000074300000403660461058200074200000369021657320475740500001674242117790510
if err != nil && x != len(websocketLUNAEUROrderbookUpdates)-1 {
t.Fatal(err)
if x != len(websocketLUNAEUROrderbookUpdates)-1 {
require.NoError(t, err, "wsHandleData should not error")
}
}

// This has less than 10 bids and still needs a checksum calc.
for x := range websocketGSTEUROrderbookUpdates {
err := k.wsHandleData([]byte(websocketGSTEUROrderbookUpdates[x]))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err, "wsHandleData should not error")
}
}

Expand Down
77 changes: 41 additions & 36 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,21 +177,14 @@ func (k *Kraken) wsHandleData(respRaw []byte) error {
return common.GetTypeAssertError("string", dataResponse[len(dataResponse)-2], "channelName")
}

// wsPair is just used for keying the Subs
wsPair := currency.EMPTYPAIR
pair := currency.EMPTYPAIR
if maybePair, ok2 := dataResponse[len(dataResponse)-1].(string); ok2 {
var err error
if wsPair, err = currency.NewPairFromString(maybePair); err != nil {
if pair, err = currency.NewPairFromString(maybePair); err != nil {
return err
}
}

c := k.Websocket.GetSubscription(subscription.Key{Channel: channelName, Pairs: &currency.Pairs{wsPair}, Asset: asset.Spot})
if c == nil {
return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, channelName, wsPair)
}

return k.wsReadDataResponse(c, wsPair, dataResponse)
return k.wsReadDataResponse(channelName, pair, dataResponse)
}

var eventResponse map[string]interface{}
Expand Down Expand Up @@ -328,27 +321,29 @@ func (k *Kraken) wsPingHandler(conn stream.Connection) {
}

// wsReadDataResponse classifies the WS response and sends to appropriate handler
func (k *Kraken) wsReadDataResponse(c *subscription.Subscription, pair currency.Pair, response WebsocketDataResponse) error {
switch c.Channel {
func (k *Kraken) wsReadDataResponse(channelName string, pair currency.Pair, response WebsocketDataResponse) error {
switch channelName {
case krakenWsTicker:
return k.wsProcessTickers(c, response, pair)
case krakenWsOHLC:
return k.wsProcessCandles(c, response, pair)
case krakenWsOrderbook:
return k.wsProcessOrderBook(c, response, pair)
return k.wsProcessTickers(response, pair)
case krakenWsSpread:
return k.wsProcessSpread(c, response, pair)
return k.wsProcessSpread(response, pair)
case krakenWsTrade:
return k.wsProcessTrades(c, response, pair)
return k.wsProcessTrades(response, pair)
case krakenWsOwnTrades:
return k.wsProcessOwnTrades(response[0])
case krakenWsOpenOrders:
return k.wsProcessOpenOrders(response[0])
default:
return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, c.Channel, response)
}

return nil
channelType := strings.TrimRight(channelName, "-0123456789")
switch channelType {
case krakenWsOHLC:
return k.wsProcessCandles(channelName, response, pair)
case krakenWsOrderbook:
return k.wsProcessOrderBook(channelName, response, pair)
default:
return fmt.Errorf("%s received unidentified data for subscription %s: %+v", k.Name, channelName, response)
}
}

func (k *Kraken) wsProcessOwnTrades(ownOrders interface{}) error {
Expand Down Expand Up @@ -500,7 +495,7 @@ func (k *Kraken) wsProcessOpenOrders(ownOrders interface{}) error {
}

// wsProcessTickers converts ticker data and sends it to the datahandler
func (k *Kraken) wsProcessTickers(c *subscription.Subscription, response []any, pair currency.Pair) error {
func (k *Kraken) wsProcessTickers(response []any, pair currency.Pair) error {
t, ok := response[1].(map[string]string)
if !ok {
return errors.New("received invalid ticker data")
Expand All @@ -523,14 +518,14 @@ func (k *Kraken) wsProcessTickers(c *subscription.Subscription, response []any,
Low: data[6],
High: data[7],
Open: data[8],
AssetType: c.Asset,
AssetType: asset.Spot,
Pair: pair,
}
return nil
}

// wsProcessSpread converts spread/orderbook data and sends it to the datahandler
func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error {
func (k *Kraken) wsProcessSpread(response WebsocketDataResponse, pair currency.Pair) error {
data, ok := response[1].([]any)
if !ok {
return errors.New("received invalid spread data")
Expand Down Expand Up @@ -563,7 +558,7 @@ func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response Websocke
log.Debugf(log.ExchangeSys,
"%v Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v', Bid volume '%v', Ask volume '%v'",
k.Name,
c.Pairs,
pair,
bestBid,
bestAsk,
convert.TimeFromUnixTimestampDecimal(timeData),
Expand All @@ -575,7 +570,7 @@ func (k *Kraken) wsProcessSpread(c *subscription.Subscription, response Websocke
}

// wsProcessTrades converts trade data and sends it to the datahandler
func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error {
func (k *Kraken) wsProcessTrades(response WebsocketDataResponse, pair currency.Pair) error {
data, ok := response[1].([]any)
if !ok {
return errors.New("received invalid trade data")
Expand Down Expand Up @@ -610,7 +605,7 @@ func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response Websocke
}

trades[i] = trade.Data{
AssetType: c.Asset,
AssetType: asset.Spot,
CurrencyPair: pair,
Exchange: k.Name,
Price: price,
Expand All @@ -623,10 +618,15 @@ func (k *Kraken) wsProcessTrades(c *subscription.Subscription, response Websocke
}

// wsProcessOrderBook handles both partial and full orderbook updates
func (k *Kraken) wsProcessOrderBook(c *subscription.Subscription, response WebsocketDataResponse, pair currency.Pair) error {
func (k *Kraken) wsProcessOrderBook(channelName string, response WebsocketDataResponse, pair currency.Pair) error {
c := k.Websocket.GetSubscription(subscription.Key{Channel: channelName, Asset: asset.Spot, Pairs: &currency.Pairs{pair}})
if c == nil {
return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, channelName, pair)
}
if c.State == subscription.UnsubscribingState {
return nil
}

ob, ok := response[1].(map[string]any)
if !ok {
return errors.New("received invalid orderbook data")
Expand Down Expand Up @@ -681,7 +681,7 @@ func (k *Kraken) wsProcessOrderBook(c *subscription.Subscription, response Webso
askSnapshot, askSnapshotExists := ob["as"].([]interface{})
bidSnapshot, bidSnapshotExists := ob["bs"].([]interface{})
if !askSnapshotExists && !bidSnapshotExists {
return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, pair, c.Asset)
return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, pair, asset.Spot)
}

return k.wsProcessOrderBookPartial(c, pair, askSnapshot, bidSnapshot)
Expand All @@ -695,7 +695,7 @@ func (k *Kraken) wsProcessOrderBookPartial(c *subscription.Subscription, pair cu
}
base := orderbook.Base{
Pair: pair,
Asset: c.Asset,
Asset: asset.Spot,
VerifyOrderbook: k.CanVerifyOrderbook,
Bids: make(orderbook.Items, len(bidData)),
Asks: make(orderbook.Items, len(askData)),
Expand Down Expand Up @@ -803,7 +803,7 @@ func (k *Kraken) wsProcessOrderBookPartial(c *subscription.Subscription, pair cu
// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair
func (k *Kraken) wsProcessOrderBookUpdate(c *subscription.Subscription, pair currency.Pair, askData, bidData []any, checksum string) error {
update := orderbook.Update{
Asset: c.Asset,
Asset: asset.Spot,
Pair: pair,
Bids: make([]orderbook.Item, len(bidData)),
Asks: make([]orderbook.Item, len(askData)),
Expand Down Expand Up @@ -920,9 +920,9 @@ func (k *Kraken) wsProcessOrderBookUpdate(c *subscription.Subscription, pair cur
return err
}

book, err := k.Websocket.Orderbook.GetOrderbook(pair, c.Asset)
book, err := k.Websocket.Orderbook.GetOrderbook(pair, asset.Spot)
if err != nil {
return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", c.Pairs, c.Asset, err)
return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", pair, asset.Spot, err)
}

token, err := strconv.ParseInt(checksum, 10, 64)
Expand Down Expand Up @@ -967,7 +967,12 @@ func trim(s string) string {
}

// wsProcessCandles converts candle data and sends it to the data handler
func (k *Kraken) wsProcessCandles(c *subscription.Subscription, response []any, pair currency.Pair) error {
func (k *Kraken) wsProcessCandles(channelName string, response []any, pair currency.Pair) error {
c := k.Websocket.GetSubscription(subscription.Key{Channel: channelName, Asset: asset.Spot, Pairs: &currency.Pairs{pair}})
if c == nil {
return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, channelName, pair)
}

dataStr, ok := response[1].([]string)
// 8 string quoted floats followed by 1 integer for trade count
if !ok || len(dataStr) != 9 {
Expand All @@ -983,7 +988,7 @@ func (k *Kraken) wsProcessCandles(c *subscription.Subscription, response []any,
}

k.Websocket.DataHandler <- stream.KlineData{
AssetType: c.Asset,
AssetType: asset.Spot,
Pair: pair,
Timestamp: time.Now(),
Exchange: k.Name,
Expand Down

0 comments on commit a4a4deb

Please sign in to comment.