Skip to content

Commit

Permalink
Websockets: Rename RemoveUnsuccessfulSubs
Browse files Browse the repository at this point in the history
Implementation doesn't imply Unsuccessful or need to.
This change supports the registering of Pending subs
  • Loading branch information
gbjk committed Oct 26, 2023
1 parent f0b9faa commit e96c82d
Show file tree
Hide file tree
Showing 26 changed files with 37 additions and 45 deletions.
2 changes: 1 addition & 1 deletion docs/ADD_NEW_EXCHANGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ channels:
continue
}
// When we have a successful unsubscription, we can alert our internal management system of the success.
f.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
f.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if errs != nil {
return errs
Expand Down
2 changes: 1 addition & 1 deletion exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func (b *Binance) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription
return err
}
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion exchanges/binanceus/binanceus_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (bi *Binanceus) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscript
return err
}
}
bi.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
bi.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,7 @@ func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error {
}

// Always remove the temporary subscription keyed by subID
defer b.Websocket.RemoveSubscription(c)
defer b.Websocket.RemoveSubscriptions(*c)

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req)
if err != nil {
Expand Down Expand Up @@ -1818,7 +1818,7 @@ func (b *Bitfinex) unsubscribeFromChan(c *stream.ChannelSubscription) error {
return wErr
}

b.Websocket.RemoveSuccessfulUnsubscriptions(*c)
b.Websocket.RemoveSubscriptions(*c)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bitmex/bitmex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (b *Bitmex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
if err != nil {
return err
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (b *Bitstamp) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscriptio
errs = common.AppendError(errs, err)
continue
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bittrex/bittrex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (b *Bittrex) unsubscribeSlice(channelsToUnsubscribe []stream.ChannelSubscri
errs = common.AppendError(errs, errors.New("unable to unsubscribe from "+channels[i]+" - error code "+response.Response[i].ErrorCode))
continue
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/btcmarkets/btcmarkets_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (b *BTCMarkets) Unsubscribe(subs []stream.ChannelSubscription) error {
if err != nil {
return err
}
b.Websocket.RemoveSuccessfulUnsubscriptions(subs...)
b.Websocket.RemoveSubscriptions(subs...)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion exchanges/btse/btse_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,6 @@ func (b *BTSE) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) e
if err != nil {
return err
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}
2 changes: 1 addition & 1 deletion exchanges/bybit/bybit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (by *Bybit) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
errs = common.AppendError(errs, err)
continue
}
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
by.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bybit/bybit_ws_cfutures.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (by *Bybit) UnsubscribeCoin(channelsToUnsubscribe []stream.ChannelSubscript
errs = common.AppendError(errs, err)
continue
}
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
by.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bybit/bybit_ws_futures.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (by *Bybit) UnsubscribeFutures(channelsToUnsubscribe []stream.ChannelSubscr
errs = common.AppendError(errs, err)
continue
}
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
by.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bybit/bybit_ws_ufutures.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (by *Bybit) UnsubscribeUSDT(channelsToUnsubscribe []stream.ChannelSubscript
errs = common.AppendError(errs, err)
continue
}
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
by.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if errs != nil {
return errs
Expand Down
2 changes: 1 addition & 1 deletion exchanges/coinbasepro/coinbasepro_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,6 @@ unsubscriptions:
if err != nil {
return err
}
c.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
c.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}
2 changes: 1 addition & 1 deletion exchanges/coinut/coinut_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (c *COINUT) Unsubscribe(channelToUnsubscribe []stream.ChannelSubscription)
channelToUnsubscribe[i].Channel))
continue
}
c.Websocket.RemoveSuccessfulUnsubscriptions(channelToUnsubscribe[i])
c.Websocket.RemoveSubscriptions(channelToUnsubscribe[i])
}
return errs
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/gateio/gateio_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (g *Gateio) handleSubscription(event string, channelsToSubscribe []stream.C
if payloads[k].Event == "subscribe" {
g.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[k])
} else {
g.Websocket.RemoveSuccessfulUnsubscriptions(channelsToSubscribe[k])
g.Websocket.RemoveSubscriptions(channelsToSubscribe[k])
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/gateio/gateio_ws_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (g *Gateio) handleOptionsSubscription(event string, channelsToSubscribe []s
if payloads[k].Event == "subscribe" {
g.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[k])
} else {
g.Websocket.RemoveSuccessfulUnsubscriptions(channelsToSubscribe[k])
g.Websocket.RemoveSubscriptions(channelsToSubscribe[k])
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/gemini/gemini_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (g *Gemini) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
return err
}

g.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
g.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion exchanges/hitbtc/hitbtc_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (h *HitBTC) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
errs = common.AppendError(errs, err)
continue
}
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
h.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if errs != nil {
return errs
Expand Down
4 changes: 2 additions & 2 deletions exchanges/huobi/huobi_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
errs = common.AppendError(errs, err)
continue
}
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
h.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
continue
}
err := h.Websocket.Conn.SendJSONMessage(WsRequest{
Expand All @@ -616,7 +616,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
errs = common.AppendError(errs, err)
continue
}
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
h.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if errs != nil {
return errs
Expand Down
4 changes: 2 additions & 2 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ channels:
errs = common.AppendError(errs, err)
continue
}
k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...)
k.Websocket.RemoveSubscriptions(unsubs[i].Channels...)
continue
}

