Skip to content

Commit

Permalink
Gemini: Add subscription configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Aug 21, 2024
1 parent 2986277 commit 40668a0
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 40 deletions.
34 changes: 34 additions & 0 deletions exchanges/gemini/gemini_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -1299,3 +1302,34 @@ func TestGetCurrencyTradeURL(t *testing.T) {
assert.NotEmpty(t, resp)
}
}

func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
g := new(Gemini)
require.NoError(t, testexch.Setup(g), "Test instance Setup must not error")
p := currency.Pairs{currency.NewPairWithDelimiter("BTC", "USD", ""), currency.NewPairWithDelimiter("ETH", "BTC", "")}
require.NoError(t, g.CurrencyPairs.StorePairs(asset.Spot, p, false))
require.NoError(t, g.CurrencyPairs.StorePairs(asset.Spot, p, true))
subs, err := g.generateSubscriptions()
require.NoError(t, err)
exp := subscription.List{
{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, QualifiedChannel: "candles_1d", Interval: kline.OneDay},
{Asset: asset.Spot, Channel: subscription.OrderbookChannel, Pairs: p, QualifiedChannel: "l2"},
}
testsubs.EqualLists(t, exp, subs)

for _, i := range []kline.Interval{kline.OneMin, kline.FiveMin, kline.FifteenMin, kline.ThirtyMin, kline.OneHour, kline.SixHour} {
subs, err = subscription.List{{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, Interval: i}}.ExpandTemplates(g)
assert.NoErrorf(t, err, "ExpandTemplates should not error on interval %s", i)
require.NotEmpty(t, subs)
assert.Equal(t, "candles_"+i.Short(), subs[0].QualifiedChannel)
}
_, err = subscription.List{{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, Interval: kline.FourHour}}.ExpandTemplates(g)
assert.ErrorIs(t, err, kline.ErrUnsupportedInterval, "ExpandTemplates should error on invalid interval")

assert.PanicsWithError(t,
"subscription channel not supported: wibble",
func() { channelName(&subscription.Subscription{Channel: "wibble"}) },
"should panic on invalid channel",
)
}
11 changes: 0 additions & 11 deletions exchanges/gemini/gemini_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/types"
)

const (
marketDataLevel2 = "l2"
candles1m = "candles_1m"
candles5m = "candles_5m"
candles15m = "candles_15m"
candles30m = "candles_30m"
candles1hr = "candles_1h"
candles6hr = "candles_6h"
candles1d = "candles_1d"
)

// Ticker holds returned ticker data from the exchange
type Ticker struct {
Ask float64 `json:"ask,string"`
Expand Down
84 changes: 56 additions & 28 deletions exchanges/gemini/gemini_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
Expand All @@ -32,6 +34,23 @@ const (
geminiWsOrderEvents = "order/events"
)

const (
marketDataLevel2 = "l2"
candlesChannel = "candles"
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneDay},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
// Authenticated connection is directly to the orders URI, so this is implicit
// {Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true},
}

var subscriptionNames = map[string]string{
subscription.CandlesChannel: candlesChannel,
subscription.OrderbookChannel: marketDataLevel2,
}

// Instantiates a communications channel between websocket connections
var comms = make(chan stream.Response)

Expand Down Expand Up @@ -61,28 +80,17 @@ func (g *Gemini) WsConnect() error {
return nil
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (g *Gemini) GenerateDefaultSubscriptions() (subscription.List, error) {
// See gemini_types.go for more subscription/candle vars
var channels = []string{
marketDataLevel2,
candles1d,
}

pairs, err := g.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (g *Gemini) generateSubscriptions() (subscription.List, error) {
return g.Features.Subscriptions.ExpandTemplates(g)
}

var subscriptions subscription.List
for x := range channels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[x],
Pairs: pairs,
Asset: asset.Spot,
})
}
return subscriptions, nil
// GetSubscriptionTemplate returns a subscription channel template
func (g *Gemini) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"interval": channelInterval,
}).Parse(subTplText)
}

// Subscribe sends a websocket message to receive data from the channel
Expand All @@ -96,19 +104,14 @@ func (g *Gemini) Unsubscribe(subs subscription.List) error {
}

func (g *Gemini) manageSubs(subs subscription.List, op wsSubOp) error {
format, err := g.GetPairFormat(asset.Spot, true)
if err != nil {
return err
}

req := wsSubscribeRequest{
Type: op,
Subscriptions: make([]wsSubscriptions, 0, len(subs)),
}
for _, s := range subs {
req.Subscriptions = append(req.Subscriptions, wsSubscriptions{
Name: s.Channel,
Symbols: s.Pairs.Format(format).Strings(),
Name: s.QualifiedChannel,
Symbols: s.Pairs.Strings(),
})
}

Expand Down Expand Up @@ -560,3 +563,28 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error {

return trade.AddTradesToBuffer(g.Name, trades...)
}

func channelName(s *subscription.Subscription) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return n
}
panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel))
}

func channelInterval(i kline.Interval) string {
switch i {
case kline.OneMin, kline.FiveMin, kline.FifteenMin, kline.ThirtyMin, kline.OneHour, kline.SixHour:
return i.Short()
case kline.OneDay:
return "1d"
}
panic(fmt.Errorf("%w: %s", kline.ErrUnsupportedInterval, i.Short()))
}

const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- channelName $.S -}}
{{- with $i := $.S.Interval -}} _ {{- interval $i }}{{ end -}}
{{- $.AssetSeparator }}
{{- end }}
`
3 changes: 2 additions & 1 deletion exchanges/gemini/gemini_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (g *Gemini) SetDefaults() {
Enabled: exchange.FeaturesEnabled{
AutoPairUpdates: true,
},
Subscriptions: defaultSubscriptions.Clone(),
}

g.Requester, err = request.New(g.Name,
Expand Down Expand Up @@ -145,7 +146,7 @@ func (g *Gemini) Setup(exch *config.Exchange) error {
Connector: g.WsConnect,
Subscriber: g.Subscribe,
Unsubscriber: g.Unsubscribe,
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
GenerateSubscriptions: g.generateSubscriptions,
Features: &g.Features.Supports.WebsocketCapabilities,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
ErrInStateAlready = errors.New("subscription already in state")
ErrInvalidState = errors.New("invalid subscription state")
ErrDuplicate = errors.New("duplicate subscription")
ErrNotSupported = errors.New("subscription channel not supported")
)

// State tracks the status of a subscription channel
Expand Down

0 comments on commit 40668a0

Please sign in to comment.