Skip to content

Commit

Permalink
Bybit: Subscription Pairs support
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Feb 26, 2024
1 parent e372bd0 commit 74df066
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 33 deletions.
22 changes: 13 additions & 9 deletions exchanges/btse/btse_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,23 +361,23 @@ func (b *BTSE) orderbookFilter(price, amount float64) bool {
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (b *BTSE) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) {
func (b *BTSE) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{"orderBookL2Api:%s_0", "tradeHistory:%s"}
pairs, err := b.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
var subscriptions []subscription.Subscription
var subscriptions subscription.List
if b.Websocket.CanUseAuthenticatedEndpoints() {
subscriptions = append(subscriptions, subscription.Subscription{
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: "notificationApi",
})
}
for i := range channels {
for j := range pairs {
subscriptions = append(subscriptions, subscription.Subscription{
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: fmt.Sprintf(channels[i], pairs[j]),
Pairs: pairs[j],
Pairs: currency.Pairs{pairs[j]},
Asset: asset.Spot,
})
}
Expand All @@ -386,7 +386,7 @@ func (b *BTSE) GenerateDefaultSubscriptions() ([]subscription.Subscription, erro
}

// Subscribe sends a websocket message to receive data from the channel
func (b *BTSE) Subscribe(channelsToSubscribe []subscription.Subscription) error {
func (b *BTSE) Subscribe(channelsToSubscribe subscription.List) error {
var sub wsSub
sub.Operation = "subscribe"
for i := range channelsToSubscribe {
Expand All @@ -396,12 +396,16 @@ func (b *BTSE) Subscribe(channelsToSubscribe []subscription.Subscription) error
if err != nil {
return err
}
b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...)
for _, sub := range channelsToSubscribe {
sub.SetState(subscription.SubscribedState)
b.Websocket.AddSubscription(sub)

}
return nil
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
func (b *BTSE) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error {
func (b *BTSE) Unsubscribe(channelsToUnsubscribe subscription.List) error {
var unSub wsSub
unSub.Operation = "unsubscribe"
for i := range channelsToUnsubscribe {
Expand All @@ -412,6 +416,6 @@ func (b *BTSE) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) er
if err != nil {
return err
}
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe)
return nil
}
9 changes: 5 additions & 4 deletions exchanges/bybit/bybit_inverse_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (by *Bybit) GenerateInverseDefaultSubscriptions() ([]subscription.Subscript
subscriptions = append(subscriptions,
subscription.Subscription{
Channel: channels[x],
Pair: pairs[z],
Pairs: currency.Pairs{pairs[z]},
Asset: asset.CoinMarginedFutures,
})
}
Expand All @@ -53,16 +54,16 @@ func (by *Bybit) GenerateInverseDefaultSubscriptions() ([]subscription.Subscript
}

// InverseSubscribe sends a subscription message to linear public channels.
func (by *Bybit) InverseSubscribe(channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) InverseSubscribe(channelSubscriptions subscription.List) error {
return by.handleInversePayloadSubscription("subscribe", channelSubscriptions)
}

// InverseUnsubscribe sends an unsubscription messages through linear public channels.
func (by *Bybit) InverseUnsubscribe(channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) InverseUnsubscribe(channelSubscriptions subscription.List) error {
return by.handleInversePayloadSubscription("unsubscribe", channelSubscriptions)
}

func (by *Bybit) handleInversePayloadSubscription(operation string, channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) handleInversePayloadSubscription(operation string, channelSubscriptions subscription.List) error {
payloads, err := by.handleSubscriptions(asset.CoinMarginedFutures, operation, channelSubscriptions)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions exchanges/bybit/bybit_linear_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (by *Bybit) GenerateLinearDefaultSubscriptions() ([]subscription.Subscripti
subscriptions = append(subscriptions,
subscription.Subscription{
Channel: channels[x],
Pair: pairs[p],
Pairs: currency.Pairs{pairs[p]},
Asset: a,
})
}
Expand All @@ -73,16 +73,16 @@ func (by *Bybit) GenerateLinearDefaultSubscriptions() ([]subscription.Subscripti
}

// LinearSubscribe sends a subscription message to linear public channels.
func (by *Bybit) LinearSubscribe(channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) LinearSubscribe(channelSubscriptions subscription.List) error {
return by.handleLinearPayloadSubscription("subscribe", channelSubscriptions)
}

// LinearUnsubscribe sends an unsubscription messages through linear public channels.
func (by *Bybit) LinearUnsubscribe(channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) LinearUnsubscribe(channelSubscriptions subscription.List) error {
return by.handleLinearPayloadSubscription("unsubscribe", channelSubscriptions)
}

func (by *Bybit) handleLinearPayloadSubscription(operation string, channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) handleLinearPayloadSubscription(operation string, channelSubscriptions subscription.List) error {
payloads, err := by.handleSubscriptions(asset.USDTMarginedFutures, operation, channelSubscriptions)
if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions exchanges/bybit/bybit_options_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"

"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
Expand Down Expand Up @@ -51,7 +52,7 @@ func (by *Bybit) GenerateOptionsDefaultSubscriptions() ([]subscription.Subscript
subscriptions = append(subscriptions,
subscription.Subscription{
Channel: channels[x],
Pair: pairs[z],
Pairs: currency.Pairs{pairs[z]},
Asset: asset.Options,
})
}
Expand All @@ -60,16 +61,16 @@ func (by *Bybit) GenerateOptionsDefaultSubscriptions() ([]subscription.Subscript
}

// OptionSubscribe sends a subscription message to options public channels.
func (by *Bybit) OptionSubscribe(channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) OptionSubscribe(channelSubscriptions subscription.List) error {
return by.handleOptionsPayloadSubscription("subscribe", channelSubscriptions)
}

// OptionUnsubscribe sends an unsubscription messages through options public channels.
func (by *Bybit) OptionUnsubscribe(channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) OptionUnsubscribe(channelSubscriptions subscription.List) error {
return by.handleOptionsPayloadSubscription("unsubscribe", channelSubscriptions)
}

func (by *Bybit) handleOptionsPayloadSubscription(operation string, channelSubscriptions []subscription.Subscription) error {
func (by *Bybit) handleOptionsPayloadSubscription(operation string, channelSubscriptions subscription.List) error {
payloads, err := by.handleSubscriptions(asset.Options, operation, channelSubscriptions)
if err != nil {
return err
Expand Down
25 changes: 13 additions & 12 deletions exchanges/bybit/bybit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ func (by *Bybit) WsAuth(ctx context.Context) error {
}

// Subscribe sends a websocket message to receive data from the channel
func (by *Bybit) Subscribe(channelsToSubscribe []subscription.Subscription) error {
func (by *Bybit) Subscribe(channelsToSubscribe subscription.List) error {
return by.handleSpotSubscription("subscribe", channelsToSubscribe)
}

func (by *Bybit) handleSubscriptions(assetType asset.Item, operation string, channelsToSubscribe []subscription.Subscription) ([]SubscriptionArgument, error) {
func (by *Bybit) handleSubscriptions(assetType asset.Item, operation string, channelsToSubscribe subscription.List) ([]SubscriptionArgument, error) {
var args []SubscriptionArgument
arg := SubscriptionArgument{
Operation: operation,
Expand Down Expand Up @@ -166,17 +166,18 @@ func (by *Bybit) handleSubscriptions(assetType asset.Item, operation string, cha
return nil, err
}
for i := range channelsToSubscribe {
pair := channelsToSubscribe[i].Pairs[0]
switch channelsToSubscribe[i].Channel {
case chanOrderbook:
arg.Arguments = append(arg.Arguments, fmt.Sprintf("%s.%d.%s", channelsToSubscribe[i].Channel, 50, channelsToSubscribe[i].Pair.Format(pairFormat).String()))
arg.Arguments = append(arg.Arguments, fmt.Sprintf("%s.%d.%s", channelsToSubscribe[i].Channel, 50, pair.Format(pairFormat).String()))
case chanPublicTrade, chanPublicTicker, chanLiquidation, chanLeverageTokenTicker, chanLeverageTokenNav:
arg.Arguments = append(arg.Arguments, channelsToSubscribe[i].Channel+"."+channelsToSubscribe[i].Pair.Format(pairFormat).String())
arg.Arguments = append(arg.Arguments, channelsToSubscribe[i].Channel+"."+pair.Format(pairFormat).String())
case chanKline, chanLeverageTokenKline:
interval, err := intervalToString(kline.FiveMin)
if err != nil {
return nil, err
}
arg.Arguments = append(arg.Arguments, channelsToSubscribe[i].Channel+"."+interval+"."+channelsToSubscribe[i].Pair.Format(pairFormat).String())
arg.Arguments = append(arg.Arguments, channelsToSubscribe[i].Channel+"."+interval+"."+pair.Format(pairFormat).String())
case chanPositions, chanExecution, chanOrder, chanWallet, chanGreeks, chanDCP:
if chanMap[channelsToSubscribe[i].Channel]&selectedChannels > 0 {
continue
Expand Down Expand Up @@ -204,11 +205,11 @@ func (by *Bybit) handleSubscriptions(assetType asset.Item, operation string, cha
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
func (by *Bybit) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error {
func (by *Bybit) Unsubscribe(channelsToUnsubscribe subscription.List) error {
return by.handleSpotSubscription("unsubscribe", channelsToUnsubscribe)
}

func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe []subscription.Subscription) error {
func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe subscription.List) error {
payloads, err := by.handleSubscriptions(asset.Spot, operation, channelsToSubscribe)
if err != nil {
return err
Expand Down Expand Up @@ -239,8 +240,8 @@ func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe []
}

// GenerateDefaultSubscriptions generates default subscription
func (by *Bybit) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) {
var subscriptions []subscription.Subscription
func (by *Bybit) GenerateDefaultSubscriptions() (subscription.List, error) {
var subscriptions subscription.List
var channels = []string{
chanPublicTicker,
chanOrderbook,
Expand All @@ -266,16 +267,16 @@ func (by *Bybit) GenerateDefaultSubscriptions() ([]subscription.Subscription, er
chanDCP,
chanWallet:
subscriptions = append(subscriptions,
subscription.Subscription{
&subscription.Subscription{
Channel: channels[x],
Asset: asset.Spot,
})
default:
for z := range pairs {
subscriptions = append(subscriptions,
subscription.Subscription{
&subscription.Subscription{
Channel: channels[x],
Pair: pairs[z],
Pairs: currency.Pairs{pairs[z]},
Asset: asset.Spot,
})
}
Expand Down

0 comments on commit 74df066

Please sign in to comment.