Skip to content

Commit

Permalink
Binance: Add subscription configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Nov 22, 2023
1 parent 8222de9 commit 84d960d
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 26 deletions.
8 changes: 8 additions & 0 deletions exchanges/binance/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
)

// Binance is the overarching type across the Binance package
Expand Down Expand Up @@ -118,6 +119,13 @@ var (
errEitherLoanOrCollateralAmountsMustBeSet = errors.New("either loan or collateral amounts must be set")
)

var subscriptionNames = map[string]string{
subscription.TickerChannel: "ticker",
subscription.OrderbookChannel: "depth",
subscription.CandlesChannel: "kline",
subscription.AllTradesChannel: "trade",
}

// GetUndocumentedInterestHistory gets interest history for currency/currencies provided
func (b *Binance) GetUndocumentedInterestHistory(ctx context.Context) (MarginInfoData, error) {
var resp MarginInfoData
Expand Down
66 changes: 60 additions & 6 deletions exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/margin"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -2029,7 +2031,7 @@ func TestWsTickerUpdate(t *testing.T) {
func TestWsKlineUpdate(t *testing.T) {
t.Parallel()
pressXToJSON := []byte(`{"stream":"btcusdt@kline_1m","data":{
"e": "kline",
"e": "kline",
"E": 123456789,
"s": "BNBBTC",
"k": {
Expand Down Expand Up @@ -2426,15 +2428,67 @@ func TestSeedLocalCache(t *testing.T) {

func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
subs, err := b.GenerateSubscriptions()
if err != nil {
t.Fatal(err)
expected := []subscription.Subscription{}
pairs, err := b.GetEnabledPairs(asset.Spot)
assert.NoError(t, err, "GetEnabledPairs should not error")
for _, p := range pairs {
for _, c := range []string{"kline_1m", "depth@100ms", "ticker", "trade"} {
expected = append(expected, subscription.Subscription{
Channel: p.Format(currency.PairFormat{Delimiter: "", Uppercase: false}).String() + "@" + c,
Pair: p,
Asset: asset.Spot,
})
}
}
if len(subs) == 0 {
t.Fatal("unexpected subscription length")
subs, err := b.GenerateSubscriptions()
assert.NoError(t, err, "GenerateSubscriptions should not error")
if assert.Len(t, subs, len(expected), "Should have the correct number of subs") {
assert.ElementsMatch(t, subs, expected, "Should get the correct subscriptions")
} else {
for _, s := range subs {
t.Log(s.String())
}
}
}

func TestChannelName(t *testing.T) {
_, err := channelName(&subscription.Subscription{Channel: "Wobbegongs"})
assert.ErrorIs(t, err, stream.ErrSubscriptionNotSupported, "Invalid channel name should return ErrSubNotSupported")
assert.ErrorContains(t, err, "Wobbegong", "Invalid channel name error should contain at least one shark")

n, err := channelName(&subscription.Subscription{Channel: subscription.TickerChannel})
assert.NoError(t, err, "Ticker channel should not error")
assert.Equal(t, "ticker", n, "Ticker channel name should be correct")

n, err = channelName(&subscription.Subscription{Channel: subscription.AllTradesChannel})
assert.NoError(t, err, "AllTrades channel should not error")
assert.Equal(t, "trade", n, "Trades channel name should be correct")

n, err = channelName(&subscription.Subscription{Channel: subscription.OrderbookChannel})
assert.NoError(t, err, "Orderbook channel should not error")
assert.Equal(t, "depth@0s", n, "Orderbook with no update rate should return 0s") // It's not channelName's job to supply defaults

n, err = channelName(&subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.Interval(time.Second)})
assert.NoError(t, err, "Orderbook channel should not error")
assert.Equal(t, "depth@1000ms", n, "Orderbook with 1s update rate should 1000ms")

n, err = channelName(&subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds})
assert.NoError(t, err, "Orderbook channel should not error")
assert.Equal(t, "depth@100ms", n, "Orderbook with update rate should return it in the depth channel name")

n, err = channelName(&subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds, Levels: 5})
assert.NoError(t, err, "Orderbook channel should not error")
assert.Equal(t, "depth@5@100ms", n, "Orderbook with Level should return it in the depth channel name")

n, err = channelName(&subscription.Subscription{Channel: subscription.CandlesChannel, Interval: kline.FifteenMin})
assert.NoError(t, err, "Candles channel should not error")
assert.Equal(t, "kline_15m", n, "Candles with interval should return it in the depth channel name")

n, err = channelName(&subscription.Subscription{Channel: subscription.CandlesChannel})
assert.NoError(t, err, "Candles channel should not error")
assert.Equal(t, "kline_0s", n, "Candles with no interval should return 0s") // It's not channelName's job to supply defaults
}

var websocketDepthUpdate = []byte(`{"E":1608001030784,"U":7145637266,"a":[["19455.19000000","0.59490200"],["19455.37000000","0.00000000"],["19456.11000000","0.00000000"],["19456.16000000","0.00000000"],["19458.67000000","0.06400000"],["19460.73000000","0.05139800"],["19461.43000000","0.00000000"],["19464.59000000","0.00000000"],["19466.03000000","0.45000000"],["19466.36000000","0.00000000"],["19508.67000000","0.00000000"],["19572.96000000","0.00217200"],["24386.00000000","0.00256600"]],"b":[["19455.18000000","2.94649200"],["19453.15000000","0.01233600"],["19451.18000000","0.00000000"],["19446.85000000","0.11427900"],["19446.74000000","0.00000000"],["19446.73000000","0.00000000"],["19444.45000000","0.14937800"],["19426.75000000","0.00000000"],["19416.36000000","0.36052100"]],"e":"depthUpdate","s":"BTCUSDT","u":7145637297}`)

func TestProcessUpdate(t *testing.T) {
Expand Down
64 changes: 44 additions & 20 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,32 +550,56 @@ func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) (bool, error) {

// GenerateSubscriptions generates the default subscription set
func (b *Binance) GenerateSubscriptions() ([]subscription.Subscription, error) {
var channels = []string{"@ticker", "@trade", "@kline_1m", "@depth@100ms"}
var channels = make([]string, 0, len(b.Features.Subscriptions))
for i := range b.Features.Subscriptions {
name, err := channelName(b.Features.Subscriptions[i])
if err != nil {
return nil, err
}
channels = append(channels, name)
}
var subscriptions []subscription.Subscription
assets := b.GetAssetTypes(true)
for x := range assets {
if assets[x] == asset.Spot {
pairs, err := b.GetEnabledPairs(assets[x])
if err != nil {
return nil, err
}

for y := range pairs {
for z := range channels {
lp := pairs[y].Lower()
lp.Delimiter = ""
subscriptions = append(subscriptions, subscription.Subscription{
Channel: lp.String() + channels[z],
Pair: pairs[y],
Asset: assets[x],
})
}
}
pairs, err := b.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
for y := range pairs {
for z := range channels {
lp := pairs[y].Lower()
lp.Delimiter = ""
subscriptions = append(subscriptions, subscription.Subscription{
Channel: lp.String() + "@" + channels[z],
Pair: pairs[y],
Asset: asset.Spot,
})
}
}
return subscriptions, nil
}

// channelName converts a Subscription Config into binance format channel suffix
func channelName(s *subscription.Subscription) (string, error) {
name, ok := subscriptionNames[s.Channel]
if !ok {
return name, fmt.Errorf("%w: %s", stream.ErrSubscriptionNotSupported, s.Channel)
}

switch s.Channel {
case subscription.OrderbookChannel:
if s.Levels != 0 {
name += "@" + strconv.Itoa(s.Levels)
}
if s.Interval.Duration() == time.Second {
name += "@1000ms"
} else {
name += "@" + s.Interval.Short()
}
case subscription.CandlesChannel:
name += "_" + s.Interval.Short()
}
return name, nil
}

// Subscribe subscribes to a set of channels
func (b *Binance) Subscribe(channelsToSubscribe []subscription.Subscription) error {
payload := WsPayload{
Expand Down
7 changes: 7 additions & 0 deletions exchanges/binance/binance_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
Expand Down Expand Up @@ -207,6 +208,12 @@ func (b *Binance) SetDefaults() {
GlobalResultLimit: 1000,
},
},
Subscriptions: []*subscription.Subscription{
{Enabled: true, Channel: subscription.TickerChannel},
{Enabled: true, Channel: subscription.AllTradesChannel},
{Enabled: true, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds},
},
}

b.Requester, err = request.New(b.Name,
Expand Down
2 changes: 2 additions & 0 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
ErrSubscribedAlready = errors.New("duplicate subscription")
// ErrSubscriptionFailure defines an error when a subscription fails
ErrSubscriptionFailure = errors.New("subscription failure")
// ErrSubscriptionNotSupported defines an error when a subscription channel is not supported by an exchange
ErrSubscriptionNotSupported = errors.New("subscription channel not supported ")
// ErrUnsubscribeFailure defines an error when a unsubscribe fails
ErrUnsubscribeFailure = errors.New("unsubscribe failure")
// ErrChannelInStateAlready defines an error when a subscription channel is already in a new state
Expand Down

0 comments on commit 84d960d

Please sign in to comment.