Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gemini: Add subscription configuration #1625

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions exchanges/gemini/gemini_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -1299,3 +1302,34 @@ func TestGetCurrencyTradeURL(t *testing.T) {
assert.NotEmpty(t, resp)
}
}

func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
g := new(Gemini)
require.NoError(t, testexch.Setup(g), "Test instance Setup must not error")
p := currency.Pairs{currency.NewPairWithDelimiter("BTC", "USD", ""), currency.NewPairWithDelimiter("ETH", "BTC", "")}
require.NoError(t, g.CurrencyPairs.StorePairs(asset.Spot, p, false))
require.NoError(t, g.CurrencyPairs.StorePairs(asset.Spot, p, true))
subs, err := g.generateSubscriptions()
require.NoError(t, err)
exp := subscription.List{
{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, QualifiedChannel: "candles_1d", Interval: kline.OneDay},
{Asset: asset.Spot, Channel: subscription.OrderbookChannel, Pairs: p, QualifiedChannel: "l2"},
}
testsubs.EqualLists(t, exp, subs)

for _, i := range []kline.Interval{kline.OneMin, kline.FiveMin, kline.FifteenMin, kline.ThirtyMin, kline.OneHour, kline.SixHour} {
subs, err = subscription.List{{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, Interval: i}}.ExpandTemplates(g)
assert.NoErrorf(t, err, "ExpandTemplates should not error on interval %s", i)
require.NotEmpty(t, subs)
assert.Equal(t, "candles_"+i.Short(), subs[0].QualifiedChannel)
}
_, err = subscription.List{{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, Interval: kline.FourHour}}.ExpandTemplates(g)
assert.ErrorIs(t, err, kline.ErrUnsupportedInterval, "ExpandTemplates should error on invalid interval")

assert.PanicsWithError(t,
"subscription channel not supported: wibble",
func() { channelName(&subscription.Subscription{Channel: "wibble"}) },
"should panic on invalid channel",
)
}
11 changes: 0 additions & 11 deletions exchanges/gemini/gemini_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/types"
)

const (
marketDataLevel2 = "l2"
candles1m = "candles_1m"
candles5m = "candles_5m"
candles15m = "candles_15m"
candles30m = "candles_30m"
candles1hr = "candles_1h"
candles6hr = "candles_6h"
candles1d = "candles_1d"
)

// Ticker holds returned ticker data from the exchange
type Ticker struct {
Ask float64 `json:"ask,string"`
Expand Down
85 changes: 57 additions & 28 deletions exchanges/gemini/gemini_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
Expand All @@ -33,6 +35,23 @@ const (
geminiWsOrderEvents = "order/events"
)

const (
marketDataLevel2 = "l2"
candlesChannel = "candles"
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneDay},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
// Authenticated connection is directly to the orders URI, so this is implicit
// {Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true},
}

var subscriptionNames = map[string]string{
subscription.CandlesChannel: candlesChannel,
subscription.OrderbookChannel: marketDataLevel2,
}

// Instantiates a communications channel between websocket connections
var comms = make(chan stream.Response)

Expand Down Expand Up @@ -62,28 +81,17 @@ func (g *Gemini) WsConnect() error {
return nil
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (g *Gemini) GenerateDefaultSubscriptions() (subscription.List, error) {
// See gemini_types.go for more subscription/candle vars
var channels = []string{
marketDataLevel2,
candles1d,
}

pairs, err := g.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (g *Gemini) generateSubscriptions() (subscription.List, error) {
return g.Features.Subscriptions.ExpandTemplates(g)
}

var subscriptions subscription.List
for x := range channels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[x],
Pairs: pairs,
Asset: asset.Spot,
})
}
return subscriptions, nil
// GetSubscriptionTemplate returns a subscription channel template
func (g *Gemini) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"interval": channelInterval,
}).Parse(subTplText)
}

// Subscribe sends a websocket message to receive data from the channel
Expand All @@ -97,19 +105,14 @@ func (g *Gemini) Unsubscribe(subs subscription.List) error {
}

