Skip to content

Commit

Permalink
subscriptions: Encapsulate, replace Pair with Pairs and refactor; imp…
Browse files Browse the repository at this point in the history
…rove exchange support

* Websocket: Use ErrSubscribedAlready

instead of errChannelAlreadySubscribed

* Subscriptions: Replace Pair with Pairs

Given that some subscriptions have multiple pairs, support that as the
standard.

* Docs: Update subscriptions in add new exch

* RPC: Update Subscription Pairs

* Linter: Disable testifylint.Len

We deliberately use Equal over Len to avoid spamming the contents of large Slices

* Websocket: Add suffix to state consts

* Binance: Subscription Pairs support

* Bitfinex: Subscription Pairs support

* Bithumb: Subscription Pairs support

* Bitmex: Subscription Pairs support

* Bitstamp: Subscription Pairs support

* BTCMarkets: Subscription Pairs support

* BTSE: Subscription Pairs support

* Coinbase: Subscription Pairs support

* Coinut: Subscription Pairs support

* GateIO: Subscription Pairs support

* Gemini: Subscription Pairs support and improvement

* Hitbtc: Subscription Pairs support

* Huboi: Subscription Pairs support

* Kucoin: Subscription Pairs support

* Okcoin: Subscription Pairs support

* Poloniex: Subscription Pairs support

* Kraken: Add subscription Pairs support

Note: This is a naieve implementation because we want to rebase the
kraken websocket rewrite on top of this

* Bybit: Subscription Pairs support

* Okx: Subscription Pairs support

* Bitmex: Subsription configuration

* Fixes unauthenticated websocket left as CanUseAuth
* Fixes auth subs happening privately

* CoinbasePro: Subscription Configuration

* Consolidate ProductIDs when all subscriptions are for the same list

* Websocket: Log actual sent message when Verbose

* Subscriptions: Improve clarity of which key is which in Match

* Subscriptions: Lint fix for HugeParam

* Subscriptions: Add AddPairs and move keys from test

* Subscriptions: Simplify subscription keys and add key types

* Subscriptions: Add List.GroupPairs Rename sub.AddPairs

* Subscription: Fix ExactKey not matching 0 pairs

* Subscriptions: Remove unused IdentityKey and HasPairKey

* Subscriptions: Fix GetKey test

* Subscriptions: Test coverage improvements

* Websocket: Change State on Add/Remove

* Subscriptions: Improve error context

* Subscriptions: Fix Enable: false subs not ignored

* Bitfinex: Fix WsAuth test failing on DataHandler

DataHandler is eaten by dataMonitor now, so we need to use ToRoutine

* Deribit: Subscription Pairs support

* Websocket: Accept nil lists for checkSubscriptions

If the user passes in a nil (implicitly empty) list, we would not panic.
Therefore the burden of correctness about that data lies with them.
The list of subscriptions is empty, and that's okay, and possibly
convenient

* Websocket: Add context to NilPointer errors

* Subscriptions: Add context to nil errors

* Exchange: Fix error expectations in UnsubToWSChans
  • Loading branch information
gbjk authored Jun 7, 2024
1 parent afb6f75 commit 1199f38
Show file tree
Hide file tree
Showing 75 changed files with 4,516 additions and 3,856 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ linters-settings:
disable:
- require-error
- float-compare
# We deliberately use Equal over Len to avoid spamming the contents of large Slices
- len

