Skip to content

Commit

Permalink
Bitfinex: Add Subscription configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Aug 13, 2024
1 parent 3e32a53 commit 282c384
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 190 deletions.
4 changes: 1 addition & 3 deletions exchanges/bitfinex/bitfinex.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ const (
bitfinexChecksumFlag = 131072
bitfinexWsSequenceFlag = 65536

// CandlesTimeframeKey configures the timeframe in subscription.Subscription.Params
CandlesTimeframeKey = "_timeframe"
// CandlesPeriodKey configures the aggregated period in subscription.Subscription.Params
// CandlesPeriodKey configures the Candles aggregated period for MarginFunding in subscription.Subscription.Params
CandlesPeriodKey = "_period"
)

Expand Down
127 changes: 78 additions & 49 deletions exchanges/bitfinex/bitfinex_test.go

Large diffs are not rendered by default.

268 changes: 131 additions & 137 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ import (
"strconv"
"strings"
"sync"
"text/template"
"time"

"github.com/Masterminds/sprig/v3"
"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/convert"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
Expand All @@ -29,6 +32,14 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.OneMin},
{Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.MarginFunding, Interval: kline.OneMin, Params: map[string]any{CandlesPeriodKey: "p30"}},
{Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.All, Levels: 100, Params: map[string]any{"prec": "R0"}},
}

var comms = make(chan stream.Response)

type checksum struct {
Expand All @@ -40,6 +51,13 @@ type checksum struct {
var checksumStore = make(map[int]*checksum)
var cMtx sync.Mutex

var subscriptionNames = map[string]string{
subscription.TickerChannel: wsTicker,
subscription.OrderbookChannel: wsBook,
subscription.CandlesChannel: wsCandles,
subscription.AllTradesChannel: wsTrades,
}

// WsConnect starts a new websocket connection
func (b *Bitfinex) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
Expand Down Expand Up @@ -524,35 +542,35 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
return nil
}

func (b *Bitfinex) handleWSChannelUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
if c == nil {
func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error {
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}

if eventType == wsChecksum {
return b.handleWSChecksum(c, d)
return b.handleWSChecksum(s, d)
}

if eventType == wsHeartbeat {
return nil
}

if len(c.Pairs) != 1 {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}

switch c.Channel {
case wsBook:
return b.handleWSBookUpdate(c, d)
case wsCandles:
return b.handleWSCandleUpdate(c, d)
case wsTicker:
return b.handleWSTickerUpdate(c, d)
case wsTrades:
return b.handleWSTradesUpdate(c, eventType, d)
switch s.Channel {
case subscription.OrderbookChannel:
return b.handleWSBookUpdate(s, d)
case subscription.CandlesChannel:
return b.handleWSCandleUpdate(s, d)
case subscription.TickerChannel:
return b.handleWSTickerUpdate(s, d)
case subscription.AllTradesChannel:
return b.handleWSTradesUpdate(s, eventType, d)
}

return fmt.Errorf("%s unhandled channel update: %s", b.Name, c.Channel)
return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel)
}

func (b *Bitfinex) handleWSChecksum(c *subscription.Subscription, d []interface{}) error {
Expand Down Expand Up @@ -1668,44 +1686,14 @@ func (b *Bitfinex) resubOrderbook(c *subscription.Subscription) error {
return nil
}

// generateSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (b *Bitfinex) generateSubscriptions() (subscription.List, error) {
var channels = []string{wsBook, wsTrades, wsTicker, wsCandles}

var subscriptions subscription.List
assets := b.GetAssetTypes(true)
for i := range assets {
if !b.IsAssetWebsocketSupported(assets[i]) {
continue
}
enabledPairs, err := b.GetEnabledPairs(assets[i])
if err != nil {
return nil, err
}

for j := range channels {
for k := range enabledPairs {
params := make(map[string]interface{})
if channels[j] == wsBook {
params["prec"] = "R0"
params["len"] = "100"
}

if channels[j] == wsCandles && assets[i] == asset.MarginFunding {
params[CandlesPeriodKey] = "30"
}

subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[j],
Pairs: currency.Pairs{enabledPairs[k]},
Params: params,
Asset: assets[i],
})
}
}
}
return b.Features.Subscriptions.ExpandTemplates(b)
}

