From 4cc26dc7d04b9ca6d0c4b59fae18d2ec3df264ab Mon Sep 17 00:00:00 2001 From: Pietralberto Mazza Date: Mon, 8 Nov 2021 15:41:54 +0100 Subject: [PATCH] Coinbase price feeder (#41) * Add coinbase price feeder * Expictly support coinbase feeder * Check for successful subscription --- cmd/feederd/main.go | 2 + .../infrastructure/feeder/coinbase/service.go | 270 ++++++++++++++++++ .../feeder/coinbase/service_test.go | 101 +++++++ .../infrastructure/feeder/coinbase/types.go | 49 ++++ 4 files changed, 422 insertions(+) create mode 100644 internal/core/infrastructure/feeder/coinbase/service.go create mode 100644 internal/core/infrastructure/feeder/coinbase/service_test.go create mode 100644 internal/core/infrastructure/feeder/coinbase/types.go diff --git a/cmd/feederd/main.go b/cmd/feederd/main.go index 5307532..936e956 100644 --- a/cmd/feederd/main.go +++ b/cmd/feederd/main.go @@ -11,6 +11,7 @@ import ( "github.com/tdex-network/tdex-feeder/internal/core/application" grpcclient "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/client/grpc" bitfinexfeeder "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/feeder/bitfinex" + coinbasefeeder "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/feeder/coinbase" krakenfeeder "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/feeder/kraken" "github.com/tdex-network/tdex-feeder/internal/core/ports" ) @@ -28,6 +29,7 @@ func (i indexedPriceFeeders) supported() []string { var ( priceFeeders = indexedPriceFeeders{ "kraken": krakenfeeder.NewKrakenPriceFeeder, + "coinbase": coinbasefeeder.NewCoinbasePriceFeeder, "bitfinex": bitfinexfeeder.NewBitfinexPriceFeeder, } ) diff --git a/internal/core/infrastructure/feeder/coinbase/service.go b/internal/core/infrastructure/feeder/coinbase/service.go new file mode 100644 index 0000000..da90e5c --- /dev/null +++ b/internal/core/infrastructure/feeder/coinbase/service.go @@ -0,0 +1,270 @@ +package coinbasefeeder + +import ( + "fmt" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/shopspring/decimal" + log "github.com/sirupsen/logrus" + "github.com/tdex-network/tdex-feeder/internal/core/ports" +) + +const ( + // CoinbaseWebSocketURL is the base url to open a WebSocket connection with + // Coinbase. + CoinbaseWebSocketURL = "ws-feed.exchange.coinbase.com" +) + +var ( + wellKnownMarkets = []ports.Market{ + market{ + baseAsset: "6f0279e9ed041c3d710a9f57d0c02928416460c4b722ae3457a11eec381c526d", + quoteAsset: "ce091c998b83c78bb71a632313ba3760f1763d9cfcffae02258ffa9865a37bd2", + ticker: "BTC-USDT", + }, + } +) + +type service struct { + conn *websocket.Conn + writeTicker *time.Ticker + lock *sync.RWMutex + chLock *sync.Mutex + + marketByTicker map[string]ports.Market + latestFeedsByTicker map[string]ports.PriceFeed + feedChan chan ports.PriceFeed + quitChan chan struct{} +} + +func NewCoinbasePriceFeeder(args ...interface{}) (ports.PriceFeeder, error) { + if len(args) != 1 { + return nil, fmt.Errorf("invalid number of args") + } + + interval, ok := args[0].(int) + if !ok { + return nil, fmt.Errorf("unknown interval arg type") + } + writeTicker := time.NewTicker(time.Duration(interval) * time.Millisecond) + + return &service{ + writeTicker: writeTicker, + lock: &sync.RWMutex{}, + chLock: &sync.Mutex{}, + latestFeedsByTicker: make(map[string]ports.PriceFeed), + feedChan: make(chan ports.PriceFeed), + quitChan: make(chan struct{}, 1), + }, nil +} + +func (s *service) WellKnownMarkets() []ports.Market { + return wellKnownMarkets +} + +func (s *service) SubscribeMarkets(markets []ports.Market) error { + mktTickers := make([]string, 0, len(markets)) + mktByTicker := make(map[string]ports.Market) + for _, mkt := range markets { + mktTickers = append(mktTickers, mkt.Ticker()) + mktByTicker[mkt.Ticker()] = mkt + } + + conn, err := connectAndSubscribe(mktTickers) + if err != nil { + return err + } + + s.conn = conn + s.marketByTicker = mktByTicker + return nil +} + +func (s *service) Start() error { + mustReconnect, err := s.start() + for mustReconnect { + log.WithError(err).Warn("connection dropped unexpectedly. Trying to reconnect...") + + tickers := make([]string, 0, len(s.marketByTicker)) + for ticker := range s.marketByTicker { + tickers = append(tickers, ticker) + } + + var conn *websocket.Conn + conn, err = connectAndSubscribe(tickers) + if err != nil { + return err + } + s.conn = conn + + log.Debug("connection and subscriptions re-established. Restarting...") + mustReconnect, err = s.start() + } + + return err +} + +func (s *service) Stop() { + s.quitChan <- struct{}{} +} + +func (s *service) FeedChan() chan ports.PriceFeed { + return s.feedChan +} + +func (s *service) start() (mustReconnect bool, err error) { + defer func() { + if rec := recover(); rec != nil { + mustReconnect = true + } + }() + + go func() { + for range s.writeTicker.C { + s.writeToFeedChan() + } + }() + + for { + select { + case <-s.quitChan: + s.writeTicker.Stop() + s.closeChannels() + err = s.conn.Close() + return false, err + default: + // If for any reason, reading a message from the socket panics, we make + // sure to recover and flag that a reconnection is required. + msg := make(map[string]interface{}) + if err := s.conn.ReadJSON(&msg); err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + panic(err) + } + log.WithError(err).Warn("could not read message from socket") + continue + } + + priceFeed := s.parseFeed(msg) + if priceFeed == nil { + continue + } + + s.writePriceFeed(priceFeed.GetMarket().Ticker(), priceFeed) + } + } +} + +func (s *service) readPriceFeeds() []ports.PriceFeed { + s.lock.RLock() + defer s.lock.RUnlock() + + feeds := make([]ports.PriceFeed, 0, len(s.latestFeedsByTicker)) + for _, priceFeed := range s.latestFeedsByTicker { + feeds = append(feeds, priceFeed) + } + return feeds +} + +func (s *service) writePriceFeed(mktTicker string, priceFeed ports.PriceFeed) { + s.lock.Lock() + defer s.lock.Unlock() + s.latestFeedsByTicker[mktTicker] = priceFeed +} + +func (s *service) writeToFeedChan() { + s.chLock.Lock() + defer s.chLock.Unlock() + + priceFeeds := s.readPriceFeeds() + for _, priceFeed := range priceFeeds { + s.feedChan <- priceFeed + } +} + +func (s *service) closeChannels() { + s.chLock.Lock() + defer s.chLock.Unlock() + + close(s.feedChan) + close(s.quitChan) +} + +func (s *service) parseFeed(msg map[string]interface{}) ports.PriceFeed { + if _, ok := msg["type"]; !ok { + return nil + } + if e, ok := msg["type"].(string); !ok || e != "ticker" { + return nil + } + if _, ok := msg["product_id"]; !ok { + return nil + } + ticker, ok := msg["product_id"].(string) + if !ok { + return nil + } + if _, ok := msg["price"]; !ok { + return nil + } + priceStr, ok := msg["price"].(string) + if !ok { + return nil + } + + quotePrice, err := decimal.NewFromString(priceStr) + if err != nil { + return nil + } + basePrice := decimal.NewFromInt(1).Div(quotePrice) + mkt := s.marketByTicker[ticker] + + return &priceFeed{ + market: mkt, + price: &price{ + basePrice: basePrice.StringFixed(8), + quotePrice: quotePrice.String(), + }, + } +} + +func connectAndSubscribe(mktTickers []string) (*websocket.Conn, error) { + url := fmt.Sprintf("wss://%s", CoinbaseWebSocketURL) + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + + msg := map[string]interface{}{ + "type": "subscribe", + "product_ids": mktTickers, + "channels": []string{ + "heartbeat", "ticker", + }, + } + + if err := conn.WriteJSON(msg); err != nil { + return nil, fmt.Errorf("cannot subscribe to given markets: %s", err) + } + + for { + msg := make(map[string]interface{}) + if err := conn.ReadJSON(&msg); err != nil { + return nil, fmt.Errorf( + "cannot read response of subscription to markets: %s", err, + ) + } + + msgType := msg["type"].(string) + if msgType == "error" { + return nil, fmt.Errorf(msg["reason"].(string)) + } + + if msgType == "subscriptions" { + break + } + } + + return conn, nil +} diff --git a/internal/core/infrastructure/feeder/coinbase/service_test.go b/internal/core/infrastructure/feeder/coinbase/service_test.go new file mode 100644 index 0000000..c11079f --- /dev/null +++ b/internal/core/infrastructure/feeder/coinbase/service_test.go @@ -0,0 +1,101 @@ +package coinbasefeeder_test + +import ( + "crypto/rand" + "encoding/hex" + "testing" + "time" + + "github.com/stretchr/testify/require" + coinbasefeeder "github.com/tdex-network/tdex-feeder/internal/core/infrastructure/feeder/coinbase" + "github.com/tdex-network/tdex-feeder/internal/core/ports" +) + +var ( + interval = 1000 // 1s interval + tickers = []string{"BTC-USD", "BTC-EUR"} +) + +func TestService(t *testing.T) { + feederSvc, err := newTestService() + require.NoError(t, err) + + go func() { + err := feederSvc.Start() + require.NoError(t, err) + }() + + go func() { + time.Sleep(5 * time.Second) + feederSvc.Stop() + }() + + count := 0 + for priceFeed := range feederSvc.FeedChan() { + count++ + require.NotNil(t, priceFeed.GetMarket()) + require.NotNil(t, priceFeed.GetPrice()) + require.NotEmpty(t, priceFeed.GetMarket().BaseAsset()) + require.NotEmpty(t, priceFeed.GetMarket().QuoteAsset()) + require.NotEmpty(t, priceFeed.GetMarket().Ticker()) + require.NotEmpty(t, priceFeed.GetPrice().BasePrice()) + require.NotEmpty(t, priceFeed.GetPrice().QuotePrice()) + } + require.Greater(t, count, 0) +} + +func newTestService() (ports.PriceFeeder, error) { + markets := mockedMarkets(tickers) + svc, err := coinbasefeeder.NewCoinbasePriceFeeder(interval) + if err != nil { + return nil, err + } + if err := svc.SubscribeMarkets(markets); err != nil { + return nil, err + } + return svc, nil +} + +func mockedMarkets(tickers []string) []ports.Market { + markets := make([]ports.Market, 0, len(tickers)) + for _, ticker := range tickers { + markets = append(markets, newMockedMarket(ticker)) + } + return markets +} + +type mockMarket struct { + baseAsset string + quoteAsset string + ticker string +} + +func newMockedMarket(ticker string) ports.Market { + return &mockMarket{ + baseAsset: randomHex(32), + quoteAsset: randomHex(32), + ticker: ticker, + } +} + +func (m *mockMarket) BaseAsset() string { + return m.baseAsset +} + +func (m *mockMarket) QuoteAsset() string { + return m.quoteAsset +} + +func (m *mockMarket) Ticker() string { + return m.ticker +} + +func randomHex(len int) string { + return hex.EncodeToString(randomBytes(len)) +} + +func randomBytes(len int) []byte { + b := make([]byte, len) + rand.Read(b) + return b +} diff --git a/internal/core/infrastructure/feeder/coinbase/types.go b/internal/core/infrastructure/feeder/coinbase/types.go new file mode 100644 index 0000000..e05a8af --- /dev/null +++ b/internal/core/infrastructure/feeder/coinbase/types.go @@ -0,0 +1,49 @@ +package coinbasefeeder + +import ( + "github.com/tdex-network/tdex-feeder/internal/core/ports" +) + +type price struct { + basePrice string + quotePrice string +} + +func (p *price) BasePrice() string { + return p.basePrice +} + +func (p *price) QuotePrice() string { + return p.quotePrice +} + +type priceFeed struct { + market ports.Market + price *price +} + +func (p *priceFeed) GetMarket() ports.Market { + return p.market +} + +func (p *priceFeed) GetPrice() ports.Price { + return p.price +} + +type market struct { + baseAsset string + quoteAsset string + ticker string +} + +func (m market) BaseAsset() string { + return m.baseAsset +} + +func (m market) QuoteAsset() string { + return m.quoteAsset +} + +func (m market) Ticker() string { + return m.ticker +}