Skip to content

Commit

Permalink
CoinbasePro: Add subscription templating
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Aug 22, 2024
1 parent e372aea commit 01be93d
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 112 deletions.
1 change: 0 additions & 1 deletion exchanges/coinbasepro/coinbasepro.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ var (
errPairEmpty = errors.New("pair cannot be empty")
errStringConvert = errors.New("unable to convert into string value")
errFloatConvert = errors.New("unable to convert into float64 value")
errNoCredsUser = errors.New("no credentials when attempting to subscribe to authenticated channel user")
errWrappedAssetEmpty = errors.New("wrapped asset cannot be empty")
errExpectedOneTickerReturned = errors.New("expected one ticker to be returned")
)
Expand Down
52 changes: 43 additions & 9 deletions exchanges/coinbasepro/coinbasepro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
gctlog "github.com/thrasher-corp/gocryptotrader/log"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
Expand Down Expand Up @@ -1659,17 +1660,29 @@ func TestProcessSnapshotUpdate(t *testing.T) {
assert.NoError(t, err)
}

func TestGenerateDefaultSubscriptions(t *testing.T) {
comparison := subscription.List{{Channel: "heartbeats"}, {Channel: "status"}, {Channel: "ticker"},
{Channel: "ticker_batch"}, {Channel: "candles"}, {Channel: "market_trades"}, {Channel: "level2"}}
for i := range comparison {
comparison[i].Pairs = currency.Pairs{
currency.NewPairWithDelimiter(testCrypto.String(), testFiat.String(), "-")}
comparison[i].Asset = asset.Spot
func TestGenerateSubscriptions(t *testing.T) {
c := new(CoinbasePro)

Check failure on line 1664 in exchanges/coinbasepro/coinbasepro_test.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "c" shadows declaration at line 46 (govet)
if err := testexch.Setup(c); err != nil {
log.Fatal(err)
}
c.Websocket.SetCanUseAuthenticatedEndpoints(true)
p, err := c.GetEnabledPairs(asset.Spot)
require.NoError(t, err)
exp := subscription.List{}
for _, baseSub := range defaultSubscriptions {
s := baseSub.Clone()
s.QualifiedChannel = subscriptionNames[s.Channel]
if s.Asset != asset.Empty {
s.Pairs = p
}
exp = append(exp, s)
}
resp, err := c.generateSubscriptions()
subs, err := c.generateSubscriptions()
require.NoError(t, err)
assert.ElementsMatch(t, comparison, resp)
testsubs.EqualLists(t, exp, subs)

_, err = subscription.List{{Channel: "wibble"}}.ExpandTemplates(c)
assert.ErrorContains(t, err, "subscription channel not supported: wibble")
}

func TestSubscribeUnsubscribe(t *testing.T) {
Expand Down Expand Up @@ -1920,3 +1933,24 @@ func testGetOneArg[G getOneArgResp](t *testing.T, f getOneArgAssertNotEmpty[G],
assert.NoError(t, err)
assert.NotEmpty(t, resp, errExpectedNonEmpty)
}

func TestCheckSubscriptions(t *testing.T) {
t.Parallel()

c := &CoinbasePro{ //nolint:govet // Intentional shadow to avoid future copy/paste mistakes

Check failure on line 1940 in exchanges/coinbasepro/coinbasepro_test.go

View workflow job for this annotation

GitHub Actions / lint

directive `//nolint:govet // Intentional shadow to avoid future copy/paste mistakes` is unused for linter "govet" (nolintlint)
Base: exchange.Base{
Config: &config.Exchange{
Features: &config.FeaturesConfig{
Subscriptions: subscription.List{
{Enabled: true, Channel: "matches"},
},
},
},
Features: exchange.Features{},
},
}

c.checkSubscriptions()
testsubs.EqualLists(t, defaultSubscriptions, c.Features.Subscriptions)
testsubs.EqualLists(t, defaultSubscriptions, c.Config.Features.Subscriptions)
}
213 changes: 119 additions & 94 deletions exchanges/coinbasepro/coinbasepro_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
Expand All @@ -33,8 +32,39 @@ import (

const (
coinbaseproWebsocketURL = "wss://advanced-trade-ws.coinbase.com"

heartbeatChannel = "heartbeats"
statusChannel = "status"
tickerChannel = "ticker"
tickerBatchChannel = "ticker_batch"
candlesChannel = "candles"
tradesChannel = "market_trades"
orderbookChannel = "level2"
userChannel = "user"
)

var subscriptionNames = map[string]string{
subscription.HeartbeatChannel: heartbeatChannel,
subscription.TickerChannel: tickerChannel,
subscription.CandlesChannel: candlesChannel,
subscription.AllTradesChannel: tradesChannel,
subscription.OrderbookChannel: orderbookChannel,
subscription.MyAccountChannel: userChannel,
statusChannel: statusChannel,
tickerBatchChannel: tickerBatchChannel,
}

var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.HeartbeatChannel},
{Enabled: true, Channel: statusChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: tickerBatchChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
{Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true},
}

// WsConnect initiates a websocket connection
func (c *CoinbasePro) WsConnect() error {
if !c.Websocket.IsEnabled() || !c.IsEnabled() {
Expand Down Expand Up @@ -300,61 +330,65 @@ func (c *CoinbasePro) ProcessUpdate(update *WebsocketOrderbookDataHolder, timest

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (c *CoinbasePro) generateSubscriptions() (subscription.List, error) {
var channels = []string{
"heartbeats",
"status",
"ticker",
"ticker_batch",
"candles",
"market_trades",
"level2",
}
enabledPairs, err := c.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
var subscriptions subscription.List
for i := range channels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[i],
Pairs: enabledPairs,
Asset: asset.Spot,
})
}
return subscriptions, nil
return c.Features.Subscriptions.ExpandTemplates(c)
}

// Subscribe sends a websocket message to receive data from the channel
func (c *CoinbasePro) Subscribe(channelsToSubscribe subscription.List) error {
chanKeys := make(map[string]currency.Pairs)
for i := range channelsToSubscribe {
chanKeys[channelsToSubscribe[i].Channel] =
chanKeys[channelsToSubscribe[i].Channel].Add(channelsToSubscribe[i].Pairs...)
}
for s := range chanKeys {
err := c.sendRequest("subscribe", s, chanKeys[s])
if err != nil {
return err
// GetSubscriptionTemplate returns a subscription channel template
func (c *CoinbasePro) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText)
}

// Subscribe sends a websocket message to receive data from a list of channels
func (c *CoinbasePro) Subscribe(subs subscription.List) error {
return c.ParallelChanOp(subs, func(subs subscription.List) error { return c.manageSubs("subscribe", subs) }, 1)
}

// Unsubscribe sends a websocket message to stop receiving data from a list of channels
func (c *CoinbasePro) Unsubscribe(subs subscription.List) error {
return c.ParallelChanOp(subs, func(subs subscription.List) error { return c.manageSubs("unsubscribe", subs) }, 1)
}

// manageSub subscribes or unsubscribes from a list of websocket channels
func (c *CoinbasePro) manageSubs(op string, subs subscription.List) error {
var errs error
subs, errs = subs.ExpandTemplates(c)
for _, s := range subs {
r := &WebsocketRequest{
Type: op,
ProductIDs: s.Pairs.Strings(),
Channel: s.QualifiedChannel,
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
}
time.Sleep(time.Millisecond * 10)
var err error
if s.Authenticated {
err = c.signWsRequest(r)
if err == nil {
err = c.InitiateRateLimit(context.Background(), WSAuthRate)
}
} else {
err = c.InitiateRateLimit(context.Background(), WSUnauthRate)
}
if err == nil {
c.Websocket.Conn.SendJSONMessage(r)

Check failure on line 372 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `c.Websocket.Conn.SendJSONMessage` is not checked (errcheck)
}
errs = common.AppendError(errs, err)
}
return nil
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
func (c *CoinbasePro) Unsubscribe(channelsToUnsubscribe subscription.List) error {
chanKeys := make(map[string]currency.Pairs)
for i := range channelsToUnsubscribe {
chanKeys[channelsToUnsubscribe[i].Channel] =
chanKeys[channelsToUnsubscribe[i].Channel].Add(channelsToUnsubscribe[i].Pairs...)
}
for s := range chanKeys {
err := c.sendRequest("unsubscribe", s, chanKeys[s])
if err != nil {
return err
}
time.Sleep(time.Millisecond * 10)
func (c *CoinbasePro) signWsRequest(r *WebsocketRequest) error {
creds, err := c.GetCredentials(context.Background())
if err != nil {
return err
}
hmac, err := crypto.GetHMAC(crypto.HashSHA256, []byte(r.Timestamp+r.Channel+strings.Join(r.ProductIDs, ",")), []byte(creds.Secret))
if err != nil {
return err
}
// TODO: Implement JWT authentication once our REST implementation moves to it, or if there's
// an exchange-wide reform to enable multiple sets of authentication credentials
r.Key = creds.Key
r.Signature = hex.EncodeToString(hmac)
return nil
}

Expand Down Expand Up @@ -421,51 +455,6 @@ func getTimestamp(rawData []byte) (time.Time, error) {
return timestamp, nil
}

// sendRequest is a helper function which sends a websocket message to the Coinbase server
func (c *CoinbasePro) sendRequest(msgType, channel string, productIDs currency.Pairs) error {
authenticated := true
creds, err := c.GetCredentials(context.Background())
if err != nil {
if errors.Is(err, exchange.ErrCredentialsAreEmpty) ||
errors.Is(err, exchange.ErrAuthenticationSupportNotEnabled) {
authenticated = false
if channel == "user" {
return errNoCredsUser
}
} else {
return err
}
}
n := strconv.FormatInt(time.Now().Unix(), 10)
req := WebsocketRequest{
Type: msgType,
ProductIDs: productIDs.Strings(),
Channel: channel,
Timestamp: n,
}
if authenticated {
message := n + channel + productIDs.Join()
var hmac []byte
hmac, err = crypto.GetHMAC(crypto.HashSHA256,
[]byte(message),
[]byte(creds.Secret))
if err != nil {
return err
}
// TODO: Implement JWT authentication once our REST implementation moves to it, or if there's
// an exchange-wide reform to enable multiple sets of authentication credentials
req.Key = creds.Key
req.Signature = hex.EncodeToString(hmac)
err = c.InitiateRateLimit(context.Background(), WSAuthRate)
} else {
err = c.InitiateRateLimit(context.Background(), WSUnauthRate)
}
if err != nil {
return fmt.Errorf("failed to rate limit websocket request: %w", err)
}
return c.Websocket.Conn.SendJSONMessage(req)
}

// processBidAskArray is a helper function that turns WebsocketOrderbookDataHolder into arrays
// of bids and asks
func processBidAskArray(data *WebsocketOrderbookDataHolder) (bids, asks orderbook.Tranches, err error) {
Expand Down Expand Up @@ -515,3 +504,39 @@ func base64URLEncode(b []byte) string {
s = strings.ReplaceAll(s, "/", "_")
return s
}

// checkSubscriptions looks for incompatible subscriptions and if found replaces all with defaults
// This should be unnecessary and removable by mid-2025
func (c *CoinbasePro) checkSubscriptions() {
replace := false
for _, s := range c.Config.Features.Subscriptions {
switch s.Channel {
case "heartbeat", "level2_batch", "matches":
replace = true
break

Check failure on line 516 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / lint

S1023: redundant break statement (gosimple)
}
}
if replace {
c.Config.Features.Subscriptions = defaultSubscriptions.Clone()
c.Features.Subscriptions = subscription.List{}
for _, s := range c.Config.Features.Subscriptions {
if s.Enabled {
c.Features.Subscriptions = append(c.Features.Subscriptions, s)
}
}
}
}

func channelName(s *subscription.Subscription) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return n
}
panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel))
}

const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- channelName $.S -}}
{{- $.AssetSeparator }}
{{- end }}
`
12 changes: 4 additions & 8 deletions exchanges/coinbasepro/coinbasepro_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
Expand Down Expand Up @@ -106,13 +105,7 @@ func (c *CoinbasePro) SetDefaults() {
GlobalResultLimit: 300,
},
},
Subscriptions: subscription.List{
{Enabled: true, Channel: "heartbeat"},
{Enabled: true, Channel: "level2_batch"}, // Other orderbook feeds require authentication; This is batched in 50ms lots
{Enabled: true, Channel: "ticker"},
{Enabled: true, Channel: "user", Authenticated: true},
{Enabled: true, Channel: "matches"},
},
Subscriptions: defaultSubscriptions.Clone(),
}
c.Requester, err = request.New(c.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
Expand Down Expand Up @@ -150,6 +143,9 @@ func (c *CoinbasePro) Setup(exch *config.Exchange) error {
if err != nil {
return err
}

c.checkSubscriptions()

wsRunningURL, err := c.API.Endpoints.GetURL(exchange.WebsocketSpot)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions exchanges/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
AllTradesChannel = "allTrades"
MyTradesChannel = "myTrades"
MyOrdersChannel = "myOrders"
MyAccountChannel = "account"
HeartbeatChannel = "heartbeat"
)

// Public errors
Expand All @@ -40,6 +42,7 @@ var (
ErrInStateAlready = errors.New("subscription already in state")
ErrInvalidState = errors.New("invalid subscription state")
ErrDuplicate = errors.New("duplicate subscription")
ErrNotSupported = errors.New("subscription channel not supported")
)

// State tracks the status of a subscription channel
Expand Down

0 comments on commit 01be93d

Please sign in to comment.