diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..71f6082 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +FROM golang:1.15.5-buster AS builder + +WORKDIR /tdex-feeder + +COPY go.mod . +COPY go.sum . +RUN go mod download + +COPY . . + +RUN go build -o feederd-linux cmd/feederd/main.go + +WORKDIR /build + +RUN cp /tdex-feeder/feederd-linux . + +FROM debian:buster + +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates + +COPY --from=builder /build/ / + +CMD ["/feederd-linux","-debug","-conf=./data/config.json"] \ No newline at end of file diff --git a/Makefile b/Makefile index b33da9d..2099678 100644 --- a/Makefile +++ b/Makefile @@ -41,11 +41,11 @@ help: ## run-linux: Run locally with default configuration run-linux: clean build-linux - ./build/tdexd-linux-amd64 + ./build/feederd-linux-amd64 ## run-mac: Run locally with default configuration run-mac: clean build-mac - ./build/tdexd-darwin-amd64 + ./build/feederd-darwin-amd64 ## vet: code analysis vet: diff --git a/README.md b/README.md index 6be80c4..26bec14 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,96 @@ # tdex-feeder + Feeder allows to connect an external price feed to the TDex Daemon to determine the current market price -## ⬇️ Install +## Overview + +tdex-feeder connects to exchanges and retrieves market prices in order to consume the gRPC +interface exposed from tdex-deamon `UpdateMarketPrice`. + +## ⬇️ Run Standalone + +### Install + +1. [Download the latest release for MacOS or Linux](https://github.com/tdex-network/tdex-feeder/releases) + +2. Move the feeder into a folder in your PATH (eg. `/usr/local/bin`) and rename the feeder as `feederd` -TBD +3. Give executable permissions. (eg. `chmod a+x /usr/local/bin/feederd`) -## 📄 Usage +4. Create [config.json](#config-file) file. -In-depth documentation for using the tdex-feeder is available at [docs.tdex.network](https://docs.tdex.network/tdex-feeder.html) +### Run +```sh +# Run with default config and default flags. +$ feederd +# Run with debug mode on. +$ feederd -debug +# Run with debug mode and different config path. +$ feederd -debug -conf=./config.json +``` ## 🖥 Local Development Below is a list of commands you will probably find useful. -TBD \ No newline at end of file +### Build and Run with docker + +Build and use `feederd` with docker. + +#### Build feederd docker image + +At the root of the repository +``` +docker build -t tdex-feederd . +``` + +#### Run the daemon + +Create a [config.json](#config-file) file +and run the following command in the same folder: +``` +docker run -it -d --net=host -v $PWD/config.json:/data/config.json tdex-feederd +``` +`--net=host` in case you're running tdex-deamon locally + +### Build it yourself + +Builds feeder as static binary and runs the project with default configuration. + +#### Linux + +`make build-linux` + +#### Mac + +`make build-mac` + +#### Run Linux + +`make run-linux` + +##### Flags + +``` +-conf: Configuration File Path. Default: "./config.json" +-debug: Log Debug Informations Default: false +``` + +##### Config file + +Rename the file `./config.example.json` into `./config.json` +and adapt if for your specific purpose. The default example +connects to kraken socket and to a local instance of tdex-deamon. + +``` +daemon_endpoint: String with the address and port of gRPC host. Required. +daemon_macaroon: String with the daemon_macaroon necessary for authentication. +kraken_ws_endpoint: String with the address and port of kraken socket. Required. +markets: Json List with necessary markets informations. Required. +base_asset: String of the Hash of the base asset for gRPC request. Required. +quote_asset: String of the Hash of the quote asset for gRPC request. Required. +kraken_ticker: String with the ticker we want kraken to provide informations on. Required. +interval: Int with the time in secods between gRPC requests. Required. +``` \ No newline at end of file diff --git a/cmd/feederd/main.go b/cmd/feederd/main.go index 7905807..f1f0789 100644 --- a/cmd/feederd/main.go +++ b/cmd/feederd/main.go @@ -1,5 +1,114 @@ +// Copyright (c) 2020 The VulpemVentures developers + +// Feeder allows to connect an external price feed to the TDex Daemon to determine the current market price. package main +import ( + "flag" + "os" + "os/signal" + "time" + + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + + "github.com/gorilla/websocket" + "github.com/tdex-network/tdex-feeder/config" + "github.com/tdex-network/tdex-feeder/pkg/conn" + "github.com/tdex-network/tdex-feeder/pkg/marketinfo" + + pboperator "github.com/tdex-network/tdex-protobuf/generated/go/operator" +) + +const ( + defaultConfigPath = "./config.json" +) + func main() { + interrupt, cSocket, marketsInfos, conngRPC := setup() + infiniteLoops(interrupt, cSocket, marketsInfos, conngRPC) +} + +func setup() (chan os.Signal, *websocket.Conn, []marketinfo.MarketInfo, *grpc.ClientConn) { + conf := checkFlags() + + // Interrupt Notification. + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + // Dials the connection the the Socket. + cSocket, err := conn.ConnectToSocket(conf.KrakenWsEndpoint) + if err != nil { + log.Fatal("Socket Connection Error: ", err) + } + marketsInfos := loadMarkets(conf, cSocket) + + // Set up the connection to the gRPC server. + conngRPC, err := conn.ConnectTogRPC(conf.DaemonEndpoint) + if err != nil { + log.Fatal("gRPC Connection Error: ", err) + } + return interrupt, cSocket, marketsInfos, conngRPC +} + +// Checks for command line flags for Config Path and Debug mode. +// Loads flags as required. +func checkFlags() config.Config { + confFlag := flag.String("conf", defaultConfigPath, "Configuration File Path") + debugFlag := flag.Bool("debug", false, "Log Debug Informations") + flag.Parse() + if *debugFlag == true { + log.SetLevel(log.DebugLevel) + } + // Loads Config File. + conf, err := config.LoadConfig(*confFlag) + if err != nil { + log.Fatal(err) + } + return conf +} + +// Loads Config Markets infos into Data Structure and Subscribes to +// Messages from this Markets. +func loadMarkets(conf config.Config, cSocket *websocket.Conn) []marketinfo.MarketInfo { + numberOfMarkets := len(conf.Markets) + marketsInfos := make([]marketinfo.MarketInfo, numberOfMarkets) + for i, marketConfig := range conf.Markets { + marketsInfos[i] = marketinfo.InitialMarketInfo(marketConfig) + m := conn.CreateSubscribeToMarketMessage(marketConfig.KrakenTicker) + err := conn.SendRequestMessage(cSocket, m) + if err != nil { + log.Fatal("Couldn't send request message: ", err) + } + } + return marketsInfos +} + +func infiniteLoops(interrupt chan os.Signal, cSocket *websocket.Conn, marketsInfos []marketinfo.MarketInfo, conngRPC *grpc.ClientConn) { + defer cSocket.Close() + defer conngRPC.Close() + clientgRPC := pboperator.NewOperatorClient(conngRPC) + done := make(chan string) + // Handles Messages from subscriptions. Will periodically call the + // gRPC UpdateMarketPrice with the price info from the messages. + go conn.HandleMessages(done, cSocket, marketsInfos, clientgRPC) + checkInterrupt(interrupt, cSocket, done) +} +// Loop to keep cycle alive. Waits Interrupt to close the connection. +func checkInterrupt(interrupt chan os.Signal, cSocket *websocket.Conn, done chan string) { + for { + for range interrupt { + log.Println("Shutting down Feeder") + err := cSocket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Fatal("write close:", err) + } + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } } diff --git a/config.example.json b/config.example.json new file mode 100644 index 0000000..ddb7d33 --- /dev/null +++ b/config.example.json @@ -0,0 +1,19 @@ +{ + "daemon_endpoint": "localhost:9000", + "daemon_macaroon": "string", + "kraken_ws_endpoint": "ws.kraken.com", + "markets": [ + { + "base_asset": "5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225", + "quote_asset":"d73f5cd0954c1bf325f85d7a7ff43a6eb3ea3b516fd57064b85306d43bc1c9ff", + "kraken_ticker": "XBT/USD", + "interval": 10 + }, + { + "base_asset": "5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225", + "quote_asset":"d090c403610fe8a9e31967355929833bc8a8fe08429e630162d1ecbf29fdf28b", + "kraken_ticker": "XBT/EUR", + "interval": 15 + } + ] +} \ No newline at end of file diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..01adcf7 --- /dev/null +++ b/config/config.go @@ -0,0 +1,106 @@ +package config + +import ( + "encoding/json" + "errors" + "io/ioutil" + "os" + "reflect" + "strings" + + log "github.com/sirupsen/logrus" +) + +const ( + defaultDaemonEndpoint = "localhost:9000" + defaultKrakenWsEndpoint = "ws.kraken.com" + defaultBaseAsset = "5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225" + defaultQuoteAsset = "d73f5cd0954c1bf325f85d7a7ff43a6eb3ea3b516fd57064b85306d43bc1c9ff" + defaultKrakenTicker = "XBT/USD" + defaultInterval = 10 +) + +type Config struct { + DaemonEndpoint string `json:"daemon_endpoint,required"` + DaemonMacaroon string `json:"daemon_macaroon"` + KrakenWsEndpoint string `json:"kraken_ws_endpoint,required"` + Markets []Market `json:"markets,required"` +} + +// DefaultConfig returns the datastructure needed +// for a default connection. +func defaultConfig() Config { + return Config{ + DaemonEndpoint: defaultDaemonEndpoint, + KrakenWsEndpoint: defaultKrakenWsEndpoint, + Markets: []Market{ + { + BaseAsset: defaultBaseAsset, + QuoteAsset: defaultQuoteAsset, + KrakenTicker: defaultKrakenTicker, + Interval: defaultInterval, + }, + }, + } +} + +// LoadConfigFromFile reads a file with the intended running behaviour +// and returns a Config struct with the respective configurations. +func loadConfigFromFile(filePath string) (Config, error) { + jsonFile, err := os.Open(filePath) + if err != nil { + return Config{}, err + } + defer jsonFile.Close() + + var config Config + + byteValue, err := ioutil.ReadAll(jsonFile) + if err != nil { + return Config{}, err + } + err = json.Unmarshal(byteValue, &config) + if err != nil { + return Config{}, err + } + err = checkConfigParsing(config) + if err != nil { + return Config{}, err + } + + return config, nil +} + +// checkConfigParsing checks if all the required fields +// were correctly loaded into the Config struct. +func checkConfigParsing(config Config) error { + fields := reflect.ValueOf(config) + for i := 0; i < fields.NumField(); i++ { + tags := fields.Type().Field(i).Tag + if strings.Contains(string(tags), "required") && fields.Field(i).IsZero() { + return errors.New("Config required field is missing: " + string(tags)) + } + } + for _, market := range config.Markets { + fields := reflect.ValueOf(market) + for i := 0; i < fields.NumField(); i++ { + tags := fields.Type().Field(i).Tag + if strings.Contains(string(tags), "required") && fields.Field(i).IsZero() { + return errors.New("Config required field is missing: " + string(tags)) + } + } + } + return nil +} + +// LoadConfig handles the default behaviour for loading +// config.json files. In case the file is not found, +// it loads the default config. +func LoadConfig(filePath string) (Config, error) { + _, err := os.Stat(filePath) + if os.IsNotExist(err) { + log.Printf("File not found: %s. Loading default config.\n", filePath) + return defaultConfig(), nil + } + return loadConfigFromFile(filePath) +} diff --git a/config/types.go b/config/types.go new file mode 100644 index 0000000..246fee2 --- /dev/null +++ b/config/types.go @@ -0,0 +1,8 @@ +package config + +type Market struct { + BaseAsset string `json:"base_asset,required"` + QuoteAsset string `json:"quote_asset,required"` + KrakenTicker string `json:"kraken_ticker,required"` + Interval int `json:"interval,required"` +} diff --git a/go.mod b/go.mod index b49a282..0fcc774 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,11 @@ module github.com/tdex-network/tdex-feeder go 1.15 + +require ( + github.com/gorilla/websocket v1.4.2 + github.com/sirupsen/logrus v1.7.0 + github.com/stretchr/testify v1.2.2 + github.com/tdex-network/tdex-protobuf v0.0.0-20201029153650-f5164a4b6a77 + google.golang.org/grpc v1.33.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6009be7 --- /dev/null +++ b/go.sum @@ -0,0 +1,90 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/tdex-network/tdex-protobuf v0.0.0-20201029153650-f5164a4b6a77 h1:njOphJsTU5qCJROyzw4ReARDbucTCbfj5t7Mgnnt9u0= +github.com/tdex-network/tdex-protobuf v0.0.0-20201029153650-f5164a4b6a77/go.mod h1:tVWv01BSMH/neJOsixdDFU5T0ll0OZbPliLXBsYQjA8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2 h1:EQyQC3sa8M+p6Ulc8yy9SWSS2GVwyRc83gAbG8lrl4o= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/conn/grpc.go b/pkg/conn/grpc.go new file mode 100644 index 0000000..4e2e15f --- /dev/null +++ b/pkg/conn/grpc.go @@ -0,0 +1,46 @@ +package conn + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/tdex-network/tdex-feeder/pkg/marketinfo" + pboperator "github.com/tdex-network/tdex-protobuf/generated/go/operator" + pbtypes "github.com/tdex-network/tdex-protobuf/generated/go/types" + "google.golang.org/grpc" +) + +const ( + timeout = 3 +) + +// ConnectTogRPC dials and returns a new client connection to a remote host +func ConnectTogRPC(daemonEndpoint string) (*grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*timeout) + defer cancel() + conn, err := grpc.DialContext(ctx, daemonEndpoint, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + return conn, err + } + log.Println("Connected to gRPC:", daemonEndpoint) + return conn, nil +} + +func UpdateMarketPricegRPC(marketInfo marketinfo.MarketInfo, clientgRPC pboperator.OperatorClient) { + if marketInfo.Price == 0.00 { + log.Println("Can't send gRPC request with no price") + return + } + log.Println("Sending gRPC request:", marketInfo.Config.KrakenTicker, marketInfo.Price) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := clientgRPC.UpdateMarketPrice(ctx, &pboperator.UpdateMarketPriceRequest{ + Market: &pbtypes.Market{BaseAsset: marketInfo.Config.BaseAsset, QuoteAsset: marketInfo.Config.QuoteAsset}, + Price: &pbtypes.Price{BasePrice: 1 / float32(marketInfo.Price), QuotePrice: float32(marketInfo.Price)}}) + if err != nil { + log.Println(err) + return + } +} diff --git a/pkg/conn/messages.go b/pkg/conn/messages.go new file mode 100644 index 0000000..308701b --- /dev/null +++ b/pkg/conn/messages.go @@ -0,0 +1,115 @@ +package conn + +import ( + "encoding/json" + "strconv" + "time" + + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" + + "github.com/tdex-network/tdex-feeder/pkg/marketinfo" + "github.com/tdex-network/tdex-protobuf/generated/go/operator" +) + +const ( + nanoToSeconds = 1000000000 +) + +// RequestMessage is the data structure used to create +// jsons in order subscribe to market updates on Kraken +type RequestMessage struct { + Event string `json:"event"` + Pair []string `json:"pair,omitempty"` + Subscription *Subscription `json:"subscription,omitempty"` + Reqid int `json:"reqid,omitempty"` +} + +type Subscription struct { + Name string `json:"name"` + Interval int `json:"interval,omitempty"` + Token string `json:"token,omitempty"` + Depth int `json:"depth,omitempty"` + Snapshop bool `json:"snapshot,omitempty"` +} + +// CreatePingMessage returns a RequestMessage struct +// with a ping Event. +func CreatePingMessage() RequestMessage { + return RequestMessage{Event: "ping"} +} + +// CreateSubscribeToMarketMessage gets a string with a market pair and returns +// a RequestMessage struct with instructions to subscrive to that market pair ticker. +func CreateSubscribeToMarketMessage(marketpair string) RequestMessage { + s := Subscription{Name: "ticker"} + return RequestMessage{"subscribe", []string{marketpair}, &s, 0} +} + +// SendRequestMessage gets a socket connection and a RequestMessage struct, +// marshalls the struct and sends the message using the socket. +func SendRequestMessage(c *websocket.Conn, m RequestMessage) error { + b, err := json.Marshal(m) + if err != nil { + return err + } + err = c.WriteMessage(websocket.TextMessage, []byte(b)) + if err != nil { + return err + } + return nil +} + +// HandleMessages is responsible for the perpetual loop of receiving messages +// from subscriptions, retrieving the price from them and send the gRPC request +// to update the market price in the predeterminated interval. +func HandleMessages(done chan string, cSocket *websocket.Conn, marketsInfos []marketinfo.MarketInfo, clientgRPC operator.OperatorClient) { + defer close(done) + for { + _, message, err := cSocket.ReadMessage() + if err != nil { + log.Debug("Message Error:", err) + return + } + log.Debug(string(message)) + marketsInfos = retrievePriceFromMessage(message, marketsInfos) + marketsInfos = checkInterval(marketsInfos, clientgRPC) + } +} + +// checkInterval handles the gRPC calls for UpdateMarketPrice +// at a predeterminated inteval for each market. +func checkInterval(marketsInfos []marketinfo.MarketInfo, clientgRPC operator.OperatorClient) []marketinfo.MarketInfo { + for i, marketInfo := range marketsInfos { + elapsedSeconds := time.Since(marketInfo.LastSent).Round(time.Second) + marketInterval := time.Duration(marketInfo.Config.Interval * int(nanoToSeconds)) + if elapsedSeconds == marketInterval { + UpdateMarketPricegRPC(marketInfo, clientgRPC) + marketInfo.LastSent = time.Now() + marketsInfos[i] = marketInfo + } + } + return marketsInfos +} + +// retrievePriceFromMessage gets a message from a subscription and retrieves the +// price information, updating the price of the specific market. +func retrievePriceFromMessage(message []byte, marketsInfos []marketinfo.MarketInfo) []marketinfo.MarketInfo { + var result []interface{} + err := json.Unmarshal([]byte(message), &result) + if err != nil { + return marketsInfos + } + if len(result) == 4 { + pricesJson := result[1].(map[string]interface{}) + priceAsk := pricesJson["c"].([]interface{}) + price, _ := strconv.ParseFloat(priceAsk[0].(string), 64) + for i, marketInfo := range marketsInfos { + if marketInfo.Config.KrakenTicker == result[3] { + marketInfo.Price = price + marketsInfos[i] = marketInfo + } + } + } + return marketsInfos +} diff --git a/pkg/conn/socket.go b/pkg/conn/socket.go new file mode 100644 index 0000000..54151f5 --- /dev/null +++ b/pkg/conn/socket.go @@ -0,0 +1,21 @@ +package conn + +import ( + "net/url" + + log "github.com/sirupsen/logrus" + + "github.com/gorilla/websocket" +) + +// ConnectToSocket dials and returns a new client connection to a remote host +func ConnectToSocket(address string) (*websocket.Conn, error) { + u := url.URL{Scheme: "wss", Host: address, Path: "/"} + + c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + return c, err + } + log.Println("Connected to socket:", u.String()) + return c, nil +} diff --git a/pkg/conn/socket_test.go b/pkg/conn/socket_test.go new file mode 100644 index 0000000..b2f0710 --- /dev/null +++ b/pkg/conn/socket_test.go @@ -0,0 +1,16 @@ +package conn + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + kraken = "ws.kraken.com" +) + +func TestConnectToSocket(t *testing.T) { + _, err := ConnectToSocket(kraken) + assert.Nil(t, err) +} diff --git a/pkg/marketinfo/marketinfo.go b/pkg/marketinfo/marketinfo.go new file mode 100644 index 0000000..52086b4 --- /dev/null +++ b/pkg/marketinfo/marketinfo.go @@ -0,0 +1,25 @@ +package marketinfo + +import ( + "time" + + "github.com/tdex-network/tdex-feeder/config" +) + +// MarketInfo stores the informations necessary for +// handling different market pair prices in real-time. +type MarketInfo struct { + Config config.Market + LastSent time.Time + Price float64 +} + +// InitialMarketInfo returns a pointer to a MarketInfo struct +// with the default configurations. +func InitialMarketInfo(market config.Market) MarketInfo { + return MarketInfo{ + Config: market, + LastSent: time.Now(), + Price: 0, + } +} diff --git a/scripts/build b/scripts/build index c9636f8..f479f24 100755 --- a/scripts/build +++ b/scripts/build @@ -10,5 +10,4 @@ PARENT_PATH=$(dirname $( pushd $PARENT_PATH mkdir -p build GOOS=$1 GOARCH=$2 go build -ldflags="-s -w" -o build/feederd-$1-$2 cmd/feederd/main.go -GOOS=$1 GOARCH=$2 go build -ldflags="-s -w" -o build/feeder-$1-$2 cmd/feeder/main.go popd