diff --git a/exchanges/huobi/huobi_test.go b/exchanges/huobi/huobi_test.go index 3d83a23da8d..56bcbeddf11 100644 --- a/exchanges/huobi/huobi_test.go +++ b/exchanges/huobi/huobi_test.go @@ -26,8 +26,10 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -3010,3 +3012,52 @@ func TestGetCurrencyTradeURL(t *testing.T) { } } } + +func TestGenerateSubscriptions(t *testing.T) { + t.Parallel() + + h := new(HUOBI) + require.NoError(t, testexch.Setup(h), "Test instance Setup must not error") + + h.Websocket.SetCanUseAuthenticatedEndpoints(true) + subs, err := h.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + exp := subscription.List{} + for _, s := range h.Features.Subscriptions { + for _, a := range h.GetAssetTypes(true) { + if s.Asset != asset.All && s.Asset != a { + continue + } + pairs, err := h.GetEnabledPairs(a) + require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a) + pairs = common.SortStrings(pairs).Format(currency.PairFormat{Uppercase: false, Delimiter: ""}) + s := s.Clone() //nolint:govet // Intentional lexical scope shadow + s.Asset = a + for i, p := range pairs { + s := s.Clone() //nolint:govet // Intentional lexical scope shadow + s.QualifiedChannel = channelName(s, p) + switch s.Channel { + case subscription.OrderbookChannel: + s.QualifiedChannel += ".step0" + case subscription.CandlesChannel: + s.QualifiedChannel += ".1min" + } + s.Pairs = pairs[i : i+1] + exp = append(exp, s) + } + } + } + testsubs.EqualLists(t, exp, subs) +} + +func TestSubscribe(t *testing.T) { + t.Parallel() + h := new(HUOBI) + require.NoError(t, testexch.Setup(h), "Test instance Setup must not error") + subs, err := h.Features.Subscriptions.ExpandTemplates(h) + require.NoError(t, err, "ExpandTemplates must not error") + h.Features.Subscriptions = subscription.List{} + testexch.SetupWs(t, h) + err = h.Subscribe(subs) + require.NoError(t, err, "Subscribe must not error") +} diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index 4a4e4d7adf7..1bfa4d03786 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "text/template" "time" "github.com/gorilla/websocket" @@ -17,6 +18,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" @@ -30,11 +32,13 @@ const ( baseWSURL = "wss://api.huobi.pro" futuresWSURL = "wss://api.hbdm.com/" - wsMarketURL = baseWSURL + "/ws" - wsMarketKline = "market.%s.kline.1min" - wsMarketDepth = "market.%s.depth.step0" - wsMarketTrade = "market.%s.trade.detail" - wsMarketTicker = "market.%s.detail" + wsMarketURL = baseWSURL + "/ws" + wsCandlesChannel = "market.%s.kline" + wsOrderbookChannel = "market.%s.depth" + wsTradesChannel = "market.%s.trade.detail" + wsMarketDetailChannel = "market.%s.detail" + wsMyOrdersChannel = "orders.%s" + wsMyTradesChannel = "orders.%s.update" wsAccountsOrdersEndPoint = "/ws/v1" wsAccountsList = "accounts.list" @@ -56,6 +60,25 @@ const ( rateLimit = 20 ) +var defaultSubscriptions = subscription.List{ + {Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 0}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.MyOrdersChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.MyTradesChannel, Authenticated: true}, + {Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true}, +} + +var subscriptionNames = map[string]string{ + subscription.TickerChannel: wsMarketDetailChannel, + subscription.CandlesChannel: wsCandlesChannel, + subscription.OrderbookChannel: wsOrderbookChannel, + subscription.AllTradesChannel: wsTradesChannel, + subscription.MyTradesChannel: wsMyOrdersChannel, + subscription.MyOrdersChannel: wsMyTradesChannel, +} + // Instantiates a communications channel between websocket connections var comms = make(chan WsMessage) @@ -514,101 +537,63 @@ func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error { return h.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } -// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() -func (h *HUOBI) GenerateDefaultSubscriptions() (subscription.List, error) { - var channels = []string{wsMarketKline, - wsMarketDepth, - wsMarketTrade, - wsMarketTicker} - var subscriptions subscription.List - if h.Websocket.CanUseAuthenticatedEndpoints() { - channels = append(channels, "orders.%v", "orders.%v.update") - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: "accounts", - }) - } - enabledCurrencies, err := h.GetEnabledPairs(asset.Spot) - if err != nil { - return nil, err - } - for i := range channels { - for j := range enabledCurrencies { - enabledCurrencies[j].Delimiter = "" - channel := fmt.Sprintf(channels[i], - enabledCurrencies[j].Lower().String()) - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: channel, - Pairs: currency.Pairs{enabledCurrencies[j]}, - }) - } - } - return subscriptions, nil +// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature +func (h *HUOBI) generateSubscriptions() (subscription.List, error) { + return h.Features.Subscriptions.ExpandTemplates(h) +} + +// GetSubscriptionTemplate returns a subscription channel template +func (h *HUOBI) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(template.FuncMap{ + "channelName": channelName, + "interval": h.FormatExchangeKlineInterval, + }).Parse(subTplText) } // Subscribe sends a websocket message to receive data from the channel -func (h *HUOBI) Subscribe(channelsToSubscribe subscription.List) error { +func (h *HUOBI) Subscribe(subs subscription.List) error { + var errs error var creds *account.Credentials - if h.Websocket.CanUseAuthenticatedEndpoints() { - var err error - creds, err = h.GetCredentials(context.TODO()) - if err != nil { - return err + if subs.Authenticated() { + if creds, errs = h.GetCredentials(context.TODO()); errs != nil { + return errs } } - var errs error - for i := range channelsToSubscribe { + for _, s := range subs { var err error - if (strings.Contains(channelsToSubscribe[i].Channel, "orders.") || - strings.Contains(channelsToSubscribe[i].Channel, "accounts")) && creds != nil { - err = h.wsAuthenticatedSubscribe(creds, - "sub", - wsAccountsOrdersEndPoint+channelsToSubscribe[i].Channel, - channelsToSubscribe[i].Channel) + if s.Authenticated { + err = h.wsAuthenticatedSubscribe(creds, "sub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel) } else { - err = h.Websocket.Conn.SendJSONMessage(WsRequest{ - Subscribe: channelsToSubscribe[i].Channel, - }) + err = h.Websocket.Conn.SendJSONMessage(WsRequest{Subscribe: s.QualifiedChannel}) } if err == nil { - err = h.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i]) - } - if err != nil { - errs = common.AppendError(errs, err) + err = h.Websocket.AddSuccessfulSubscriptions(s) } + errs = common.AppendError(errs, err) } return nil } // Unsubscribe sends a websocket message to stop receiving data from the channel -func (h *HUOBI) Unsubscribe(channelsToUnsubscribe subscription.List) error { +func (h *HUOBI) Unsubscribe(subs subscription.List) error { + var errs error var creds *account.Credentials - if h.Websocket.CanUseAuthenticatedEndpoints() { - var err error - creds, err = h.GetCredentials(context.TODO()) - if err != nil { - return err + if subs.Authenticated() { + if creds, errs = h.GetCredentials(context.TODO()); errs != nil { + return errs } } - var errs error - for i := range channelsToUnsubscribe { + for _, s := range subs { var err error - if (strings.Contains(channelsToUnsubscribe[i].Channel, "orders.") || - strings.Contains(channelsToUnsubscribe[i].Channel, "accounts")) && creds != nil { - err = h.wsAuthenticatedSubscribe(creds, - "unsub", - wsAccountsOrdersEndPoint+channelsToUnsubscribe[i].Channel, - channelsToUnsubscribe[i].Channel) + if s.Authenticated { + err = h.wsAuthenticatedSubscribe(creds, "unsub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel) } else { - err = h.Websocket.Conn.SendJSONMessage(WsRequest{ - Unsubscribe: channelsToUnsubscribe[i].Channel, - }) + err = h.Websocket.Conn.SendJSONMessage(WsRequest{Unsubscribe: s.QualifiedChannel}) } if err == nil { - err = h.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i]) - } - if err != nil { - errs = common.AppendError(errs, err) + err = h.Websocket.RemoveSubscriptions(s) } + errs = common.AppendError(errs, err) } return errs } @@ -810,3 +795,25 @@ func (h *HUOBI) wsGetOrderDetails(ctx context.Context, orderID string) (*WsAuthe } return &response, nil } + +// channelName converts global channel Names used in config of channel input into bitmex channel names +// returns the name unchanged if no match is found +func channelName(s *subscription.Subscription, p currency.Pair) string { + name := s.Channel + if n, ok := subscriptionNames[name]; ok { + name = n + } + return fmt.Sprintf(name, p) +} + +const subTplText = ` +{{ range $asset, $pairs := $.AssetPairs }} + {{- range $p := $pairs -}} + {{- channelName $.S $p -}} + {{- if eq $.S.Channel "candles" -}} . {{- interval $.S.Interval }}{{ end }} + {{- if eq $.S.Channel "orderbook" -}} .step {{- $.S.Levels }}{{ end }} + {{ $.PairSeparator }} + {{- end }} + {{ $.AssetSeparator }} +{{- end }} +` diff --git a/exchanges/huobi/huobi_wrapper.go b/exchanges/huobi/huobi_wrapper.go index 3c1002300c1..068ce61ba0b 100644 --- a/exchanges/huobi/huobi_wrapper.go +++ b/exchanges/huobi/huobi_wrapper.go @@ -161,6 +161,7 @@ func (h *HUOBI) SetDefaults() { GlobalResultLimit: 2000, }, }, + Subscriptions: defaultSubscriptions.Clone(), } h.Requester, err = request.New(h.Name, @@ -212,7 +213,7 @@ func (h *HUOBI) Setup(exch *config.Exchange) error { Connector: h.WsConnect, Subscriber: h.Subscribe, Unsubscriber: h.Unsubscribe, - GenerateSubscriptions: h.GenerateDefaultSubscriptions, + GenerateSubscriptions: h.generateSubscriptions, Features: &h.Features.Supports.WebsocketCapabilities, }) if err != nil { diff --git a/exchanges/subscription/subscription.go b/exchanges/subscription/subscription.go index 516466363eb..7cbf3f25d7e 100644 --- a/exchanges/subscription/subscription.go +++ b/exchanges/subscription/subscription.go @@ -31,6 +31,7 @@ const ( AllTradesChannel = "allTrades" MyTradesChannel = "myTrades" MyOrdersChannel = "myOrders" + MyAccountChannel = "account" ) // Public errors