From 3d7cf2ab215165851322b27ccf220c381fbe4f31 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Mon, 26 Feb 2024 14:32:17 +0700 Subject: [PATCH] Bybit: Subscription Pairs support --- exchanges/btse/btse_websocket.go | 16 +++++++------- exchanges/bybit/bybit_inverse_websocket.go | 9 ++++---- exchanges/bybit/bybit_linear_websocket.go | 8 +++---- exchanges/bybit/bybit_options_websocket.go | 9 ++++---- exchanges/bybit/bybit_websocket.go | 25 +++++++++++----------- 5 files changed, 35 insertions(+), 32 deletions(-) diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index 75b382f9b34..b6b7995e10f 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -361,23 +361,23 @@ func (b *BTSE) orderbookFilter(price, amount float64) bool { } // GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() -func (b *BTSE) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) { +func (b *BTSE) GenerateDefaultSubscriptions() (subscription.List, error) { var channels = []string{"orderBookL2Api:%s_0", "tradeHistory:%s"} pairs, err := b.GetEnabledPairs(asset.Spot) if err != nil { return nil, err } - var subscriptions []subscription.Subscription + var subscriptions subscription.List if b.Websocket.CanUseAuthenticatedEndpoints() { - subscriptions = append(subscriptions, subscription.Subscription{ + subscriptions = append(subscriptions, &subscription.Subscription{ Channel: "notificationApi", }) } for i := range channels { for j := range pairs { - subscriptions = append(subscriptions, subscription.Subscription{ + subscriptions = append(subscriptions, &subscription.Subscription{ Channel: fmt.Sprintf(channels[i], pairs[j]), - Pairs: pairs[j], + Pairs: currency.Pairs{pairs[j]}, Asset: asset.Spot, }) } @@ -386,7 +386,7 @@ func (b *BTSE) GenerateDefaultSubscriptions() ([]subscription.Subscription, erro } // Subscribe sends a websocket message to receive data from the channel -func (b *BTSE) Subscribe(channelsToSubscribe []subscription.Subscription) error { +func (b *BTSE) Subscribe(channelsToSubscribe subscription.List) error { var sub wsSub sub.Operation = "subscribe" for i := range channelsToSubscribe { @@ -401,7 +401,7 @@ func (b *BTSE) Subscribe(channelsToSubscribe []subscription.Subscription) error } // Unsubscribe sends a websocket message to stop receiving data from the channel -func (b *BTSE) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error { +func (b *BTSE) Unsubscribe(channelsToUnsubscribe subscription.List) error { var unSub wsSub unSub.Operation = "unsubscribe" for i := range channelsToUnsubscribe { @@ -412,6 +412,6 @@ func (b *BTSE) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) er if err != nil { return err } - b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...) + b.Websocket.RemoveSubscriptions(channelsToUnsubscribe) return nil } diff --git a/exchanges/bybit/bybit_inverse_websocket.go b/exchanges/bybit/bybit_inverse_websocket.go index d1387c277f8..7e6bec90527 100644 --- a/exchanges/bybit/bybit_inverse_websocket.go +++ b/exchanges/bybit/bybit_inverse_websocket.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" @@ -44,7 +45,7 @@ func (by *Bybit) GenerateInverseDefaultSubscriptions() ([]subscription.Subscript subscriptions = append(subscriptions, subscription.Subscription{ Channel: channels[x], - Pair: pairs[z], + Pairs: currency.Pairs{pairs[z]}, Asset: asset.CoinMarginedFutures, }) } @@ -53,16 +54,16 @@ func (by *Bybit) GenerateInverseDefaultSubscriptions() ([]subscription.Subscript } // InverseSubscribe sends a subscription message to linear public channels. -func (by *Bybit) InverseSubscribe(channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) InverseSubscribe(channelSubscriptions subscription.List) error { return by.handleInversePayloadSubscription("subscribe", channelSubscriptions) } // InverseUnsubscribe sends an unsubscription messages through linear public channels. -func (by *Bybit) InverseUnsubscribe(channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) InverseUnsubscribe(channelSubscriptions subscription.List) error { return by.handleInversePayloadSubscription("unsubscribe", channelSubscriptions) } -func (by *Bybit) handleInversePayloadSubscription(operation string, channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) handleInversePayloadSubscription(operation string, channelSubscriptions subscription.List) error { payloads, err := by.handleSubscriptions(asset.CoinMarginedFutures, operation, channelSubscriptions) if err != nil { return err diff --git a/exchanges/bybit/bybit_linear_websocket.go b/exchanges/bybit/bybit_linear_websocket.go index 9b3ed08426a..4d9815986d2 100644 --- a/exchanges/bybit/bybit_linear_websocket.go +++ b/exchanges/bybit/bybit_linear_websocket.go @@ -63,7 +63,7 @@ func (by *Bybit) GenerateLinearDefaultSubscriptions() ([]subscription.Subscripti subscriptions = append(subscriptions, subscription.Subscription{ Channel: channels[x], - Pair: pairs[p], + Pairs: currency.Pairs{pairs[p]}, Asset: a, }) } @@ -73,16 +73,16 @@ func (by *Bybit) GenerateLinearDefaultSubscriptions() ([]subscription.Subscripti } // LinearSubscribe sends a subscription message to linear public channels. -func (by *Bybit) LinearSubscribe(channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) LinearSubscribe(channelSubscriptions subscription.List) error { return by.handleLinearPayloadSubscription("subscribe", channelSubscriptions) } // LinearUnsubscribe sends an unsubscription messages through linear public channels. -func (by *Bybit) LinearUnsubscribe(channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) LinearUnsubscribe(channelSubscriptions subscription.List) error { return by.handleLinearPayloadSubscription("unsubscribe", channelSubscriptions) } -func (by *Bybit) handleLinearPayloadSubscription(operation string, channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) handleLinearPayloadSubscription(operation string, channelSubscriptions subscription.List) error { payloads, err := by.handleSubscriptions(asset.USDTMarginedFutures, operation, channelSubscriptions) if err != nil { return err diff --git a/exchanges/bybit/bybit_options_websocket.go b/exchanges/bybit/bybit_options_websocket.go index 4bb25cef2a9..92c4958c35b 100644 --- a/exchanges/bybit/bybit_options_websocket.go +++ b/exchanges/bybit/bybit_options_websocket.go @@ -6,6 +6,7 @@ import ( "strconv" "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" @@ -51,7 +52,7 @@ func (by *Bybit) GenerateOptionsDefaultSubscriptions() ([]subscription.Subscript subscriptions = append(subscriptions, subscription.Subscription{ Channel: channels[x], - Pair: pairs[z], + Pairs: currency.Pairs{pairs[z]}, Asset: asset.Options, }) } @@ -60,16 +61,16 @@ func (by *Bybit) GenerateOptionsDefaultSubscriptions() ([]subscription.Subscript } // OptionSubscribe sends a subscription message to options public channels. -func (by *Bybit) OptionSubscribe(channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) OptionSubscribe(channelSubscriptions subscription.List) error { return by.handleOptionsPayloadSubscription("subscribe", channelSubscriptions) } // OptionUnsubscribe sends an unsubscription messages through options public channels. -func (by *Bybit) OptionUnsubscribe(channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) OptionUnsubscribe(channelSubscriptions subscription.List) error { return by.handleOptionsPayloadSubscription("unsubscribe", channelSubscriptions) } -func (by *Bybit) handleOptionsPayloadSubscription(operation string, channelSubscriptions []subscription.Subscription) error { +func (by *Bybit) handleOptionsPayloadSubscription(operation string, channelSubscriptions subscription.List) error { payloads, err := by.handleSubscriptions(asset.Options, operation, channelSubscriptions) if err != nil { return err diff --git a/exchanges/bybit/bybit_websocket.go b/exchanges/bybit/bybit_websocket.go index ff2698b8667..397ecf699ff 100644 --- a/exchanges/bybit/bybit_websocket.go +++ b/exchanges/bybit/bybit_websocket.go @@ -134,11 +134,11 @@ func (by *Bybit) WsAuth(ctx context.Context) error { } // Subscribe sends a websocket message to receive data from the channel -func (by *Bybit) Subscribe(channelsToSubscribe []subscription.Subscription) error { +func (by *Bybit) Subscribe(channelsToSubscribe subscription.List) error { return by.handleSpotSubscription("subscribe", channelsToSubscribe) } -func (by *Bybit) handleSubscriptions(assetType asset.Item, operation string, channelsToSubscribe []subscription.Subscription) ([]SubscriptionArgument, error) { +func (by *Bybit) handleSubscriptions(assetType asset.Item, operation string, channelsToSubscribe subscription.List) ([]SubscriptionArgument, error) { var args []SubscriptionArgument arg := SubscriptionArgument{ Operation: operation, @@ -166,17 +166,18 @@ func (by *Bybit) handleSubscriptions(assetType asset.Item, operation string, cha return nil, err } for i := range channelsToSubscribe { + pair := channelsToSubscribe[i].Pairs[0] switch channelsToSubscribe[i].Channel { case chanOrderbook: - arg.Arguments = append(arg.Arguments, fmt.Sprintf("%s.%d.%s", channelsToSubscribe[i].Channel, 50, channelsToSubscribe[i].Pair.Format(pairFormat).String())) + arg.Arguments = append(arg.Arguments, fmt.Sprintf("%s.%d.%s", channelsToSubscribe[i].Channel, 50, pair.Format(pairFormat).String())) case chanPublicTrade, chanPublicTicker, chanLiquidation, chanLeverageTokenTicker, chanLeverageTokenNav: - arg.Arguments = append(arg.Arguments, channelsToSubscribe[i].Channel+"."+channelsToSubscribe[i].Pair.Format(pairFormat).String()) + arg.Arguments = append(arg.Arguments, channelsToSubscribe[i].Channel+"."+pair.Format(pairFormat).String()) case chanKline, chanLeverageTokenKline: interval, err := intervalToString(kline.FiveMin) if err != nil { return nil, err } - arg.Arguments = append(arg.Arguments, channelsToSubscribe[i].Channel+"."+interval+"."+channelsToSubscribe[i].Pair.Format(pairFormat).String()) + arg.Arguments = append(arg.Arguments, channelsToSubscribe[i].Channel+"."+interval+"."+pair.Format(pairFormat).String()) case chanPositions, chanExecution, chanOrder, chanWallet, chanGreeks, chanDCP: if chanMap[channelsToSubscribe[i].Channel]&selectedChannels > 0 { continue @@ -204,11 +205,11 @@ func (by *Bybit) handleSubscriptions(assetType asset.Item, operation string, cha } // Unsubscribe sends a websocket message to stop receiving data from the channel -func (by *Bybit) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error { +func (by *Bybit) Unsubscribe(channelsToUnsubscribe subscription.List) error { return by.handleSpotSubscription("unsubscribe", channelsToUnsubscribe) } -func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe []subscription.Subscription) error { +func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe subscription.List) error { payloads, err := by.handleSubscriptions(asset.Spot, operation, channelsToSubscribe) if err != nil { return err @@ -239,8 +240,8 @@ func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe [] } // GenerateDefaultSubscriptions generates default subscription -func (by *Bybit) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) { - var subscriptions []subscription.Subscription +func (by *Bybit) GenerateDefaultSubscriptions() (subscription.List, error) { + var subscriptions subscription.List var channels = []string{ chanPublicTicker, chanOrderbook, @@ -266,16 +267,16 @@ func (by *Bybit) GenerateDefaultSubscriptions() ([]subscription.Subscription, er chanDCP, chanWallet: subscriptions = append(subscriptions, - subscription.Subscription{ + &subscription.Subscription{ Channel: channels[x], Asset: asset.Spot, }) default: for z := range pairs { subscriptions = append(subscriptions, - subscription.Subscription{ + &subscription.Subscription{ Channel: channels[x], - Pair: pairs[z], + Pairs: currency.Pairs{pairs[z]}, Asset: asset.Spot, }) }