Skip to content

Commit

Permalink
Huobi: Add subscription configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Aug 13, 2024
1 parent b636ba8 commit 315c634
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 77 deletions.
51 changes: 51 additions & 0 deletions exchanges/huobi/huobi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -3010,3 +3012,52 @@ func TestGetCurrencyTradeURL(t *testing.T) {
}
}
}

func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()

h := new(HUOBI)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")

h.Websocket.SetCanUseAuthenticatedEndpoints(true)
subs, err := h.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
exp := subscription.List{}
for _, s := range h.Features.Subscriptions {
for _, a := range h.GetAssetTypes(true) {
if s.Asset != asset.All && s.Asset != a {
continue
}
pairs, err := h.GetEnabledPairs(a)
require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a)
pairs = common.SortStrings(pairs).Format(currency.PairFormat{Uppercase: false, Delimiter: ""})
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.Asset = a
for i, p := range pairs {
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.QualifiedChannel = channelName(s, p)
switch s.Channel {
case subscription.OrderbookChannel:
s.QualifiedChannel += ".step0"
case subscription.CandlesChannel:
s.QualifiedChannel += ".1min"
}
s.Pairs = pairs[i : i+1]
exp = append(exp, s)
}
}
}
testsubs.EqualLists(t, exp, subs)
}

func TestSubscribe(t *testing.T) {
t.Parallel()
h := new(HUOBI)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
subs, err := h.Features.Subscriptions.ExpandTemplates(h)
require.NoError(t, err, "ExpandTemplates must not error")
h.Features.Subscriptions = subscription.List{}
testexch.SetupWs(t, h)
err = h.Subscribe(subs)
require.NoError(t, err, "Subscribe must not error")
}
159 changes: 83 additions & 76 deletions exchanges/huobi/huobi_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
"text/template"
"time"

"github.com/gorilla/websocket"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
Expand All @@ -30,11 +32,13 @@ const (
baseWSURL = "wss://api.huobi.pro"
futuresWSURL = "wss://api.hbdm.com/"

wsMarketURL = baseWSURL + "/ws"
wsMarketKline = "market.%s.kline.1min"
wsMarketDepth = "market.%s.depth.step0"
wsMarketTrade = "market.%s.trade.detail"
wsMarketTicker = "market.%s.detail"
wsMarketURL = baseWSURL + "/ws"
wsCandlesChannel = "market.%s.kline"
wsOrderbookChannel = "market.%s.depth"
wsTradesChannel = "market.%s.trade.detail"
wsMarketDetailChannel = "market.%s.detail"
wsMyOrdersChannel = "orders.%s"
wsMyTradesChannel = "orders.%s.update"

wsAccountsOrdersEndPoint = "/ws/v1"
wsAccountsList = "accounts.list"
Expand All @@ -56,6 +60,25 @@ const (
rateLimit = 20
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 0},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyTradesChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true},
}

var subscriptionNames = map[string]string{
subscription.TickerChannel: wsMarketDetailChannel,
subscription.CandlesChannel: wsCandlesChannel,
subscription.OrderbookChannel: wsOrderbookChannel,
subscription.AllTradesChannel: wsTradesChannel,
subscription.MyTradesChannel: wsMyOrdersChannel,
subscription.MyOrdersChannel: wsMyTradesChannel,
}

// Instantiates a communications channel between websocket connections
var comms = make(chan WsMessage)

Expand Down Expand Up @@ -514,101 +537,63 @@ func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error {
return h.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (h *HUOBI) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{wsMarketKline,
wsMarketDepth,
wsMarketTrade,
wsMarketTicker}
var subscriptions subscription.List
if h.Websocket.CanUseAuthenticatedEndpoints() {
channels = append(channels, "orders.%v", "orders.%v.update")
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: "accounts",
})
}
enabledCurrencies, err := h.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
for i := range channels {
for j := range enabledCurrencies {
enabledCurrencies[j].Delimiter = ""
channel := fmt.Sprintf(channels[i],
enabledCurrencies[j].Lower().String())
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channel,
Pairs: currency.Pairs{enabledCurrencies[j]},
})
}
}
return subscriptions, nil
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (h *HUOBI) generateSubscriptions() (subscription.List, error) {
return h.Features.Subscriptions.ExpandTemplates(h)
}

// GetSubscriptionTemplate returns a subscription channel template
func (h *HUOBI) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"interval": h.FormatExchangeKlineInterval,
}).Parse(subTplText)
}

