Skip to content

Commit

Permalink
Subs: Renamed Currency to Pair
Browse files Browse the repository at this point in the history
This was being mis-used through much of the code, and since we're
already touching everything, we might as well fix it
  • Loading branch information
gbjk committed Nov 7, 2023
1 parent a3f3c7f commit 336697c
Show file tree
Hide file tree
Showing 39 changed files with 2,208 additions and 2,204 deletions.
8 changes: 4 additions & 4 deletions engine/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3102,10 +3102,10 @@ func (s *RPCServer) WebsocketGetSubscriptions(_ context.Context, r *gctrpc.Webso
}
payload.Subscriptions = append(payload.Subscriptions,
&gctrpc.WebsocketSubscription{
Channel: subs[i].Channel,
Currency: subs[i].Currency.String(),
Asset: subs[i].Asset.String(),
Params: string(params),
Channel: subs[i].Channel,
Pair: subs[i].Pair.String(),
Asset: subs[i].Asset.String(),
Params: string(params),
})
}
return payload, nil
Expand Down
6 changes: 3 additions & 3 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,9 @@ func (b *Binance) GenerateSubscriptions() ([]subscription.Subscription, error) {
lp := pairs[y].Lower()
lp.Delimiter = ""
subscriptions = append(subscriptions, subscription.Subscription{
Channel: lp.String() + channels[z],
Currency: pairs[y],
Asset: assets[x],
Channel: lp.String() + channels[z],
Pair: pairs[y],
Asset: assets[x],
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions exchanges/binanceus/binanceus_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,9 @@ subs:
break subs
}
subscriptions = append(subscriptions, subscription.Subscription{
Channel: lp.String() + channels[z],
Currency: pairs[y],
Asset: asset.Spot,
Channel: lp.String() + channels[z],
Pair: pairs[y],
Asset: asset.Spot,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func TestWsAuth(t *testing.T) {
// See also TestSubscribeReq which covers key and symbol conversion
func TestWsSubscribe(t *testing.T) {
setupWs(t)
err := b.Subscribe([]subscription.Subscription{{Channel: wsTicker, Currency: currency.NewPair(currency.BTC, currency.USD), Asset: asset.Spot}})
err := b.Subscribe([]subscription.Subscription{{Channel: wsTicker, Pair: currency.NewPair(currency.BTC, currency.USD), Asset: asset.Spot}})
assert.NoError(t, err, "Subrcribe should not error")
catcher := func() (ok bool) {
i := <-b.Websocket.DataHandler
Expand Down
40 changes: 20 additions & 20 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {

chanID, err := jsonparser.GetInt(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Currency)
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Pair)
}

// Note: chanID's int type avoids conflicts with the string type subID key because of the type difference
Expand All @@ -516,7 +516,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
b.Websocket.AddSuccessfulSubscriptions(*c)

if b.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Currency, chanID)
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Pair, chanID)
}
if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) {
return fmt.Errorf("%v channel subscribe listener not found", subID)
Expand Down Expand Up @@ -632,7 +632,7 @@ func (b *Bitfinex) handleWSBookUpdate(c *subscription.Subscription, d []interfac
Amount: rateAmount})
}
}
if err := b.WsInsertSnapshot(c.Currency, c.Asset, newOrderbook, fundingRate); err != nil {
if err := b.WsInsertSnapshot(c.Pair, c.Asset, newOrderbook, fundingRate); err != nil {
return fmt.Errorf("inserting snapshot error: %s",
err)
}
Expand Down Expand Up @@ -664,7 +664,7 @@ func (b *Bitfinex) handleWSBookUpdate(c *subscription.Subscription, d []interfac
Amount: amountRate})
}

