Skip to content

Commit

Permalink
CoinbasePro: Add subscription templating
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Aug 21, 2024
1 parent e372aea commit 695e8d6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 101 deletions.
1 change: 0 additions & 1 deletion exchanges/coinbasepro/coinbasepro.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ var (
errPairEmpty = errors.New("pair cannot be empty")
errStringConvert = errors.New("unable to convert into string value")
errFloatConvert = errors.New("unable to convert into float64 value")
errNoCredsUser = errors.New("no credentials when attempting to subscribe to authenticated channel user")
errWrappedAssetEmpty = errors.New("wrapped asset cannot be empty")
errExpectedOneTickerReturned = errors.New("expected one ticker to be returned")
)
Expand Down
191 changes: 99 additions & 92 deletions exchanges/coinbasepro/coinbasepro_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/buger/jsonparser"
Expand All @@ -33,8 +34,39 @@ import (

const (
coinbaseproWebsocketURL = "wss://advanced-trade-ws.coinbase.com"

heartbeatChannel = "heartbeats"
statusChannel = "status"
tickerChannel = "ticker"
tickerBatchChannel = "ticker_batch"
candlesChannel = "candles"
tradesChannel = "market_trades"
orderbookChannel = "level2"
userChannel = "user"
)

var subscriptionNames = map[string]string{
subscription.HeartbeatChannel: heartbeatChannel,
subscription.TickerChannel: tickerChannel,
subscription.CandlesChannel: candlesChannel,
subscription.AllTradesChannel: tradesChannel,
subscription.OrderbookChannel: orderbookChannel,
subscription.MyAccountChannel: userChannel,
statusChannel: statusChannel,
tickerBatchChannel: tickerBatchChannel,
}

var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.HeartbeatChannel},
{Enabled: true, Channel: statusChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: tickerBatchChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
{Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true},
}

// WsConnect initiates a websocket connection
func (c *CoinbasePro) WsConnect() error {
if !c.Websocket.IsEnabled() || !c.IsEnabled() {
Expand Down Expand Up @@ -300,62 +332,67 @@ func (c *CoinbasePro) ProcessUpdate(update *WebsocketOrderbookDataHolder, timest

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (c *CoinbasePro) generateSubscriptions() (subscription.List, error) {
var channels = []string{
"heartbeats",
"status",
"ticker",
"ticker_batch",
"candles",
"market_trades",
"level2",
}
enabledPairs, err := c.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
var subscriptions subscription.List
for i := range channels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[i],
Pairs: enabledPairs,
Asset: asset.Spot,
})
}
return subscriptions, nil
return c.Features.Subscriptions.ExpandTemplates(c)
}

// Subscribe sends a websocket message to receive data from the channel
func (c *CoinbasePro) Subscribe(channelsToSubscribe subscription.List) error {
chanKeys := make(map[string]currency.Pairs)
for i := range channelsToSubscribe {
chanKeys[channelsToSubscribe[i].Channel] =
chanKeys[channelsToSubscribe[i].Channel].Add(channelsToSubscribe[i].Pairs...)
}
for s := range chanKeys {
err := c.sendRequest("subscribe", s, chanKeys[s])
if err != nil {
return err
}
time.Sleep(time.Millisecond * 10)
}
return nil
// GetSubscriptionTemplate returns a subscription channel template
func (c *CoinbasePro) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText)
}

// Subscribe sends a websocket message to receive data from a list of channels
func (c *CoinbasePro) Subscribe(subs subscription.List) error {
return b.ParallelChanOp(subs, func(subs subscription.List) error { return b.manageSubs("subscribe", subs) }, 1)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
func (c *CoinbasePro) Unsubscribe(channelsToUnsubscribe subscription.List) error {
chanKeys := make(map[string]currency.Pairs)
for i := range channelsToUnsubscribe {
chanKeys[channelsToUnsubscribe[i].Channel] =
chanKeys[channelsToUnsubscribe[i].Channel].Add(channelsToUnsubscribe[i].Pairs...)
// Unsubscribe sends a websocket message to stop receiving data from a list of channels
func (c *CoinbasePro) Unsubscribe(subs subscription.List) error {
return b.ParallelChanOp(subs, func(subs subscription.List) error { return b.manageSubs("unsubscribe", subs) }, 1)
}

// manageSub subscribes or unsubscribes from a list of websocket channels
func (c *CoinbasePro) manageSubs(op string, subs subscription.List currency.Pairs) error {

Check failure on line 354 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, amd64, true, false)

syntax error: unexpected currency in parameter list; possibly missing comma or )

Check failure on line 354 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, 386, true, true)

syntax error: unexpected currency in parameter list; possibly missing comma or )

Check failure on line 354 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-latest, amd64, true, true)

syntax error: unexpected currency in parameter list; possibly missing comma or )

Check failure on line 354 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-13, amd64, true, true)

syntax error: unexpected currency in parameter list; possibly missing comma or )

Check failure on line 354 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (windows-latest, amd64, true, true)

syntax error: unexpected currency in parameter list; possibly missing comma or )
var errs error
subs, errs = subs.ExpandTemplates(c)

