Skip to content

Commit

Permalink
fixup! Huobi: Add V2 websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Nov 2, 2024
1 parent 95efb32 commit 97fb2fc
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 172 deletions.
66 changes: 42 additions & 24 deletions exchanges/huobi/huobi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,24 +1390,25 @@ func TestWsTicker(t *testing.T) {
}

func TestWsAccountUpdate(t *testing.T) {
pressXToJSON := []byte(`{
"op": "notify",
"ts": 1522856623232,
"topic": "accounts",
"data": {
"event": "order.place",
"list": [
{
"account-id": 419013,
"currency": "usdt",
"type": "trade",
"balance": "500009195917.4362872650"
}
]
}
}`)
err := h.wsHandleData(pressXToJSON)
require.NoError(t, err)
h := new(HUOBI) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(h), "Setup Instance must not error")
err := h.Websocket.AddSubscriptions(h.Websocket.Conn, &subscription.Subscription{Key: "accounts.update#2", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair}, Channel: subscription.MyAccountChannel})
require.NoError(t, err, "AddSubscriptions must not error")
h.SetSaveTradeDataStatus(true)
testexch.FixtureToDataHandler(t, "testdata/wsMyAccount.json", h.wsHandleData)
close(h.Websocket.DataHandler)
require.Len(t, h.Websocket.DataHandler, 2, "Should see correct number of records")
exp := []WsAccountUpdate{
{Currency: "btc", AccountID: 123456, Balance: 23.111, ChangeType: "transfer", AccountType: "trade", ChangeTime: 1568601800000, SeqNum: "1"},
{Currency: "btc", AccountID: 33385, Available: 2028.69, ChangeType: "order.match", AccountType: "trade", ChangeTime: 1574393385167, SeqNum: "2"},
}
for _, e := range exp {
uAny := <-h.Websocket.DataHandler
u, ok := uAny.(WsAccountUpdate)
require.True(t, ok, "Must get the correct type from DataHandler")
require.NotNil(t, u)
assert.Equal(t, e, u)
}
}

