diff --git a/exchanges/coinbasepro/coinbasepro.go b/exchanges/coinbasepro/coinbasepro.go index ecf31ac6cfc..fcf9e5bc626 100644 --- a/exchanges/coinbasepro/coinbasepro.go +++ b/exchanges/coinbasepro/coinbasepro.go @@ -160,7 +160,6 @@ var ( errPairEmpty = errors.New("pair cannot be empty") errStringConvert = errors.New("unable to convert into string value") errFloatConvert = errors.New("unable to convert into float64 value") - errNoCredsUser = errors.New("no credentials when attempting to subscribe to authenticated channel user") errWrappedAssetEmpty = errors.New("wrapped asset cannot be empty") errExpectedOneTickerReturned = errors.New("expected one ticker to be returned") ) diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index f6b82f57099..2c66ff1c96b 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -13,6 +13,7 @@ import ( "net/http" "strconv" "strings" + "text/template" "time" "github.com/buger/jsonparser" @@ -33,8 +34,39 @@ import ( const ( coinbaseproWebsocketURL = "wss://advanced-trade-ws.coinbase.com" + + heartbeatChannel = "heartbeats" + statusChannel = "status" + tickerChannel = "ticker" + tickerBatchChannel = "ticker_batch" + candlesChannel = "candles" + tradesChannel = "market_trades" + orderbookChannel = "level2" + userChannel = "user" ) +var subscriptionNames = map[string]string{ + subscription.HeartbeatChannel: heartbeatChannel, + subscription.TickerChannel: tickerChannel, + subscription.CandlesChannel: candlesChannel, + subscription.AllTradesChannel: tradesChannel, + subscription.OrderbookChannel: orderbookChannel, + subscription.MyAccountChannel: userChannel, + statusChannel: statusChannel, + tickerBatchChannel: tickerBatchChannel, +} + +var defaultSubscriptions = subscription.List{ + {Enabled: true, Channel: subscription.HeartbeatChannel}, + {Enabled: true, Channel: statusChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel}, + {Enabled: true, Asset: asset.Spot, Channel: tickerBatchChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel}, + {Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true}, +} + // WsConnect initiates a websocket connection func (c *CoinbasePro) WsConnect() error { if !c.Websocket.IsEnabled() || !c.IsEnabled() { @@ -300,62 +332,67 @@ func (c *CoinbasePro) ProcessUpdate(update *WebsocketOrderbookDataHolder, timest // GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() func (c *CoinbasePro) generateSubscriptions() (subscription.List, error) { - var channels = []string{ - "heartbeats", - "status", - "ticker", - "ticker_batch", - "candles", - "market_trades", - "level2", - } - enabledPairs, err := c.GetEnabledPairs(asset.Spot) - if err != nil { - return nil, err - } - var subscriptions subscription.List - for i := range channels { - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: channels[i], - Pairs: enabledPairs, - Asset: asset.Spot, - }) - } - return subscriptions, nil + return c.Features.Subscriptions.ExpandTemplates(c) } -// Subscribe sends a websocket message to receive data from the channel -func (c *CoinbasePro) Subscribe(channelsToSubscribe subscription.List) error { - chanKeys := make(map[string]currency.Pairs) - for i := range channelsToSubscribe { - chanKeys[channelsToSubscribe[i].Channel] = - chanKeys[channelsToSubscribe[i].Channel].Add(channelsToSubscribe[i].Pairs...) - } - for s := range chanKeys { - err := c.sendRequest("subscribe", s, chanKeys[s]) - if err != nil { - return err - } - time.Sleep(time.Millisecond * 10) - } - return nil +// GetSubscriptionTemplate returns a subscription channel template +func (c *CoinbasePro) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText) +} + +// Subscribe sends a websocket message to receive data from a list of channels +func (c *CoinbasePro) Subscribe(subs subscription.List) error { + return b.ParallelChanOp(subs, func(subs subscription.List) error { return b.manageSubs("subscribe", subs) }, 1) } -// Unsubscribe sends a websocket message to stop receiving data from the channel -func (c *CoinbasePro) Unsubscribe(channelsToUnsubscribe subscription.List) error { - chanKeys := make(map[string]currency.Pairs) - for i := range channelsToUnsubscribe { - chanKeys[channelsToUnsubscribe[i].Channel] = - chanKeys[channelsToUnsubscribe[i].Channel].Add(channelsToUnsubscribe[i].Pairs...) +// Unsubscribe sends a websocket message to stop receiving data from a list of channels +func (c *CoinbasePro) Unsubscribe(subs subscription.List) error { + return b.ParallelChanOp(subs, func(subs subscription.List) error { return b.manageSubs("unsubscribe", subs) }, 1) +} + +// manageSub subscribes or unsubscribes from a list of websocket channels +func (c *CoinbasePro) manageSubs(op string, subs subscription.List currency.Pairs) error { + var errs error + subs, errs = subs.ExpandTemplates(c) + + for _, s := range subs { + var creds int + if s.Authenticated { // Only expecting one authenticated channel, so no need to abstract this + var err error + creds, err = c.GetCredentials(context.Background()) + if err != nil { + errs = common.AppendError(errs, errNoCredsUser) + continue + } + n := strconv.FormatInt(time.Now().Unix(), 10) + req := WebsocketRequest{ + Type: msgType, + ProductIDs: productIDs.Strings(), + Channel: channel, + Timestamp: n, } - for s := range chanKeys { - err := c.sendRequest("unsubscribe", s, chanKeys[s]) + if authenticated { + message := n + channel + productIDs.Join() + var hmac []byte + hmac, err = crypto.GetHMAC(crypto.HashSHA256, + []byte(message), + []byte(creds.Secret)) if err != nil { return err } - time.Sleep(time.Millisecond * 10) + // TODO: Implement JWT authentication once our REST implementation moves to it, or if there's + // an exchange-wide reform to enable multiple sets of authentication credentials + req.Key = creds.Key + req.Signature = hex.EncodeToString(hmac) + err = c.InitiateRateLimit(context.Background(), WSAuthRate) + } else { + err = c.InitiateRateLimit(context.Background(), WSUnauthRate) + } + if err != nil { + return fmt.Errorf("failed to rate limit websocket request: %w", err) + } + return c.Websocket.Conn.SendJSONMessage(req) } - return nil } // GetJWT checks if the current JWT is valid, returns it if it is, generates a new one if it isn't @@ -421,51 +458,6 @@ func getTimestamp(rawData []byte) (time.Time, error) { return timestamp, nil } -// sendRequest is a helper function which sends a websocket message to the Coinbase server -func (c *CoinbasePro) sendRequest(msgType, channel string, productIDs currency.Pairs) error { - authenticated := true - creds, err := c.GetCredentials(context.Background()) - if err != nil { - if errors.Is(err, exchange.ErrCredentialsAreEmpty) || - errors.Is(err, exchange.ErrAuthenticationSupportNotEnabled) { - authenticated = false - if channel == "user" { - return errNoCredsUser - } - } else { - return err - } - } - n := strconv.FormatInt(time.Now().Unix(), 10) - req := WebsocketRequest{ - Type: msgType, - ProductIDs: productIDs.Strings(), - Channel: channel, - Timestamp: n, - } - if authenticated { - message := n + channel + productIDs.Join() - var hmac []byte - hmac, err = crypto.GetHMAC(crypto.HashSHA256, - []byte(message), - []byte(creds.Secret)) - if err != nil { - return err - } - // TODO: Implement JWT authentication once our REST implementation moves to it, or if there's - // an exchange-wide reform to enable multiple sets of authentication credentials - req.Key = creds.Key - req.Signature = hex.EncodeToString(hmac) - err = c.InitiateRateLimit(context.Background(), WSAuthRate) - } else { - err = c.InitiateRateLimit(context.Background(), WSUnauthRate) - } - if err != nil { - return fmt.Errorf("failed to rate limit websocket request: %w", err) - } - return c.Websocket.Conn.SendJSONMessage(req) -} - // processBidAskArray is a helper function that turns WebsocketOrderbookDataHolder into arrays // of bids and asks func processBidAskArray(data *WebsocketOrderbookDataHolder) (bids, asks orderbook.Tranches, err error) { @@ -515,3 +507,18 @@ func base64URLEncode(b []byte) string { s = strings.ReplaceAll(s, "/", "_") return s } + +func channelName(s *subscription.Subscription) string { + if n, ok := subscriptionNames[s.Channel]; ok { + return n + } + panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel)) +} + +const subTplText = ` +{{ range $asset, $pairs := $.AssetPairs }} + {{- channelName $.S -}} + {{- with $i := $.S.Interval -}} _ {{- interval $i }}{{ end -}} + {{- $.AssetSeparator }} +{{- end }} +` diff --git a/exchanges/coinbasepro/coinbasepro_wrapper.go b/exchanges/coinbasepro/coinbasepro_wrapper.go index 37e3eb7a208..1f01102dcb1 100644 --- a/exchanges/coinbasepro/coinbasepro_wrapper.go +++ b/exchanges/coinbasepro/coinbasepro_wrapper.go @@ -25,7 +25,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" - "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -106,13 +105,7 @@ func (c *CoinbasePro) SetDefaults() { GlobalResultLimit: 300, }, }, - Subscriptions: subscription.List{ - {Enabled: true, Channel: "heartbeat"}, - {Enabled: true, Channel: "level2_batch"}, // Other orderbook feeds require authentication; This is batched in 50ms lots - {Enabled: true, Channel: "ticker"}, - {Enabled: true, Channel: "user", Authenticated: true}, - {Enabled: true, Channel: "matches"}, - }, + Subscriptions: defaultSubscriptions.Clone(), } c.Requester, err = request.New(c.Name, common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout), diff --git a/exchanges/subscription/subscription.go b/exchanges/subscription/subscription.go index 516466363eb..64db2c3e415 100644 --- a/exchanges/subscription/subscription.go +++ b/exchanges/subscription/subscription.go @@ -31,6 +31,8 @@ const ( AllTradesChannel = "allTrades" MyTradesChannel = "myTrades" MyOrdersChannel = "myOrders" + MyAccountChannel = "account" + HeartbeatChannel = "heartbeat" ) // Public errors @@ -40,6 +42,7 @@ var ( ErrInStateAlready = errors.New("subscription already in state") ErrInvalidState = errors.New("invalid subscription state") ErrDuplicate = errors.New("duplicate subscription") + ErrNotSupported = errors.New("subscription channel not supported") ) // State tracks the status of a subscription channel