for _, s := range subs {
var creds int
if s.Authenticated { // Only expecting one authenticated channel, so no need to abstract this
var err error
creds, err = c.GetCredentials(context.Background())
if err != nil {
errs = common.AppendError(errs, errNoCredsUser)
continue
}
n := strconv.FormatInt(time.Now().Unix(), 10)
req := WebsocketRequest{
Type: msgType,
ProductIDs: productIDs.Strings(),
Channel: channel,
Timestamp: n,
}
for s := range chanKeys {
err := c.sendRequest("unsubscribe", s, chanKeys[s])
if authenticated {
message := n + channel + productIDs.Join()
var hmac []byte
hmac, err = crypto.GetHMAC(crypto.HashSHA256,
[]byte(message),
[]byte(creds.Secret))
if err != nil {
return err
}
time.Sleep(time.Millisecond * 10)
// TODO: Implement JWT authentication once our REST implementation moves to it, or if there's
// an exchange-wide reform to enable multiple sets of authentication credentials
req.Key = creds.Key
req.Signature = hex.EncodeToString(hmac)
err = c.InitiateRateLimit(context.Background(), WSAuthRate)
} else {
err = c.InitiateRateLimit(context.Background(), WSUnauthRate)
}
if err != nil {
return fmt.Errorf("failed to rate limit websocket request: %w", err)
}
return c.Websocket.Conn.SendJSONMessage(req)
}
return nil
}

// GetJWT checks if the current JWT is valid, returns it if it is, generates a new one if it isn't
Expand Down Expand Up @@ -421,51 +458,6 @@ func getTimestamp(rawData []byte) (time.Time, error) {
return timestamp, nil
}

// sendRequest is a helper function which sends a websocket message to the Coinbase server
func (c *CoinbasePro) sendRequest(msgType, channel string, productIDs currency.Pairs) error {
authenticated := true
creds, err := c.GetCredentials(context.Background())
if err != nil {
if errors.Is(err, exchange.ErrCredentialsAreEmpty) ||
errors.Is(err, exchange.ErrAuthenticationSupportNotEnabled) {
authenticated = false
if channel == "user" {
return errNoCredsUser
}
} else {
return err
}
}
n := strconv.FormatInt(time.Now().Unix(), 10)
req := WebsocketRequest{
Type: msgType,
ProductIDs: productIDs.Strings(),
Channel: channel,
Timestamp: n,
}
if authenticated {
message := n + channel + productIDs.Join()
var hmac []byte
hmac, err = crypto.GetHMAC(crypto.HashSHA256,
[]byte(message),
[]byte(creds.Secret))
if err != nil {
return err
}
// TODO: Implement JWT authentication once our REST implementation moves to it, or if there's
// an exchange-wide reform to enable multiple sets of authentication credentials
req.Key = creds.Key
req.Signature = hex.EncodeToString(hmac)
err = c.InitiateRateLimit(context.Background(), WSAuthRate)
} else {
err = c.InitiateRateLimit(context.Background(), WSUnauthRate)
}
if err != nil {
return fmt.Errorf("failed to rate limit websocket request: %w", err)
}
return c.Websocket.Conn.SendJSONMessage(req)
}

// processBidAskArray is a helper function that turns WebsocketOrderbookDataHolder into arrays
// of bids and asks
func processBidAskArray(data *WebsocketOrderbookDataHolder) (bids, asks orderbook.Tranches, err error) {
Expand Down Expand Up @@ -515,3 +507,18 @@ func base64URLEncode(b []byte) string {
s = strings.ReplaceAll(s, "/", "_")
return s
}

func channelName(s *subscription.Subscription) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return n
}
panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel))
}

const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- channelName $.S -}}
{{- with $i := $.S.Interval -}} _ {{- interval $i }}{{ end -}}
{{- $.AssetSeparator }}
{{- end }}
`
9 changes: 1 addition & 8 deletions exchanges/coinbasepro/coinbasepro_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
Expand Down Expand Up @@ -106,13 +105,7 @@ func (c *CoinbasePro) SetDefaults() {
GlobalResultLimit: 300,
},
},
Subscriptions: subscription.List{
{Enabled: true, Channel: "heartbeat"},
{Enabled: true, Channel: "level2_batch"}, // Other orderbook feeds require authentication; This is batched in 50ms lots
{Enabled: true, Channel: "ticker"},
{Enabled: true, Channel: "user", Authenticated: true},
{Enabled: true, Channel: "matches"},
},
Subscriptions: defaultSubscriptions.Clone(),
}
c.Requester, err = request.New(c.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
Expand Down
3 changes: 3 additions & 0 deletions exchanges/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
AllTradesChannel = "allTrades"
MyTradesChannel = "myTrades"
MyOrdersChannel = "myOrders"
MyAccountChannel = "account"
HeartbeatChannel = "heartbeat"
)

// Public errors
Expand All @@ -40,6 +42,7 @@ var (
ErrInStateAlready = errors.New("subscription already in state")
ErrInvalidState = errors.New("invalid subscription state")
ErrDuplicate = errors.New("duplicate subscription")
ErrNotSupported = errors.New("subscription channel not supported")
)

// State tracks the status of a subscription channel
Expand Down

0 comments on commit 695e8d6

Please sign in to comment.