Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor #28

Merged
merged 9 commits into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ NOTE: All entries of the JSON configuration file are mandatory if not otherwise

```
price_feeder: service where to source prices (only "kraken" available for now).
interval: the period in millisecond with which the feeder updates its target(s).
markets: list with necessary markets info.
base_asset: hex string of the hash of the market base asset.
quote_asset: hex string of the hash of the market quote asset.
Expand Down
2 changes: 1 addition & 1 deletion cmd/feederd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
)
}

priceFeeder, err := priceFeederFactory(cfg.PortableMarkets())
priceFeeder, err := priceFeederFactory(cfg.Interval, cfg.PortableMarkets())
if err != nil {
log.WithError(err).Fatal("error while initializing price feeder")
}
Expand Down
1 change: 1 addition & 0 deletions config.example.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"price_feeder": "kraken",
"interval": 1000,
"markets": [
{
"base_asset": "5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225",
Expand Down
4 changes: 4 additions & 0 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,17 @@ func (m Market) validate() error {

type Config struct {
PriceFeeder string `mapstructure:"price_feeder"`
Interval int `mapstructure:"interval"`
Markets []Market `mapstructure:"markets"`
}

func (c Config) Validate() error {
if c.PriceFeeder == "" {
return fmt.Errorf("price_feeder must not be nil")
}
if c.Interval <= 0 {
return fmt.Errorf("interval must be a positive value")
}
if len(c.Markets) <= 0 {
return fmt.Errorf("markets must not be empty")
}
Expand Down
3 changes: 2 additions & 1 deletion internal/core/application/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ func TestService(t *testing.T) {
}

func newTestService() (application.Service, error) {
interval := 1000 // 1s interval
tickers := []string{"XBT/USDT", "XBT/EUR"}
markets := mockedMarkets(tickers)

priceFeeder, err := krakenfeeder.NewKrakenPriceFeeder(markets)
priceFeeder, err := krakenfeeder.NewKrakenPriceFeeder(interval, markets)
if err != nil {
return nil, err
}
Expand Down
104 changes: 76 additions & 28 deletions internal/core/infrastructure/feeder/kraken/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package krakenfeeder
import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/shopspring/decimal"
Expand All @@ -17,23 +19,33 @@ const (
)

type service struct {
conn *websocket.Conn
conn *websocket.Conn
writeTicker *time.Ticker
lock *sync.RWMutex
chLock *sync.Mutex

marketByTicker map[string]ports.Market
lastPriceFeed ports.PriceFeed
feedChan chan ports.PriceFeed
quitChan chan struct{}
}

func NewKrakenPriceFeeder(args ...interface{}) (ports.PriceFeeder, error) {
if len(args) != 1 {
if len(args) != 2 {
return nil, fmt.Errorf("invalid number of args")
}

markets, ok := args[0].([]ports.Market)
interval, ok := args[0].(int)
if !ok {
return nil, fmt.Errorf("unknown args type")
return nil, fmt.Errorf("unknown interval arg type")
}

markets, ok := args[1].([]ports.Market)
if !ok {
return nil, fmt.Errorf("unknown marktes arg type")
}

writeTicker := time.NewTicker(time.Duration(interval) * time.Millisecond)
mktTickers := make([]string, 0, len(markets))
mktByTicker := make(map[string]ports.Market)
for _, mkt := range markets {
Expand All @@ -49,24 +61,22 @@ func NewKrakenPriceFeeder(args ...interface{}) (ports.PriceFeeder, error) {

return &service{
conn: conn,
writeTicker: writeTicker,
lock: &sync.RWMutex{},
chLock: &sync.Mutex{},
marketByTicker: mktByTicker,
feedChan: make(chan ports.PriceFeed),
quitChan: make(chan struct{}, 1),
}, nil
}

func (k *service) Start() error {
defer func() {
close(k.feedChan)
close(k.quitChan)
}()

mustReconnect, err := k.start()
func (s *service) Start() error {
mustReconnect, err := s.start()
for mustReconnect {
log.WithError(err).Warn("connection dropped unexpectedly. Trying to reconnect...")

tickers := make([]string, 0, len(k.marketByTicker))
for ticker := range k.marketByTicker {
tickers := make([]string, 0, len(s.marketByTicker))
for ticker := range s.marketByTicker {
tickers = append(tickers, ticker)
}

Expand All @@ -75,54 +85,92 @@ func (k *service) Start() error {
if err != nil {
return err
}
k.conn = conn
s.conn = conn

log.Debug("connection and subscriptions re-established. Restarting...")
mustReconnect, err = k.start()
mustReconnect, err = s.start()
}

return err
}

func (k *service) Stop() {
k.quitChan <- struct{}{}
func (s *service) Stop() {
s.quitChan <- struct{}{}
}

func (k *service) FeedChan() chan ports.PriceFeed {
return k.feedChan
func (s *service) FeedChan() chan ports.PriceFeed {
return s.feedChan
}

func (k *service) start() (mustReconnect bool, err error) {
func (s *service) start() (mustReconnect bool, err error) {
defer func() {
if rec := recover(); rec != nil {
mustReconnect = true
}
}()

go func() {
for range s.writeTicker.C {
s.writeToFeedChan()
}
}()

for {
select {
case <-k.quitChan:
err = k.conn.Close()
case <-s.quitChan:
s.writeTicker.Stop()
s.closeChannels()
err = s.conn.Close()
return false, err
default:
_, message, err := k.conn.ReadMessage()
_, message, err := s.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not great to panic here deep in code, better to lift up eventually and handle in cmd maybe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the fix for #24 ? shoudnt we connect by ourself again via code?

Copy link
Contributor Author

@altafan altafan Sep 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is specifically intended. After debugging and testing how this issue affects our service, basically what happens when Cloudflare closes the connection is that k.conn.ReadMessage() panics instead of returning the IsUnexpectedCloseError.
For this reason, we MUST recover the paniced code from the websocket pkg and signal that a reconnection attempt must be performed. In case the error is detected by this line (but I never experienced this so far) , we panic like websocket would be to preserve the behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to add some commented line to explain why things are done this way, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes then comment

}
}

priceFeed := k.parseFeed(message)
priceFeed := s.parseFeed(message)
if priceFeed == nil {
continue
}

k.feedChan <- priceFeed
s.writePriceFeed(priceFeed)
}
}
}

func (k *service) parseFeed(msg []byte) ports.PriceFeed {
func (s *service) readPriceFeed() ports.PriceFeed {
s.lock.RLock()
defer s.lock.RUnlock()
return s.lastPriceFeed
}

func (s *service) writePriceFeed(priceFeed ports.PriceFeed) {
s.lock.Lock()
defer s.lock.Unlock()
s.lastPriceFeed = priceFeed
}

func (s *service) writeToFeedChan() {
s.chLock.Lock()
defer s.chLock.Unlock()

priceFeed := s.readPriceFeed()
if priceFeed != nil {
s.feedChan <- priceFeed
}
}

func (s *service) closeChannels() {
s.chLock.Lock()
defer s.chLock.Unlock()

close(s.feedChan)
close(s.quitChan)
}

func (s *service) parseFeed(msg []byte) ports.PriceFeed {
var i []interface{}
if err := json.Unmarshal(msg, &i); err != nil {
return nil
Expand All @@ -136,7 +184,7 @@ func (k *service) parseFeed(msg []byte) ports.PriceFeed {
return nil
}

mkt, ok := k.marketByTicker[ticker]
mkt, ok := s.marketByTicker[ticker]
if !ok {
return nil
}
Expand Down Expand Up @@ -168,7 +216,7 @@ func (k *service) parseFeed(msg []byte) ports.PriceFeed {
return &priceFeed{
market: mkt,
price: &price{
basePrice: basePrice.String(),
basePrice: basePrice.StringFixed(8),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to have a float with 0.xxx here? there fore the stirng fixed shoudl be 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only 3 decimals seems not enough to me. For BTC/USD ,for example, the base price (hence 1/(BTCUSD price) expresses a BTC unit, therefore 8 decimal precision.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 decimals plus the dot and the base unit, therefore StringFixed(10) ?

quotePrice: quotePrice.String(),
},
}
Expand Down
13 changes: 8 additions & 5 deletions internal/core/infrastructure/feeder/kraken/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import (
"github.com/tdex-network/tdex-feeder/internal/core/ports"
)

func TestService(t *testing.T) {
tickers := []string{"XBT/USDT", "XBT/EUR"}
var (
interval = 1000 // 1s interval
tickers = []string{"XBT/USDT", "XBT/EUR"}
)

feederSvc, err := newTestService(tickers)
func TestService(t *testing.T) {
feederSvc, err := newTestService()
require.NoError(t, err)

go func() {
Expand All @@ -41,9 +44,9 @@ func TestService(t *testing.T) {
require.Greater(t, count, 0)
}

func newTestService(tickers []string) (ports.PriceFeeder, error) {
func newTestService() (ports.PriceFeeder, error) {
markets := mockedMarkets(tickers)
return krakenfeeder.NewKrakenPriceFeeder(markets)
return krakenfeeder.NewKrakenPriceFeeder(interval, markets)
}

func mockedMarkets(tickers []string) []ports.Market {
Expand Down