return subscriptions, nil
// GetSubscriptionTemplate returns a subscription channel template
func (b *Bitfinex) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(sprig.FuncMap()).Funcs(template.FuncMap{"subToMap": subToMap}).Parse(subTplText)
}

// ConfigureWS to send checksums and sequence numbers
Expand All @@ -1717,26 +1705,36 @@ func (b *Bitfinex) ConfigureWS() error {
}

// Subscribe sends a websocket message to receive data from channels
func (b *Bitfinex) Subscribe(channels subscription.List) error {
return b.ParallelChanOp(channels, b.subscribeToChan, 1)
func (b *Bitfinex) Subscribe(subs subscription.List) error {
var err error
if subs, err = subs.ExpandTemplates(b); err != nil {
return err
}
return b.ParallelChanOp(subs, b.subscribeToChan, 1)
}

// Unsubscribe sends a websocket message to stop receiving data from channels
func (b *Bitfinex) Unsubscribe(channels subscription.List) error {
return b.ParallelChanOp(channels, b.unsubscribeFromChan, 1)
func (b *Bitfinex) Unsubscribe(subs subscription.List) error {
var err error
if subs, err = subs.ExpandTemplates(b); err != nil {
return err
}
return b.ParallelChanOp(subs, b.unsubscribeFromChan, 1)
}

// subscribeToChan handles a single subscription and parses the result
// on success it adds the subscription to the websocket
func (b *Bitfinex) subscribeToChan(chans subscription.List) error {
if len(chans) != 1 {
return errors.New("subscription batching limited to 1")
func (b *Bitfinex) subscribeToChan(subs subscription.List) error {
if len(subs) != 1 {
return subscription.ErrNotSinglePair
}

c := chans[0]
req, err := subscribeReq(c)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs)
s := subs[0]
req := map[string]any{
"event": "subscribe",
}
if err := json.Unmarshal([]byte(s.QualifiedChannel), &req); err != nil {
return err
}

// subId is a single round-trip identifier that provides linking sub requests to chanIDs
Expand All @@ -1746,102 +1744,39 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error {

// Add a temporary Key so we can find this Sub when we get the resp without delay or context switch
// Otherwise we might drop the first messages after the subscribed resp
c.Key = subID // Note subID string type avoids conflicts with later chanID key
if err = b.Websocket.AddSubscriptions(c); err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
s.Key = subID // Note subID string type avoids conflicts with later chanID key
if err := b.Websocket.AddSubscriptions(s); err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s", err, s.Channel, s.Pairs)
}

// Always remove the temporary subscription keyed by subID
defer func() {
_ = b.Websocket.RemoveSubscriptions(c)
_ = b.Websocket.RemoveSubscriptions(s)
}()

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs)
return fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs)
}

if err = b.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs)
wErr := fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs)
b.Websocket.DataHandler <- wErr
return wErr
}

return nil
}

