Skip to content

Commit

Permalink
Subscriptions: More Pairs work
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Feb 8, 2024
1 parent d7483a2 commit f6e5882
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 49 deletions.
6 changes: 3 additions & 3 deletions exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,23 +246,23 @@ func (b *Bitstamp) generateDefaultSubscriptions() ([]subscription.Subscription,
subscriptions = append(subscriptions, subscription.Subscription{
Channel: defaultSubChannels[j] + "_" + p.String(),
Asset: asset.Spot,
Pair: p,
Pairs: currency.Pairs{p},
})
}
if b.Websocket.CanUseAuthenticatedEndpoints() {
for j := range defaultAuthSubChannels {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: defaultAuthSubChannels[j] + "_" + p.String(),
Asset: asset.Spot,
Pairsr: p,
Pairs: currency.Pairs{p},
Params: map[string]interface{}{
"auth": struct{}{},
},
})
}
}
}
retuPairsubscriptions, nil
return subscriptions, nil
}

// Subscribe sends a websocket message to receive data from the channel
Expand Down
71 changes: 36 additions & 35 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -1207,38 +1208,38 @@ func setupWs(tb testing.TB) {
func TestWsSubscribe(t *testing.T) {
testexch.SetupWs(t, k)

err := k.Subscribe([]subscription.Subscription{{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("XBT", "USD", "/")}})
err := k.Subscribe([]subscription.Subscription{{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("XBT", "USD", "/")}}})
assert.NoError(t, err, "Simple subscription should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription")

err = k.Subscribe([]subscription.Subscription{{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("XBT", "USD", "/")}})
err = k.Subscribe([]subscription.Subscription{{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("XBT", "USD", "/")}}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Resubscribing to the same channel should error with SubFailure")
assert.ErrorIs(t, err, stream.ErrSubscribedAlready, "Resubscribing to the same channel should error with SubscribedAlready")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should not add a subscription on error")

err = k.Subscribe([]subscription.Subscription{{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}})
err = k.Subscribe([]subscription.Subscription{{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Simple error subscription should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/HOBBIT", "Subscribing to an invalid pair should yield the correct error")

err = k.Subscribe([]subscription.Subscription{
{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("ETH", "USD", "/")},
{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")},
{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("DWARF", "ELF", "/")},
{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("ETH", "USD", "/")}},
{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}},
{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "ELF", "/")}},
})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Mixed error subscription should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/ELF", "Subscribing to an invalid pair should yield the correct error")
assert.Len(t, k.Websocket.GetSubscriptions(), 2, "Should have 2 subscriptions after mixed success/failures")

err = k.Subscribe([]subscription.Subscription{
{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")},
{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")},
{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "HOBBIT", "/")}},
{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "GOBLIN", "/")}},
})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Only failing subscriptions should error")
assert.ErrorContains(t, err, "Currency pair not supported DWARF/GOBLIN", "Subscribing to an invalid pair should yield the correct error")

err = k.Subscribe([]subscription.Subscription{
{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("ETH", "XBT", "/")},
{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("LTC", "ETH", "/")},
{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("ETH", "XBT", "/")}},
{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("LTC", "ETH", "/")}},
})
assert.NoError(t, err, "Multiple successful subscriptions should not error")

Expand All @@ -1249,15 +1250,15 @@ func TestWsSubscribe(t *testing.T) {
assert.NoError(t, err, "Simple Unsubscribe should succeed")
assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should have removed 1 channel")

err = k.Unsubscribe([]subscription.Subscription{{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("DWARF", "WIZARD", "/"), Key: 1337}})
err = k.Unsubscribe([]subscription.Subscription{{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "WIZARD", "/")}, Key: 1337}})
assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Simple failing Unsubscribe should error UnsubFail")
assert.ErrorIs(t, err, stream.ErrSubscriptionNotFound, "Simple failing Unsubscribe should error SubNotFound")
assert.ErrorContains(t, err, "DWARF/WIZARD", "Simple failing Unsubscribe should error containing pair")
assert.Len(t, k.Websocket.GetSubscriptions(), 3, "Should not have removed any channels")

