Skip to content

Commit

Permalink
Bitfinex: Subscribe and Unsubscribe atomicly
Browse files Browse the repository at this point in the history
* This change makes it so that Subscribe and Unsubscribe wait for success
** Tells the DataHandler about errors
** Errors are returned to consumers
* Subscribes concurrently to the channels
* It also simplifies the chanId to stream mapping
* Removes unable to locate chanID: %d errors which are just noise
* Paves the way for unified channelSubscription id handling
* Adds support for subId for Book subscriptions, which is more robust
  • Loading branch information
gbjk committed Sep 14, 2023
1 parent 3218982 commit 1f25932
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 145 deletions.
26 changes: 24 additions & 2 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ func TestMain(m *testing.M) {
if err != nil {
log.Fatal("Bitfinex setup error", err)
}
b.Websocket.Enable()
if err = b.WsConnect(); err != nil {
log.Fatal("Bitfinex setup error", err)
}
b.SetCredentials(apiKey, apiSecret, "", "", "", "")
if !b.Enabled || b.API.AuthenticatedSupport ||
b.Verbose || b.Websocket.IsEnabled() || len(b.BaseCurrencies) < 1 {
if !b.Enabled || b.API.AuthenticatedSupport || len(b.BaseCurrencies) < 1 {
log.Fatal("Bitfinex Setup values not set correctly")
}

Expand Down Expand Up @@ -87,6 +90,25 @@ func TestStart(t *testing.T) {
testWg.Wait()
}

// TestWebsocketSubscribe tests returning a message with an id
// TODO: This test is really just an integration test for development
// We need a better way to test this overall
func TestWebsocketSubscribe(t *testing.T) {
s := stream.ChannelSubscription{
Channel: wsTrades,
Currency: currency.NewPairWithDelimiter("BTC", "USDT", "-"),
Params: map[string]interface{}{
"symbol": "tBTCUST",
},
}
if err := b.Subscribe([]stream.ChannelSubscription{s}); err != nil {
t.Error(err)
}
ss := b.Websocket.GetSubscriptions()
if err := b.Websocket.ResubscribeToChannel(&ss[0]); err != nil {
t.Error(err)
}
}
func TestGetV2MarginFunding(t *testing.T) {
t.Parallel()
sharedtestvalues.SkipTestIfCredentialsUnset(t, b)
Expand Down
17 changes: 17 additions & 0 deletions exchanges/bitfinex/bitfinex_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bitfinex

import (
"encoding/json"
"errors"
"sync"
"time"
Expand All @@ -13,6 +14,7 @@ import (
var (
errTypeAssert = errors.New("type assertion failed")
errSetCannotBeEmpty = errors.New("set cannot be empty")
errUnknownError = errors.New("unknown error")
)

// AccountV2Data stores account v2 data
Expand Down Expand Up @@ -657,6 +659,10 @@ const (
wsTicker = "ticker"
wsTrades = "trades"
wsError = "error"
wsEventSubscribed = "subscribed"
wsEventUnsubscribed = "unsubscribed"
wsEventAuth = "auth"
wsEventError = "error"
)

// WsAuthRequest container for WS auth request
Expand All @@ -669,6 +675,17 @@ type WsAuthRequest struct {
DeadManSwitch int64 `json:"dms,omitempty"`
}

// WsEvent contains response structure for WS sub/unsub/auth
// This type probably isn't used, but is here for completeness
type WsEvent struct {
Event string `json:"event"`
ChanId json.Number `json:"chanId"`

Check warning on line 682 in exchanges/bitfinex/bitfinex_types.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: struct field ChanId should be ChanID (revive)
SubId string `json:"subId"`

Check warning on line 683 in exchanges/bitfinex/bitfinex_types.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: struct field SubId should be SubID (revive)
Status string `json:"status"`
Code json.Number `json:"code"`
Msg string `json:"msg"`
}

// WsFundingOffer funding offer received via websocket
type WsFundingOffer struct {
ID int64
Expand Down
Loading

0 comments on commit 1f25932

Please sign in to comment.