if err := b.WsUpdateOrderbook(c, c.Currency, c.Asset, newOrderbook, int64(sequenceNo), fundingRate); err != nil {
if err := b.WsUpdateOrderbook(c, c.Pair, c.Asset, newOrderbook, int64(sequenceNo), fundingRate); err != nil {
return fmt.Errorf("updating orderbook error: %s",
err)
}
Expand Down Expand Up @@ -712,7 +712,7 @@ func (b *Bitfinex) handleWSCandleUpdate(c *subscription.Subscription, d []interf
}
klineData.Exchange = b.Name
klineData.AssetType = c.Asset
klineData.Pair = c.Currency
klineData.Pair = c.Pair
b.Websocket.DataHandler <- klineData
}
case float64:
Expand Down Expand Up @@ -741,7 +741,7 @@ func (b *Bitfinex) handleWSCandleUpdate(c *subscription.Subscription, d []interf
}
klineData.Exchange = b.Name
klineData.AssetType = c.Asset
klineData.Pair = c.Currency
klineData.Pair = c.Pair
b.Websocket.DataHandler <- klineData
}
return nil
Expand All @@ -755,7 +755,7 @@ func (b *Bitfinex) handleWSTickerUpdate(c *subscription.Subscription, d []interf

t := &ticker.Price{
AssetType: c.Asset,
Pair: c.Currency,
Pair: c.Pair,
ExchangeName: b.Name,
}

Expand Down Expand Up @@ -936,7 +936,7 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
CurrencyPair: c.Currency,
CurrencyPair: c.Pair,
Timestamp: time.UnixMilli(tradeHolder[i].Timestamp),
Price: price,
Amount: newAmount,
Expand Down Expand Up @@ -1604,7 +1604,7 @@ func (b *Bitfinex) WsUpdateOrderbook(c *subscription.Subscription, p currency.Pa
// which forces a fresh snapshot. If we don't do this the orderbook will keep erroring and drifting.
// Flushing the orderbook happens immediately, but the ReSub itself is a go routine to avoid blocking the WS data channel
func (b *Bitfinex) resubOrderbook(c *subscription.Subscription) {
if err := b.Websocket.Orderbook.FlushOrderbook(c.Currency, c.Asset); err != nil {
if err := b.Websocket.Orderbook.FlushOrderbook(c.Pair, c.Asset); err != nil {
log.Errorf(log.ExchangeSys, "%s error flushing orderbook: %v", b.Name, err)
}

Expand Down Expand Up @@ -1644,10 +1644,10 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() ([]subscription.Subscription,
}

subscriptions = append(subscriptions, subscription.Subscription{
Channel: channels[j],
Currency: enabledPairs[k],
Params: params,
Asset: assets[i],
Channel: channels[j],
Pair: enabledPairs[k],
Params: params,
Asset: assets[i],
})
}
}
Expand Down Expand Up @@ -1705,7 +1705,7 @@ func (b *Bitfinex) parallelChanOp(channels []subscription.Subscription, m func(*
func (b *Bitfinex) subscribeToChan(c *subscription.Subscription) error {
req, err := subscribeReq(c)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pair)
}

// subId is a single round-trip identifier that provides linking sub requests to chanIDs
Expand All @@ -1720,19 +1720,19 @@ func (b *Bitfinex) subscribeToChan(c *subscription.Subscription) error {
c.State = subscription.SubscribingState
err = b.Websocket.AddSubscription(c)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pair, err)
}

// Always remove the temporary subscription keyed by subID
defer b.Websocket.RemoveSubscriptions(*c)

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pair)
}

if err = b.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pair)
b.Websocket.DataHandler <- wErr
return wErr
}
Expand Down Expand Up @@ -1764,13 +1764,13 @@ func subscribeReq(c *subscription.Subscription) (map[string]interface{}, error)
prefix = "f"
}

needsDelimiter := c.Currency.Len() > 6
needsDelimiter := c.Pair.Len() > 6

var formattedPair string
if needsDelimiter {
formattedPair = c.Currency.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String()
formattedPair = c.Pair.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String()
} else {
formattedPair = currency.PairFormat{Uppercase: true}.Format(c.Currency)
formattedPair = currency.PairFormat{Uppercase: true}.Format(c.Pair)
}

