Skip to content

Commit

Permalink
Bitfinex: Fix first sub message maybe lost
Browse files Browse the repository at this point in the history
The only link we have between a sub req and the sub resp is the subID.
And the only link we have between a sub message and the sub is the chanID.
We can't derive a link using Pair or anything else.

This meant that by sending the resp and its chanID down the IncomingData
channel, we allowed the channel reader to maybe process the next
message, the first message on the channel, before the runtime executed
the switch back to subscribeToChan waiting on the chan.

To fix this, we key initially on subId.(string), and then replace it
with chanId.(int64) when we have it *inside* the wsHandleData so we
know we've procedurally handled it before the next message.

subscribeToChan is then free to remove the subId keyed Sub regardless of
error or not

If there's an error, we don't need to inline handling because there
won't be any second update.

Expands test coverage to make sure those subId keyed subscriptions are
removed.
  • Loading branch information
gbjk committed Oct 25, 2023
1 parent 1984e46 commit 29954a8
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 26 deletions.
18 changes: 16 additions & 2 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,10 @@ func TestWsSubscribe(t *testing.T) {
}
assert.Eventually(t, catcher, sharedtestvalues.WebsocketResponseDefaultTimeout, time.Millisecond*10, "Ticker response should arrive")

subs, err := b.GetSubscriptions()
assert.NoError(t, err, "GetSubscriptions should not error")
assert.Len(t, subs, 1, "We should only have 1 subscription; subID subscription should have been Removed by subscribeToChan")