func (g *Gemini) manageSubs(subs subscription.List, op wsSubOp) error {
format, err := g.GetPairFormat(asset.Spot, true)
if err != nil {
return err
}

req := wsSubscribeRequest{
Type: op,
Subscriptions: make([]wsSubscriptions, 0, len(subs)),
}
for _, s := range subs {
req.Subscriptions = append(req.Subscriptions, wsSubscriptions{
Name: s.Channel,
Symbols: s.Pairs.Format(format).Strings(),
Name: s.QualifiedChannel,
Symbols: s.Pairs.Strings(),
})
}

Expand Down Expand Up @@ -166,6 +169,7 @@ func (g *Gemini) WsAuth(ctx context.Context, dialer *websocket.Dialer) error {
if err != nil {
return fmt.Errorf("%v Websocket connection %v error. Error %v", g.Name, endpoint, err)
}
g.Websocket.Wg.Add(1)
go g.wsFunnelConnectionData(g.Websocket.AuthConn)
return nil
}
Expand Down Expand Up @@ -561,3 +565,28 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error {

return trade.AddTradesToBuffer(g.Name, trades...)
}

func channelName(s *subscription.Subscription) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return n
}
panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel))
}

func channelInterval(i kline.Interval) string {
switch i {
case kline.OneMin, kline.FiveMin, kline.FifteenMin, kline.ThirtyMin, kline.OneHour, kline.SixHour:
return i.Short()
case kline.OneDay:
return "1d"
gbjk marked this conversation as resolved.
Show resolved Hide resolved
}
panic(fmt.Errorf("%w: %s", kline.ErrUnsupportedInterval, i.Short()))
}

const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- channelName $.S -}}
{{- with $i := $.S.Interval -}} _ {{- interval $i }}{{ end -}}
{{- $.AssetSeparator }}
{{- end }}
`
3 changes: 2 additions & 1 deletion exchanges/gemini/gemini_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (g *Gemini) SetDefaults() {
Enabled: exchange.FeaturesEnabled{
AutoPairUpdates: true,
},
Subscriptions: defaultSubscriptions.Clone(),
}

g.Requester, err = request.New(g.Name,
Expand Down Expand Up @@ -145,7 +146,7 @@ func (g *Gemini) Setup(exch *config.Exchange) error {
Connector: g.WsConnect,
Subscriber: g.Subscribe,
Unsubscriber: g.Unsubscribe,
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
GenerateSubscriptions: g.generateSubscriptions,
Features: &g.Features.Supports.WebsocketCapabilities,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
ErrInvalidState = errors.New("invalid subscription state")
ErrDuplicate = errors.New("duplicate subscription")
ErrUseConstChannelName = errors.New("must use standard channel name constants")
ErrNotSupported = errors.New("subscription channel not supported")
)

// State tracks the status of a subscription channel
Expand Down
13 changes: 8 additions & 5 deletions testdata/configtest.json
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,6 @@
"websocketResponseCheckTimeout": 30000000,
"websocketResponseMaxLimit": 7000000000,
"websocketTrafficTimeout": 30000000000,
"websocketOrderbookBufferLimit": 5,
"baseCurrencies": "USD",
"currencyPairs": {
"requestFormat": {
Expand All @@ -1645,11 +1644,9 @@
"uppercase": true
},
"useGlobalFormat": true,
"assetTypes": [
"spot"
],
"pairs": {
"spot": {
"assetEnabled": true,
"enabled": "BTCUSD",
"available": "BTCUSD,ETHBTC,ETHUSD,BCHUSD,BCHBTC,BCHETH,LTCUSD,LTCBTC,LTCETH,LTCBCH,ZECUSD,ZECBTC,ZECETH,ZECBCH,ZECLTC"
}
Expand Down Expand Up @@ -1700,7 +1697,13 @@
"iban": "",
"supportedCurrencies": ""
}
]
],
"orderbook": {
"verificationBypass": false,
"websocketBufferLimit": 5,
"websocketBufferEnabled": false,
"publishPeriod": 10000000000
}
},
{
"name": "HitBTC",
Expand Down
Loading