From d35dd50187634e85125b16215491c088394602ec Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Tue, 17 Sep 2024 13:24:45 +0700 Subject: [PATCH] Bitmex: Handle subscription errors This switches to multiplexing so that we know which errors belong to which stream, particularly for the auth attempt --- exchanges/bitmex/bitmex_test.go | 117 ++--- exchanges/bitmex/bitmex_websocket.go | 653 ++++++++++++++------------- 2 files changed, 384 insertions(+), 386 deletions(-) diff --git a/exchanges/bitmex/bitmex_test.go b/exchanges/bitmex/bitmex_test.go index e2214dfd8a8..27ba9a2b346 100644 --- a/exchanges/bitmex/bitmex_test.go +++ b/exchanges/bitmex/bitmex_test.go @@ -832,7 +832,7 @@ func TestUpdateTradablePairs(t *testing.T) { func TestWsPositionUpdate(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -840,7 +840,7 @@ func TestWsPositionUpdate(t *testing.T) { "riskValue":87960,"homeNotional":0.0008796,"posState":"Liquidation","maintMargin":263, "unrealisedGrossPnl":-677,"unrealisedPnl":-677,"unrealisedPnlPcnt":-0.0078,"unrealisedRoePcnt":-0.7756, "simpleQty":0.001,"liquidationPrice":1140.1, "timestamp":"2017-04-04T22:07:45.442Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -849,7 +849,7 @@ func TestWsPositionUpdate(t *testing.T) { func TestWsInsertExectuionUpdate(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"execution", + pressXToJSON := []byte(`[0, "public", "public", {"table":"execution", "action":"insert", "data":[{ "execID":"0193e879-cb6f-2891-d099-2c4eb40fee21", @@ -864,27 +864,7 @@ func TestWsInsertExectuionUpdate(t *testing.T) { "text":"Liquidation","trdMatchID":"7f4ab7f6-0006-3234-76f4-ae1385aad00f","execCost":88155,"execComm":66, "homeNotional":-0.00088155,"foreignNotional":1,"transactTime":"2017-04-04T22:07:46.035Z", "timestamp":"2017-04-04T22:07:46.035Z" - }]}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } -} - -func TestWSConnectionHandling(t *testing.T) { - t.Parallel() - pressXToJSON := []byte(`{"info":"Welcome to the BitMEX Realtime API.","version":"1.1.0", - "timestamp":"2015-01-18T10:14:06.802Z","docs":"https://www.bitmex.com/app/wsAPI","heartbeatEnabled":false}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } -} - -func TestWSSubscriptionHandling(t *testing.T) { - t.Parallel() - pressXToJSON := []byte(`{"success":true,"subscribe":"trade:ETHUSD", - "request":{"op":"subscribe","args":["trade:ETHUSD","instrument:ETHUSD"]}}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -893,18 +873,18 @@ func TestWSSubscriptionHandling(t *testing.T) { func TestWSPositionUpdateHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt","currentQty":1, "markPrice":1136.88,"posState":"Liquidated","simpleQty":0.001,"liquidationPrice":1140.1,"bankruptPrice":1134.37, "timestamp":"2017-04-04T22:07:46.019Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } - pressXToJSON = []byte(`{"table":"position", + pressXToJSON = []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -917,7 +897,7 @@ func TestWSPositionUpdateHandling(t *testing.T) { "unrealisedPnlPcnt":0,"unrealisedRoePcnt":0,"simpleQty":0,"simpleCost":0,"simpleValue":0,"avgCostPrice":null, "avgEntryPrice":null,"breakEvenPrice":null,"marginCallPrice":null,"liquidationPrice":null,"bankruptPrice":null, "timestamp":"2017-04-04T22:07:46.140Z" - }]}`) + }]}]`) err = b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -926,7 +906,7 @@ func TestWSPositionUpdateHandling(t *testing.T) { func TestWSOrderbookHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ + pressXToJSON := []byte(`[0, "public", "public", { "table":"orderBookL2_25", "keys":["symbol","id","side"], "types":{"id":"long","price":"float","side":"symbol","size":"long","symbol":"symbol"}, @@ -940,76 +920,60 @@ func TestWSOrderbookHandling(t *testing.T) { {"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":10,"price":50}, {"symbol":"ETHUSD","id":17999996000,"side":"Buy","size":20,"price":40}, {"symbol":"ETHUSD","id":17999997000,"side":"Buy","size":100,"price":30} - ] - }`) + ]}]`) err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"update", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":5,"timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"update", - "data":[ - ] - }`) + "data":[]}]`) err = b.wsHandleData(pressXToJSON) - if err == nil { - t.Error("Expected error") - } + require.ErrorContains(t, err, "empty orderbook") - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"delete", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"delete", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if !errors.Is(err, orderbook.ErrOrderbookInvalid) { - t.Error(err) - } + assert.ErrorIs(t, err, orderbook.ErrOrderbookInvalid) } func TestWSDeleveragePositionUpdateHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt","currentQty":2000, "markPrice":1160.72,"posState":"Deleverage","simpleQty":1.746,"liquidationPrice":1140.1, "timestamp":"2017-04-04T22:16:38.460Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } - pressXToJSON = []byte(`{"table":"position", + pressXToJSON = []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -1023,7 +987,7 @@ func TestWSDeleveragePositionUpdateHandling(t *testing.T) { "simpleQty":0,"simpleCost":0,"simpleValue":0,"simplePnl":0,"simplePnlPcnt":0,"avgCostPrice":null, "avgEntryPrice":null,"breakEvenPrice":null,"marginCallPrice":null,"liquidationPrice":null,"bankruptPrice":null, "timestamp":"2017-04-04T22:16:38.547Z" - }]}`) + }]}]`) err = b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1032,7 +996,7 @@ func TestWSDeleveragePositionUpdateHandling(t *testing.T) { func TestWSDeleverageExecutionInsertHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"execution", + pressXToJSON := []byte(`[0, "public", "public", {"table":"execution", "action":"insert", "data":[{ "execID":"20ad1ff4-c110-a4f2-dd31-f94eaa0701fd", @@ -1047,7 +1011,7 @@ func TestWSDeleverageExecutionInsertHandling(t *testing.T) { "trdMatchID":"1e849b8a-7e88-3c67-a93f-cc654d40e8ba","execCost":172306000,"execComm":-43077, "homeNotional":-1.72306,"foreignNotional":2000,"transactTime":"2017-04-04T22:16:38.472Z", "timestamp":"2017-04-04T22:16:38.472Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1056,7 +1020,7 @@ func TestWSDeleverageExecutionInsertHandling(t *testing.T) { func TestWsTrades(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"MinusTick","trdMatchID":"c427f7a0-6b26-1e10-5c4e-1bd74daf2a73","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"95eb9155-b58c-70e9-44b7-34efe50302e0","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"e607c187-f25c-86bc-cb39-8afff7aaf2d9","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":17,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"0f076814-a57d-9a59-8063-ad6b823a80ac","grossValue":439110,"homeNotional":0.1683835182250396,"foreignNotional":43.49346275752773},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"MinusTick","trdMatchID":"f4ef3dfd-51c4-538f-37c1-e5071ba1c75d","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"81ef136b-8f4a-b1cf-78a8-fffbfa89bf40","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"65a87e8c-7563-34a4-d040-94e8513c5401","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":15,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"1d11a74e-a157-3f33-036d-35a101fba50b","grossValue":387375,"homeNotional":0.14857369255150554,"foreignNotional":38.369156101426306},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":1,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"40d49df1-f018-f66f-4ca5-31d4997641d7","grossValue":25825,"homeNotional":0.009904912836767036,"foreignNotional":2.5579437400950873},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"MinusTick","trdMatchID":"36135b51-73e5-c007-362b-a55be5830c6b","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"6ee19edb-99aa-3030-ba63-933ffb347ade","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"d44be603-cdb8-d676-e3e2-f91fb12b2a70","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":5,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"a14b43b3-50b4-c075-c54d-dfb0165de33d","grossValue":129100,"homeNotional":0.04952456418383518,"foreignNotional":12.787242472266245},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":8,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"3c30e175-5194-320c-8f8c-01636c2f4a32","grossValue":206560,"homeNotional":0.07923930269413629,"foreignNotional":20.45958795562599},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":50,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"5b803378-760b-4919-21fc-bfb275d39ace","grossValue":1291000,"homeNotional":0.49524564183835185,"foreignNotional":127.87242472266244},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":244,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"cf57fec1-c444-b9e5-5e2d-4fb643f4fdb7","grossValue":6300080,"homeNotional":2.416798732171157,"foreignNotional":624.0174326465927}]}`) + pressXToJSON := []byte(`[0, "public", "public", {"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"MinusTick","trdMatchID":"c427f7a0-6b26-1e10-5c4e-1bd74daf2a73","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"95eb9155-b58c-70e9-44b7-34efe50302e0","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"e607c187-f25c-86bc-cb39-8afff7aaf2d9","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":17,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"0f076814-a57d-9a59-8063-ad6b823a80ac","grossValue":439110,"homeNotional":0.1683835182250396,"foreignNotional":43.49346275752773},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"MinusTick","trdMatchID":"f4ef3dfd-51c4-538f-37c1-e5071ba1c75d","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"81ef136b-8f4a-b1cf-78a8-fffbfa89bf40","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"65a87e8c-7563-34a4-d040-94e8513c5401","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":15,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"1d11a74e-a157-3f33-036d-35a101fba50b","grossValue":387375,"homeNotional":0.14857369255150554,"foreignNotional":38.369156101426306},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":1,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"40d49df1-f018-f66f-4ca5-31d4997641d7","grossValue":25825,"homeNotional":0.009904912836767036,"foreignNotional":2.5579437400950873},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"MinusTick","trdMatchID":"36135b51-73e5-c007-362b-a55be5830c6b","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"6ee19edb-99aa-3030-ba63-933ffb347ade","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"d44be603-cdb8-d676-e3e2-f91fb12b2a70","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":5,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"a14b43b3-50b4-c075-c54d-dfb0165de33d","grossValue":129100,"homeNotional":0.04952456418383518,"foreignNotional":12.787242472266245},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":8,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"3c30e175-5194-320c-8f8c-01636c2f4a32","grossValue":206560,"homeNotional":0.07923930269413629,"foreignNotional":20.45958795562599},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":50,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"5b803378-760b-4919-21fc-bfb275d39ace","grossValue":1291000,"homeNotional":0.49524564183835185,"foreignNotional":127.87242472266244},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":244,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"cf57fec1-c444-b9e5-5e2d-4fb643f4fdb7","grossValue":6300080,"homeNotional":2.416798732171157,"foreignNotional":624.0174326465927}]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1372,7 +1336,6 @@ func TestGenerateSubscriptions(t *testing.T) { } exp := subscription.List{ - {QualifiedChannel: bitmexWSAnnouncement, Channel: bitmexWSAnnouncement}, {QualifiedChannel: bitmexWSOrderbookL2 + ":" + p[1].String(), Channel: bitmexWSOrderbookL2, Asset: asset.Futures, Pairs: p[1:2]}, {QualifiedChannel: bitmexWSOrderbookL2 + ":" + p[0].String(), Channel: bitmexWSOrderbookL2, Asset: asset.PerpetualContract, Pairs: p[:1]}, {QualifiedChannel: bitmexWSTrade + ":" + p[1].String(), Channel: bitmexWSTrade, Asset: asset.Futures, Pairs: p[1:2]}, @@ -1380,7 +1343,6 @@ func TestGenerateSubscriptions(t *testing.T) { {QualifiedChannel: bitmexWSAffiliate, Channel: bitmexWSAffiliate, Authenticated: true}, {QualifiedChannel: bitmexWSOrder, Channel: bitmexWSOrder, Authenticated: true}, {QualifiedChannel: bitmexWSMargin, Channel: bitmexWSMargin, Authenticated: true}, - {QualifiedChannel: bitmexWSPrivateNotifications, Channel: bitmexWSPrivateNotifications, Authenticated: true}, {QualifiedChannel: bitmexWSTransact, Channel: bitmexWSTransact, Authenticated: true}, {QualifiedChannel: bitmexWSWallet, Channel: bitmexWSWallet, Authenticated: true}, {QualifiedChannel: bitmexWSExecution + ":" + p[0].String(), Channel: bitmexWSExecution, Authenticated: true, Asset: asset.PerpetualContract, Pairs: p[:1]}, @@ -1392,3 +1354,22 @@ func TestGenerateSubscriptions(t *testing.T) { require.NoError(t, err, "generateSubscriptions must not error") testsubs.EqualLists(t, exp, subs) } + +func TestSubscribe(t *testing.T) { + t.Parallel() + b := new(Bitmex) + require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") + subs, err := b.generateSubscriptions() // Note: We grab this before it's overwritten by SetupWs + require.NoError(t, err, "generateSubscriptions must not error") + testexch.SetupWs(t, b) + err = b.Subscribe(subs) + require.NoError(t, err, "Subscribe should not error") + for _, s := range subs { + assert.Equalf(t, subscription.SubscribedState, s.State(), "%s state should be subscribed", s.QualifiedChannel) + } + err = b.Unsubscribe(subs) + require.NoError(t, err, "Unsubscribe should not error") + for _, s := range subs { + assert.Equalf(t, subscription.UnsubscribedState, s.State(), "%s state should be unsusbscribed", s.QualifiedChannel) + } +} diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index 9794f23181b..240a37c9638 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -11,7 +11,9 @@ import ( "text/template" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/crypto" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" @@ -25,7 +27,7 @@ import ( ) const ( - bitmexWSURL = "wss://www.bitmex.com/realtime" + bitmexWSURL = "wss://www.bitmex.com/realtimemd" // Public Subscription Channels bitmexWSAnnouncement = "announcement" @@ -68,13 +70,11 @@ const ( ) var defaultSubscriptions = subscription.List{ - {Enabled: true, Channel: bitmexWSAnnouncement}, {Enabled: true, Channel: bitmexWSOrderbookL2, Asset: asset.All}, {Enabled: true, Channel: bitmexWSTrade, Asset: asset.All}, {Enabled: true, Channel: bitmexWSAffiliate, Authenticated: true}, {Enabled: true, Channel: bitmexWSOrder, Authenticated: true}, {Enabled: true, Channel: bitmexWSMargin, Authenticated: true}, - {Enabled: true, Channel: bitmexWSPrivateNotifications, Authenticated: true}, {Enabled: true, Channel: bitmexWSTransact, Authenticated: true}, {Enabled: true, Channel: bitmexWSWallet, Authenticated: true}, {Enabled: true, Channel: bitmexWSExecution, Authenticated: true, Asset: asset.PerpetualContract}, @@ -83,39 +83,25 @@ var defaultSubscriptions = subscription.List{ // WsConnect initiates a new websocket connection func (b *Bitmex) WsConnect() error { + ctx := context.TODO() + if !b.Websocket.IsEnabled() || !b.IsEnabled() { return stream.ErrWebsocketNotEnabled } var dialer websocket.Dialer - err := b.Websocket.Conn.Dial(&dialer, http.Header{}) - if err != nil { + if err := b.Websocket.Conn.Dial(&dialer, http.Header{}); err != nil { return err } - resp := b.Websocket.Conn.ReadMessage() - if resp.Raw == nil { - return errors.New("connection closed") - } - var welcomeResp WebsocketWelcome - err = json.Unmarshal(resp.Raw, &welcomeResp) - if err != nil { - return err - } - - if b.Verbose { - log.Debugf(log.ExchangeSys, - "Successfully connected to Bitmex %s at time: %s Limit: %d", - welcomeResp.Info, - welcomeResp.Timestamp, - welcomeResp.Limit.Remaining) - } - b.Websocket.Wg.Add(1) go b.wsReadData() + if err := b.wsOpenStream(ctx, b.Websocket.Conn, wsPublicStream); err != nil { + return err + } + if b.Websocket.CanUseAuthenticatedEndpoints() { - err = b.websocketSendAuth(context.TODO()) - if err != nil { + if err := b.websocketSendAuth(ctx); err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", b.Name, err) } @@ -124,6 +110,31 @@ func (b *Bitmex) WsConnect() error { return nil } +const ( + wsPublicStream = "public" + wsPrivateStream = "private" + wsSubscribeOp = "subscribe" + wsUnsubscribeOp = "unsubscribe" + wsMsgPacket = 0 + wsOpenPacket = 1 + wsClosePacket = 2 +) + +func (b *Bitmex) wsOpenStream(ctx context.Context, c stream.Connection, name string) error { + resp, err := c.SendMessageReturnResponse(ctx, request.Unset, "open:"+name, []any{wsOpenPacket, name, name}) + if err != nil { + return err + } + var welcomeResp WebsocketWelcome + if err := json.Unmarshal(resp, &welcomeResp); err != nil { + return err + } + if b.Verbose { + log.Debugf(log.ExchangeSys, "Successfully connected to Bitmex %s websocket API at time: %s Limit: %d", name, welcomeResp.Timestamp, welcomeResp.Limit.Remaining) + } + return nil +} + // wsReadData receives and passes on websocket messages for processing func (b *Bitmex) wsReadData() { defer b.Websocket.Wg.Done() @@ -141,338 +152,321 @@ func (b *Bitmex) wsReadData() { } func (b *Bitmex) wsHandleData(respRaw []byte) error { - quickCapture := make(map[string]interface{}) - err := json.Unmarshal(respRaw, &quickCapture) + msg, _, _, err := jsonparser.Get(respRaw, "[3]") if err != nil { - return err + return fmt.Errorf("unknown message format: %s", respRaw) + } + // We don't need to know about errors, since we're looking optimistically into the json + op, _ := jsonparser.GetString(msg, "request", "op") + errMsg, _ := jsonparser.GetString(msg, "error") + success, _ := jsonparser.GetBoolean(msg, "success") + version, _ := jsonparser.GetString(msg, "version") + switch { + case version != "": + op = "open" + fallthrough + case errMsg != "", success: + streamID, e2 := jsonparser.GetString(respRaw, "[1]") + if e2 != nil { + return fmt.Errorf("%w parsing stream", e2) + } + if !b.Websocket.Match.IncomingWithData(op+":"+streamID, msg) { + return fmt.Errorf("%w: %s:%s", stream.ErrNoMessageListener, op, streamID) + } + return nil } - var respError WebsocketErrorResponse - if _, ok := quickCapture["status"]; ok { - err = json.Unmarshal(respRaw, &respError) + tableName, err := jsonparser.GetString(msg, "table") + if err != nil { + // Anything that's not a table isn't expected + return fmt.Errorf("unknown message format: %s", msg) + } + + switch tableName { + case bitmexWSOrderbookL2, bitmexWSOrderbookL225, bitmexWSOrderbookL10: + var orderbooks OrderBookData + err = json.Unmarshal(msg, &orderbooks) if err != nil { return err } - } + if len(orderbooks.Data) == 0 { + return fmt.Errorf("empty orderbook data received: %s", msg) + } - if _, ok := quickCapture["success"]; ok { - var decodedResp WebsocketSubscribeResp - err = json.Unmarshal(respRaw, &decodedResp) + var pair currency.Pair + var a asset.Item + pair, a, err = b.GetPairAndAssetTypeRequestFormatted(orderbooks.Data[0].Symbol) if err != nil { return err } - if decodedResp.Success { - if len(quickCapture) == 3 { - if b.Verbose { - log.Debugf(log.ExchangeSys, "%s websocket: Successfully subscribed to %s", - b.Name, decodedResp.Subscribe) - } - } else { - b.Websocket.SetCanUseAuthenticatedEndpoints(true) - if b.Verbose { - log.Debugf(log.ExchangeSys, "%s websocket: Successfully authenticated websocket connection", - b.Name) - } - } + err = b.processOrderbook(orderbooks.Data, orderbooks.Action, pair, a) + if err != nil { + return err + } + case bitmexWSTrade: + if !b.IsSaveTradeDataEnabled() { return nil } - - b.Websocket.DataHandler <- fmt.Errorf("%s websocket error: Unable to subscribe %s", - b.Name, decodedResp.Subscribe) - } else if _, ok := quickCapture["table"]; ok { - var decodedResp WebsocketMainResponse - err = json.Unmarshal(respRaw, &decodedResp) + var tradeHolder TradeData + err = json.Unmarshal(msg, &tradeHolder) if err != nil { return err } - switch decodedResp.Table { - case bitmexWSOrderbookL2, bitmexWSOrderbookL225, bitmexWSOrderbookL10: - var orderbooks OrderBookData - err = json.Unmarshal(respRaw, &orderbooks) + var trades []trade.Data + for i := range tradeHolder.Data { + if tradeHolder.Data[i].Price == 0 { + // Please note that indices (symbols starting with .) post trades at intervals to the trade feed. + // These have a size of 0 and are used only to indicate a changing price. + continue + } + var p currency.Pair + var a asset.Item + p, a, err = b.GetPairAndAssetTypeRequestFormatted(tradeHolder.Data[i].Symbol) if err != nil { return err } - if len(orderbooks.Data) == 0 { - return fmt.Errorf("%s - Empty orderbook data received: %s", b.Name, respRaw) + var oSide order.Side + oSide, err = order.StringToOrderSide(tradeHolder.Data[i].Side) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + Err: err, + } } - var pair currency.Pair + trades = append(trades, trade.Data{ + TID: tradeHolder.Data[i].TrdMatchID, + Exchange: b.Name, + CurrencyPair: p, + AssetType: a, + Side: oSide, + Price: tradeHolder.Data[i].Price, + Amount: float64(tradeHolder.Data[i].Size), + Timestamp: tradeHolder.Data[i].Timestamp, + }) + } + return b.AddTradesToBuffer(trades...) + case bitmexWSAnnouncement: + var announcement AnnouncementData + err = json.Unmarshal(msg, &announcement) + if err != nil { + return err + } + + if announcement.Action == bitmexActionInitialData { + return nil + } + + b.Websocket.DataHandler <- announcement.Data + case bitmexWSAffiliate: + var response WsAffiliateResponse + err = json.Unmarshal(msg, &response) + if err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSInstrument: + // ticker + case bitmexWSExecution: + // trades of an order + var response WsExecutionResponse + err = json.Unmarshal(msg, &response) + if err != nil { + return err + } + + for i := range response.Data { + var p currency.Pair var a asset.Item - pair, a, err = b.GetPairAndAssetTypeRequestFormatted(orderbooks.Data[0].Symbol) + p, a, err = b.GetPairAndAssetTypeRequestFormatted(response.Data[i].Symbol) if err != nil { return err } - - err = b.processOrderbook(orderbooks.Data, orderbooks.Action, pair, a) + var oStatus order.Status + oStatus, err = order.StringToOrderStatus(response.Data[i].OrdStatus) if err != nil { - return err - } - case bitmexWSTrade: - if !b.IsSaveTradeDataEnabled() { - return nil + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + Err: err, + } } - var tradeHolder TradeData - err = json.Unmarshal(respRaw, &tradeHolder) + var oSide order.Side + oSide, err = order.StringToOrderSide(response.Data[i].Side) if err != nil { - return err - } - var trades []trade.Data - for i := range tradeHolder.Data { - if tradeHolder.Data[i].Price == 0 { - // Please note that indices (symbols starting with .) post trades at intervals to the trade feed. - // These have a size of 0 and are used only to indicate a changing price. - continue + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + Err: err, } + } + b.Websocket.DataHandler <- &order.Detail{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + AccountID: strconv.FormatInt(response.Data[i].Account, 10), + AssetType: a, + Pair: p, + Status: oStatus, + Trades: []order.TradeHistory{ + { + Price: response.Data[i].Price, + Amount: response.Data[i].OrderQuantity, + Exchange: b.Name, + TID: response.Data[i].ExecID, + Side: oSide, + Timestamp: response.Data[i].Timestamp, + IsMaker: false, + }, + }, + } + } + case bitmexWSOrder: + var response WsOrderResponse + err = json.Unmarshal(msg, &response) + if err != nil { + return err + } + switch response.Action { + case "update", "insert": + for x := range response.Data { var p currency.Pair var a asset.Item - p, a, err = b.GetPairAndAssetTypeRequestFormatted(tradeHolder.Data[i].Symbol) + p, a, err = b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) if err != nil { return err } var oSide order.Side - oSide, err = order.StringToOrderSide(tradeHolder.Data[i].Side) + oSide, err = order.StringToOrderSide(response.Data[x].Side) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, + OrderID: response.Data[x].OrderID, Err: err, } } - - trades = append(trades, trade.Data{ - TID: tradeHolder.Data[i].TrdMatchID, - Exchange: b.Name, - CurrencyPair: p, - AssetType: a, - Side: oSide, - Price: tradeHolder.Data[i].Price, - Amount: float64(tradeHolder.Data[i].Size), - Timestamp: tradeHolder.Data[i].Timestamp, - }) - } - return b.AddTradesToBuffer(trades...) - case bitmexWSAnnouncement: - var announcement AnnouncementData - err = json.Unmarshal(respRaw, &announcement) - if err != nil { - return err - } - - if announcement.Action == bitmexActionInitialData { - return nil - } - - b.Websocket.DataHandler <- announcement.Data - case bitmexWSAffiliate: - var response WsAffiliateResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSInstrument: - // ticker - case bitmexWSExecution: - // trades of an order - var response WsExecutionResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - - for i := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetPairAndAssetTypeRequestFormatted(response.Data[i].Symbol) - if err != nil { - return err - } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[i].OrdStatus) + var oType order.Type + oType, err = order.StringToOrderType(response.Data[x].OrderType) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, - OrderID: response.Data[i].OrderID, + OrderID: response.Data[x].OrderID, Err: err, } } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[i].Side) + var oStatus order.Status + oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, - OrderID: response.Data[i].OrderID, + OrderID: response.Data[x].OrderID, Err: err, } } b.Websocket.DataHandler <- &order.Detail{ + Price: response.Data[x].Price, + Amount: response.Data[x].OrderQuantity, Exchange: b.Name, - OrderID: response.Data[i].OrderID, - AccountID: strconv.FormatInt(response.Data[i].Account, 10), + OrderID: response.Data[x].OrderID, + AccountID: strconv.FormatInt(response.Data[x].Account, 10), + Type: oType, + Side: oSide, + Status: oStatus, AssetType: a, + Date: response.Data[x].TransactTime, Pair: p, - Status: oStatus, - Trades: []order.TradeHistory{ - { - Price: response.Data[i].Price, - Amount: response.Data[i].OrderQuantity, - Exchange: b.Name, - TID: response.Data[i].ExecID, - Side: oSide, - Timestamp: response.Data[i].Timestamp, - IsMaker: false, - }, - }, } } - case bitmexWSOrder: - var response WsOrderResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - switch response.Action { - case "update", "insert": - for x := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) - if err != nil { - return err - } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[x].Side) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oType order.Type - oType, err = order.StringToOrderType(response.Data[x].OrderType) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - b.Websocket.DataHandler <- &order.Detail{ - Price: response.Data[x].Price, - Amount: response.Data[x].OrderQuantity, - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - AccountID: strconv.FormatInt(response.Data[x].Account, 10), - Type: oType, - Side: oSide, - Status: oStatus, - AssetType: a, - Date: response.Data[x].TransactTime, - Pair: p, - } + case "delete": + for x := range response.Data { + var p currency.Pair + var a asset.Item + p, a, err = b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) + if err != nil { + return err } - case "delete": - for x := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) - if err != nil { - return err - } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[x].Side) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oType order.Type - oType, err = order.StringToOrderType(response.Data[x].OrderType) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } + var oSide order.Side + oSide, err = order.StringToOrderSide(response.Data[x].Side) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } + } + var oType order.Type + oType, err = order.StringToOrderType(response.Data[x].OrderType) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, } - b.Websocket.DataHandler <- &order.Detail{ - Price: response.Data[x].Price, - Amount: response.Data[x].OrderQuantity, - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - AccountID: strconv.FormatInt(response.Data[x].Account, 10), - Type: oType, - Side: oSide, - Status: oStatus, - AssetType: a, - Date: response.Data[x].TransactTime, - Pair: p, + } + var oStatus order.Status + oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, } } - default: - b.Websocket.DataHandler <- fmt.Errorf("%s - Unsupported order update %+v", b.Name, response) - } - case bitmexWSMargin: - var response WsMarginResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSPosition: - var response WsPositionResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - - case bitmexWSPrivateNotifications: - var response WsPrivateNotificationsResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSTransact: - var response WsTransactResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSWallet: - var response WsWalletResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err + b.Websocket.DataHandler <- &order.Detail{ + Price: response.Data[x].Price, + Amount: response.Data[x].OrderQuantity, + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + AccountID: strconv.FormatInt(response.Data[x].Account, 10), + Type: oType, + Side: oSide, + Status: oStatus, + AssetType: a, + Date: response.Data[x].TransactTime, + Pair: p, + } } - b.Websocket.DataHandler <- response default: - b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(respRaw)} - return nil + b.Websocket.DataHandler <- fmt.Errorf("%s - Unsupported order update %+v", b.Name, response) + } + case bitmexWSMargin: + var response WsMarginResponse + err = json.Unmarshal(msg, &response) + if err != nil { + return err } + b.Websocket.DataHandler <- response + case bitmexWSPosition: + var response WsPositionResponse + err = json.Unmarshal(msg, &response) + if err != nil { + return err + } + case bitmexWSPrivateNotifications: + var response WsPrivateNotificationsResponse + err = json.Unmarshal(msg, &response) + if err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSTransact: + var response WsTransactResponse + err = json.Unmarshal(msg, &response) + if err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSWallet: + var response WsWalletResponse + err = json.Unmarshal(msg, &response) + if err != nil { + return err + } + b.Websocket.DataHandler <- response + default: + b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(msg)} } + return nil } @@ -567,36 +561,52 @@ func (b *Bitmex) GetSubscriptionTemplate(_ *subscription.Subscription) (*templat // Subscribe subscribes to a websocket channel func (b *Bitmex) Subscribe(subs subscription.List) error { - req := WebsocketRequest{ - Command: "subscribe", - } - for _, s := range subs { - for _, a := range strings.Split(s.QualifiedChannel, ",") { - req.Arguments = append(req.Arguments, a) - } - } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) - if err == nil { - err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, subs...) - } - return err + return common.AppendError( + b.ParallelChanOp(subs.Public(), func(l subscription.List) error { return b.manageSubs(wsSubscribeOp, l, wsPublicStream) }, len(subs)), + b.ParallelChanOp(subs.Private(), func(l subscription.List) error { return b.manageSubs(wsSubscribeOp, l, wsPrivateStream) }, len(subs)), + ) } // Unsubscribe sends a websocket message to stop receiving data from the channel func (b *Bitmex) Unsubscribe(subs subscription.List) error { + return common.AppendError( + b.ParallelChanOp(subs.Public(), func(l subscription.List) error { return b.manageSubs(wsUnsubscribeOp, l, wsPublicStream) }, len(subs)), + b.ParallelChanOp(subs.Private(), func(l subscription.List) error { return b.manageSubs(wsUnsubscribeOp, l, wsPrivateStream) }, len(subs)), + ) +} + +func (b *Bitmex) manageSubs(op string, subs subscription.List, stream string) error { req := WebsocketRequest{ - Command: "unsubscribe", + Command: op, } + exp := map[string]*subscription.Subscription{} for _, s := range subs { - for _, a := range strings.Split(s.QualifiedChannel, ",") { - req.Arguments = append(req.Arguments, a) - } + req.Arguments = append(req.Arguments, s.QualifiedChannel) + exp[s.QualifiedChannel] = s } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) - if err == nil { - err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, subs...) + packet := []any{wsMsgPacket, stream, stream, req} + resps, errs := b.Websocket.Conn.SendMessageReturnResponses(context.TODO(), request.Unset, op+":"+stream, packet, len(subs)) + for _, resp := range resps { + if errMsg, _ := jsonparser.GetString(resp, "error"); errMsg != "" { + errs = common.AppendError(errs, errors.New(errMsg)) + } else { + chanName, err := jsonparser.GetString(resp, op) + if err != nil { + errs = common.AppendError(errs, err) + } + s, ok := exp[chanName] + if !ok { + errs = common.AppendError(errs, fmt.Errorf("%w: %s", subscription.ErrNotFound, chanName)) + } else { + if op == wsSubscribeOp { + errs = common.AppendError(errs, b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s)) + } else { + errs = common.AppendError(errs, b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s)) + } + } + } } - return err + return errs } // WebsocketSendAuth sends an authenticated subscription @@ -605,26 +615,33 @@ func (b *Bitmex) websocketSendAuth(ctx context.Context) error { if err != nil { return err } - b.Websocket.SetCanUseAuthenticatedEndpoints(true) timestamp := time.Now().Add(time.Hour * 1).Unix() newTimestamp := strconv.FormatInt(timestamp, 10) - hmac, err := crypto.GetHMAC(crypto.HashSHA256, - []byte("GET/realtime"+newTimestamp), - []byte(creds.Secret)) + hmac, err := crypto.GetHMAC(crypto.HashSHA256, []byte("GET/realtime"+newTimestamp), []byte(creds.Secret)) if err != nil { return err } signature := crypto.HexEncodeToString(hmac) - sendAuth := WebsocketRequest{ + err = b.wsOpenStream(ctx, b.Websocket.Conn, wsPrivateStream) + if err != nil { + return err + } + req := WebsocketRequest{ Command: "authKeyExpires", Arguments: []any{creds.Key, timestamp, signature}, } - err = b.Websocket.Conn.SendJSONMessage(ctx, request.Unset, sendAuth) + packet := []any{wsMsgPacket, wsPrivateStream, wsPrivateStream, req} + resp, err := b.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.Command+":"+wsPrivateStream, packet) if err != nil { - b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err } + if errMsg, _ := jsonparser.GetString(resp, "error"); errMsg != "" { + return errors.New(errMsg) + } + if b.Verbose { + log.Debugf(log.ExchangeSys, "%s websocket: Successfully authenticated websocket connection", b.Name) + } return nil } @@ -662,8 +679,8 @@ const subTplText = ` {{ range $asset, $pairs := $.AssetPairs }} {{- with $name := channelName $.S $asset }} {{- range $i, $p := $pairs -}} - {{- if $i -}} , {{- end -}} {{- $name -}} : {{- $p -}} + {{ $.PairSeparator }} {{- end }} {{- end }} {{ $.AssetSeparator }}