err = b.Subscribe([]stream.ChannelSubscription{{Channel: wsTicker, Currency: currency.NewPair(currency.BTC, currency.USD), Asset: asset.Spot}})
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Duplicate subscription should error correctly")
catcher = func() bool {
Expand All @@ -1102,8 +1106,10 @@ func TestWsSubscribe(t *testing.T) {
}
assert.Eventually(t, catcher, sharedtestvalues.WebsocketResponseDefaultTimeout, time.Millisecond*10, "error response should arrive")

subs, err := b.GetSubscriptions()
subs, err = b.GetSubscriptions()
assert.NoError(t, err, "GetSubscriptions should not error")
assert.Len(t, subs, 1, "We should only have one subscription after an error attempt")

err = b.Unsubscribe(subs)
assert.NoError(t, err, "Unsubscribing should not error")

Expand Down Expand Up @@ -1249,6 +1255,14 @@ func TestWsSubscribedResponse(t *testing.T) {
m, err := b.Websocket.Match.Set("subscribe:waiter1")
assert.NoError(t, err, "Setting a matcher should not error")
err = b.wsHandleData([]byte(`{"event":"subscribed","channel":"ticker","chanId":224555,"subId":"waiter1","symbol":"tBTCUSD","pair":"BTCUSD"}`))
if assert.Error(t, err, "Should error if sub is not registered yet") {
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Should error SubFailure if sub isn't registered yet")
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Should error SubNotFound if sub isn't registered yet")
assert.ErrorContains(t, err, "waiter1", "Should error containing subID if")
}

b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Key: "waiter1"})
err = b.wsHandleData([]byte(`{"event":"subscribed","channel":"ticker","chanId":224555,"subId":"waiter1","symbol":"tBTCUSD","pair":"BTCUSD"}`))
assert.NoError(t, err, "wsHandleData should not error")
if assert.NotEmpty(t, m.C, "Matcher should have received a sub notification") {
msg := <-m.C
Expand All @@ -1260,7 +1274,7 @@ func TestWsSubscribedResponse(t *testing.T) {
}

func TestWsOrderBook(t *testing.T) {
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsBook, Params: map[string]interface{}{"chanId": 23405}, Key: 23405})
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Key: 23405, Asset: asset.Spot, Currency: btcusdPair, Channel: wsBook})
pressXToJSON := `[23405,[[38334303613,9348.8,0.53],[38334308111,9348.8,5.98979404],[38331335157,9344.1,1.28965787],[38334302803,9343.8,0.08230094],[38334279092,9343,0.8],[38334307036,9342.938663676,0.8],[38332749107,9342.9,0.2],[38332277330,9342.8,0.85],[38329406786,9342,0.1432012],[38332841570,9341.947288638,0.3],[38332163238,9341.7,0.3],[38334303384,9341.6,0.324],[38332464840,9341.4,0.5],[38331935870,9341.2,0.5],[38334312082,9340.9,0.02126899],[38334261292,9340.8,0.26763],[38334138680,9340.625455254,0.12],[38333896802,9339.8,0.85],[38331627527,9338.9,1.57863959],[38334186713,9338.9,0.26769],[38334305819,9338.8,2.999],[38334211180,9338.75285796,3.999],[38334310699,9337.8,0.10679883],[38334307414,9337.5,1],[38334179822,9337.1,0.26773],[38334306600,9336.659955102,1.79],[38334299667,9336.6,1.1],[38334306452,9336.6,0.13979771],[38325672859,9336.3,1.25],[38334311646,9336.2,1],[38334258509,9336.1,0.37],[38334310592,9336,1.79],[38334310378,9335.6,1.43],[38334132444,9335.2,0.26777],[38331367325,9335,0.07],[38334310703,9335,0.10680562],[38334298209,9334.7,0.08757301],[38334304857,9334.456899462,0.291],[38334309940,9334.088390727,0.0725],[38334310377,9333.7,1.2868],[38334297615,9333.607784,0.1108],[38334095188,9333.3,0.26785],[38334228913,9332.7,0.40861186],[38334300526,9332.363996604,0.3884],[38334310701,9332.2,0.10680562],[38334303548,9332.005382871,0.07],[38334311798,9331.8,0.41285228],[38334301012,9331.7,1.7952],[38334089877,9331.4,0.2679],[38321942150,9331.2,0.2],[38334310670,9330,1.069],[38334063096,9329.6,0.26796],[38334310700,9329.4,0.10680562],[38334310404,9329.3,1],[38334281630,9329.1,6.57150597],[38334036864,9327.7,0.26801],[38334310702,9326.6,0.10680562],[38334311799,9326.1,0.50220625],[38334164163,9326,0.219638],[38334309722,9326,1.5],[38333051682,9325.8,0.26807],[38334302027,9325.7,0.75],[38334203435,9325.366592,0.32397696],[38321967613,9325,0.05],[38334298787,9324.9,0.3],[38334301719,9324.8,3.6227592],[38331316716,9324.763454646,0.71442],[38334310698,9323.8,0.10680562],[38334035499,9323.7,0.23431017],[38334223472,9322.670551788,0.42150603],[38334163459,9322.560399006,0.143967],[38321825171,9320.8,2],[38334075805,9320.467496148,0.30772633],[38334075800,9319.916732238,0.61457592],[38333682302,9319.7,0.0011],[38331323088,9319.116771762,0.12913],[38333677480,9319,0.0199],[38334277797,9318.6,0.89],[38325235155,9318.041088,1.20249],[38334310910,9317.82382938,1.79],[38334311811,9317.2,0.61079138],[38334311812,9317.2,0.71937652],[38333298214,9317.1,50],[38334306359,9317,1.79],[38325531545,9316.382823951,0.21263],[38333727253,9316.3,0.02316372],[38333298213,9316.1,45],[38333836479,9316,2.135],[38324520465,9315.9,2.7681],[38334307411,9315.5,1],[38330313617,9315.3,0.84455],[38334077770,9315.294024,0.01248397],[38334286663,9315.294024,1],[38325533762,9315.290315394,2.40498],[38334310018,9315.2,3],[38333682617,9314.6,0.0011],[38334304794,9314.6,0.76364676],[38334304798,9314.3,0.69242113],[38332915733,9313.8,0.0199],[38334084411,9312.8,1],[38334311893,9350.1,-1.015],[38334302734,9350.3,-0.26737],[38334300732,9350.8,-5.2],[38333957619,9351,-0.90677089],[38334300521,9351,-1.6457],[38334301600,9351.012829557,-0.0523],[38334308878,9351.7,-2.5],[38334299570,9351.921544,-0.1015],[38334279367,9352.1,-0.26732],[38334299569,9352.411802928,-0.4036],[38334202773,9353.4,-0.02139404],[38333918472,9353.7,-1.96412776],[38334278782,9354,-0.26731],[38334278606,9355,-1.2785],[38334302105,9355.439221251,-0.79191542],[38313897370,9355.569409242,-0.43363],[38334292995,9355.584296,-0.0979],[38334216989,9355.8,-0.03686414],[38333894025,9355.9,-0.26721],[38334293798,9355.936691952,-0.4311],[38331159479,9356,-0.4204022],[38333918888,9356.1,-1.10885563],[38334298205,9356.4,-0.20124428],[38328427481,9356.5,-0.1],[38333343289,9356.6,-0.41034213],[38334297205,9356.6,-0.08835018],[38334277927,9356.741101161,-0.0737],[38334311645,9356.8,-0.5],[38334309002,9356.9,-5],[38334309736,9357,-0.10680107],[38334306448,9357.4,-0.18645275],[38333693302,9357.7,-0.2672],[38332815159,9357.8,-0.0011],[38331239824,9358.2,-0.02],[38334271608,9358.3,-2.999],[38334311971,9358.4,-0.55],[38333919260,9358.5,-1.9972841],[38334265365,9358.5,-1.7841],[38334277960,9359,-3],[38334274601,9359.020969848,-3],[38326848839,9359.1,-0.84],[38334291080,9359.247048,-0.16199869],[38326848844,9359.4,-1.84],[38333680200,9359.6,-0.26713],[38331326606,9359.8,-0.84454],[38334309738,9359.8,-0.10680107],[38331314707,9359.9,-0.2],[38333919803,9360.9,-1.41177599],[38323651149,9361.33417827,-0.71442],[38333656906,9361.5,-0.26705],[38334035500,9361.5,-0.40861586],[38334091886,9362.4,-6.85940815],[38334269617,9362.5,-4],[38323629409,9362.545858872,-2.40497],[38334309737,9362.7,-0.10680107],[38334312380,9362.7,-3],[38325280830,9362.8,-1.75123],[38326622800,9362.8,-1.05145],[38333175230,9363,-0.0011],[38326848745,9363.2,-0.79],[38334308960,9363.206775564,-0.12],[38333920234,9363.3,-1.25318113],[38326848843,9363.4,-1.29],[38331239823,9363.4,-0.02],[38333209613,9363.4,-0.26719],[38334299964,9364,-0.05583123],[38323470224,9364.161816648,-0.12912],[38334284711,9365,-0.21346019],[38334299594,9365,-2.6757062],[38323211816,9365.073132585,-0.21262],[38334312456,9365.1,-0.11167861],[38333209612,9365.2,-0.26719],[38327770474,9365.3,-0.0073],[38334298788,9365.3,-0.3],[38334075803,9365.409831204,-0.30772637],[38334309740,9365.5,-0.10680107],[38326608767,9365.7,-2.76809],[38333920657,9365.7,-1.25848083],[38329594226,9366.6,-0.02587],[38334311813,9366.7,-4.72290945],[38316386301,9367.39258128,-2.37581],[38334302026,9367.4,-4.5],[38334228915,9367.9,-0.81725458],[38333921381,9368.1,-1.72213641],[38333175678,9368.2,-0.0011],[38334301150,9368.2,-2.654604],[38334297208,9368.3,-0.78036466],[38334309739,9368.3,-0.10680107],[38331227515,9368.7,-0.02],[38331184470,9369,-0.003975],[38334203436,9369.319616,-0.32397695],[38334269964,9369.7,-0.5],[38328386732,9370,-4.11759935],[38332719555,9370,-0.025],[38333921935,9370.5,-1.2224398],[38334258511,9370.5,-0.35],[38326848842,9370.8,-0.34],[38333985038,9370.9,-0.8551502],[38334283018,9370.9,-1],[38326848744,9371,-1.34]],5]`
err := b.wsHandleData([]byte(pressXToJSON))
if err != nil {
Expand Down
77 changes: 53 additions & 24 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,11 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
case map[string]interface{}:
return b.handleWSEvent(respRaw)
case []interface{}:
var chanID int
if f, ok := d[0].(float64); !ok {
chanIDFloat, ok := d[0].(float64)
if !ok {
return common.GetTypeAssertError("float64", d[0], "chanID")
} else { //nolint:revive // using lexical variable requires else statement
chanID = int(f)
}
chanID := int(chanIDFloat)

eventType, hasEventType := d[1].(string)

Expand All @@ -158,7 +157,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
}

if !hasEventType {
return errors.New("WS message without eventType or chanID")
return errors.New("WS message without eventType")
}

switch eventType {
Expand Down Expand Up @@ -433,13 +432,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
}
switch event {
case wsEventSubscribed:
subID, err := jsonparser.GetUnsafeString(respRaw, "subId")
if err != nil {
return fmt.Errorf("%w 'subId': %w from message: %s", errParsingWSField, err, respRaw)
}
if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) {
return fmt.Errorf("%v channel subscribe listener not found", subID)
}
return b.handleWSSubscribed(respRaw)
case wsEventUnsubscribed:
chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId")
if err != nil {
Expand Down Expand Up @@ -497,6 +490,39 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
return nil
}

// handleWSSubscribed parses a subscription response and registers the chanID key immediately, before updating subscribeToChan via IncomingWithData chan
// handleWS* happens sequentially, so by rekeying on chanID immediately we ensure the first message is not dropped
func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
subID, err := jsonparser.GetUnsafeString(respRaw, "subId")
if err != nil {
return fmt.Errorf("%w 'subId': %w from message: %s", errParsingWSField, err, respRaw)
}

c := b.Websocket.GetSubscription(subID)
if c == nil {
return fmt.Errorf("%w: %w subID: %s", stream.ErrSubscriptionFailure, stream.ErrSubscriptionNotFound, subID)
}

chanID, err := jsonparser.GetInt(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Currency)
}

