Skip to content

Commit

Permalink
Bitfinex: Subscribe and Unsubscribe atomicly
Browse files Browse the repository at this point in the history
* Fix Auth failures ignored
* This change makes it so that Subscribe and Unsubscribe wait for success
** Tells the DataHandler about errors
** Errors are returned to consumers
* Subscribes concurrently to the channels
* It also simplifies the chanId to stream mapping
* Removes unable to locate chanID: %d errors which are just noise
* Paves the way for unified channelSubscription id handling
* Adds support for subId for Book subscriptions, which is more robust

* Vastly simplifies what we need to test TestWsSubscribedResponse
This test was working to ensure that the various fancy key parsing
mechanisms all worked. Now that we use subId, we just need a thorough
test of that
* Expose Match.Set in order to capture websocket incoming data
Can't see another way of doing this. Doesn't seem too bad
  • Loading branch information
gbjk committed Sep 18, 2023
1 parent ade2d9c commit 2ebc56b
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 186 deletions.
77 changes: 36 additions & 41 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/thrasher-corp/gocryptotrader/common"
Expand Down Expand Up @@ -54,9 +55,12 @@ func TestMain(m *testing.M) {
if err != nil {
log.Fatal("Bitfinex setup error", err)
}
b.Websocket.Enable()
if err = b.WsConnect(); err != nil {
log.Fatal("Bitfinex setup error", err)
}
b.SetCredentials(apiKey, apiSecret, "", "", "", "")
if !b.Enabled || b.API.AuthenticatedSupport ||
b.Verbose || b.Websocket.IsEnabled() || len(b.BaseCurrencies) < 1 {
if !b.Enabled || b.API.AuthenticatedSupport || len(b.BaseCurrencies) < 1 {
log.Fatal("Bitfinex Setup values not set correctly")
}

Expand Down Expand Up @@ -88,6 +92,25 @@ func TestStart(t *testing.T) {
testWg.Wait()
}

// TestWebsocketSubscribe tests returning a message with an id
// TODO: This test is really just an integration test for development
// We need a better way to test this overall
func TestWebsocketSubscribe(t *testing.T) {
s := stream.ChannelSubscription{
Channel: wsTrades,
Currency: currency.NewPairWithDelimiter("BTC", "USDT", "-"),
Params: map[string]interface{}{
"symbol": "tBTCUST",
},
}
if err := b.Subscribe([]stream.ChannelSubscription{s}); err != nil {
t.Error(err)
}
ss := b.Websocket.GetSubscriptions()
if err := b.Websocket.ResubscribeToChannel(&ss[0]); err != nil {
t.Error(err)
}
}
func TestGetV2MarginFunding(t *testing.T) {
t.Parallel()
sharedtestvalues.SkipTestIfCredentialsUnset(t, b)
Expand Down Expand Up @@ -1206,45 +1229,17 @@ func TestWsCancelOffer(t *testing.T) {
}

func TestWsSubscribedResponse(t *testing.T) {
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTicker, Params: map[string]interface{}{"chanId": 224555}})
if err := b.wsHandleData([]byte(`{"event":"subscribed","channel":"ticker","chanId":224555,"symbol":"tBTCUSD","pair":"BTCUSD"}`)); err != nil {
t.Error(err)
}

// Spot Candles
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsCandles, Params: map[string]interface{}{"chanId": 224556}})
if err := b.wsHandleData([]byte(`{"event":"subscribed","channel":"candles","chanId":224556,"key":"trade:1m:tBTCUSD"}`)); err != nil {
t.Error(err)
}

pair, err := currency.NewPairFromString("BTC:CNHT")
if err != nil {
t.Error(err)
}
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: pair, Channel: wsCandles, Params: map[string]interface{}{"chanId": 224557}})
pressXToJSON := `{"event":"subscribed","channel":"candles","chanId":224557,"key":"trade:1m:tBTC:CNHT"}`
if err = b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)
}

// Margin Candles
pair, err = currency.NewPairFromString("BTC")
if err != nil {
t.Error(err)
}
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.MarginFunding, Currency: pair, Channel: wsCandles, Params: map[string]interface{}{"chanId": 224558}})
if e2 := b.wsHandleData([]byte(`{"event":"subscribed","channel":"candles","chanId":224558,"key":"trade:1m:fBTC:a30:p2:p30"}`)); e2 != nil {
t.Error(e2)
}

pair, err = currency.NewPairFromString("USD")
if err != nil {
t.Error(err)
}
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.MarginFunding, Currency: pair, Channel: wsCandles, Params: map[string]interface{}{"chanId": 224559}})
if e2 := b.wsHandleData([]byte(`{"event":"subscribed","channel":"candles","chanId":224559,"key":"trade:1m:fUSD:p30"}`)); e2 != nil {
t.Error(e2)
}
m, err := b.Websocket.Match.Set("subscribe:waiter1")
assert.NoError(t, err, "Setting a matcher should not error")
err = b.wsHandleData([]byte(`{"event":"subscribed","channel":"ticker","chanId":224555,"subId":"waiter1","symbol":"tBTCUSD","pair":"BTCUSD"}`))
assert.NoError(t, err, "wsHandleData should not error")
if assert.NotEmpty(t, m.C, "Matcher should have received a sub notification") {
msg := <-m.C
cId, err := jsonparser.GetInt(msg, "chanId")
assert.NoError(t, err, "Should get chanId from sub notification without error")
assert.EqualValues(t, 224555, cId, "Should get the correct chanId through the matcher notification")
}
m.Cleanup()
}

func TestWsTradingPairSnapshot(t *testing.T) {
Expand Down
17 changes: 17 additions & 0 deletions exchanges/bitfinex/bitfinex_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bitfinex

import (
"encoding/json"
"errors"
"sync"
"time"
Expand All @@ -14,6 +15,7 @@ var (
errSetCannotBeEmpty = errors.New("set cannot be empty")
errSubNotFound = errors.New("could not find matching subscription")
errTypeAssert = errors.New("type assertion failed")
errUnknownError = errors.New("unknown error")
)

// AccountV2Data stores account v2 data
Expand Down Expand Up @@ -658,6 +660,10 @@ const (
wsTicker = "ticker"
wsTrades = "trades"
wsError = "error"
wsEventSubscribed = "subscribed"
wsEventUnsubscribed = "unsubscribed"
wsEventAuth = "auth"
wsEventError = "error"
)

// WsAuthRequest container for WS auth request
Expand All @@ -670,6 +676,17 @@ type WsAuthRequest struct {
DeadManSwitch int64 `json:"dms,omitempty"`
}

// WsEvent contains response structure for WS sub/unsub/auth
// This type probably isn't used, but is here for completeness
type WsEvent struct {
Event string `json:"event"`
ChanId json.Number `json:"chanId"`
SubId string `json:"subId"`
Status string `json:"status"`
Code json.Number `json:"code"`
Msg string `json:"msg"`
}

// WsFundingOffer funding offer received via websocket
type WsFundingOffer struct {
ID int64
Expand Down
Loading

0 comments on commit 2ebc56b

Please sign in to comment.