diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index a2c7520ac68..3062ccef2f0 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -53,9 +53,12 @@ func TestMain(m *testing.M) { if err != nil { log.Fatal("Bitfinex setup error", err) } + b.Websocket.Enable() + if err = b.WsConnect(); err != nil { + log.Fatal("Bitfinex setup error", err) + } b.SetCredentials(apiKey, apiSecret, "", "", "", "") - if !b.Enabled || b.API.AuthenticatedSupport || - b.Verbose || b.Websocket.IsEnabled() || len(b.BaseCurrencies) < 1 { + if !b.Enabled || b.API.AuthenticatedSupport || len(b.BaseCurrencies) < 1 { log.Fatal("Bitfinex Setup values not set correctly") } @@ -87,6 +90,25 @@ func TestStart(t *testing.T) { testWg.Wait() } +// TestWebsocketSubscribe tests returning a message with an id +// TODO: This test is really just an integration test for development +// We need a better way to test this overall +func TestWebsocketSubscribe(t *testing.T) { + s := stream.ChannelSubscription{ + Channel: wsTrades, + Currency: currency.NewPairWithDelimiter("BTC", "USDT", "-"), + Params: map[string]interface{}{ + "symbol": "tBTCUST", + }, + } + if err := b.Subscribe([]stream.ChannelSubscription{s}); err != nil { + t.Error(err) + } + ss := b.Websocket.GetSubscriptions() + if err := b.Websocket.ResubscribeToChannel(&ss[0]); err != nil { + t.Error(err) + } +} func TestGetV2MarginFunding(t *testing.T) { t.Parallel() sharedtestvalues.SkipTestIfCredentialsUnset(t, b) diff --git a/exchanges/bitfinex/bitfinex_types.go b/exchanges/bitfinex/bitfinex_types.go index e35521afe71..0c8e4236878 100644 --- a/exchanges/bitfinex/bitfinex_types.go +++ b/exchanges/bitfinex/bitfinex_types.go @@ -1,6 +1,7 @@ package bitfinex import ( + "encoding/json" "errors" "sync" "time" @@ -13,6 +14,7 @@ import ( var ( errTypeAssert = errors.New("type assertion failed") errSetCannotBeEmpty = errors.New("set cannot be empty") + errUnknownError = errors.New("unknown error") ) // AccountV2Data stores account v2 data @@ -657,6 +659,10 @@ const ( wsTicker = "ticker" wsTrades = "trades" wsError = "error" + wsEventSubscribed = "subscribed" + wsEventUnsubscribed = "unsubscribed" + wsEventAuth = "auth" + wsEventError = "error" ) // WsAuthRequest container for WS auth request @@ -669,6 +675,17 @@ type WsAuthRequest struct { DeadManSwitch int64 `json:"dms,omitempty"` } +// WsEvent contains response structure for WS sub/unsub/auth +// This type probably isn't used, but is here for completeness +type WsEvent struct { + Event string `json:"event"` + ChanId json.Number `json:"chanId"` + SubId string `json:"subId"` + Status string `json:"status"` + Code json.Number `json:"code"` + Msg string `json:"msg"` +} + // WsFundingOffer funding offer received via websocket type WsFundingOffer struct { ID int64 diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 3a5c617436f..421a7e66917 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/convert" @@ -134,57 +135,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { } switch d := result.(type) { case map[string]interface{}: - event := d["event"] - switch event { - case "subscribed": - chanID, ok := d["chanId"].(float64) - if !ok { - return errors.New("unable to type assert chanId") - } - channel, ok := d["channel"].(string) - if !ok { - return errors.New("unable to type assert channel") - } - symbol, ok := d["symbol"].(string) - if !ok { - key, ok := d["key"].(string) - if !ok { - return fmt.Errorf("subscribed to channel but no symbol or key: %v", channel) - } - if channel != wsCandles { - // status channel not implemented at all yet. - return fmt.Errorf("%v channel subscription keys: %w", channel, common.ErrNotYetImplemented) - } - var err error - symbol, err = symbolFromCandleKey(key) - if err != nil { - return err - } - } - if err := b.WsAddSubscriptionChannel(int(chanID), channel, symbol); err != nil { - return err - } - case "unsubscribed": - chanID, ok := d["chanId"].(float64) - if !ok { - return errors.New("unable to type assert chanId") - } - delete(b.WebsocketSubdChannels, int(chanID)) - case "auth": - status, ok := d["status"].(string) - if !ok { - return errors.New("unable to type assert status") - } - if status == "OK" { - b.Websocket.DataHandler <- d - } else if status == "fail" { - if code, ok := d["code"].(string); ok { - return fmt.Errorf("websocket unable to AUTH. Error code: %s", - code) - } - return errors.New("websocket unable to auth") - } - } + return b.handleWSEvent(respRaw) case []interface{}: var chanID int if f, ok := d[0].(float64); !ok { @@ -199,7 +150,10 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { if c, ok := b.WebsocketSubdChannels[chanID]; ok { return b.handleWSChannelUpdate(c, chanID, eventType, d) } - return fmt.Errorf("unable to locate chanID: %d", chanID) + // We didn't have a mapping for this chanID; This probably means we have unsubscribed OR + // received our first message before processing the sub chanID + // In either case it's okay. No point in erroring because there's nothing we can do about it, and it happens often + return nil } if !hasEventType { @@ -471,6 +425,68 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { return nil } +func (b *Bitfinex) handleWSEvent(respRaw []byte) error { + event, err := jsonparser.GetUnsafeString(respRaw, "event") + if err != nil { + return fmt.Errorf("error parsing WS event name: %w from message: %s", err, respRaw) + } + switch event { + case wsEventSubscribed: + subId, err := jsonparser.GetUnsafeString(respRaw, "subId") + if err != nil { + return fmt.Errorf("error parsing WS subscribed event subId: %w from message: %s", err, respRaw) + } + if !b.Websocket.Match.IncomingWithData("subscribe:"+subId, respRaw) { + return fmt.Errorf("%v channel subscribe listener not found", subId) + } + case wsEventUnsubscribed: + chanId, err := jsonparser.GetInt(respRaw, "chanId") + if err != nil { + return fmt.Errorf("error parsing WS unsubscribed event chanId: %w from message: %s", err, respRaw) + } + if !b.Websocket.Match.IncomingWithData("unsubscribe:"+strconv.Itoa(int(chanId)), respRaw) { + return fmt.Errorf("%v channel unsubscribe listener not found", chanId) + } + case wsEventError: + if subId, err := jsonparser.GetUnsafeString(respRaw, "subId"); err == nil { + if !b.Websocket.Match.IncomingWithData("subscribe:"+subId, respRaw) { + return fmt.Errorf("%v channel subscribe listener not found", subId) + } + } else if chanId, err := jsonparser.GetInt(respRaw, "chanId"); err == nil { + if !b.Websocket.Match.IncomingWithData("unsubscribe:"+strconv.Itoa(int(chanId)), respRaw) { + return fmt.Errorf("%v channel unsubscribe listener not found", chanId) + } + } else { + return fmt.Errorf("unknown channel error; Message: %s", respRaw) + } + case wsEventAuth: + status, err := jsonparser.GetUnsafeString(respRaw, "status") + if err != nil { + return fmt.Errorf("error parsing WS auth event status: %w from message: %s", err, respRaw) + } + if status == "OK" { + var glob map[string]interface{} + if err := json.Unmarshal(respRaw, &glob); err != nil { + return fmt.Errorf("unable to Unmarshal auth resp; Error: %w Msg: %v", err, respRaw) + } + // TODO - Send a better value down the channel + b.Websocket.DataHandler <- glob + } else { + errCode, err := jsonparser.GetString(respRaw, "code") + if err != nil { + log.Errorf(log.ExchangeSys, "%s error parsing WS auth event error code: %s", b.Name, err) + } + err = fmt.Errorf("WS auth subscription error; Status: %s Error Code: %s", status, errCode) + b.Websocket.DataHandler <- err + return err + } + default: + return fmt.Errorf("unknown WS event msg: %s", respRaw) + } + + return nil +} + func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, chanID int, eventType string, d []interface{}) error { if eventType == wsChecksum { return b.handleWSChecksum(chanID, d) @@ -1404,8 +1420,7 @@ func (b *Bitfinex) wsHandleOrder(data []interface{}) { b.Websocket.DataHandler <- &od } -// WsInsertSnapshot add the initial orderbook snapshot when subscribed to a -// channel +// WsInsertSnapshot add the initial orderbook snapshot when subscribed to a channel func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books []WebsocketBook, fundingRate bool) error { if len(books) == 0 { return errors.New("no orderbooks submitted") @@ -1572,6 +1587,7 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, formattedPair = wsPairFormat.Format(enabledPairs[k]) } + // TODO - This should not be here in DefSubs if channels[j] == wsCandles { // TODO: Add ability to select timescale && funding period fundingPeriod := "" @@ -1606,29 +1622,74 @@ func (b *Bitfinex) Subscribe(channelsToSubscribe []stream.ChannelSubscription) e return err } - var errs error + wg := sync.WaitGroup{} + wg.Add(len(channelsToSubscribe)) + errC := make(chan error, len(channelsToSubscribe)) + for i := range channelsToSubscribe { - req := make(map[string]interface{}) - req["event"] = "subscribe" - req["channel"] = channelsToSubscribe[i].Channel - - for k, v := range channelsToSubscribe[i].Params { - // Resubscribing channels might already have this set - if k != "chanId" { - req[k] = v + go func(c *stream.ChannelSubscription) { + defer wg.Done() + if err := b.subscribeToChan(c); err != nil { + errC <- err } - } + }(&channelsToSubscribe[i]) + } - err := b.Websocket.Conn.SendJSONMessage(req) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i]) + wg.Wait() + close(errC) + + var errs error + for err := range errC { + errs = common.AppendError(errs, err) } + return errs } +// subscribeToChan handles a single subscription and parses the result +// on success it adds the subscription to the websocket +func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error { + req := make(map[string]interface{}) + req["event"] = "subscribe" + req["channel"] = c.Channel + + for k, v := range c.Params { + // Resubscribing channels might already have this set + if k != "chanId" { + req[k] = v + } + } + + // Although docs only mention this for wsBook, it works for all chans + subId := strconv.Itoa(int(b.Websocket.Conn.GenerateMessageID(false))) + req["subId"] = subId + + respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subId, req) + if err != nil { + return fmt.Errorf("error subscribing to Channel: %s Pair: %s Error: %w", c.Channel, c.Currency, err) + } + + if err := b.getErrResp(respRaw); err != nil { + err = fmt.Errorf("error subscribing to Channel: %s Pair: %s; %w", c.Channel, c.Currency, err) + b.Websocket.DataHandler <- err + return err + } + + chanId, err := jsonparser.GetInt(respRaw, "chanId") + if err != nil { + return fmt.Errorf("error parsing chanId in WS subscribe response: %w", err) + } + + c.Params["chanId"] = int(chanId) + b.Websocket.AddSuccessfulSubscriptions(*c) + b.WebsocketSubdChannels[int(chanId)] = c + if b.Verbose { + log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Currency, chanId) + } + + return nil +} + // Unsubscribe sends a websocket message to stop receiving data from the channel func (b *Bitfinex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error { var errs error @@ -1638,7 +1699,7 @@ func (b *Bitfinex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscriptio errs = common.AppendError(errs, fmt.Errorf("cannot unsubscribe from a channel without an id")) continue } - chanID, ok := idAny.(int) + chanId, ok := idAny.(int) if !ok { errs = common.AppendError(errs, fmt.Errorf("chanId is not an int")) continue @@ -1646,20 +1707,55 @@ func (b *Bitfinex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscriptio req := map[string]interface{}{ "event": "unsubscribe", - "chanId": chanID, + "chanId": chanId, } - err := b.Websocket.Conn.SendJSONMessage(req) + respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("unsubscribe:"+strconv.Itoa(chanId), req) if err != nil { errs = common.AppendError(errs, err) continue } - // We do this before the unsubscribed event comes back so we can subscribe again when called from ResubcribeToChannel + + if err := b.getErrResp(respRaw); err != nil { + err = fmt.Errorf("error unsubscribing from ChanId: %v; %w", chanId, err) + b.Websocket.DataHandler <- err + return err + } + + delete(b.WebsocketSubdChannels, chanId) b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i]) } return errs } +// getErrResp takes a json response string and looks for an error event type +// If found it parses the error code and message as a wrapped error and returns it +// It might log parsing errors about the nature of the error +// If the error message is not defined it will return a wrapped errUnknownError +func (b *Bitfinex) getErrResp(resp []byte) error { + event, err := jsonparser.GetUnsafeString(resp, "event") + if err != nil { + return fmt.Errorf("error parsing WS event: %w from message: %s", err, resp) + } + + if event != "error" { + return nil + } + errCode, err := jsonparser.GetInt(resp, "code") + if err != nil { + log.Errorf(log.ExchangeSys, "%s error parsing WS error code: %s from message: %s", b.Name, err, resp) + } + + var apiErr error + if msg, e2 := jsonparser.GetString(resp, "msg"); e2 != nil { + log.Errorf(log.ExchangeSys, "%s error parsing WS error msg: %s from message: %s", b.Name, err, resp) + apiErr = errUnknownError + } else { + apiErr = errors.New(msg) + } + return fmt.Errorf("%w (code: %d)", apiErr, errCode) +} + // WsSendAuth sends a authenticated event payload func (b *Bitfinex) WsSendAuth(ctx context.Context) error { creds, err := b.GetCredentials(ctx) @@ -1692,56 +1788,6 @@ func (b *Bitfinex) WsSendAuth(ctx context.Context) error { return nil } -// WsAddSubscriptionChannel adds a confirmed channel subscription mapping from id to original params -func (b *Bitfinex) WsAddSubscriptionChannel(chanID int, channel, symbol string) error { - assetType, pair, err := assetPairFromSymbol(symbol) - if err != nil { - return err - } - - var c *stream.ChannelSubscription - s := b.Websocket.GetSubscriptions() - for i := range s { - if strings.EqualFold(s[i].Channel, channel) && s[i].Currency.Equal(pair) && s[i].Asset == assetType { - c = &s[i] - break - } - } - - if c == nil { - log.Errorf(log.ExchangeSys, - "%s Could not find an existing channel subscription: %s Pair: %s ChannelID: %d Asset: %s\n", - b.Name, - channel, - pair, - chanID, - assetType) - c = &stream.ChannelSubscription{ - Channel: channel, - Currency: pair, - Asset: assetType, - } - } - - if c.Params == nil { - c.Params = map[string]interface{}{} - } - - c.Params["chanId"] = chanID - - b.WebsocketSubdChannels[chanID] = c - - if b.Verbose { - log.Debugf(log.ExchangeSys, - "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", - b.Name, - channel, - pair, - chanID) - } - return nil -} - // WsNewOrder authenticated new order request func (b *Bitfinex) WsNewOrder(data *WsNewOrderRequest) (string, error) { data.CustomID = b.Websocket.AuthConn.GenerateMessageID(false) @@ -2074,21 +2120,3 @@ func assetPairFromSymbol(symbol string) (asset.Item, currency.Pair, error) { return assetType, pair, err } - -// symbolFromCandleKey extracts the symbol or pair from a subscribed channel key -// e.g. trade:1h:tBTC, trade:1h:tBTC:CNHT, trade:1m:fBTC:p30 and trade:1m:fBTC:a30:p2:p30 -func symbolFromCandleKey(key string) (string, error) { - parts := strings.Split(key, ":") - if len(parts) < 3 { - return "", fmt.Errorf("subscription key has too few parts, need 3: %v", key) - } - parts = parts[2:] - if parts[0][0] == 'f' { - // Margin Funding subscription has one currency, and suffixes - return parts[0], nil - } - if len(parts) > 2 { - return "", fmt.Errorf("subscription key has too many parts for trade types: %v", key) - } - return strings.Join(parts, ":"), nil -} diff --git a/exchanges/stream/stream_match.go b/exchanges/stream/stream_match.go index 431bdba9384..712411ab8ba 100644 --- a/exchanges/stream/stream_match.go +++ b/exchanges/stream/stream_match.go @@ -27,7 +27,7 @@ func (m *Match) Incoming(signature interface{}) bool { } // IncomingWithData matches with requests and takes in the returned payload, to -// be processed outside of a stream processing routine +// be processed outside of a stream processing routine and returns true if a handler was found func (m *Match) IncomingWithData(signature interface{}, data []byte) bool { m.mu.Lock() defer m.mu.Unlock() diff --git a/go.mod b/go.mod index 0e74a5ea1f8..194a64f3da4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/thrasher-corp/gocryptotrader go 1.20 require ( + github.com/buger/jsonparser v1.1.1 github.com/d5/tengo/v2 v2.16.1 github.com/gofrs/uuid v4.4.0+incompatible github.com/gorilla/mux v1.8.0 diff --git a/go.sum b/go.sum index ca64ec19460..353fa934d5f 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1 h1:NDBbPmhS+EqABEs5Kg3n/5ZNjy73Pz7SIV+KCeqyXcs= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=