// Note chanID int type avoids conflicts with subID key
c.Key = int(chanID)

// subscribeToChan removes the old subID keyed Subscription
b.Websocket.AddSuccessfulSubscriptions(*c)

if b.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Currency, chanID)
}
if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) {
return fmt.Errorf("%v channel subscribe listener not found", subID)
}
return nil
}

func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, eventType string, d []interface{}) error {
if eventType == wsChecksum {
return b.handleWSChecksum(c, d)
Expand Down Expand Up @@ -1681,10 +1707,24 @@ func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
}

// Although docs only mention this for wsBook, it works for all chans
// subId is a single round-trip identifier that provides linking sub requests to chanIDs
// Although docs only mention subId for wsBook, it works for all chans
subID := strconv.FormatInt(b.Websocket.Conn.GenerateMessageID(false), 10)
req["subId"] = subID

// Add a temporary Key so we can find this Sub when we get the resp without delay or context switch
// Otherwise we might drop the first messages after the subscribed resp
c.Key = subID // Note subID string type avoids conflicts with later chanID key

c.State = stream.ChannelSubscribing
err = b.Websocket.AddSubscription(c)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
}

// Always remove the temporary subscription keyed by subID
defer b.Websocket.RemoveSubscription(c)

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
Expand All @@ -1696,17 +1736,6 @@ func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error {
return wErr
}

chanID, err := jsonparser.GetInt(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Currency)
}

c.Key = int(chanID)
b.Websocket.AddSuccessfulSubscriptions(*c)
if b.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Currency, chanID)
}

return nil
}

Expand Down

0 comments on commit 29954a8

Please sign in to comment.