if c.Channel == wsCandles {
Expand Down
8 changes: 4 additions & 4 deletions exchanges/bithumb/bithumb_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ func (b *Bithumb) GenerateSubscriptions() ([]subscription.Subscription, error) {
for x := range pairs {
for y := range channels {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: channels[y],
Currency: pairs[x].Format(pFmt),
Asset: asset.Spot,
Channel: channels[y],
Pair: pairs[x].Format(pFmt),
Asset: asset.Spot,
})
}
}
Expand All @@ -204,7 +204,7 @@ func (b *Bithumb) Subscribe(channelsToSubscribe []subscription.Subscription) err
}
subs[channelsToSubscribe[i].Channel] = s
}
s.Symbols = append(s.Symbols, channelsToSubscribe[i].Currency)
s.Symbols = append(s.Symbols, channelsToSubscribe[i].Pair)
}

tSub, ok := subs["ticker"]
Expand Down
12 changes: 6 additions & 6 deletions exchanges/bitmex/bitmex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,9 @@ func (b *Bitmex) GenerateDefaultSubscriptions() ([]subscription.Subscription, er
continue
}
subscriptions = append(subscriptions, subscription.Subscription{
Channel: channels[z] + ":" + pFmt.Format(contracts[y]),
Currency: contracts[y],
Asset: assets[x],
Channel: channels[z] + ":" + pFmt.Format(contracts[y]),
Pair: contracts[y],
Asset: assets[x],
})
}
}
Expand Down Expand Up @@ -620,9 +620,9 @@ func (b *Bitmex) GenerateAuthenticatedSubscriptions() ([]subscription.Subscripti
for i := range channels {
for j := range contracts {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: channels[i] + ":" + pFmt.Format(contracts[j]),
Currency: contracts[j],
Asset: asset.PerpetualContract,
Channel: channels[i] + ":" + pFmt.Format(contracts[j]),
Pair: contracts[j],
Asset: asset.PerpetualContract,
})
}
}
Expand Down
12 changes: 6 additions & 6 deletions exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,17 @@ func (b *Bitstamp) generateDefaultSubscriptions() ([]subscription.Subscription,
}
for j := range defaultSubChannels {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: defaultSubChannels[j] + "_" + p.String(),
Asset: asset.Spot,
Currency: p,
Channel: defaultSubChannels[j] + "_" + p.String(),
Asset: asset.Spot,
Pair: p,
})
}
if b.Websocket.CanUseAuthenticatedEndpoints() {
for j := range defaultAuthSubChannels {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: defaultAuthSubChannels[j] + "_" + p.String(),
Asset: asset.Spot,
Currency: p,
Channel: defaultAuthSubChannels[j] + "_" + p.String(),
Asset: asset.Spot,
Pair: p,
Params: map[string]interface{}{
"auth": struct{}{},
},
Expand Down
6 changes: 3 additions & 3 deletions exchanges/bittrex/bittrex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ func (b *Bittrex) GenerateDefaultSubscriptions() ([]subscription.Subscription, e
}
subscriptions = append(subscriptions,
subscription.Subscription{
Channel: channel,
Currency: pair,
Asset: asset.Spot,
Channel: channel,
Pair: pair,
Asset: asset.Spot,
})
}
}
Expand Down
20 changes: 10 additions & 10 deletions exchanges/btcmarkets/btcmarkets_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ func (b *BTCMarkets) generateDefaultSubscriptions() ([]subscription.Subscription
for i := range channels {
for j := range enabledCurrencies {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: channels[i],
Currency: enabledCurrencies[j],
Asset: asset.Spot,
Channel: channels[i],
Pair: enabledCurrencies[j],
Asset: asset.Spot,
})
}
}
Expand Down Expand Up @@ -370,10 +370,10 @@ func (b *BTCMarkets) Subscribe(subs []subscription.Subscription) error {
authenticate = true
}
payload.Channels = append(payload.Channels, subs[i].Channel)
if subs[i].Currency.IsEmpty() {
if subs[i].Pair.IsEmpty() {
continue
}
pair := subs[i].Currency.String()
pair := subs[i].Pair.String()
if common.StringDataCompare(payload.MarketIDs, pair) {
continue
}
Expand Down Expand Up @@ -415,11 +415,11 @@ func (b *BTCMarkets) Unsubscribe(subs []subscription.Subscription) error {
}
for i := range subs {
payload.Channels = append(payload.Channels, subs[i].Channel)
if subs[i].Currency.IsEmpty() {
if subs[i].Pair.IsEmpty() {
continue
}

pair := subs[i].Currency.String()
pair := subs[i].Pair.String()
if common.StringDataCompare(payload.MarketIDs, pair) {
continue
}
Expand All @@ -438,9 +438,9 @@ func (b *BTCMarkets) Unsubscribe(subs []subscription.Subscription) error {
// again to fetch a new snapshot in the event of a de-sync event.
func (b *BTCMarkets) ReSubscribeSpecificOrderbook(pair currency.Pair) error {
sub := []subscription.Subscription{{
Channel: wsOB,
Currency: pair,
Asset: asset.Spot,
Channel: wsOB,
Pair: pair,
Asset: asset.Spot,
}}
if err := b.Unsubscribe(sub); err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions exchanges/btse/btse_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,9 @@ func (b *BTSE) GenerateDefaultSubscriptions() ([]subscription.Subscription, erro
for i := range channels {
for j := range pairs {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: fmt.Sprintf(channels[i], pairs[j]),
Currency: pairs[j],
Asset: asset.Spot,
Channel: fmt.Sprintf(channels[i], pairs[j]),
Pair: pairs[j],
Asset: asset.Spot,
})
}
}
Expand Down
10 changes: 5 additions & 5 deletions exchanges/bybit/bybit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (by *Bybit) Subscribe(channelsToSubscribe []subscription.Subscription) erro
subReq.Topic = channelsToSubscribe[i].Channel
subReq.Event = sub

formattedPair, err := by.FormatExchangeCurrency(channelsToSubscribe[i].Currency, asset.Spot)
formattedPair, err := by.FormatExchangeCurrency(channelsToSubscribe[i].Pair, asset.Spot)
if err != nil {
errs = common.AppendError(errs, err)
continue
Expand Down Expand Up @@ -158,7 +158,7 @@ func (by *Bybit) Unsubscribe(channelsToUnsubscribe []subscription.Subscription)
unSub.Event = cancel
unSub.Topic = channelsToUnsubscribe[i].Channel

formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Currency, asset.Spot)
formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, asset.Spot)
if err != nil {
errs = common.AppendError(errs, err)
continue
Expand Down Expand Up @@ -200,9 +200,9 @@ func (by *Bybit) GenerateDefaultSubscriptions() ([]subscription.Subscription, er
for x := range channels {
subscriptions = append(subscriptions,
subscription.Subscription{
Channel: channels[x],
Currency: pairs[z],
Asset: asset.Spot,
Channel: channels[x],
Pair: pairs[z],
Asset: asset.Spot,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bybit/bybit_ws_cfutures.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (by *Bybit) UnsubscribeCoin(channelsToUnsubscribe []subscription.Subscripti
var unSub WsFuturesReq
unSub.Topic = unsubscribe

formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Currency, asset.CoinMarginedFutures)
formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, asset.CoinMarginedFutures)
if err != nil {
errs = common.AppendError(errs, err)
continue
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bybit/bybit_ws_futures.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (by *Bybit) UnsubscribeFutures(channelsToUnsubscribe []subscription.Subscri
var unSub WsFuturesReq
unSub.Topic = unsubscribe

formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Currency, asset.Futures)
formattedPair, err := by.FormatExchangeCurrency(channelsToUnsubscribe[i].Pair, asset.Futures)
if err != nil {
errs = common.AppendError(errs, err)
continue
Expand Down
Loading

0 comments on commit 336697c

Please sign in to comment.