issues:
max-issues-per-linter: 0
Expand Down
6 changes: 3 additions & 3 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ type FeaturesEnabledConfig struct {

// FeaturesConfig stores the exchanges supported and enabled features
type FeaturesConfig struct {
Supports FeaturesSupportedConfig `json:"supports"`
Enabled FeaturesEnabledConfig `json:"enabled"`
Subscriptions []*subscription.Subscription `json:"subscriptions,omitempty"`
Supports FeaturesSupportedConfig `json:"supports"`
Enabled FeaturesEnabledConfig `json:"enabled"`
Subscriptions subscription.List `json:"subscriptions,omitempty"`
}

// APIEndpointsConfig stores the API endpoint addresses
Expand Down
39 changes: 31 additions & 8 deletions currency/pairs.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func (p Pairs) Lower() Pairs {
return newSlice
}

// Contains checks to see if a specified pair exists inside a currency pair
// array
// Contains checks to see if a specified pair exists inside a currency pair array
func (p Pairs) Contains(check Pair, exact bool) bool {
for i := range p {
if (exact && p[i].Equal(check)) ||
Expand All @@ -121,15 +120,13 @@ func (p Pairs) Contains(check Pair, exact bool) bool {
return false
}

// ContainsAll checks to see if all pairs supplied are contained within the
// original pairs list.
// ContainsAll checks to see if all pairs supplied are contained within the original pairs list
func (p Pairs) ContainsAll(check Pairs, exact bool) error {
if len(check) == 0 {
return ErrCurrencyPairsEmpty
}

comparative := make(Pairs, len(p))
copy(comparative, p)
comparative := slices.Clone(p)
list:
for x := range check {
for y := range comparative {
Expand Down Expand Up @@ -204,8 +201,7 @@ func (p Pairs) GetPairsByCurrencies(currencies Currencies) Pairs {

// Remove removes the specified pair from the list of pairs if it exists
func (p Pairs) Remove(pair Pair) (Pairs, error) {
pairs := make(Pairs, len(p))
copy(pairs, p)
pairs := slices.Clone(p)
for x := range p {
if p[x].Equal(pair) {
return append(pairs[:x], pairs[x+1:]...), nil
Expand Down Expand Up @@ -481,3 +477,30 @@ func (p Pairs) GetPairsByBase(baseTerm Code) (Pairs, error) {
}
return pairs, nil
}

// equalKey is a small key for testing pair equality without delimiter
type equalKey struct {
Base *Item
Quote *Item
}

// Equal checks to see if two lists of pairs contain only the same pairs, ignoring delimiter and case
// Does not check for inverted/reciprocal pairs
func (p Pairs) Equal(b Pairs) bool {
if len(p) != len(b) {
return false
}
if len(p) == 0 {
return true
}
m := map[equalKey]struct{}{}
for i := range p {
m[equalKey{Base: p[i].Base.Item, Quote: p[i].Quote.Item}] = struct{}{}
}
for i := range b {
if _, ok := m[equalKey{Base: b[i].Base.Item, Quote: b[i].Quote.Item}]; !ok {
return false
}
}
return true
}
9 changes: 9 additions & 0 deletions currency/pairs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,12 @@ func TestGetPairsByBase(t *testing.T) {
t.Fatalf("received: '%v' but expected '%v'", len(got), 3)
}
}

// TestPairsEqual exercises Pairs.Equal
func TestPairsEqual(t *testing.T) {
t.Parallel()
orig := Pairs{NewPairWithDelimiter("USDT", "BTC", "-"), NewPair(DAI, XRP), NewPair(DAI, BTC)}
assert.True(t, orig.Equal(Pairs{NewPair(DAI, XRP), NewPair(DAI, BTC), NewPair(USDT, BTC)}), "Equal Pairs should return true")
assert.Equal(t, "USDT-BTC", orig[0].String(), "Equal Pairs should not effect original order or format")
assert.False(t, orig.Equal(Pairs{NewPair(DAI, XRP), NewPair(DAI, BTC), NewPair(USD, LTC)}), "UnEqual Pairs should return false")
}
46 changes: 15 additions & 31 deletions docs/ADD_NEW_EXCHANGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (f *FTX) WsConnect() error {
}
}
// Generates the default subscription set, based off enabled pairs.
subs, err := f.GenerateDefaultSubscriptions()
subs, err := f.generateSubscriptions()
if err != nil {
return err
}
Expand All @@ -733,10 +733,10 @@ func (f *FTX) WsConnect() error {
- Create function to generate default subscriptions:
```go
// GenerateDefaultSubscriptions generates default subscription
func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) {
var subscriptions []subscription.Subscription
subscriptions = append(subscriptions, subscription.Subscription{
// generateSubscriptions generates default subscription
func (f *FTX) generateSubscriptions() (subscription.List, error) {
var subscriptions subscription.List
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: wsMarkets,
})
// Ranges over available channels, pairs and asset types to produce a full
Expand All @@ -754,9 +754,9 @@ func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error
"-")
for x := range channels {
subscriptions = append(subscriptions,
subscription.Subscription{
&subscription.Subscription{
Channel: channels[x],
Pair: newPair,
Pair: currency.Pairs{newPair},
Asset: assets[a],
})
}
Expand All @@ -766,9 +766,7 @@ func (f *FTX) GenerateDefaultSubscriptions() ([]subscription.Subscription, error
if f.IsWebsocketAuthenticationSupported() {
var authchan = []string{wsOrders, wsFills}
for x := range authchan {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: authchan[x],
})
subscriptions = append(subscriptions, &subscription.Subscription{Channel: authchan[x]})
}
}
return subscriptions, nil
Expand Down Expand Up @@ -809,7 +807,7 @@ type WsSub struct {
```go
// Subscribe sends a websocket message to receive data from the channel
func (f *FTX) Subscribe(channelsToSubscribe []subscription.Subscription) error {
func (f *FTX) Subscribe(channelsToSubscribe subscription.List) error {
// For subscriptions we try to batch as much as possible to limit the amount
// of connection usage but sometimes this is not supported on the exchange
// API.
Expand All @@ -825,13 +823,8 @@ channels:
case wsFills, wsOrders, wsMarkets:
// Authenticated wsFills && wsOrders or wsMarkets which is a channel subscription for the full set of tradable markets do not need a currency pair association.
default:
a, err := f.GetPairAssetType(channelsToSubscribe[i].Pair)
if err != nil {
errs = append(errs, err)
continue channels
}
// Ensures our outbound currency pair is formatted correctly, sometimes our configuration format is different from what our request format needs to be.
formattedPair, err := f.FormatExchangeCurrency(channelsToSubscribe[i].Pair, a)
formattedPair, err := f.FormatExchangeCurrency(channelsToSubscribe[i].Pair, channelsToSubscribe[i].Asset)
if err != nil {
errs = append(errs, err)
continue channels
Expand All @@ -846,10 +839,7 @@ channels:
// When we have a successful subscription, we can alert our internal management system of the success.
f.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
}
if errs != nil {
return errs
}
return nil
return errs
}
```
Expand Down Expand Up @@ -1063,7 +1053,7 @@ func (f *FTX) WsAuth(ctx context.Context) error {
```go
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (f *FTX) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error {
func (f *FTX) Unsubscribe(channelsToUnsubscribe subscription.List) error {
// As with subscribing we want to batch as much as possible, but sometimes this cannot be achieved due to API shortfalls.
var errs common.Errors
channels:
Expand All @@ -1074,13 +1064,7 @@ channels:
switch channelsToUnsubscribe[i].Channel {
case wsFills, wsOrders, wsMarkets:
default:
a, err := f.GetPairAssetType(channelsToUnsubscribe[i].Pair)
if err != nil {
errs = append(errs, err)
continue channels
}

formattedPair, err := f.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, a)
formattedPair, err := f.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, channelsToUnsubscribe[i].Asset)
if err != nil {
errs = append(errs, err)
continue channels
Expand Down Expand Up @@ -1135,8 +1119,8 @@ func (f *FTX) Setup(exch *config.Exchange) error {
Subscriber: f.Subscribe,
// Unsubscriber function outlined above.
UnSubscriber: f.Unsubscribe,
// GenerateDefaultSubscriptions function outlined above.
GenerateSubscriptions: f.GenerateDefaultSubscriptions,
// GenerateSubscriptions function outlined above.
GenerateSubscriptions: f.generateSubscriptions,
// Defines the capabilities of the websocket outlined in supported
// features struct. This allows the websocket connection to be flushed
// appropriately if we have a pair/asset enable/disable change. This is
Expand Down
2 changes: 1 addition & 1 deletion engine/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3121,7 +3121,7 @@ func (s *RPCServer) WebsocketGetSubscriptions(_ context.Context, r *gctrpc.Webso
payload.Subscriptions = append(payload.Subscriptions,
&gctrpc.WebsocketSubscription{
Channel: subs[i].Channel,
Pair: subs[i].Pair.String(),
Pairs: subs[i].Pairs.Join(),
Asset: subs[i].Asset.String(),
Params: string(params),
})
Expand Down
15 changes: 8 additions & 7 deletions exchanges/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ const (
Options
OptionCombo
FutureCombo

// Added to represent a USDT and USDC based linear derivatives(futures/perpetual) assets in Bybit V5.
LinearContract
LinearContract // Added to represent a USDT and USDC based linear derivatives(futures/perpetual) assets in Bybit V5
All

optionsFlag = OptionCombo | Options
futuresFlag = PerpetualContract | PerpetualSwap | Futures | DeliveryFutures | UpsideProfitContract | DownsideProfitContract | CoinMarginedFutures | USDTMarginedFutures | USDCMarginedFutures | LinearContract | FutureCombo
Expand All @@ -70,6 +69,7 @@ const (
options = "options"
optionCombo = "option_combo"
futureCombo = "future_combo"
all = "all"
)

var (
Expand Down Expand Up @@ -120,6 +120,8 @@ func (a Item) String() string {
return optionCombo
case FutureCombo:
return futureCombo
case All:
return all
default:
return ""
}
Expand Down Expand Up @@ -225,11 +227,10 @@ func New(input string) (Item, error) {
return OptionCombo, nil
case futureCombo:
return FutureCombo, nil
case all:
return All, nil
default:
return 0, fmt.Errorf("%w '%v', only supports %s",
ErrNotSupported,
input,
supportedList)
return 0, fmt.Errorf("%w '%v', only supports %s", ErrNotSupported, input, supportedList)
}
}

Expand Down
14 changes: 7 additions & 7 deletions exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,7 @@ func TestGetHistoricTrades(t *testing.T) {
if mockTests {
expected = 1002
}
assert.Equal(t, expected, len(result), "GetHistoricTrades should return correct number of entries") //nolint:testifylint // assert.Len doesn't produce clear messages on result
assert.Equal(t, expected, len(result), "GetHistoricTrades should return correct number of entries")
for _, r := range result {
if !assert.WithinRange(t, r.Timestamp, start, end, "All trades should be within time range") {
break
Expand Down Expand Up @@ -1982,7 +1982,7 @@ func BenchmarkWsHandleData(bb *testing.B) {
func TestSubscribe(t *testing.T) {
t.Parallel()
b := b
channels := []subscription.Subscription{
channels := subscription.List{
{Channel: "btcusdt@ticker"},
{Channel: "btcusdt@trade"},
}
Expand All @@ -2008,7 +2008,7 @@ func TestSubscribe(t *testing.T) {

func TestSubscribeBadResp(t *testing.T) {
t.Parallel()
channels := []subscription.Subscription{
channels := subscription.List{
{Channel: "moons@ticker"},
}
mock := func(msg []byte, w *websocket.Conn) error {
Expand Down Expand Up @@ -2434,19 +2434,19 @@ func TestSeedLocalCache(t *testing.T) {

func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
expected := []subscription.Subscription{}
expected := subscription.List{}
pairs, err := b.GetEnabledPairs(asset.Spot)
assert.NoError(t, err, "GetEnabledPairs should not error")
for _, p := range pairs {
for _, c := range []string{"kline_1m", "depth@100ms", "ticker", "trade"} {
expected = append(expected, subscription.Subscription{
expected = append(expected, &subscription.Subscription{
Channel: p.Format(currency.PairFormat{Delimiter: "", Uppercase: false}).String() + "@" + c,
Pair: p,
Pairs: currency.Pairs{p},
Asset: asset.Spot,
})
}
}
subs, err := b.GenerateSubscriptions()
subs, err := b.generateSubscriptions()
assert.NoError(t, err, "GenerateSubscriptions should not error")
if assert.Len(t, subs, len(expected), "Should have the correct number of subs") {
assert.ElementsMatch(t, subs, expected, "Should get the correct subscriptions")
Expand Down
Loading

0 comments on commit 1199f38

Please sign in to comment.