diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index 7d2c111e7e5..86c5e42f159 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -1,10 +1,12 @@ package bitfinex import ( + "bufio" "context" "errors" "log" "os" + "strconv" "sync" "testing" "time" @@ -1074,16 +1076,89 @@ func TestWsAuth(t *testing.T) { } // TestWsSubscribe tests Subscribe and Unsubscribe functionality +// See also TestSubscribeReq which covers key and symbol conversion func TestWsSubscribe(t *testing.T) { setupWs(t) - defSubs, err := b.GenerateDefaultSubscriptions() - assert.NoError(t, err) - err = b.Subscribe([]stream.ChannelSubscription{defSubs[0]}) - assert.NoError(t, err) - s, err := b.GetSubscriptions() - assert.NoError(t, err) - err = b.Unsubscribe(s) - assert.NoError(t, err) + err := b.Subscribe([]stream.ChannelSubscription{{Channel: wsTicker, Currency: currency.NewPair(currency.BTC, currency.USD), Asset: asset.Spot}}) + assert.NoError(t, err, "Subrcribe should not error") + catcher := func() (ok bool) { + i := <-b.Websocket.DataHandler + _, ok = i.(*ticker.Price) + return + } + assert.Eventually(t, catcher, sharedtestvalues.WebsocketResponseDefaultTimeout, time.Millisecond*10, "Ticker response should arrive") + + err = b.Subscribe([]stream.ChannelSubscription{{Channel: wsTicker, Currency: currency.NewPair(currency.BTC, currency.USD), Asset: asset.Spot}}) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Duplicate subscription should error correctly") + catcher = func() bool { + i := <-b.Websocket.DataHandler + if e, ok := i.(error); ok { + if assert.ErrorIs(t, e, stream.ErrSubscriptionFailure, "Error should go to DataHandler") { + assert.ErrorContains(t, e, "subscribe: dup (code: 10301)", "Error should contain message and code") + return true + } + } + return false + } + assert.Eventually(t, catcher, sharedtestvalues.WebsocketResponseDefaultTimeout, time.Millisecond*10, "error response should arrive") + + subs, err := b.GetSubscriptions() + assert.NoError(t, err, "GetSubscriptions should not error") + err = b.Unsubscribe(subs) + assert.NoError(t, err, "Unsubscribing should not error") + + chanID, ok := subs[0].Key.(int) + assert.True(t, ok, "sub.Key should be an int") + + err = b.Unsubscribe(subs) + assert.ErrorIs(t, err, stream.ErrUnsubscribeFailure, "Unsubscribe should error") + assert.ErrorContains(t, err, strconv.Itoa(chanID), "Unsubscribe should contain correct chanId") + assert.ErrorContains(t, err, "unsubscribe: invalid (code: 10400)", "Unsubscribe should contain correct upstream error") + + err = b.Subscribe([]stream.ChannelSubscription{{ + Channel: wsTicker, + Currency: currency.NewPair(currency.BTC, currency.USD), + Asset: asset.Spot, + Params: map[string]interface{}{"key": "tBTCUSD"}, + }}) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Trying to use a 'key' param should error ErrSubscriptionFailure") + assert.ErrorIs(t, err, errParamNotAllowed, "Trying to use a 'key' param should error errParamNotAllowed") +} + +// TestSubscribeReq tests the channel to request map marshalling +func TestSubscribeReq(t *testing.T) { + c := &stream.ChannelSubscription{ + Channel: wsCandles, + Currency: currency.NewPair(currency.BTC, currency.USD), + Asset: asset.MarginFunding, + Params: map[string]interface{}{ + CandlesPeriodKey: "30", + }, + } + r, err := subscribeReq(c) + assert.NoError(t, err, "subscribeReq should not error") + assert.Equal(t, "trade:1m:fBTCUSD:p30", r["key"], "key contain period and default timeframe") + + c.Params = map[string]interface{}{ + CandlesTimeframeKey: "15m", + } + r, err = subscribeReq(c) + assert.NoError(t, err, "subscribeReq should not error") + assert.Equal(t, "trade:15m:fBTCUSD", r["key"], "key should be contain specific timeframe and no period") + + c = &stream.ChannelSubscription{ + Channel: wsBook, + Currency: currency.NewPair(currency.BTC, currency.DOGE), + Asset: asset.Spot, + } + r, err = subscribeReq(c) + assert.NoError(t, err, "subscribeReq should not error") + assert.Equal(t, "tBTC:DOGE", r["symbol"], "symbol should use colon delimiter if a currency is > 3 chars") + + c.Currency = currency.NewPair(currency.BTC, currency.LTC) + r, err = subscribeReq(c) + assert.NoError(t, err, "subscribeReq should not error") + assert.Equal(t, "tBTCLTC", r["symbol"], "symbol should not use colon delimiter if both currencies < 3 chars") } // TestWsPlaceOrder dials websocket, sends order request. @@ -1796,6 +1871,75 @@ func TestCancelMultipleOrdersV2(t *testing.T) { } } +// TestGetErrResp unit tests the helper func getErrResp +func TestGetErrResp(t *testing.T) { + t.Parallel() + fixture, err := os.Open("testdata/getErrResp.json") + if !assert.NoError(t, err, "Opening fixture should not error") { + t.FailNow() + } + s := bufio.NewScanner(fixture) + seen := 0 + for s.Scan() { + testErr := b.getErrResp(s.Bytes()) + seen++ + switch seen { + case 1: // no event + assert.ErrorIs(t, testErr, errParsingWSField, "Message with no event Should get correct error type") + assert.ErrorContains(t, testErr, "'event'", "Message with no event error should contain missing field name") + assert.ErrorContains(t, testErr, "nightjar", "Message with no event error should contain the message") + case 2: // with {} for event + assert.NoError(t, testErr, "Message with '{}' for event field should not error") + case 3: // event != 'error' + assert.NoError(t, testErr, "Message with non-'error' event field should not error") + case 4: // event="error" + assert.ErrorIs(t, testErr, errUnknownError, "error without a message should throw unknown error") + assert.ErrorContains(t, testErr, "code: 0", "error without a code should throw code 0") + case 5: // Fully formatted + assert.ErrorContains(t, testErr, "redcoats", "message field should be in the error") + assert.ErrorContains(t, testErr, "code: 42", "code field should be in the error") + } + } + assert.NoError(t, s.Err(), "Fixture Scanner should not error") + assert.NoError(t, fixture.Close(), "Closing the fixture file should not error") +} + +// TestParallelChanOp unit tests the helper func parallelChanOp +func TestParallelChanOp(t *testing.T) { + t.Parallel() + c := []stream.ChannelSubscription{ + {Channel: "red"}, + {Channel: "blue"}, + {Channel: "violent"}, + {Channel: "spin"}, + {Channel: "charm"}, + } + var testErr error + run := make(chan struct{}, 5) + go func() { + testErr = b.parallelChanOp(c, func(c *stream.ChannelSubscription) error { + time.Sleep(300 * time.Millisecond) + run <- struct{}{} + switch c.Channel { + case "spin", "violent": + return errors.New(c.Channel) + } + return nil + }) + close(run) + }() + f := func(c *assert.CollectT) { + assert.ErrorContains(c, testErr, "violent", "Should get a violent error") + assert.ErrorContains(c, testErr, "spin", "Should get a spin error") + } + assert.EventuallyWithT(t, f, 500*time.Millisecond, 50*time.Millisecond, "ParallelChanOp should complete within 500ms not 5*300ms") + got := 0 + for range run { + got++ + } + assert.Equal(t, got, 5, "Every channel was run to completion") +} + // setupWs is a helper function to connect both auth and normal websockets // It will skip the test if websockets are not enabled // It's up to the test to skip if it requires creds, though @@ -1810,8 +1954,11 @@ func setupWs(tb testing.TB) { if wsConnected { return } - wsConnected = true // We don't use b.websocket.Connect() because it'd subscribe to channels err := b.WsConnect() - assert.NoError(tb, err, "WsConnect should not error") + if !assert.NoError(tb, err, "WsConnect should not error") { + tb.FailNow() + } + + wsConnected = true } diff --git a/exchanges/bitfinex/bitfinex_types.go b/exchanges/bitfinex/bitfinex_types.go index 0ab1bb50a79..e99a1171042 100644 --- a/exchanges/bitfinex/bitfinex_types.go +++ b/exchanges/bitfinex/bitfinex_types.go @@ -15,6 +15,8 @@ var ( errTypeAssert = errors.New("type assertion failed") errNoSeqNo = errors.New("no sequence number") errUnknownError = errors.New("unknown error") + errParamNotAllowed = errors.New("param not allowed") + errParsingWSField = errors.New("error parsing WS field") ) // AccountV2Data stores account v2 data diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 1f18e0f2544..14e60c78fba 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -426,13 +426,13 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { func (b *Bitfinex) handleWSEvent(respRaw []byte) error { event, err := jsonparser.GetUnsafeString(respRaw, "event") if err != nil { - return fmt.Errorf("error parsing WS event name: %w from message: %s", err, respRaw) + return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, respRaw) } switch event { case wsEventSubscribed: subID, err := jsonparser.GetUnsafeString(respRaw, "subId") if err != nil { - return fmt.Errorf("error parsing WS subscribed event subId: %w from message: %s", err, respRaw) + return fmt.Errorf("%w 'subId': %w from message: %s", errParsingWSField, err, respRaw) } if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) { return fmt.Errorf("%v channel subscribe listener not found", subID) @@ -440,7 +440,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventUnsubscribed: chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId") if err != nil { - return fmt.Errorf("error parsing WS unsubscribed event chanId: %w from message: %s", err, respRaw) + return fmt.Errorf("%w 'chanId': %w from message: %s", errParsingWSField, err, respRaw) } if !b.Websocket.Match.IncomingWithData("unsubscribe:"+chanID, respRaw) { return fmt.Errorf("%v channel unsubscribe listener not found", chanID) @@ -460,7 +460,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventAuth: status, err := jsonparser.GetUnsafeString(respRaw, "status") if err != nil { - return fmt.Errorf("error parsing WS auth event status: %w from message: %s", err, respRaw) + return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw) } if status == "OK" { var glob map[string]interface{} @@ -472,7 +472,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { } else { errCode, err := jsonparser.GetInt(respRaw, "code") if err != nil { - log.Errorf(log.ExchangeSys, "%s error parsing WS auth event error code: %s", b.Name, err) + log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, respRaw) } return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode) } @@ -482,7 +482,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventConf: status, err := jsonparser.GetUnsafeString(respRaw, "status") if err != nil { - return fmt.Errorf("error parsing WS configure channel event status: %w from message: %s", err, respRaw) + return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw) } if status != "OK" { return fmt.Errorf("WS configure channel error; Status: %s", status) @@ -1673,9 +1673,46 @@ func (b *Bitfinex) parallelChanOp(channels []stream.ChannelSubscription, m func( // subscribeToChan handles a single subscription and parses the result // on success it adds the subscription to the websocket func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error { - req := make(map[string]interface{}) - req["event"] = "subscribe" - req["channel"] = c.Channel + req, err := subscribeReq(c) + if err != nil { + return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency) + } + + // Although docs only mention this for wsBook, it works for all chans + subID := strconv.FormatInt(b.Websocket.Conn.GenerateMessageID(false), 10) + req["subId"] = subID + + respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req) + if err != nil { + return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency) + } + + if err = b.getErrResp(respRaw); err != nil { + wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency) + b.Websocket.DataHandler <- wErr + return wErr + } + + chanID, err := jsonparser.GetInt(respRaw, "chanId") + if err != nil { + return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Currency) + } + + c.Key = int(chanID) + b.Websocket.AddSuccessfulSubscriptions(*c) + if b.Verbose { + log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Currency, chanID) + } + + return nil +} + +// subscribeReq returns a map of request params for subscriptions +func subscribeReq(c *stream.ChannelSubscription) (map[string]interface{}, error) { + req := map[string]interface{}{ + "event": "subscribe", + "channel": c.Channel, + } for k, v := range c.Params { switch k { @@ -1683,7 +1720,7 @@ func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error { // Skip these internal Params case "key", "symbol": // Ensure user's Params aren't silently overwritten - return fmt.Errorf("subscribe to channel with key or symbol Param is not supported") + return nil, fmt.Errorf("%s %w", k, errParamNotAllowed) default: req[k] = v } @@ -1707,14 +1744,14 @@ func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error { timeframe := "1m" if t, ok := c.Params[CandlesTimeframeKey]; ok { if timeframe, ok = t.(string); !ok { - return common.GetTypeAssertError("string", t, "Subscription.CandlesTimeframeKey") + return nil, common.GetTypeAssertError("string", t, "Subscription.CandlesTimeframeKey") } } fundingPeriod := "" if p, ok := c.Params[CandlesPeriodKey]; ok { s, cOk := p.(string) if !cOk { - return common.GetTypeAssertError("string", p, "Subscription.CandlesPeriodKey") + return nil, common.GetTypeAssertError("string", p, "Subscription.CandlesPeriodKey") } fundingPeriod = ":p" + s } @@ -1723,33 +1760,7 @@ func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error { req["symbol"] = prefix + formattedPair } - // Although docs only mention this for wsBook, it works for all chans - subID := strconv.FormatInt(b.Websocket.Conn.GenerateMessageID(false), 10) - req["subId"] = subID - - respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req) - if err != nil { - return fmt.Errorf("error subscribing to Channel: %s Pair: %s Error: %w", c.Channel, c.Currency, err) - } - - if err = b.getErrResp(respRaw); err != nil { - wErr := fmt.Errorf("error subscribing to Channel: %s Pair: %s; %w", c.Channel, c.Currency, err) - b.Websocket.DataHandler <- wErr - return wErr - } - - chanID, err := jsonparser.GetInt(respRaw, "chanId") - if err != nil { - return fmt.Errorf("error parsing chanId in WS subscribe response: %w", err) - } - - c.Key = int(chanID) - b.Websocket.AddSuccessfulSubscriptions(*c) - if b.Verbose { - log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Currency, chanID) - } - - return nil + return req, nil } // unsubscribeFromChan sends a websocket message to stop receiving data from a channel @@ -1770,7 +1781,7 @@ func (b *Bitfinex) unsubscribeFromChan(c *stream.ChannelSubscription) error { } if err := b.getErrResp(respRaw); err != nil { - wErr := fmt.Errorf("error unsubscribing from ChanId: %v; %w", chanID, err) + wErr := fmt.Errorf("%w from ChanId: %v; %w", stream.ErrUnsubscribeFailure, chanID, err) b.Websocket.DataHandler <- wErr return wErr } @@ -1787,20 +1798,19 @@ func (b *Bitfinex) unsubscribeFromChan(c *stream.ChannelSubscription) error { func (b *Bitfinex) getErrResp(resp []byte) error { event, err := jsonparser.GetUnsafeString(resp, "event") if err != nil { - return fmt.Errorf("error parsing WS event: %w from message: %s", err, resp) + return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, resp) } - if event != "error" { return nil } errCode, err := jsonparser.GetInt(resp, "code") if err != nil { - log.Errorf(log.ExchangeSys, "%s error parsing WS error code: %s from message: %s", b.Name, err, resp) + log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, resp) } var apiErr error if msg, e2 := jsonparser.GetString(resp, "msg"); e2 != nil { - log.Errorf(log.ExchangeSys, "%s error parsing WS error msg: %s from message: %s", b.Name, err, resp) + log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, errParsingWSField, e2, resp) apiErr = errUnknownError } else { apiErr = errors.New(msg) diff --git a/exchanges/bitfinex/testdata/getErrResp.json b/exchanges/bitfinex/testdata/getErrResp.json new file mode 100644 index 00000000000..795d9f565b8 --- /dev/null +++ b/exchanges/bitfinex/testdata/getErrResp.json @@ -0,0 +1,5 @@ +{"bird": "great eared nightjar", "you_are_welcome":true} +{"event": {}} +{"event": "sneezegasm"} +{"event": "error"} +{"event": "error", "msg":"redcoats", "code":42} diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 9738a4f5254..48548e66adb 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -26,6 +26,8 @@ const ( var ( // ErrSubscriptionFailure defines an error when a subscription fails ErrSubscriptionFailure = errors.New("subscription failure") + // ErrUnsubscribeFailure defines an error when a unsubscribe fails + ErrUnsubscribeFailure = errors.New("unsubscribe failure") // ErrAlreadyDisabled is returned when you double-disable the websocket ErrAlreadyDisabled = errors.New("websocket already disabled") // ErrNotConnected defines an error when websocket is not connected