// subscribeReq returns a map of request params for subscriptions
func subscribeReq(c *subscription.Subscription) (map[string]interface{}, error) {
if c == nil {
return nil, fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
if len(c.Pairs) != 1 {
return nil, subscription.ErrNotSinglePair
}
pair := c.Pairs[0]
req := map[string]interface{}{
"event": "subscribe",
"channel": c.Channel,
}

for k, v := range c.Params {
switch k {
case CandlesPeriodKey, CandlesTimeframeKey:
// Skip these internal Params
case "key", "symbol":
// Ensure user's Params aren't silently overwritten
return nil, fmt.Errorf("%s %w", k, errParamNotAllowed)
default:
req[k] = v
}
}

prefix := "t"
if c.Asset == asset.MarginFunding {
prefix = "f"
}

needsDelimiter := pair.Len() > 6

var formattedPair string
if needsDelimiter {
formattedPair = pair.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String()
} else {
formattedPair = currency.PairFormat{Uppercase: true}.Format(pair)
}

if c.Channel == wsCandles {
timeframe := "1m"
if t, ok := c.Params[CandlesTimeframeKey]; ok {
if timeframe, ok = t.(string); !ok {
return nil, common.GetTypeAssertError("string", t, "Subscription.CandlesTimeframeKey")
}
}
fundingPeriod := ""
if p, ok := c.Params[CandlesPeriodKey]; ok {
s, cOk := p.(string)
if !cOk {
return nil, common.GetTypeAssertError("string", p, "Subscription.CandlesPeriodKey")
}
fundingPeriod = ":p" + s
}
req["key"] = "trade:" + timeframe + ":" + prefix + formattedPair + fundingPeriod
} else {
req["symbol"] = prefix + formattedPair
}

return req, nil
}

// unsubscribeFromChan sends a websocket message to stop receiving data from a channel
func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error {
if len(chans) != 1 {
func (b *Bitfinex) unsubscribeFromChan(subs subscription.List) error {
if len(subs) != 1 {
return errors.New("subscription batching limited to 1")
}
c := chans[0]
chanID, ok := c.Key.(int)
s := subs[0]
chanID, ok := s.Key.(int)
if !ok {
return common.GetTypeAssertError("int", c.Key, "chanID")
return common.GetTypeAssertError("int", s.Key, "subscription.Key")
}

req := map[string]interface{}{
Expand All @@ -1855,12 +1790,12 @@ func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error {
}

if err := b.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w from ChanId: %v; %w", stream.ErrUnsubscribeFailure, chanID, err)
wErr := fmt.Errorf("%w: ChanId: %v", err, chanID)
b.Websocket.DataHandler <- wErr
return wErr
}

return b.Websocket.RemoveSubscriptions(c)
return b.Websocket.RemoveSubscriptions(s)
}

// getErrResp takes a json response string and looks for an error event type
Expand Down Expand Up @@ -2233,3 +2168,62 @@ subSort:
break
}
}

// subToMap returns a json object of request params for subscriptions
func subToMap(s *subscription.Subscription, a asset.Item, p currency.Pair) map[string]any {
c := s.Channel
if name, ok := subscriptionNames[s.Channel]; ok {
c = name
}
req := map[string]interface{}{
"channel": c,
}

var fundingPeriod string
for k, v := range s.Params {
switch k {
case CandlesPeriodKey:
if s, ok := v.(string); !ok {
panic(common.GetTypeAssertError("string", v, "subscription.CandlesPeriodKey"))
} else {
fundingPeriod = ":" + s
}
case "key", "symbol", "len":
panic(fmt.Errorf("%w: %s", errParamNotAllowed, k)) // Ensure user's Params aren't silently overwritten
default:
req[k] = v
}
}

if s.Levels != 0 {
req["len"] = s.Levels
}

prefix := "t"
if a == asset.MarginFunding {
prefix = "f"
}

pairFmt := currency.PairFormat{Uppercase: true}
if needsDelimiter := p.Len() > 6; needsDelimiter {
pairFmt.Delimiter = ":"
}
symbol := p.Format(pairFmt).String()
if c == wsCandles {
req["key"] = "trade:" + s.Interval.Short() + ":" + prefix + symbol + fundingPeriod
} else {
req["symbol"] = prefix + symbol
}

return req
}

const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs -}}
{{- subToMap $.S $asset $p | mustToJson }}
{{- $.PairSeparator }}
{{- end -}}
{{ $.AssetSeparator }}
{{- end -}}
`
Loading

0 comments on commit 282c384

Please sign in to comment.