Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
write trades into a sql database, closes #194 (#319)
Browse files Browse the repository at this point in the history
* 1 - write trades to a sqlite database

* 2 - switch over to postgres db and use lib/pq library

* 3 - log failed queries when interacting with the db

* 4 - dbExec directly instead of using a prepared statement

* 5 - save date_utc as a date type in db as UTC (no time zone so db doesn't convert automatically)

* 6 - Kraken timestamp for trades is in seconds, convert to millis

* 8 - Kraken should request trade cursor in milliseconds, not seconds

* 9 - log when lastCursor is updated

* 10 - Kraken use transaction IDs for updates to cursor

* 7 - FillDBWriter should ignore duplicate insert values

* 11 - fillDbWriter should register full asset string in db table

* 12 - threading to move markets data to it's own table

* 13 - write markets data to it's on table and fix schemas across db

* 14 - set precision for floats to 15 decimals (max allowed by db for double precision types)

* 15 - composite primary key for trades table

* 16 - graceful failure when postgres database not running, remove pointer indirection to port in postgres config

* 17 - db_version table and refactoring of db helper functions

* 18 - update README to add postgres as a dependency

* 19 - use better method to calculate elapsed time in millis
  • Loading branch information
nikhilsaraf authored Dec 16, 2019
1 parent 335d191 commit 493b4b0
Show file tree
Hide file tree
Showing 16 changed files with 629 additions and 40 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ To compile Kelp from source:
8. Set up CCXT to use an expanded set of priceFeeds and orderbooks (see the [Using CCXT](#using-ccxt) section for details)
* `sudo docker run -p 3000:3000 -d franzsee/ccxt-rest:v0.0.4`

Optional dependencies based on features:

1. [Postgres][postgres] must be installed for Kelp to automatically write trades to a sql database along with updating the trader config file.

## Running Kelp

Kelp places orders on the [Stellar marketplace][stellarx] based on the selected strategy. Configuration files specify the Stellar account and strategy details.
Expand Down Expand Up @@ -314,6 +318,7 @@ See the [Code of Conduct](CODE_OF_CONDUCT.md).
[yarn-install]: https://yarnpkg.com/lang/en/docs/install/
[nodejs-install]: https://nodejs.org/en/download/
[astilectron-bundler]: https://github.com/asticode/go-astilectron-bundler
[postgres]: https://www.postgresql.org/
[spread]: https://en.wikipedia.org/wiki/Bid%E2%80%93ask_spread
[hedge]: https://en.wikipedia.org/wiki/Hedge_(finance)
[cmc]: https://coinmarketcap.com/
Expand Down
29 changes: 25 additions & 4 deletions cmd/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/support/config"
"github.com/stellar/kelp/api"
"github.com/stellar/kelp/database"
"github.com/stellar/kelp/model"
"github.com/stellar/kelp/plugins"
"github.com/stellar/kelp/query"
Expand Down Expand Up @@ -203,6 +204,7 @@ func makeExchangeShimSdex(
network string,
threadTracker *multithreading.ThreadTracker,
tradingPair *model.TradingPair,
sdexAssetMap map[model.Asset]hProtocol.Asset,
) (api.ExchangeShim, *plugins.SDEX) {
var e error
var exchangeShim api.ExchangeShim
Expand Down Expand Up @@ -261,10 +263,6 @@ func makeExchangeShimSdex(
}
}

sdexAssetMap := map[model.Asset]hProtocol.Asset{
tradingPair.Base: botConfig.AssetBase(),
tradingPair.Quote: botConfig.AssetQuote(),
}
feeFn := makeFeeFn(l, botConfig, client)
sdex := plugins.MakeSDEX(
client,
Expand Down Expand Up @@ -432,6 +430,10 @@ func runTradeCmd(options inputs) {

ieif := plugins.MakeIEIF(botConfig.IsTradingSdex())
network := utils.ParseNetwork(botConfig.HorizonURL)
sdexAssetMap := map[model.Asset]hProtocol.Asset{
tradingPair.Base: botConfig.AssetBase(),
tradingPair.Quote: botConfig.AssetQuote(),
}
exchangeShim, sdex := makeExchangeShimSdex(
l,
botConfig,
Expand All @@ -441,6 +443,7 @@ func runTradeCmd(options inputs) {
network,
threadTracker,
tradingPair,
sdexAssetMap,
)
strategy := makeStrategy(
l,
Expand Down Expand Up @@ -493,6 +496,7 @@ func runTradeCmd(options inputs) {
sdex,
exchangeShim,
tradingPair,
sdexAssetMap,
threadTracker,
)
startQueryServer(
Expand Down Expand Up @@ -561,6 +565,7 @@ func startFillTracking(
sdex *plugins.SDEX,
exchangeShim api.ExchangeShim,
tradingPair *model.TradingPair,
sdexAssetMap map[model.Asset]hProtocol.Asset,
threadTracker *multithreading.ThreadTracker,
) {
strategyFillHandlers, e := strategy.GetFillHandlers()
Expand All @@ -575,6 +580,22 @@ func startFillTracking(
fillTracker := plugins.MakeFillTracker(tradingPair, threadTracker, exchangeShim, botConfig.FillTrackerSleepMillis, botConfig.FillTrackerDeleteCyclesThreshold)
fillLogger := plugins.MakeFillLogger()
fillTracker.RegisterHandler(fillLogger)
if botConfig.PostgresDbConfig != nil {
db, e := database.ConnectInitializedDatabase(botConfig.PostgresDbConfig)
if e != nil {
l.Info("")
l.Errorf("problem encountered while initializing the db: %s", e)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}
log.Printf("made db instance with config: %s\n", botConfig.PostgresDbConfig.MakeConnectString())

assetDisplayFn := model.MakePassthroughAssetDisplayFn()
if botConfig.IsTradingSdex() {
assetDisplayFn = model.MakeSdexMappedAssetDisplayFn(sdexAssetMap)
}
fillDBWriter := plugins.MakeFillDBWriter(db, assetDisplayFn, botConfig.TradingExchangeName())
fillTracker.RegisterHandler(fillDBWriter)
}
if strategyFillHandlers != nil {
for _, h := range strategyFillHandlers {
fillTracker.RegisterHandler(h)
Expand Down
65 changes: 65 additions & 0 deletions database/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package database

import (
"database/sql"
"fmt"
"log"
)

/*
tables
*/
const sqlDbVersionTableCreate = "CREATE TABLE IF NOT EXISTS db_version (version INTEGER NOT NULL, date_completed_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, num_scripts INTEGER NOT NULL, time_elapsed_millis BIGINT NOT NULL, PRIMARY KEY (version))"
const sqlMarketsTableCreate = "CREATE TABLE IF NOT EXISTS markets (market_id TEXT PRIMARY KEY, exchange_name TEXT NOT NULL, base TEXT NOT NULL, quote TEXT NOT NULL)"
const sqlTradesTableCreate = "CREATE TABLE IF NOT EXISTS trades (market_id TEXT NOT NULL, txid TEXT NOT NULL, date_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, action TEXT NOT NULL, type TEXT NOT NULL, counter_price DOUBLE PRECISION NOT NULL, base_volume DOUBLE PRECISION NOT NULL, counter_cost DOUBLE PRECISION NOT NULL, fee DOUBLE PRECISION NOT NULL, PRIMARY KEY (market_id, txid))"

/*
indexes
*/
const sqlTradesIndexCreate = "CREATE INDEX IF NOT EXISTS date ON trades (market_id, date_utc)"

/*
insert statements
*/
// sqlDbVersionTableInsertTemplate inserts into the db_version table
const sqlDbVersionTableInsertTemplate = "INSERT INTO db_version (version, date_completed_utc, num_scripts, time_elapsed_millis) VALUES (%d, '%s', %d, %d)"

// SqlMarketsInsertTemplate inserts into the markets table
const SqlMarketsInsertTemplate = "INSERT INTO markets (market_id, exchange_name, base, quote) VALUES ('%s', '%s', '%s', '%s')"

// SqlTradesInsertTemplate inserts into the trades table
const SqlTradesInsertTemplate = "INSERT INTO trades (market_id, txid, date_utc, action, type, counter_price, base_volume, counter_cost, fee) VALUES ('%s', '%s', '%s', '%s', '%s', %.15f, %.15f, %.15f, %.15f)"

/*
queries
*/
// SqlQueryMarketsById queries the markets table
const SqlQueryMarketsById = "SELECT market_id, exchange_name, base, quote FROM markets WHERE market_id = $1 LIMIT 1"

// sqlQueryDbVersion queries the db_version table
const sqlQueryDbVersion = "SELECT version FROM db_version ORDER BY version desc LIMIT 1"

/*
query helper functions
*/
// QueryDbVersion queries for the version of the database
func QueryDbVersion(db *sql.DB) (uint32, error) {
rows, e := db.Query(sqlQueryDbVersion)
if e != nil {
return 0, fmt.Errorf("could not execute sql select query (%s): %s", sqlQueryDbVersion, e)
}
defer rows.Close()

for rows.Next() {
var dbVersion uint32
e = rows.Scan(&dbVersion)
if e != nil {
return 0, fmt.Errorf("could not scan row to get the db version: %s", e)
}

log.Printf("fetched dbVersion from db: %d", dbVersion)
return dbVersion, nil
}

return 0, nil
}
125 changes: 125 additions & 0 deletions database/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package database

import (
"database/sql"
"fmt"
"log"
"strings"
"time"

"github.com/stellar/kelp/support/postgresdb"
"github.com/stellar/kelp/support/utils"
)

var upgradeScripts = []*UpgradeScript{
makeUpgradeScript(1, sqlDbVersionTableCreate),
makeUpgradeScript(2,
sqlMarketsTableCreate,
sqlTradesTableCreate,
sqlTradesIndexCreate,
),
}

// UpgradeScript encapsulates a script to be run to upgrade the database from one version to the next
type UpgradeScript struct {
version uint32
commands []string
}

// makeUpgradeScript encapsulates a script to be run to upgrade the database from one version to the next
func makeUpgradeScript(version uint32, command string, moreCommands ...string) *UpgradeScript {
allCommands := []string{command}
allCommands = append(allCommands, moreCommands...)

return &UpgradeScript{
version: version,
commands: allCommands,
}
}

// ConnectInitializedDatabase creates a database with the required metadata tables
func ConnectInitializedDatabase(postgresDbConfig *postgresdb.Config) (*sql.DB, error) {
dbCreated, e := postgresdb.CreateDatabaseIfNotExists(postgresDbConfig)
if e != nil {
if strings.Contains(e.Error(), "connect: connection refused") {
utils.PrintErrorHintf("ensure your postgres database is available on %s:%d, or remove the 'POSTGRES_DB' config from your trader config file\n", postgresDbConfig.GetHost(), postgresDbConfig.GetPort())
}
return nil, fmt.Errorf("error when creating database from config (%+v), created=%v: %s", *postgresDbConfig, dbCreated, e)
}
if dbCreated {
log.Printf("created database '%s'", postgresDbConfig.GetDbName())
} else {
log.Printf("did not create db '%s' because it already exists", postgresDbConfig.GetDbName())
}

db, e := sql.Open("postgres", postgresDbConfig.MakeConnectString())
if e != nil {
return nil, fmt.Errorf("could not open database: %s", e)
}
// don't defer db.Close() here becuase we want it open for the life of the application for now

log.Printf("creating db schema and running upgrade scripts ...\n")
e = runUpgradeScripts(db, upgradeScripts)
if e != nil {
return nil, fmt.Errorf("could not run upgrade scripts: %s", e)
}
log.Printf("... finished creating db schema and running upgrade scripts\n")

return db, nil
}

func runUpgradeScripts(db *sql.DB, scripts []*UpgradeScript) error {
currentDbVersion, e := QueryDbVersion(db)
if e != nil {
if !strings.Contains(e.Error(), "relation \"db_version\" does not exist") {
return fmt.Errorf("could not fetch current db version: %s", e)
}
currentDbVersion = 0
}

for _, script := range scripts {
if script.version <= currentDbVersion {
log.Printf(" skipping upgrade script for version %d because current db version (%d) is equal or ahead\n", script.version, currentDbVersion)
continue
}

// start transaction
e = postgresdb.ExecuteStatement(db, "BEGIN")
if e != nil {
return fmt.Errorf("could not start transaction before upgrading db to version %d: %s", script.version, e)
}

startTime := time.Now()
startTimeMillis := startTime.UnixNano() / int64(time.Millisecond)
for ci, command := range script.commands {
e = postgresdb.ExecuteStatement(db, command)
if e != nil {
return fmt.Errorf("could not execute sql statement at index %d for db version %d (%s): %s", ci, script.version, command, e)
}
log.Printf(" executed sql statement at index %d for db version %d", ci, script.version)
}
endTimeMillis := time.Now().UnixNano() / int64(time.Millisecond)
elapsedMillis := endTimeMillis - startTimeMillis

// add entry to db_version table
sqlInsertDbVersion := fmt.Sprintf(sqlDbVersionTableInsertTemplate,
script.version,
startTime.Format(postgresdb.DateFormatString),
len(script.commands),
elapsedMillis,
)
_, e = db.Exec(sqlInsertDbVersion)
if e != nil {
// duplicate insert should return an error
return fmt.Errorf("could not execute sql insert values statement in db_version table for db version %d (%s): %s", script.version, sqlInsertDbVersion, e)
}

// commit transaction
e = postgresdb.ExecuteStatement(db, "COMMIT")
if e != nil {
return fmt.Errorf("could not commit transaction before upgrading db to version %d: %s", script.version, e)
}
log.Printf(" successfully ran %d upgrade commands and upgraded to version %d of the database in %d milliseconds\n", len(script.commands), script.version, elapsedMillis)
}
return nil
}
9 changes: 9 additions & 0 deletions examples/configs/trader/sample_trader.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ MAX_OP_FEE_STROOPS=5000
# (optional) minimum volume of quote units needed to place an order on the non-sdex (centralized) exchange
#CENTRALIZED_MIN_QUOTE_VOLUME_OVERRIDE=10.0

# uncomment if you want to track fills in a postgres db
#[POSTGRES_DB]
#HOST="localhost"
#PORT=5432
#DB_NAME="kelp"
#USER=""
#PASSWORD=""
#SSL_ENABLE=false

# uncomment lines below to use kraken. Can use "sdex" or leave out to trade on the Stellar Decentralized Exchange.
# can alternatively use any of the ccxt-exchanges marked as "Trading" (run `kelp exchanges` for full list)
#TRADING_EXCHANGE="kraken"
Expand Down
Loading

0 comments on commit 493b4b0

Please sign in to comment.