Skip to content

Commit

Permalink
Websockets: Add Subscription configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Oct 22, 2023
1 parent ceef7a1 commit 6af3dba
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 13 deletions.
17 changes: 12 additions & 5 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,18 @@ type FeaturesSupportedConfig struct {

// FeaturesEnabledConfig stores the exchanges enabled features
type FeaturesEnabledConfig struct {
AutoPairUpdates bool `json:"autoPairUpdates"`
Websocket bool `json:"websocketAPI"`
SaveTradeData bool `json:"saveTradeData"`
TradeFeed bool `json:"tradeFeed"`
FillsFeed bool `json:"fillsFeed"`
AutoPairUpdates bool `json:"autoPairUpdates"`
Websocket bool `json:"websocketAPI"`
SaveTradeData bool `json:"saveTradeData"`
TradeFeed bool `json:"tradeFeed"`
FillsFeed bool `json:"fillsFeed"`
Subscriptions []EnabledSubscriptionConfig `json:"subscriptions,omitempty"`
}

type EnabledSubscriptionConfig struct {
Name string `json:"name"`
Interval string `json:"interval,omitempty"`
Levels int `json:"levels,omitempty"`
}

// FeaturesConfig stores the exchanges supported and enabled features
Expand Down
7 changes: 7 additions & 0 deletions exchanges/binance/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ var (
errEitherLoanOrCollateralAmountsMustBeSet = errors.New("either loan or collateral amounts must be set")
)

var subscriptionNames = map[exchange.SubscriptionName]string{
exchange.TickerSubscription: "ticker",
exchange.OrderbookSubscription: "depth",
exchange.CandlesSubscription: "kline",
exchange.AllTradesSubscription: "trade",
}

// GetUndocumentedInterestHistory gets interest history for currency/currencies provided
func (b *Binance) GetUndocumentedInterestHistory(ctx context.Context) (MarginInfoData, error) {
var resp MarginInfoData
Expand Down
9 changes: 3 additions & 6 deletions exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/core"
"github.com/thrasher-corp/gocryptotrader/currency"
Expand Down Expand Up @@ -2426,12 +2427,8 @@ func TestSeedLocalCache(t *testing.T) {
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
subs, err := b.GenerateSubscriptions()
if err != nil {
t.Fatal(err)
}
if len(subs) == 0 {
t.Fatal("unexpected subscription length")
}
assert.NoError(t, err, "GenerateSubscriptions should not error")
assert.Len(t, subs, 4, "Should get 4 subscriptions")
}

var websocketDepthUpdate = []byte(`{"E":1608001030784,"U":7145637266,"a":[["19455.19000000","0.59490200"],["19455.37000000","0.00000000"],["19456.11000000","0.00000000"],["19456.16000000","0.00000000"],["19458.67000000","0.06400000"],["19460.73000000","0.05139800"],["19461.43000000","0.00000000"],["19464.59000000","0.00000000"],["19466.03000000","0.45000000"],["19466.36000000","0.00000000"],["19508.67000000","0.00000000"],["19572.96000000","0.00217200"],["24386.00000000","0.00256600"]],"b":[["19455.18000000","2.94649200"],["19453.15000000","0.01233600"],["19451.18000000","0.00000000"],["19446.85000000","0.11427900"],["19446.74000000","0.00000000"],["19446.73000000","0.00000000"],["19444.45000000","0.14937800"],["19426.75000000","0.00000000"],["19416.36000000","0.36052100"]],"e":"depthUpdate","s":"BTCUSDT","u":7145637297}`)
Expand Down
40 changes: 38 additions & 2 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
Expand Down Expand Up @@ -549,7 +550,14 @@ func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) (bool, error) {

// GenerateSubscriptions generates the default subscription set
func (b *Binance) GenerateSubscriptions() ([]stream.ChannelSubscription, error) {
var channels = []string{"@ticker", "@trade", "@kline_1m", "@depth@100ms"}
var channels = make([]string, 0, len(b.Features.Enabled.Subscriptions))
for _, s := range b.Features.Enabled.Subscriptions {
name, err := channelName(s)
if err != nil {
return nil, err
}
channels = append(channels, name)
}
var subscriptions []stream.ChannelSubscription
assets := b.GetAssetTypes(true)
for x := range assets {
Expand All @@ -564,7 +572,7 @@ func (b *Binance) GenerateSubscriptions() ([]stream.ChannelSubscription, error)
lp := pairs[y].Lower()
lp.Delimiter = ""
subscriptions = append(subscriptions, stream.ChannelSubscription{
Channel: lp.String() + channels[z],
Channel: lp.String() + "@" + channels[z],
Currency: pairs[y],
Asset: assets[x],
})
Expand All @@ -575,6 +583,34 @@ func (b *Binance) GenerateSubscriptions() ([]stream.ChannelSubscription, error)
return subscriptions, nil
}

// channelName converts a Subscription Config into binance format channel suffix
func channelName(s exchange.Subscription) (string, error) {
name, ok := subscriptionNames[s.Name]
if !ok {
// TODO: Name error
return name, fmt.Errorf("%w: %s", errors.New("Unsupported subscription channel"), s.Name)
}

switch s.Name {
case exchange.OrderbookSubscription:
if s.Levels != 0 {
name += "@" + strconv.Itoa(s.Levels)
}
interval := s.Interval
if interval == "" {
interval = "100ms"
}
name += "@" + interval
case exchange.CandlesSubscription:
interval := s.Interval
if interval == "" {
interval = "1m"
}
name += "_" + interval
}
return name, nil
}

// Subscribe subscribes to a set of channels
func (b *Binance) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
payload := WsPayload{
Expand Down
6 changes: 6 additions & 0 deletions exchanges/binance/binance_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ func (b *Binance) SetDefaults() {
),
GlobalResultLimit: 1000,
},
Subscriptions: []exchange.Subscription{
{Name: exchange.TickerSubscription},
{Name: exchange.AllTradesSubscription},
{Name: exchange.CandlesSubscription, Interval: "1m"},
{Name: exchange.OrderbookSubscription, Interval: "100ms"},
},
},
}

Expand Down
40 changes: 40 additions & 0 deletions exchanges/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,50 @@ func (b *Base) SetFeatureDefaults() {
b.SetFillsFeedStatus(b.Config.Features.Enabled.FillsFeed)
}

b.SetEnabledSubscriptions()

b.Features.Enabled.AutoPairUpdates = b.Config.Features.Enabled.AutoPairUpdates
}
}