Expand All @@ -1356,7 +1356,7 @@ channels:
errs = common.AppendError(errs, err)
continue
}
k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...)
k.Websocket.RemoveSubscriptions(unsubs[i].Channels...)
}
return errs
}
Expand Down
6 changes: 3 additions & 3 deletions exchanges/okcoin/okcoin_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,9 +926,9 @@ func (o *Okcoin) handleSubscriptions(operation string, subs []stream.ChannelSubs

if operation == "unsubscribe" {
if authenticatedChannelSubscription {
o.Websocket.RemoveSuccessfulUnsubscriptions(authChannels...)
o.Websocket.RemoveSubscriptions(authChannels...)
} else {
o.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
o.Websocket.RemoveSubscriptions(channels...)
}
} else {
if authenticatedChannelSubscription {
Expand Down Expand Up @@ -968,7 +968,7 @@ func (o *Okcoin) handleSubscriptions(operation string, subs []stream.ChannelSubs
}
}
if operation == "unsubscribe" {
o.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
o.Websocket.RemoveSubscriptions(channels...)
} else {
o.Websocket.AddSuccessfulSubscriptions(channels...)
}
Expand Down
6 changes: 3 additions & 3 deletions exchanges/okx/okx_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions []stream.Chann
return err
}
if operation == operationUnsubscribe {
ok.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
ok.Websocket.RemoveSubscriptions(channels...)
} else {
ok.Websocket.AddSuccessfulSubscriptions(channels...)
}
Expand All @@ -500,7 +500,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions []stream.Chann
return err
}
if operation == operationUnsubscribe {
ok.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
ok.Websocket.RemoveSubscriptions(channels...)
} else {
ok.Websocket.AddSuccessfulSubscriptions(channels...)
}
Expand Down Expand Up @@ -529,7 +529,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions []stream.Chann

if operation == operationUnsubscribe {
channels = append(channels, authChannels...)
ok.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
ok.Websocket.RemoveSubscriptions(channels...)
} else {
channels = append(channels, authChannels...)
ok.Websocket.AddSuccessfulSubscriptions(channels...)
Expand Down
4 changes: 2 additions & 2 deletions exchanges/poloniex/poloniex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ channels:
errs = common.AppendError(errs, err)
continue channels
}
p.Websocket.RemoveSuccessfulUnsubscriptions(unsub[i])
p.Websocket.RemoveSubscriptions(unsub[i])
continue channels
case strings.EqualFold(strconv.FormatInt(wsTickerDataID, 10),
unsub[i].Channel):
Expand All @@ -652,7 +652,7 @@ channels:
errs = common.AppendError(errs, err)
continue
}
p.Websocket.RemoveSuccessfulUnsubscriptions(unsub[i])
p.Websocket.RemoveSubscriptions(unsub[i])
}
if errs != nil {
return errs
Expand Down
12 changes: 2 additions & 10 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,13 +973,6 @@ func (w *Websocket) SetSubscriptionState(c *ChannelSubscription, state ChannelSt
return nil
}

// RemoveSubscription removes subscriptions from the subscription list
// It is a duplicate of RemoveSuccessfulUnsubscriptions for shorterm naming clarity
// We should harmonise Add* and Remove* to not imply sub state in these method names
func (w *Websocket) RemoveSubscription(c *ChannelSubscription) {
w.RemoveSuccessfulUnsubscriptions(*c)
}

// AddSuccessfulSubscriptions adds subscriptions to the subscription lists that
// has been successfully subscribed
func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription) {
Expand All @@ -996,9 +989,8 @@ func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription)
}
}

// RemoveSuccessfulUnsubscriptions removes subscriptions from the subscription
// list that has been successfulling unsubscribed
func (w *Websocket) RemoveSuccessfulUnsubscriptions(channels ...ChannelSubscription) {
// RemoveSubscriptions removes subscriptions from the subscription list
func (w *Websocket) RemoveSubscriptions(channels ...ChannelSubscription) {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
if w.subscriptions == nil {
Expand Down
6 changes: 3 additions & 3 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func TestSubscribeUnsubscribe(t *testing.T) {
return nil
}
fnUnsub := func(unsubs []ChannelSubscription) error {
ws.RemoveSuccessfulUnsubscriptions(unsubs...)
ws.RemoveSubscriptions(unsubs...)
return nil
}
ws.Subscriber = fnSub
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestResubscribe(t *testing.T) {
return nil
}
fnUnsub := func(unsubs []ChannelSubscription) error {
ws.RemoveSuccessfulUnsubscriptions(unsubs...)
ws.RemoveSubscriptions(unsubs...)
return nil
}
ws.Subscriber = fnSub
Expand Down Expand Up @@ -609,7 +609,7 @@ func TestRemoveSubscription(t *testing.T) {
assert.NoError(t, ws.AddSubscription(c), "Adding first subscription should not error")
assert.NotNil(t, ws.GetSubscription(42), "Added subscription should be findable")

ws.RemoveSubscription(c)
ws.RemoveSubscriptions(*c)
assert.Nil(t, ws.GetSubscription(42), "Remove should have removed the sub")
}

Expand Down

0 comments on commit e96c82d

Please sign in to comment.