From edfd0d0d7651395ad2116e3bf2cc36054a49b6bb Mon Sep 17 00:00:00 2001 From: altafan Date: Fri, 5 Nov 2021 14:42:13 +0100 Subject: [PATCH 1/4] Add bitfinex price feeder --- .../infrastructure/feeder/bitfinex/service.go | 300 ++++++++++++++++++ .../feeder/bitfinex/service_test.go | 101 ++++++ .../infrastructure/feeder/bitfinex/types.go | 49 +++ 3 files changed, 450 insertions(+) create mode 100644 internal/core/infrastructure/feeder/bitfinex/service.go create mode 100644 internal/core/infrastructure/feeder/bitfinex/service_test.go create mode 100644 internal/core/infrastructure/feeder/bitfinex/types.go diff --git a/internal/core/infrastructure/feeder/bitfinex/service.go b/internal/core/infrastructure/feeder/bitfinex/service.go new file mode 100644 index 0000000..f8cbab2 --- /dev/null +++ b/internal/core/infrastructure/feeder/bitfinex/service.go @@ -0,0 +1,300 @@ +package bitfinexfeeder + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/shopspring/decimal" + log "github.com/sirupsen/logrus" + "github.com/tdex-network/tdex-feeder/internal/core/ports" +) + +const ( + // BitfinexWebSocketURL is the base url to open a WebSocket connection with + // Bitfinex. + BitfinexWebSocketURL = "api-pub.bitfinex.com/ws/2" +) + +var ( + wellKnownMarkets = []ports.Market{ + market{ + baseAsset: "6f0279e9ed041c3d710a9f57d0c02928416460c4b722ae3457a11eec381c526d", + quoteAsset: "ce091c998b83c78bb71a632313ba3760f1763d9cfcffae02258ffa9865a37bd2", + ticker: "BTCUST", + }, + } +) + +type service struct { + conn *websocket.Conn + writeTicker *time.Ticker + lock *sync.RWMutex + chLock *sync.Mutex + + marketByTicker map[string]ports.Market + latestFeedsByTicker map[string]ports.PriceFeed + tickersByChanId map[int]string + feedChan chan ports.PriceFeed + quitChan chan struct{} +} + +func NewBitfinexPriceFeeder(args ...interface{}) (ports.PriceFeeder, error) { + if len(args) != 1 { + return nil, fmt.Errorf("invalid number of args") + } + + interval, ok := args[0].(int) + if !ok { + return nil, fmt.Errorf("unknown interval arg type") + } + writeTicker := time.NewTicker(time.Duration(interval) * time.Millisecond) + + return &service{ + writeTicker: writeTicker, + lock: &sync.RWMutex{}, + chLock: &sync.Mutex{}, + latestFeedsByTicker: make(map[string]ports.PriceFeed), + feedChan: make(chan ports.PriceFeed), + quitChan: make(chan struct{}, 1), + }, nil +} + +func (s *service) WellKnownMarkets() []ports.Market { + return wellKnownMarkets +} + +func (s *service) SubscribeMarkets(markets []ports.Market) error { + mktTickers := make([]string, 0, len(markets)) + mktByTicker := make(map[string]ports.Market) + for _, mkt := range markets { + mktTickers = append(mktTickers, mkt.Ticker()) + mktByTicker[mkt.Ticker()] = mkt + } + + conn, tickersByChanId, err := connectAndSubscribe(mktTickers) + if err != nil { + return err + } + + s.conn = conn + s.tickersByChanId = tickersByChanId + s.marketByTicker = mktByTicker + return nil +} + +func (s *service) Start() error { + mustReconnect, err := s.start() + for mustReconnect { + log.WithError(err).Warn("connection dropped unexpectedly. Trying to reconnect...") + + tickers := make([]string, 0, len(s.marketByTicker)) + for ticker := range s.marketByTicker { + tickers = append(tickers, ticker) + } + + conn, tickersByChanId, err := connectAndSubscribe(tickers) + if err != nil { + return err + } + s.conn = conn + s.tickersByChanId = tickersByChanId + + log.Debug("connection and subscriptions re-established. Restarting...") + mustReconnect, err = s.start() + } + + return err +} + +func (s *service) Stop() { + s.quitChan <- struct{}{} +} + +func (s *service) FeedChan() chan ports.PriceFeed { + return s.feedChan +} + +func (s *service) start() (mustReconnect bool, err error) { + defer func() { + if rec := recover(); rec != nil { + mustReconnect = true + } + }() + + go func() { + for range s.writeTicker.C { + s.writeToFeedChan() + } + }() + + for { + select { + case <-s.quitChan: + s.writeTicker.Stop() + s.closeChannels() + err = s.conn.Close() + return false, err + default: + // if for any reason, reading a message from the socket panics, we make + // sure to recover and flag that a reconnection is required. + _, message, err := s.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + panic(err) + } + } + + priceFeed := s.parseFeed(message) + if priceFeed == nil { + continue + } + + s.writePriceFeed(priceFeed.GetMarket().Ticker(), priceFeed) + } + } +} + +func (s *service) readPriceFeeds() []ports.PriceFeed { + s.lock.RLock() + defer s.lock.RUnlock() + + feeds := make([]ports.PriceFeed, 0, len(s.latestFeedsByTicker)) + for _, priceFeed := range s.latestFeedsByTicker { + feeds = append(feeds, priceFeed) + } + return feeds +} + +func (s *service) writePriceFeed(mktTicker string, priceFeed ports.PriceFeed) { + s.lock.Lock() + defer s.lock.Unlock() + s.latestFeedsByTicker[mktTicker] = priceFeed +} + +func (s *service) writeToFeedChan() { + s.chLock.Lock() + defer s.chLock.Unlock() + + priceFeeds := s.readPriceFeeds() + for _, priceFeed := range priceFeeds { + s.feedChan <- priceFeed + } +} + +func (s *service) closeChannels() { + s.chLock.Lock() + defer s.chLock.Unlock() + + close(s.feedChan) + close(s.quitChan) +} + +func (s *service) parseFeed(msg []byte) ports.PriceFeed { + var i []interface{} + if err := json.Unmarshal(msg, &i); err != nil { + return nil + } + if len(i) != 2 { + return nil + } + + c, ok := i[0].(float64) + if !ok { + return nil + } + chanId := int(c) + + ticker, ok := s.tickersByChanId[chanId] + if !ok { + return nil + } + mkt, ok := s.marketByTicker[ticker] + if !ok { + return nil + } + + ii, ok := i[1].([]interface{}) + if !ok { + return nil + } + if len(ii) < 10 { + return nil + } + + p, ok := ii[6].(float64) + if !ok { + return nil + } + + quotePrice := decimal.NewFromFloat(p) + basePrice := decimal.NewFromInt(1).Div(quotePrice) + + return &priceFeed{ + market: mkt, + price: &price{ + basePrice: basePrice.StringFixed(8), + quotePrice: quotePrice.String(), + }, + } +} + +func connectAndSubscribe( + mktTickers []string, +) (*websocket.Conn, map[int]string, error) { + url := fmt.Sprintf("wss://%s", BitfinexWebSocketURL) + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, nil, err + } + + tickersByChanID := make(map[int]string) + for _, ticker := range mktTickers { + msg := map[string]interface{}{ + "event": "subscribe", + "channel": "ticker", + "symbol": fmt.Sprintf("t%s", ticker), + } + + if err := conn.WriteJSON(msg); err != nil { + return nil, nil, fmt.Errorf("cannot subscribe to market %s: %s", ticker, err) + } + + for { + _, msg, err := conn.ReadMessage() + if err != nil { + return nil, nil, fmt.Errorf( + "cannot read response of subscribtion for market %s: %s", ticker, err, + ) + } + + chanId := parseSubscriptionResponse(msg, ticker) + if chanId == -1 { + continue + } + + tickersByChanID[chanId] = ticker + break + } + } + return conn, tickersByChanID, nil +} + +func parseSubscriptionResponse(msg []byte, ticker string) int { + m := make(map[string]interface{}) + if err := json.Unmarshal(msg, &m); err != nil { + return -1 + } + if e, ok := m["event"].(string); !ok || e != "subscribed" { + return -1 + } + if c, ok := m["channel"].(string); !ok || c != "ticker" { + return -1 + } + if t, ok := m["pair"].(string); !ok || t != ticker { + return -1 + } + return int(m["chanId"].(float64)) +} diff --git a/internal/core/infrastructure/feeder/bitfinex/service_test.go b/internal/core/infrastructure/feeder/bitfinex/service_test.go new file mode 100644 index 0000000..e8d8242 --- /dev/null +++ b/internal/core/infrastructure/feeder/bitfinex/service_test.go @@ -0,0 +1,101 @@ +package bitfinexfeeder_test + +import ( + "crypto/rand" + "encoding/hex" + "testing" + "time" + + "github.com/stretchr/testify/require" + bitfinexfeeder "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/feeder/bitfinex" + "github.com/tdex-network/tdex-feeder/internal/core/ports" +) + +var ( + interval = 1000 // 1s interval + tickers = []string{"BTCUST", "BTCEUT"} +) + +func TestService(t *testing.T) { + feederSvc, err := newTestService() + require.NoError(t, err) + + go func() { + err := feederSvc.Start() + require.NoError(t, err) + }() + + go func() { + time.Sleep(5 * time.Second) + feederSvc.Stop() + }() + + count := 0 + for priceFeed := range feederSvc.FeedChan() { + count++ + require.NotNil(t, priceFeed.GetMarket()) + require.NotNil(t, priceFeed.GetPrice()) + require.NotEmpty(t, priceFeed.GetMarket().BaseAsset()) + require.NotEmpty(t, priceFeed.GetMarket().QuoteAsset()) + require.NotEmpty(t, priceFeed.GetMarket().Ticker()) + require.NotEmpty(t, priceFeed.GetPrice().BasePrice()) + require.NotEmpty(t, priceFeed.GetPrice().QuotePrice()) + } + require.Greater(t, count, 0) +} + +func newTestService() (ports.PriceFeeder, error) { + markets := mockedMarkets(tickers) + svc, err := bitfinexfeeder.NewBitfinexPriceFeeder(interval) + if err != nil { + return nil, err + } + if err := svc.SubscribeMarkets(markets); err != nil { + return nil, err + } + return svc, nil +} + +func mockedMarkets(tickers []string) []ports.Market { + markets := make([]ports.Market, 0, len(tickers)) + for _, ticker := range tickers { + markets = append(markets, newMockedMarket(ticker)) + } + return markets +} + +type mockMarket struct { + baseAsset string + quoteAsset string + ticker string +} + +func newMockedMarket(ticker string) ports.Market { + return &mockMarket{ + baseAsset: randomHex(32), + quoteAsset: randomHex(32), + ticker: ticker, + } +} + +func (m *mockMarket) BaseAsset() string { + return m.baseAsset +} + +func (m *mockMarket) QuoteAsset() string { + return m.quoteAsset +} + +func (m *mockMarket) Ticker() string { + return m.ticker +} + +func randomHex(len int) string { + return hex.EncodeToString(randomBytes(len)) +} + +func randomBytes(len int) []byte { + b := make([]byte, len) + rand.Read(b) + return b +} diff --git a/internal/core/infrastructure/feeder/bitfinex/types.go b/internal/core/infrastructure/feeder/bitfinex/types.go new file mode 100644 index 0000000..f055df5 --- /dev/null +++ b/internal/core/infrastructure/feeder/bitfinex/types.go @@ -0,0 +1,49 @@ +package bitfinexfeeder + +import ( + "github.com/tdex-network/tdex-feeder/internal/core/ports" +) + +type price struct { + basePrice string + quotePrice string +} + +func (p *price) BasePrice() string { + return p.basePrice +} + +func (p *price) QuotePrice() string { + return p.quotePrice +} + +type priceFeed struct { + market ports.Market + price *price +} + +func (p *priceFeed) GetMarket() ports.Market { + return p.market +} + +func (p *priceFeed) GetPrice() ports.Price { + return p.price +} + +type market struct { + baseAsset string + quoteAsset string + ticker string +} + +func (m market) BaseAsset() string { + return m.baseAsset +} + +func (m market) QuoteAsset() string { + return m.quoteAsset +} + +func (m market) Ticker() string { + return m.ticker +} From 9c5dfff7d6e2091777b14f487ee03c89197cb393 Mon Sep 17 00:00:00 2001 From: altafan Date: Fri, 5 Nov 2021 14:49:01 +0100 Subject: [PATCH 2/4] Explicitly support bitfinex feeder --- cmd/feederd/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/feederd/main.go b/cmd/feederd/main.go index dac6c02..5307532 100644 --- a/cmd/feederd/main.go +++ b/cmd/feederd/main.go @@ -10,6 +10,7 @@ import ( "github.com/tdex-network/tdex-feeder/internal/config" "github.com/tdex-network/tdex-feeder/internal/core/application" grpcclient "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/client/grpc" + bitfinexfeeder "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/feeder/bitfinex" krakenfeeder "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/feeder/kraken" "github.com/tdex-network/tdex-feeder/internal/core/ports" ) @@ -26,7 +27,8 @@ func (i indexedPriceFeeders) supported() []string { var ( priceFeeders = indexedPriceFeeders{ - "kraken": krakenfeeder.NewKrakenPriceFeeder, + "kraken": krakenfeeder.NewKrakenPriceFeeder, + "bitfinex": bitfinexfeeder.NewBitfinexPriceFeeder, } ) From 36f7378b0a1f87aabcf6b01265a3a9e83acd5bcb Mon Sep 17 00:00:00 2001 From: altafan Date: Fri, 5 Nov 2021 16:55:29 +0100 Subject: [PATCH 3/4] Check for successful subscription --- .../infrastructure/feeder/bitfinex/service.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/core/infrastructure/feeder/bitfinex/service.go b/internal/core/infrastructure/feeder/bitfinex/service.go index f8cbab2..0b712c6 100644 --- a/internal/core/infrastructure/feeder/bitfinex/service.go +++ b/internal/core/infrastructure/feeder/bitfinex/service.go @@ -263,13 +263,17 @@ func connectAndSubscribe( } for { - _, msg, err := conn.ReadMessage() - if err != nil { + msg := make(map[string]interface{}) + if err := conn.ReadJSON(&msg); err != nil { return nil, nil, fmt.Errorf( "cannot read response of subscribtion for market %s: %s", ticker, err, ) } - + if msg["event"].(string) == "error" { + return nil, nil, fmt.Errorf( + "%s %s", msg["pair"].(string), msg["msg"].(string), + ) + } chanId := parseSubscriptionResponse(msg, ticker) if chanId == -1 { continue @@ -282,11 +286,7 @@ func connectAndSubscribe( return conn, tickersByChanID, nil } -func parseSubscriptionResponse(msg []byte, ticker string) int { - m := make(map[string]interface{}) - if err := json.Unmarshal(msg, &m); err != nil { - return -1 - } +func parseSubscriptionResponse(m map[string]interface{}, ticker string) int { if e, ok := m["event"].(string); !ok || e != "subscribed" { return -1 } From 19e06b400500f37aec03d500f5ebfb878dad00ed Mon Sep 17 00:00:00 2001 From: altafan Date: Mon, 8 Nov 2021 13:47:33 +0100 Subject: [PATCH 4/4] Fix parsing messages from bitfinex --- .../infrastructure/feeder/bitfinex/service.go | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/internal/core/infrastructure/feeder/bitfinex/service.go b/internal/core/infrastructure/feeder/bitfinex/service.go index 0b712c6..f227711 100644 --- a/internal/core/infrastructure/feeder/bitfinex/service.go +++ b/internal/core/infrastructure/feeder/bitfinex/service.go @@ -263,18 +263,17 @@ func connectAndSubscribe( } for { - msg := make(map[string]interface{}) - if err := conn.ReadJSON(&msg); err != nil { + _, msg, err := conn.ReadMessage() + if err != nil { return nil, nil, fmt.Errorf( "cannot read response of subscribtion for market %s: %s", ticker, err, ) } - if msg["event"].(string) == "error" { - return nil, nil, fmt.Errorf( - "%s %s", msg["pair"].(string), msg["msg"].(string), - ) + + chanId, err := parseSubscriptionResponse(msg, ticker) + if err != nil { + return nil, nil, err } - chanId := parseSubscriptionResponse(msg, ticker) if chanId == -1 { continue } @@ -286,15 +285,26 @@ func connectAndSubscribe( return conn, tickersByChanID, nil } -func parseSubscriptionResponse(m map[string]interface{}, ticker string) int { - if e, ok := m["event"].(string); !ok || e != "subscribed" { - return -1 +func parseSubscriptionResponse(msg []byte, ticker string) (int, error) { + m := make(map[string]interface{}) + if err := json.Unmarshal(msg, &m); err != nil { + return -1, nil + } + e, ok := m["event"].(string) + if !ok { + return -1, nil + } + if e == "error" { + return -1, fmt.Errorf("%s %s", m["pair"].(string), m["msg"].(string)) + } + if e != "subscribed" { + return -1, nil } if c, ok := m["channel"].(string); !ok || c != "ticker" { - return -1 + return -1, nil } if t, ok := m["pair"].(string); !ok || t != ticker { - return -1 + return -1, nil } - return int(m["chanId"].(float64)) + return int(m["chanId"].(float64)), nil }