Skip to content

Commit

Permalink
Export Feeder in pkg (#12)
Browse files Browse the repository at this point in the history
* move feeder in pkg

* comments and cleaning

* remove unused deps
  • Loading branch information
louisinger authored Feb 26, 2021
1 parent 488cd2b commit 114d921
Show file tree
Hide file tree
Showing 22 changed files with 97 additions and 72 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@
build/

cmd/feederd/tdexd
cmd/feederd/config.json
cmd/feederd/config*.json

config.json
config.json
11 changes: 1 addition & 10 deletions cmd/feederd/config.test.json
Original file line number Diff line number Diff line change
@@ -1,10 +1 @@
{
"daemon_endpoint":"127.0.0.1:9000",
"kraken_ws_endpoint":"ws.kraken.com",
"markets": [
{
"base_asset":"5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225","quote_asset":"bffce3908a595436b6ab08f916fea2c9fc6a702f46b268ca354205d127f60c48","kraken_ticker":"LTC/USDT",
"interval":500
}
]
}
{"daemon_endpoint":"127.0.0.1:9000","kraken_ws_endpoint":"ws.kraken.com","markets":[{"base_asset":"5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225","quote_asset":"91b988204bfb8568d81d7c2962c5f37d6e413e5efaa36c417d245ba2237e8320","kraken_ticker":"LTC/USDT","interval":500}]}
20 changes: 10 additions & 10 deletions cmd/feederd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
daemonEndpoint = "127.0.0.1:9000"
krakenWsEndpoint = "ws.kraken.com"
// nigiriUrl = "https://nigiri.network/liquid/api"
nigiriUrl = "http://localhost:3001"
nigiriURL = "http://localhost:3001"
password = "vulpemsecret"
)