func TestWsOrderUpdate(t *testing.T) {
Expand Down Expand Up @@ -1882,7 +1883,7 @@ func TestGenerateSubscriptions(t *testing.T) {
s.Asset = a
for i, p := range pairs {
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.QualifiedChannel = channelName(s, p)
s.QualifiedChannel = channelName(s, &p)
switch s.Channel {
case subscription.OrderbookChannel:
s.QualifiedChannel += ".step0"
Expand Down Expand Up @@ -1915,8 +1916,25 @@ func wsFixture(tb testing.TB, msg []byte, w *websocket.Conn) error {
return fmt.Errorf("%w: %s", errors.New("Unhandled mock websocket message"), msg)
}

// TestSubscribe exercises mock subscriptions
// TestSubscribe exercises live public subscriptions
func TestSubscribe(t *testing.T) {
t.Parallel()
h := new(HUOBI) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
subs, err := h.Features.Subscriptions.ExpandTemplates(h)
require.NoError(t, err, "ExpandTemplates must not error")
testexch.SetupWs(t, h)
err = h.Subscribe(subs)
require.NoError(t, err, "Subscribe must not error")
got := h.Websocket.GetSubscriptions()
require.Equal(t, 4, len(got), "Should get correct number of subscriptions")
for _, s := range got {
assert.Equal(t, subscription.SubscribedState, s.State())
}
}

// TestAuthSubscribe exercises mock subscriptions including private
func TestAuthSubscribe(t *testing.T) {
t.Parallel()
subCfg := h.Features.Subscriptions
h := testexch.MockWsInstance[HUOBI](t, mockws.CurryWsMockUpgrader(t, wsFixture)) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
Expand All @@ -1926,16 +1944,16 @@ func TestSubscribe(t *testing.T) {
err = h.Subscribe(subs)
require.NoError(t, err, "Subscribe must not error")
got := h.Websocket.GetSubscriptions()
require.Len(t, got, len(subs))
require.Equal(t, 7, len(got), "Should get correct number of subscriptions")
for _, s := range got {
assert.Equal(t, subscription.SubscribedState, s.State())
}
}

func TestChannelName(t *testing.T) {
assert.Equal(t, "market.BTC-USD.kline", channelName(&subscription.Subscription{Channel: subscription.CandlesChannel}, btcusdPair))
assert.Equal(t, "trade.clearing#BTC-USD#1", channelName(&subscription.Subscription{Channel: subscription.MyOrdersChannel}, btcusdPair))
assert.Panics(t, func() { channelName(&subscription.Subscription{Channel: wsOrderbookChannel}, btcusdPair) })
assert.Equal(t, "market.BTC-USD.kline", channelName(&subscription.Subscription{Channel: subscription.CandlesChannel}, &btcusdPair))
assert.Equal(t, "trade.clearing#BTC-USD#1", channelName(&subscription.Subscription{Channel: subscription.MyOrdersChannel}, &btcusdPair))
assert.Panics(t, func() { channelName(&subscription.Subscription{Channel: wsOrderbookChannel}, &btcusdPair) })
}

func TestGetErrResp(t *testing.T) {
Expand Down
134 changes: 14 additions & 120 deletions exchanges/huobi/huobi_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,19 +814,6 @@ type wsSubReq struct {
Unsub string `json:"unsub,omitempty"`
}

// wsSubResp is a response to a subscribe/unsubcribe request
type wsSubResp struct {
ID int64 `json:"id"`
Op string `json:"op"`
Channel string `json:"ch"`
Timestamp int64 `json:"ts"`
Status string `json:"status"`
ErrorCode any `json:"err-code"` // ErrorCode returns either an integer or a string
ErrorMessage string `json:"err-msg"`
Subscribed string `json:"subbed"`
UnSubscribed string `json:"unsubbed"`
}

// WsHeartBeat defines a heartbeat request
type WsHeartBeat struct {
ClientNonce int64 `json:"ping"`
Expand Down Expand Up @@ -912,78 +899,24 @@ type wsAuthReq struct {
Signature string `json:"signature"`
}

// WsAuthenticatedSubscriptionRequest request for subscription on authenticated connection
type WsAuthenticatedSubscriptionRequest struct {
Op string `json:"op"`
AccessKeyID string `json:"AccessKeyId"`
SignatureMethod string `json:"SignatureMethod"`
SignatureVersion string `json:"SignatureVersion"`
Timestamp string `json:"Timestamp"`
Signature string `json:"Signature"`
Topic string `json:"topic"`
}

// WsAuthenticatedAccountsListRequest request for account list authenticated connection
type WsAuthenticatedAccountsListRequest struct {
Op string `json:"op"`
AccessKeyID string `json:"AccessKeyId"`
SignatureMethod string `json:"SignatureMethod"`
SignatureVersion string `json:"SignatureVersion"`
Timestamp string `json:"Timestamp"`
Signature string `json:"Signature"`
Topic string `json:"topic"`
Symbol string `json:"symbol"`
}

// WsAuthenticatedOrderDetailsRequest request for order details authenticated connection
type WsAuthenticatedOrderDetailsRequest struct {
Op string `json:"op"`
AccessKeyID string `json:"AccessKeyId"`
SignatureMethod string `json:"SignatureMethod"`
SignatureVersion string `json:"SignatureVersion"`
Timestamp string `json:"Timestamp"`
Signature string `json:"Signature"`
Topic string `json:"topic"`
OrderID string `json:"order-id"`
}

// WsAuthenticatedOrdersListRequest request for orderslist authenticated connection
type WsAuthenticatedOrdersListRequest struct {
Op string `json:"op"`
AccessKeyID string `json:"AccessKeyId"`
SignatureMethod string `json:"SignatureMethod"`
SignatureVersion string `json:"SignatureVersion"`
Timestamp string `json:"Timestamp"`
Signature string `json:"Signature"`
Topic string `json:"topic"`
States string `json:"states"`
AccountID int64 `json:"account-id"`
Symbol string `json:"symbol"`
}

// WsAuthenticatedAccountsResponse response from Accounts authenticated subscription
type WsAuthenticatedAccountsResponse struct {
wsSubResp
Data WsAuthenticatedAccountsResponseData `json:"data"`
}

// WsAuthenticatedAccountsResponseData account data
type WsAuthenticatedAccountsResponseData struct {
Event string `json:"event"`
List []WsAuthenticatedAccountsResponseDataList `json:"list"`
}

// WsAuthenticatedAccountsResponseDataList detailed account data
type WsAuthenticatedAccountsResponseDataList struct {
AccountID int64 `json:"account-id"`
Currency string `json:"currency"`
Type string `json:"type"`
Balance float64 `json:"balance,string"`
// wsAccountUpdateMsg contains account updates to balances
type wsAccountUpdateMsg struct {
Data WsAccountUpdate `json:"data"`
}

type WsAccountUpdate struct {

Check failure on line 907 in exchanges/huobi/huobi_types.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type WsAccountUpdate should have comment or be unexported (revive)
Currency string `json:"currency"`
AccountID int64 `json:"accountId"`
Balance float64 `json:"balance,string"`
Available float64 `json:"available,string"`
ChangeType string `json:"changeType"`
AccountType string `json:"accountType"`
ChangeTime int64 `json:"changeTime"`
SeqNum string `json:"seqNum"`
}

// WsAuthenticatedOrdersUpdateResponse response from OrdersUpdate authenticated subscription
type WsAuthenticatedOrdersUpdateResponse struct {
wsSubResp
Data WsAuthenticatedOrdersUpdateResponseData `json:"data"`
}

Expand All @@ -1003,7 +936,6 @@ type WsAuthenticatedOrdersUpdateResponseData struct {

// WsAuthenticatedOrdersResponse response from Orders authenticated subscription
type WsAuthenticatedOrdersResponse struct {
wsSubResp
Data []WsAuthenticatedOrdersResponseData `json:"data"`
}

Expand All @@ -1027,44 +959,6 @@ type WsAuthenticatedOrdersResponseData struct {
FilledFees float64 `json:"filled-fees,string"`
}

// WsAuthenticatedAccountsListResponse response from AccountsList authenticated endpoint
type WsAuthenticatedAccountsListResponse struct {
wsSubResp
Data []WsAuthenticatedAccountsListResponseData `json:"data"`
}

// WsAuthenticatedAccountsListResponseData account data
type WsAuthenticatedAccountsListResponseData struct {
ID int64 `json:"id"`
Type string `json:"type"`
State string `json:"state"`
List []WsAuthenticatedAccountsListResponseDataList `json:"list"`
}

// WsAuthenticatedAccountsListResponseDataList detailed account data
type WsAuthenticatedAccountsListResponseDataList struct {
Currency string `json:"currency"`
Type string `json:"type"`
Balance float64 `json:"balance,string"`
}

// WsAuthenticatedOrdersListResponse response from OrdersList authenticated endpoint
type WsAuthenticatedOrdersListResponse struct {
wsSubResp
Data []OrderInfo `json:"data"`
}

// WsAuthenticatedOrderDetailResponse response from OrderDetail authenticated endpoint
type WsAuthenticatedOrderDetailResponse struct {
wsSubResp
Data OrderInfo `json:"data"`
}

type authenticationPing struct {
OP string `json:"op"`
TS int64 `json:"ts"`
}

// OrderVars stores side, status and type for any order/trade
type OrderVars struct {
Side order.Side
Expand Down
68 changes: 40 additions & 28 deletions exchanges/huobi/huobi_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
wsMarketDetailChannel = "market.%s.detail"
wsMyOrdersChannel = "orders#%s"
wsMyTradesChannel = "trade.clearing#%s#1" // 0=Only trade events, 1=Trade and Cancellation events
wsMyAccountChannel = "accounts.update#2" // 0=Only balance, 1=Balance or Available, 2=Balance and Available when either change
wsAuthChannel = "auth"

wsDateTimeFormatting = "2006-01-02T15:04:05"
Expand Down Expand Up @@ -75,6 +76,7 @@ var subscriptionNames = map[string]string{
subscription.AllTradesChannel: wsTradesChannel,
subscription.MyTradesChannel: wsMyOrdersChannel,
subscription.MyOrdersChannel: wsMyTradesChannel,
subscription.MyAccountChannel: wsMyAccountChannel,
}

// WsConnect initiates a new websocket connection
Expand Down Expand Up @@ -154,7 +156,12 @@ func (h *HUOBI) wsHandleData(respRaw []byte) error {
}

if action, err := jsonparser.GetString(respRaw, "action"); err == nil {
return h.wsHandleActionMsgs(action, respRaw)
switch action {
case "ping":
return h.wsHandleV2ping(action, respRaw)
case wsSubOp, wsUnsubOp:
return h.wsHandleV2subResp(action, respRaw)
}
}

if err := getErrResp(respRaw); err != nil {
Expand All @@ -176,21 +183,21 @@ func (h *HUOBI) wsHandleData(respRaw []byte) error {
return nil
}

func (h *HUOBI) wsHandleActionMsgs(action string, respRaw []byte) error {
switch action {
case "ping":
ts, err := jsonparser.GetInt(respRaw, "data", "ts")
if err != nil {
return fmt.Errorf("error getting ts from auth ping: %w", err)
}
if err := h.Websocket.AuthConn.SendJSONMessage(context.Background(), request.Unset, json.RawMessage(`{"action":"pong","data":{"ts":`+strconv.Itoa(int(ts))+`}}`)); err != nil {
return fmt.Errorf("error sending auth pong response: %w", err)
}
case wsSubOp, wsUnsubOp:
if ch, err := jsonparser.GetString(respRaw, "ch"); err == nil {
if h.Websocket.Match.IncomingWithData(action+":"+ch, respRaw) {
return nil
}
func (h *HUOBI) wsHandleV2ping(action string, respRaw []byte) error {

Check failure on line 186 in exchanges/huobi/huobi_websocket.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'action' seems to be unused, consider removing or renaming it as _ (revive)
ts, err := jsonparser.GetInt(respRaw, "data", "ts")
if err != nil {
return fmt.Errorf("error getting ts from auth ping: %w", err)
}
if err := h.Websocket.AuthConn.SendJSONMessage(context.Background(), request.Unset, json.RawMessage(`{"action":"pong","data":{"ts":`+strconv.Itoa(int(ts))+`}}`)); err != nil {
return fmt.Errorf("error sending auth pong response: %w", err)
}
return nil
}

func (h *HUOBI) wsHandleV2subResp(action string, respRaw []byte) error {
if ch, err := jsonparser.GetString(respRaw, "ch"); err == nil {
if !h.Websocket.Match.IncomingWithData(action+":"+ch, respRaw) {
return fmt.Errorf("%w: %s:%s", stream.ErrNoMessageListener, action, ch)
}
}
return nil
Expand Down Expand Up @@ -379,17 +386,18 @@ func (h *HUOBI) wsHandleMyOrdersMsg(s *subscription.Subscription, respRaw []byte
Status: oStatus,
AssetType: s.Asset,
Pair: s.Pairs[0],
LastUpdated: time.Unix(response.Timestamp*1000, 0),
// TODO

Check failure on line 389 in exchanges/huobi/huobi_websocket.go

View workflow job for this annotation

GitHub Actions / lint

todoCommentWithoutDetail: may want to add detail/assignee to this TODO/FIXME/BUG comment (gocritic)
//LastUpdated: time.Unix(response.Timestamp*1000, 0),
}
return nil
}

func (h *HUOBI) wsHandleMyAccountMsg(respRaw []byte) error {
var response WsAuthenticatedAccountsResponse
if err := json.Unmarshal(respRaw, &response); err != nil {
u := &wsAccountUpdateMsg{}
if err := json.Unmarshal(respRaw, u); err != nil {
return err
}
h.Websocket.DataHandler <- response
h.Websocket.DataHandler <- u.Data
return nil
}

Expand Down Expand Up @@ -594,7 +602,7 @@ func getErrResp(msg []byte) error {

// channelName converts global channel Names used in config of channel input into bitmex channel names
// returns the name unchanged if no match is found
func channelName(s *subscription.Subscription, p currency.Pair) string {
func channelName(s *subscription.Subscription, p *currency.Pair) string {
name := s.Channel
if n, ok := subscriptionNames[name]; ok {
return fmt.Sprintf(n, p)
Expand All @@ -603,13 +611,17 @@ func channelName(s *subscription.Subscription, p currency.Pair) string {
}

const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs }}
{{- channelName $.S $p -}}
{{- if eq $.S.Channel "candles" -}} . {{- interval $.S.Interval }}{{ end }}
{{- if eq $.S.Channel "orderbook" -}} .step {{- $.S.Levels }}{{ end }}
{{ $.PairSeparator }}
{{- if $.S.Asset }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs }}
{{- channelName $.S $p -}}
{{- if eq $.S.Channel "candles" -}} . {{- interval $.S.Interval }}{{ end }}
{{- if eq $.S.Channel "orderbook" -}} .step {{- $.S.Levels }}{{ end }}
{{ $.PairSeparator }}
{{- end }}
{{ $.AssetSeparator }}
{{- end }}
{{ $.AssetSeparator }}
{{- else -}}
{{ channelName $.S nil }}
{{- end }}
`
2 changes: 2 additions & 0 deletions exchanges/huobi/testdata/wsMyAccount.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"action":"push","ch":"accounts.update#2","data":{"currency":"btc","accountId":123456,"balance":"23.111","changeType":"transfer","accountType":"trade","seqNum":"1","changeTime":1568601800000}}
{"action":"push","ch":"accounts.update#2","data":{"currency":"btc","accountId":33385,"available":"2028.69","changeType":"order.match","accountType":"trade","seqNum":"2","changeTime":1574393385167}}

0 comments on commit 97fb2fc

Please sign in to comment.