err = k.Unsubscribe([]subscription.Subscription{
subs[1],
{Channel: krakenWsTicker, Pair: currency.NewPairWithDelimiter("DWARF", "EAGLE", "/"), Key: 1338},
{Channel: krakenWsTicker, Pairs: currency.Pairs{currency.NewPairWithDelimiter("DWARF", "EAGLE", "/")}, Key: 1338},
})
assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Mixed failing Unsubscribe should error UnsubFail")
assert.ErrorIs(t, err, stream.ErrSubscriptionNotFound, "Simple failing Unsubscribe should error SubNotFound")
Expand All @@ -1277,7 +1278,7 @@ func TestWsOrderbookSub(t *testing.T) {

err := k.Subscribe([]subscription.Subscription{{
Channel: krakenWsOrderbook,
Pair: currency.NewPairWithDelimiter("XBT", "USD", "/"),
Pairs: currency.Pairs{currency.NewPairWithDelimiter("XBT", "USD", "/")},
Params: map[string]any{
ChannelOrderbookDepthKey: 25,
}}})
Expand All @@ -1296,7 +1297,7 @@ func TestWsOrderbookSub(t *testing.T) {

err = k.Subscribe([]subscription.Subscription{{
Channel: krakenWsOrderbook,
Pair: currency.NewPairWPairselimiter("XBT", "USD", "/"),
Pairs: currency.Pairs{currency.NewPairWithDelimiter("XBT", "USD", "/")},
Params: map[string]any{
ChannelOrderbookDepthKey: 42,
}}})
Expand All @@ -1310,9 +1311,9 @@ func TestWsCandlesSub(t *testing.T) {

err := k.Subscribe([]subscription.Subscription{{
Channel: krakenWsOHLC,
Pair: currency.NewPairWithDelimiter("XBT", "USD", "/"),
Pairs: currency.Pairs{currency.NewPairWithDelimiter("XBT", "USD", "/")},
Params: map[string]any{
ChannelCandlesTimeframeKey0,
ChannelCandlesTimeframeKey: 60,
}}})
assert.NoError(t, err, "Simple subscription should not error")
assert.Len(t, k.Websocket.GetSubscriptions(), 1, "Should add 1 Subscription")
Expand All @@ -1329,9 +1330,9 @@ func TestWsCandlesSub(t *testing.T) {

err = k.Subscribe([]subscription.Subscription{{
Channel: krakenWsOHLC,
Pair: currency.NewPairWithDelimiter("XBT", "USD", "/"),
Pairs: currency.Pairs{currency.NewPairWithDelimiter("XBT", "USD", "/")},
Params: map[string]any{
ChannelCandlesTimeframeKePairs27,
ChannelCandlesTimeframeKey: 127,
}}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Bad subscription should error")
assert.ErrorContains(t, err, "Subscription ohlc interval not supported", "Bad subscription should error about interval")
Expand Down Expand Up @@ -1370,7 +1371,7 @@ func TestWsAddOrder(t *testing.T) {
t.Parallel()
sharedtestvalues.SkipTestIfCredentialsUnset(t, k, canManipulateRealOrders)
testexch.SetupWs(t, k)
_Pairsr := k.wsAddOrder(&WsAddOrderRequest{
_, err := k.wsAddOrder(&WsAddOrderRequest{
OrderType: order.Limit.Lower(),
OrderSide: order.Buy.Lower(),
Pair: "XBT/USD",
Expand All @@ -1384,13 +1385,13 @@ func TestWsAddOrder(t *testing.T) {
func TestWsCancelOrders(t *testing.T) {
t.Parallel()
if sharedtestvalues.AreAPICredentialsSet(k) && canManipulateRealOrders { // Live test
PairspWs(t)
testexch.SetupWs(t, k)
if err := k.wsCancelOrders([]string{"1337"}); err != nil {
t.Error(err)
}
} else {
k := k
k = testexch.MockWsInstance[Kraken](t, func(msg []byte, w *websocket.Conn) error {
k = testexch.MockWSInstance[Kraken](t, func(msg []byte, w *websocket.Conn) error {
var req WsCancelOrderRequest
if err := json.Unmarshal(msg, &req); err != nil {
return err
Expand All @@ -1415,13 +1416,13 @@ func TestWsCancelOrders(t *testing.T) {
return w.WriteMessage(websocket.TextMessage, respJSON)
})

err := m.wsCancelOrders([]string{"RABBIT", "BATFISH", "SQUIRREL", "CATFISH", "MOUSE"})
err := k.wsCancelOrders([]string{"RABBIT", "BATFISH", "SQUIRREL", "CATFISH", "MOUSE"})
assert.ErrorIs(t, err, errCancellingOrder, "Should error cancelling order")
assert.ErrorContains(t, err, "BATFISH", "Should error containing txn id")
assert.ErrorContains(t, err, "CATFISH", "Should error containing txn id")
assert.ErrorContains(t, err, "[EOrder:Unknown order]", "Should error containing server error")

err = m.wsCancelOrders([]string{"RABBIT", "SQUIRREL", "MOUSE"})
err = k.wsCancelOrders([]string{"RABBIT", "SQUIRREL", "MOUSE"})
assert.NoError(t, err, "Should not error without bad txn id")
}
}
Expand Down Expand Up @@ -1528,7 +1529,7 @@ func TestWsSubscriptionStatus(t *testing.T) {

func TestWsTicker(t *testing.T) {
t.Parallel()
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Pair: btcusdPair, Channel: krakenWsTicker})
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: krakenWsTicker})
pressXToJSON := []byte(`[2,{"a":["5525.40000",1,"1.000"],"b":["5525.10000",1,"1.000"],"c":["5525.10000","0.00398963"],"h":["5783.00000","5783.00000"],"l":["5505.00000","5505.00000"],"o":["5760.70000","5763.40000"],"p":["5631.44067","5653.78939"],"t":[11493,16267],"v":["2634.11501494","3591.17907851"]},"ticker","XBT/USD"]`)
err := k.wsHandleData(pressXToJSON)
assert.NoError(t, err, "handle WS Ticker should not error")
Expand All @@ -1539,11 +1540,11 @@ func TestWsOHLC(t *testing.T) {
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{
Key: subscription.Key{
Channel: krakenWsOHLC + "-5",
Pairs: btcusdPair,
Pairs: currency.Pairs{btcusdPair},
Asset: asset.Spot,
},
Channel: krakenWsOHLC,
Pair: btcusdPair,
Pairs: currency.Pairs{btcusdPair},
Asset: asset.Spot,
})
pressXToJSON := []byte(`[2,["1542057314.748456","1542057360.435743","3586.70000","3586.70000","3586.60000","3586.60000","3586.68894","0.03373000",2],"ohlc-5","XBT/USD"]`)
Expand All @@ -1553,15 +1554,15 @@ func TestWsOHLC(t *testing.T) {

func TestWsTrade(t *testing.T) {
t.Parallel()
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Pair: btcusdPair, Channel: krakenWsTrade})
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: krakenWsTrade})
pressXToJSON := []byte(`[2,[["5541.20000","0.15850568","1534614057.321597","s","l",""],["6060.00000","0.02455000","1534614057.324998","b","l",""]],"trade","XBT/USD"]`)
err := k.wsHandleData(pressXToJSON)
assert.NoError(t, err, "handle WS Trades should not error")
}

func TestWsSpread(t *testing.T) {
t.Parallel()
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Pair: btcusdPair, Channel: krakenWsSpread})
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: krakenWsSpread})
pressXToJSON := []byte(`[2,["5698.40000","5700.00000","1542057299.545897","1.01234567","0.98765432"],"spread","XBT/USD"]`)
err := k.wsHandleData(pressXToJSON)
assert.NoError(t, err, "handle WS Spread should not error")
Expand All @@ -1572,12 +1573,12 @@ func TestWsOrdrbook(t *testing.T) {
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{
Key: subscription.Key{
Channel: krakenWsOrderbook + "-100",
Pair: btcusdPair,
Pairs: currency.Pairs{btcusdPair},
Asset: asset.Spot,
},
Pairsnel: krakenWsOrderbook,
Pair: btcusdPair,
Asset: asset.Spot,
Channel: krakenWsOrderbook,
Pairs: currency.Pairs{btcusdPair},
Asset: asset.Spot,
Params: map[string]any{
ChannelOrderbookDepthKey: 100,
},
Expand Down Expand Up @@ -1608,7 +1609,7 @@ func TestWsOwnTrades(t *testing.T) {
"ordertxid": "TDLH43-DVQXD-2KHVYY",
"ordertype": "limit",
"pair": "XBT/USD",
Pairsstxid": "OGTT3Y-C6I3P-XRI6HX",
"postxid": "OGTT3Y-C6I3P-XRI6HX",
"price": "100000.00000",
"time": "1560516023.070651",
"type": "sell",
Expand Down Expand Up @@ -2005,11 +2006,11 @@ func TestWsOrderbookMax10Depth(t *testing.T) {
k.Websocket.AddSuccessfulSubscriptions(subscription.Subscription{
Key: subscription.Key{
Channel: krakenWsOrderbook + "-10",
Pairs: p,
Pairs: currency.Pairs{p},
Asset: asset.Spot,
},
Channel: krakenWsOrderbook,
Pair: p,
Pairs: currency.Pairs{p},
Asset: asset.Spot,
Params: map[string]any{
ChannelOrderbookDepthKey: 10,
Expand Down
14 changes: 7 additions & 7 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func TestGetSubscription(t *testing.T) {
assert.Nil(t, (*Websocket).GetSubscription(nil, "imaginary"), "GetSubscription on a nil Websocket should return nil")
assert.Nil(t, (&Websocket{}).GetSubscription("empty"), "GetSubscription on a Websocket with no sub map should return nil")
w := Websocket{
subscriptions: subscriptionMap{
subscriptions: subscription.Map{
42: {
Channel: "hello3",
},
Expand All @@ -659,7 +659,7 @@ func TestGetSubscription(t *testing.T) {
func TestGetSubscriptions(t *testing.T) {
t.Parallel()
w := Websocket{
subscriptions: subscriptionMap{
subscriptions: subscription.Map{
42: {
Channel: "hello3",
},
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (g *GenSubs) generateSubs() ([]subscription.Subscription, error) {
for i := range g.EnabledPairs {
superduperchannelsubs[i] = subscription.Subscription{
Channel: "TEST:" + strconv.FormatInt(int64(i), 10),
Pair: g.EnabledPairs[i],
Pairs: currency.Pairs{g.EnabledPairs[i]},
}
}
return superduperchannelsubs, nil
Expand Down Expand Up @@ -1225,16 +1225,16 @@ func TestFlushChannels(t *testing.T) {
t.Fatal(err)
}
web.subscriptionMutex.Lock()
web.subscriptions = subscriptionMap{
web.subscriptions = subscription.Map{
41: {
Key: 41,
Channel: "match channel",
Pair: currency.NewPair(currency.BTC, currency.AUD),
Pairs: currency.Pairs{currency.NewPair(currency.BTC, currency.AUD)},
},
42: {
Key: 42,
Channel: "unsub channel",
Pair: currency.NewPair(currency.THETA, currency.USDT),
Pairs: currency.Pairs{currency.NewPair(currency.THETA, currency.USDT)},
},
}
web.subscriptionMutex.Unlock()
Expand Down Expand Up @@ -1440,7 +1440,7 @@ func TestCheckSubscriptions(t *testing.T) {

ws.MaxSubscriptionsPerConnection = 2

ws.subscriptions = subscriptionMap{42: {Key: 42, Channel: "test"}}
ws.subscriptions = subscription.Map{42: {Key: 42, Channel: "test"}}
err = ws.checkSubscriptions([]subscription.Subscription{{Key: 42, Channel: "test"}})
assert.ErrorIs(t, err, ErrSubscribedAlready, "checkSubscriptions should error correctly")

Expand Down
8 changes: 4 additions & 4 deletions exchanges/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type MatchableKey interface {
// It provides for matching on one or more keys
type Key struct {
Channel string
Pairs currency.Pairs
Pairs *currency.Pairs
Asset asset.Item
}

Expand Down Expand Up @@ -76,7 +76,7 @@ func (s *Subscription) EnsureKeyed() any {
s.Key = Key{
Channel: s.Channel,
Asset: s.Asset,
Pairs: s.Pairs,
Pairs: &s.Pairs,
}
}
return s.Key
Expand All @@ -98,10 +98,10 @@ func (k *Key) Match(m Map) *Subscription {
if k.Asset != candidate.Asset {
continue
}
if len(k.Pairs) == 0 && len(candidate.Pairs) == 0 {
if len(*k.Pairs) == 0 && len(*candidate.Pairs) == 0 {
return v
}
if err := candidate.Pairs.ContainsAll(k.Pairs, true); err == nil {
if err := candidate.Pairs.ContainsAll(*k.Pairs, true); err == nil {
return v
}
}
Expand Down

0 comments on commit f6e5882

Please sign in to comment.