// SetEnabledSubscriptions sets the enabled subscriptions from config
// If the subscriptions config is empty then Config will be updated from the exchange subs,
// allowing e.SetDefaults to set default subscriptions for an exchange to update user's config
// Note: It's not possible to configure no subscriptions for a websocket
// TODO: Abstract the Marshal/Unmarshal Config to methods on the Subscriptions types
func (b *Base) SetEnabledSubscriptions() {
b.settingsMutex.Lock()
defer b.settingsMutex.Unlock()
subConfig := b.Config.Features.Enabled.Subscriptions
if len(subConfig) == 0 {

for _, s := range b.Features.Enabled.Subscriptions {
b.Config.Features.Enabled.Subscriptions = append(b.Config.Features.Enabled.Subscriptions, config.EnabledSubscriptionConfig{
Name: string(s.Name),
Interval: s.Interval,
Levels: s.Levels,
})
}
return
}
subs := make([]Subscription, 0, len(subConfig))
for _, s := range subConfig {
subs = append(subs, Subscription{
Name: SubscriptionName(s.Name),
Interval: s.Interval,
Levels: s.Levels,
})
}
b.Features.Enabled.Subscriptions = subs
if b.Verbose {
names := make([]string, 0, len(subs))
for _, s := range subs {
names = append(names, string(s.Name))
}
log.Debugf(log.ExchangeSys, "Set %v 'Subscriptions' to %v", b.Name, strings.Join(names, ", "))
}
}

// SupportsRESTTickerBatchUpdates returns whether or not the
// exchange supports REST batch ticker fetching
func (b *Base) SupportsRESTTickerBatchUpdates() bool {
Expand Down
20 changes: 20 additions & 0 deletions exchanges/exchange_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,26 @@ type FeaturesEnabled struct {
SaveTradeData bool
TradeFeed bool
FillsFeed bool
Subscriptions []Subscription
}

// TODO: Should this live in stream.* ?

Check warning on line 166 in exchanges/exchange_types.go

View workflow job for this annotation

GitHub Actions / lint

exported: comment on exported type SubscriptionName should be of the form "SubscriptionName ..." (with optional leading article) (revive)
type SubscriptionName string

const (
TickerSubscription SubscriptionName = "ticker"

Check warning on line 170 in exchanges/exchange_types.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported const TickerSubscription should have comment (or a comment on this block) or be unexported (revive)
OrderbookSubscription = "orderbook"
CandlesSubscription = "candles"
AllOrdersSubscription = "allOrders"
AllTradesSubscription = "allTrades"
MyTradesSubscription = "myTrades"
MyOrdersSubscription = "myOrders"
)

type Subscription struct {

Check warning on line 179 in exchanges/exchange_types.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type Subscription should have comment or be unexported (revive)
Name SubscriptionName
Interval string // Candle interval or Orderbook update interval
Levels int // Orderbook depth levels
}

// FeaturesSupported stores the exchanges supported features
Expand Down

0 comments on commit 6af3dba

Please sign in to comment.