Expand All @@ -41,11 +41,11 @@ func TestFeeder(t *testing.T) {
func runDaemonAndInitConfigFile(t *testing.T) {
usdt := runDaemonAndCreateMarket(t)

configJson := adapters.ConfigJson{
configJSON := adapters.ConfigJSON{
DaemonEndpoint: daemonEndpoint,
KrakenWsEndpoint: krakenWsEndpoint,
Markets: []adapters.MarketJson{
adapters.MarketJson{
Markets: []adapters.MarketJSON{
adapters.MarketJSON{
KrakenTicker: "LTC/USDT",
BaseAsset: "5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225",
QuoteAsset: usdt,
Expand All @@ -54,7 +54,7 @@ func runDaemonAndInitConfigFile(t *testing.T) {
},
}

bytes, err := json.Marshal(configJson)
bytes, err := json.Marshal(configJSON)
if err != nil {
t.Error(err)
}
Expand All @@ -72,7 +72,7 @@ func runDaemonAndCreateMarket(t *testing.T) string {
"-d",
"-v", "tdexd:/.tdex-daemon",
"-e", "TDEX_NETWORK=regtest",
"-e", "TDEX_EXPLORER_ENDPOINT="+nigiriUrl,
"-e", "TDEX_EXPLORER_ENDPOINT="+nigiriURL,
"-e", "TDEX_FEE_ACCOUNT_BALANCE_TRESHOLD=1000",
"-e", "TDEX_BASE_ASSET=5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225",
"-e", "TDEX_LOG_LEVEL=5",
Expand Down Expand Up @@ -107,14 +107,14 @@ func runDaemonAndCreateMarket(t *testing.T) string {
t.Error(err)
}

depositMarketJson, err := runCLICommand("depositmarket", "--base_asset", "", "--quote_asset", "")
depositMarketJSON, err := runCLICommand("depositmarket", "--base_asset", "", "--quote_asset", "")
if err != nil {
t.Error(err)
}

var depositMarketResult map[string]interface{}

err = json.Unmarshal([]byte(depositMarketJson), &depositMarketResult)
err = json.Unmarshal([]byte(depositMarketJSON), &depositMarketResult)
if err != nil {
t.Error(t, err)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func fundMarketAddress(t *testing.T, address string) string {
}

func mint(address string, amount int) (string, string, error) {
url := fmt.Sprintf("%s/mint", nigiriUrl)
url := fmt.Sprintf("%s/mint", nigiriURL)
payload := map[string]interface{}{"address": address, "quantity": amount}
body, _ := json.Marshal(payload)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(body))
Expand Down Expand Up @@ -183,7 +183,7 @@ func mint(address string, amount int) (string, string, error) {
}

func faucet(address string) (string, error) {
url := fmt.Sprintf("%s/faucet", nigiriUrl)
url := fmt.Sprintf("%s/faucet", nigiriURL)
payload := map[string]string{"address": address}
body, _ := json.Marshal(payload)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(body))
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func init() {
vip.SetDefault(ConfigFilePathKey, "./config.json")
}

// GetConfigPath return the path of the config.json file
func GetConfigPath() string {
return vip.GetString(ConfigFilePathKey)
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.15

require (
github.com/aopoltorzhicky/go_kraken/websocket v0.0.10
github.com/prometheus/common v0.4.0
github.com/sirupsen/logrus v1.7.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
Expand Down
35 changes: 20 additions & 15 deletions internal/adapters/config_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,30 @@ import (
"github.com/tdex-network/tdex-feeder/internal/domain"
)

type MarketJson struct {
// MarketJSON is the struct describing the shape of market specs in config JSON file
type MarketJSON struct {
BaseAsset string `json:"base_asset"`
QuoteAsset string `json:"quote_asset"`
KrakenTicker string `json:"kraken_ticker"`
Interval int `json:"interval"`
}

type ConfigJson struct {
// ConfigJSON is the struct describing the shape of config JSON file
type ConfigJSON struct {
DaemonEndpoint string `json:"daemon_endpoint"`
KrakenWsEndpoint string `json:"kraken_ws_endpoint"`
Markets []MarketJson `json:"markets"`
Markets []MarketJSON `json:"markets"`
}

// Config is the config of the application retreived from config JSON file
type Config struct {
daemonEndpoint string
krakenWSaddress string
markets map[string]domain.Market
marketIntervals map[domain.Market]time.Duration
}

// ToFeederService transforms a Config into FeederService
func (config *Config) ToFeederService() application.FeederService {
feederSvc := application.NewFeederService(application.NewFeederServiceArgs{
KrakenWSaddress: config.krakenWSaddress,
Expand All @@ -40,8 +44,9 @@ func (config *Config) ToFeederService() application.FeederService {
return feederSvc
}

// UnmarshalJSON ...
func (config *Config) UnmarshalJSON(data []byte) error {
jsonConfig := &ConfigJson{}
jsonConfig := &ConfigJSON{}
err := json.Unmarshal(data, jsonConfig)
if err != nil {
return err
Expand All @@ -58,14 +63,14 @@ func (config *Config) UnmarshalJSON(data []byte) error {
configTickerToMarketMap := make(map[string]domain.Market)
marketIntervalsMap := make(map[domain.Market]time.Duration)

for _, marketJson := range jsonConfig.Markets {
for _, marketJSON := range jsonConfig.Markets {
market := domain.Market{
BaseAsset: marketJson.BaseAsset,
QuoteAsset: marketJson.QuoteAsset,
BaseAsset: marketJSON.BaseAsset,
QuoteAsset: marketJSON.QuoteAsset,
}

configTickerToMarketMap[marketJson.KrakenTicker] = market
marketIntervalsMap[market] = time.Duration(marketJson.Interval) * time.Millisecond
configTickerToMarketMap[marketJSON.KrakenTicker] = market
marketIntervalsMap[market] = time.Duration(marketJSON.Interval) * time.Millisecond
}

config.markets = configTickerToMarketMap
Expand All @@ -74,7 +79,7 @@ func (config *Config) UnmarshalJSON(data []byte) error {
return nil
}

func (configJson ConfigJson) validate() error {
func (configJson ConfigJSON) validate() error {
if configJson.DaemonEndpoint == "" {
return ErrDaemonEndpointIsEmpty
}
Expand All @@ -87,22 +92,22 @@ func (configJson ConfigJson) validate() error {
return ErrNeedAtLeastOneMarketToFeed
}

for _, marketJson := range configJson.Markets {
if marketJson.KrakenTicker == "" {
for _, marketJSON := range configJson.Markets {
if marketJSON.KrakenTicker == "" {
return ErrKrakenTickerIsEmpty
}

err := validateAssetString(marketJson.BaseAsset)
err := validateAssetString(marketJSON.BaseAsset)
if err != nil {
return err
}

err = validateAssetString(marketJson.QuoteAsset)
err = validateAssetString(marketJSON.QuoteAsset)
if err != nil {
return err
}

if marketJson.Interval < 0 {
if marketJSON.Interval < 0 {
return ErrIntervalIsNotPositiveNumber
}
}
Expand Down
16 changes: 12 additions & 4 deletions internal/adapters/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,25 @@ package adapters
import "errors"

var (
ErrDaemonEndpointIsEmpty = errors.New("daemon endpoint is empty")
ErrKrakenEndpointIsEmpty = errors.New("kraken websocket endpoint is empty")
ErrNeedAtLeastOneMarketToFeed = errors.New("need at least 1 market to feed")
ErrKrakenTickerIsEmpty = errors.New("krakenTicker should not be an empty string")
// ErrDaemonEndpointIsEmpty is returned if the config contains an empty tdex-daemon endpoint
ErrDaemonEndpointIsEmpty = errors.New("daemon endpoint is empty")
// ErrKrakenEndpointIsEmpty is returned if the config contains an empty kraken WS endpoint
ErrKrakenEndpointIsEmpty = errors.New("kraken websocket endpoint is empty")
// ErrNeedAtLeastOneMarketToFeed is returned if the config does not contain market to feed
ErrNeedAtLeastOneMarketToFeed = errors.New("need at least 1 market to feed")
// ErrKrakenTickerIsEmpty is returned if the ticker specified in config is an empty string
ErrKrakenTickerIsEmpty = errors.New("krakenTicker should not be an empty string")
// ErrIntervalIsNotPositiveNumber is returned if the interval is < 0
ErrIntervalIsNotPositiveNumber = errors.New("interval must be greater (or equal) than 0")
)

// ErrInvalidAssetHash is returned if the given string `asset` is not a valid asset hash string
type ErrInvalidAssetHash struct {
asset string
}

var _ error = &ErrInvalidAssetHash{}

func (e ErrInvalidAssetHash) Error() string {
return "the string '" + e.asset + "' is an invalid asset string."
}
2 changes: 2 additions & 0 deletions internal/application/feed_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/tdex-network/tdex-feeder/internal/ports"
)

// FeedService is the interface wrapping krakenWS and transform it into a domain.Feed
type FeedService interface {
Start()
Stop()
Expand All @@ -20,6 +21,7 @@ type krakenFeedService struct {
tickersToMarketMap map[string]domain.Market
}

// NewKrakenFeedService is the factory function for FeedService
func NewKrakenFeedService(
address string,
tickersToMarketMap map[string]domain.Market,
Expand Down
8 changes: 5 additions & 3 deletions internal/application/feed_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/tdex-network/tdex-feeder/internal/domain"
"github.com/tdex-network/tdex-feeder/pkg/feeder"
"github.com/tdex-network/tdex-feeder/pkg/testutils"
)

const (
Expand All @@ -30,8 +32,8 @@ func TestKrakenFeedService(t *testing.T) {
defer svc.Stop()

feed := svc.GetFeed()
target := &mockTarget{marketPrices: []domain.MarketPrice{}}
feeder := NewTdexFeeder([]domain.Feed{feed}, []domain.Target{target})
target := &testutils.MockTarget{MarketPrices: []domain.MarketPrice{}}
feeder := feeder.NewFeeder([]domain.Feed{feed}, []domain.Target{target})
go func() {
err := feeder.Start()
if err != nil {
Expand All @@ -42,5 +44,5 @@ func TestKrakenFeedService(t *testing.T) {
time.Sleep(10 * time.Second)
feeder.Stop()

assert.Equal(t, true, len(target.marketPrices) > 0)
assert.Equal(t, true, len(target.MarketPrices) > 0)
}
8 changes: 6 additions & 2 deletions internal/application/feeder_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,30 @@ import (

log "github.com/sirupsen/logrus"
"github.com/tdex-network/tdex-feeder/internal/domain"
"github.com/tdex-network/tdex-feeder/pkg/feeder"
)

// FeederService is a tdex-configured feeder. It takes data from KrakenWS endpoint and update a tdex daemon
type FeederService interface {
Start() error
Stop() error
}

type feederService struct {
tdexFeeder TdexFeeder
tdexFeeder feeder.Service
krakenService FeedService
target *TdexDaemonTarget
}

// NewFeederServiceArgs is a wrapper for NewFeederService arguments
type NewFeederServiceArgs struct {
OperatorEndpoint string
MarketToInterval map[domain.Market]time.Duration
KrakenWSaddress string
TickerToMarket map[string]domain.Market
}

// NewFeederService is the factory function for the FeederService
func NewFeederService(args NewFeederServiceArgs) FeederService {
target := NewTdexDaemonTarget(args.OperatorEndpoint, args.MarketToInterval)

Expand All @@ -33,7 +37,7 @@ func NewFeederService(args NewFeederServiceArgs) FeederService {
log.Fatal(err)
}

feeder := NewTdexFeeder(
feeder := feeder.NewFeeder(
[]domain.Feed{krakenFeedService.GetFeed()},
[]domain.Target{target},
)
Expand Down
11 changes: 0 additions & 11 deletions internal/application/test_utils.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/application/updater_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/tdex-network/tdex-feeder/internal/ports"
)

// Implements the domain.Target interface and manage interval for each market
// TdexDaemonTarget implements the domain.Target interface and manage interval for each market
type TdexDaemonTarget struct {
Endpoint string
priceUpdater ports.TdexDaemonPriceUpdater
Expand Down
1 change: 1 addition & 0 deletions internal/domain/feed.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package domain

// MarketPrice represents a new price associated with a given market
type MarketPrice struct {
Market Market
Price Price
Expand Down
1 change: 1 addition & 0 deletions internal/domain/market.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package domain

// Market is represented by his base asset & his quote asset
type Market struct {
BaseAsset string
QuoteAsset string
Expand Down
1 change: 1 addition & 0 deletions internal/domain/price.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package domain

// Price represents a price for a market
type Price struct {
BasePrice float32
QuotePrice float32
Expand Down
1 change: 1 addition & 0 deletions internal/domain/target.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package domain

// Target interface is used to push new prices fetched from feeds
type Target interface {
Push(marketPrice MarketPrice)
}
2 changes: 2 additions & 0 deletions internal/ports/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus"
)

// KrakenWebSocket is the interface to manage kraken web socket streams
type KrakenWebSocket interface {
Connect(address string, tickersToSubscribe []string) error
StartListen() (chan TickerWithPrice, error)
Expand All @@ -18,6 +19,7 @@ type krakenWebSocket struct {
tickerWithPriceChan chan TickerWithPrice
}

// NewKrakenWebSocket is a factory function for KrakenWebSocket interface
func NewKrakenWebSocket() KrakenWebSocket {
return &krakenWebSocket{
krakenWS: ws.New(),
Expand Down
Loading

0 comments on commit 114d921

Please sign in to comment.