// Subscribe sends a websocket message to receive data from the channel
func (h *HUOBI) Subscribe(channelsToSubscribe subscription.List) error {
func (h *HUOBI) Subscribe(subs subscription.List) error {
var errs error
var creds *account.Credentials
if h.Websocket.CanUseAuthenticatedEndpoints() {
var err error
creds, err = h.GetCredentials(context.TODO())
if err != nil {
return err
if subs.Authenticated() {
if creds, errs = h.GetCredentials(context.TODO()); errs != nil {
return errs
}
}
var errs error
for i := range channelsToSubscribe {
for _, s := range subs {
var err error
if (strings.Contains(channelsToSubscribe[i].Channel, "orders.") ||
strings.Contains(channelsToSubscribe[i].Channel, "accounts")) && creds != nil {
err = h.wsAuthenticatedSubscribe(creds,
"sub",
wsAccountsOrdersEndPoint+channelsToSubscribe[i].Channel,
channelsToSubscribe[i].Channel)
if s.Authenticated {
err = h.wsAuthenticatedSubscribe(creds, "sub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel)
} else {
err = h.Websocket.Conn.SendJSONMessage(WsRequest{
Subscribe: channelsToSubscribe[i].Channel,
})
err = h.Websocket.Conn.SendJSONMessage(WsRequest{Subscribe: s.QualifiedChannel})
}
if err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
}
if err != nil {
errs = common.AppendError(errs, err)
err = h.Websocket.AddSuccessfulSubscriptions(s)
}
errs = common.AppendError(errs, err)
}
return nil
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
func (h *HUOBI) Unsubscribe(channelsToUnsubscribe subscription.List) error {
func (h *HUOBI) Unsubscribe(subs subscription.List) error {
var errs error
var creds *account.Credentials
if h.Websocket.CanUseAuthenticatedEndpoints() {
var err error
creds, err = h.GetCredentials(context.TODO())
if err != nil {
return err
if subs.Authenticated() {
if creds, errs = h.GetCredentials(context.TODO()); errs != nil {
return errs
}
}
var errs error
for i := range channelsToUnsubscribe {
for _, s := range subs {
var err error
if (strings.Contains(channelsToUnsubscribe[i].Channel, "orders.") ||
strings.Contains(channelsToUnsubscribe[i].Channel, "accounts")) && creds != nil {
err = h.wsAuthenticatedSubscribe(creds,
"unsub",
wsAccountsOrdersEndPoint+channelsToUnsubscribe[i].Channel,
channelsToUnsubscribe[i].Channel)
if s.Authenticated {
err = h.wsAuthenticatedSubscribe(creds, "unsub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel)
} else {
err = h.Websocket.Conn.SendJSONMessage(WsRequest{
Unsubscribe: channelsToUnsubscribe[i].Channel,
})
err = h.Websocket.Conn.SendJSONMessage(WsRequest{Unsubscribe: s.QualifiedChannel})
}
if err == nil {
err = h.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if err != nil {
errs = common.AppendError(errs, err)
err = h.Websocket.RemoveSubscriptions(s)
}
errs = common.AppendError(errs, err)
}
return errs
}
Expand Down Expand Up @@ -810,3 +795,25 @@ func (h *HUOBI) wsGetOrderDetails(ctx context.Context, orderID string) (*WsAuthe
}
return &response, nil
}

// channelName converts global channel Names used in config of channel input into bitmex channel names
// returns the name unchanged if no match is found
func channelName(s *subscription.Subscription, p currency.Pair) string {
name := s.Channel
if n, ok := subscriptionNames[name]; ok {
name = n
}
return fmt.Sprintf(name, p)
}

const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs -}}
{{- channelName $.S $p -}}
{{- if eq $.S.Channel "candles" -}} . {{- interval $.S.Interval }}{{ end }}
{{- if eq $.S.Channel "orderbook" -}} .step {{- $.S.Levels }}{{ end }}
{{ $.PairSeparator }}
{{- end }}
{{ $.AssetSeparator }}
{{- end }}
`
3 changes: 2 additions & 1 deletion exchanges/huobi/huobi_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (h *HUOBI) SetDefaults() {
GlobalResultLimit: 2000,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}

h.Requester, err = request.New(h.Name,
Expand Down Expand Up @@ -212,7 +213,7 @@ func (h *HUOBI) Setup(exch *config.Exchange) error {
Connector: h.WsConnect,
Subscriber: h.Subscribe,
Unsubscriber: h.Unsubscribe,
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
GenerateSubscriptions: h.generateSubscriptions,
Features: &h.Features.Supports.WebsocketCapabilities,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
AllTradesChannel = "allTrades"
MyTradesChannel = "myTrades"
MyOrdersChannel = "myOrders"
MyAccountChannel = "account"
)

// Public errors
Expand Down

0 comments on commit 315c634

Please sign in to comment.