diff --git a/cmd/trade.go b/cmd/trade.go index e54684ddd..569298f94 100644 --- a/cmd/trade.go +++ b/cmd/trade.go @@ -17,9 +17,10 @@ import ( hProtocol "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/support/config" "github.com/stellar/kelp/api" - "github.com/stellar/kelp/database" + "github.com/stellar/kelp/kelpdb" "github.com/stellar/kelp/model" "github.com/stellar/kelp/plugins" + "github.com/stellar/kelp/support/database" "github.com/stellar/kelp/support/logger" "github.com/stellar/kelp/support/monitoring" "github.com/stellar/kelp/support/networking" @@ -29,6 +30,19 @@ import ( "github.com/stellar/kelp/trader" ) +var upgradeScripts = []*database.UpgradeScript{ + database.MakeUpgradeScript(1, database.SqlDbVersionTableCreate), + database.MakeUpgradeScript(2, + kelpdb.SqlMarketsTableCreate, + kelpdb.SqlTradesTableCreate, + kelpdb.SqlTradesIndexCreate, + ), + database.MakeUpgradeScript(3, + kelpdb.SqlTradesIndexDrop, + kelpdb.SqlTradesIndexCreate2, + ), +} + const tradeExamples = ` kelp trade --botConf ./path/trader.cfg --strategy buysell --stratConf ./path/buysell.cfg kelp trade --botConf ./path/trader.cfg --strategy buysell --stratConf ./path/buysell.cfg --sim` @@ -489,7 +503,7 @@ func runTradeCmd(options inputs) { } var e error - db, e = database.ConnectInitializedDatabase(botConfig.PostgresDbConfig) + db, e = database.ConnectInitializedDatabase(botConfig.PostgresDbConfig, upgradeScripts) if e != nil { logger.Fatal(l, fmt.Errorf("problem encountered while initializing the db: %s", e)) } diff --git a/database/schema.go b/database/schema.go deleted file mode 100644 index 2b7510bd8..000000000 --- a/database/schema.go +++ /dev/null @@ -1,70 +0,0 @@ -package database - -import ( - "database/sql" - "fmt" - "log" -) - -/* - tables -*/ -const sqlDbVersionTableCreate = "CREATE TABLE IF NOT EXISTS db_version (version INTEGER NOT NULL, date_completed_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, num_scripts INTEGER NOT NULL, time_elapsed_millis BIGINT NOT NULL, PRIMARY KEY (version))" -const sqlMarketsTableCreate = "CREATE TABLE IF NOT EXISTS markets (market_id TEXT PRIMARY KEY, exchange_name TEXT NOT NULL, base TEXT NOT NULL, quote TEXT NOT NULL)" -const sqlTradesTableCreate = "CREATE TABLE IF NOT EXISTS trades (market_id TEXT NOT NULL, txid TEXT NOT NULL, date_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, action TEXT NOT NULL, type TEXT NOT NULL, counter_price DOUBLE PRECISION NOT NULL, base_volume DOUBLE PRECISION NOT NULL, counter_cost DOUBLE PRECISION NOT NULL, fee DOUBLE PRECISION NOT NULL, PRIMARY KEY (market_id, txid))" - -/* - indexes -*/ -const sqlTradesIndexCreate = "CREATE INDEX IF NOT EXISTS date ON trades (market_id, date_utc)" -const sqlTradesIndexDrop = "DROP INDEX IF EXISTS date" -const sqlTradesIndexCreate2 = "CREATE INDEX IF NOT EXISTS trades_mdd ON trades (market_id, DATE(date_utc), date_utc)" - -/* - insert statements -*/ -// sqlDbVersionTableInsertTemplate inserts into the db_version table -const sqlDbVersionTableInsertTemplate = "INSERT INTO db_version (version, date_completed_utc, num_scripts, time_elapsed_millis) VALUES (%d, '%s', %d, %d)" - -// SqlMarketsInsertTemplate inserts into the markets table -const SqlMarketsInsertTemplate = "INSERT INTO markets (market_id, exchange_name, base, quote) VALUES ('%s', '%s', '%s', '%s')" - -// SqlTradesInsertTemplate inserts into the trades table -const SqlTradesInsertTemplate = "INSERT INTO trades (market_id, txid, date_utc, action, type, counter_price, base_volume, counter_cost, fee) VALUES ('%s', '%s', '%s', '%s', '%s', %.15f, %.15f, %.15f, %.15f)" - -/* - queries -*/ -// SqlQueryMarketsById queries the markets table -const SqlQueryMarketsById = "SELECT market_id, exchange_name, base, quote FROM markets WHERE market_id = $1 LIMIT 1" - -// sqlQueryDbVersion queries the db_version table -const sqlQueryDbVersion = "SELECT version FROM db_version ORDER BY version desc LIMIT 1" - -// SqlQueryDailyValuesTemplate queries the trades table to get the values for a given day -const SqlQueryDailyValuesTemplate = "SELECT SUM(base_volume) as total_base_volume, SUM(counter_cost) as total_counter_volume FROM trades WHERE market_id IN (%s) AND DATE(date_utc) = $1 and action = $2 group by DATE(date_utc)" - -/* - query helper functions -*/ -// QueryDbVersion queries for the version of the database -func QueryDbVersion(db *sql.DB) (uint32, error) { - rows, e := db.Query(sqlQueryDbVersion) - if e != nil { - return 0, fmt.Errorf("could not execute sql select query (%s): %s", sqlQueryDbVersion, e) - } - defer rows.Close() - - for rows.Next() { - var dbVersion uint32 - e = rows.Scan(&dbVersion) - if e != nil { - return 0, fmt.Errorf("could not scan row to get the db version: %s", e) - } - - log.Printf("fetched dbVersion from db: %d", dbVersion) - return dbVersion, nil - } - - return 0, nil -} diff --git a/kelpdb/schema.go b/kelpdb/schema.go new file mode 100644 index 000000000..be50cd756 --- /dev/null +++ b/kelpdb/schema.go @@ -0,0 +1,32 @@ +package kelpdb + +/* + tables +*/ +const SqlMarketsTableCreate = "CREATE TABLE IF NOT EXISTS markets (market_id TEXT PRIMARY KEY, exchange_name TEXT NOT NULL, base TEXT NOT NULL, quote TEXT NOT NULL)" +const SqlTradesTableCreate = "CREATE TABLE IF NOT EXISTS trades (market_id TEXT NOT NULL, txid TEXT NOT NULL, date_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, action TEXT NOT NULL, type TEXT NOT NULL, counter_price DOUBLE PRECISION NOT NULL, base_volume DOUBLE PRECISION NOT NULL, counter_cost DOUBLE PRECISION NOT NULL, fee DOUBLE PRECISION NOT NULL, PRIMARY KEY (market_id, txid))" + +/* + indexes +*/ +const SqlTradesIndexCreate = "CREATE INDEX IF NOT EXISTS date ON trades (market_id, date_utc)" +const SqlTradesIndexDrop = "DROP INDEX IF EXISTS date" +const SqlTradesIndexCreate2 = "CREATE INDEX IF NOT EXISTS trades_mdd ON trades (market_id, DATE(date_utc), date_utc)" + +/* + insert statements +*/ +// SqlMarketsInsertTemplate inserts into the markets table +const SqlMarketsInsertTemplate = "INSERT INTO markets (market_id, exchange_name, base, quote) VALUES ('%s', '%s', '%s', '%s')" + +// SqlTradesInsertTemplate inserts into the trades table +const SqlTradesInsertTemplate = "INSERT INTO trades (market_id, txid, date_utc, action, type, counter_price, base_volume, counter_cost, fee) VALUES ('%s', '%s', '%s', '%s', '%s', %.15f, %.15f, %.15f, %.15f)" + +/* + queries +*/ +// SqlQueryMarketsById queries the markets table +const SqlQueryMarketsById = "SELECT market_id, exchange_name, base, quote FROM markets WHERE market_id = $1 LIMIT 1" + +// SqlQueryDailyValuesTemplate queries the trades table to get the values for a given day +const SqlQueryDailyValuesTemplate = "SELECT SUM(base_volume) as total_base_volume, SUM(counter_cost) as total_counter_volume FROM trades WHERE market_id IN (%s) AND DATE(date_utc) = $1 and action = $2 group by DATE(date_utc)" diff --git a/plugins/fillDBWriter.go b/plugins/fillDBWriter.go index 1e27e2170..4d937ee2c 100644 --- a/plugins/fillDBWriter.go +++ b/plugins/fillDBWriter.go @@ -10,7 +10,7 @@ import ( _ "github.com/lib/pq" "github.com/stellar/kelp/api" - "github.com/stellar/kelp/database" + "github.com/stellar/kelp/kelpdb" "github.com/stellar/kelp/model" "github.com/stellar/kelp/support/postgresdb" "github.com/stellar/kelp/support/utils" @@ -117,9 +117,9 @@ func (f *FillDBWriter) fetchOrRegisterMarket(trade model.Trade) (*tradingMarket, } func (f *FillDBWriter) fetchMarketFromDb(marketId string) (*tradingMarket, error) { - rows, e := f.db.Query(database.SqlQueryMarketsById, marketId) + rows, e := f.db.Query(kelpdb.SqlQueryMarketsById, marketId) if e != nil { - return nil, fmt.Errorf("could not execute sql select query (%s) for marketId (%s): %s", database.SqlQueryMarketsById, marketId, e) + return nil, fmt.Errorf("could not execute sql select query (%s) for marketId (%s): %s", kelpdb.SqlQueryMarketsById, marketId, e) } defer rows.Close() @@ -138,7 +138,7 @@ func (f *FillDBWriter) fetchMarketFromDb(marketId string) (*tradingMarket, error } func (f *FillDBWriter) registerMarket(market *tradingMarket) error { - sqlInsert := fmt.Sprintf(database.SqlMarketsInsertTemplate, + sqlInsert := fmt.Sprintf(kelpdb.SqlMarketsInsertTemplate, market.ID, market.ExchangeName, market.BaseAsset, @@ -166,7 +166,7 @@ func (f *FillDBWriter) HandleFill(trade model.Trade) error { return fmt.Errorf("cannot fetch or register market for trade (txid=%s): %s", txid, e) } - sqlInsert := fmt.Sprintf(database.SqlTradesInsertTemplate, + sqlInsert := fmt.Sprintf(kelpdb.SqlTradesInsertTemplate, market.ID, txid, dateString, diff --git a/plugins/volumeFilter.go b/plugins/volumeFilter.go index 5c943f668..074598a68 100644 --- a/plugins/volumeFilter.go +++ b/plugins/volumeFilter.go @@ -10,7 +10,7 @@ import ( hProtocol "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/txnbuild" - "github.com/stellar/kelp/database" + "github.com/stellar/kelp/kelpdb" "github.com/stellar/kelp/model" "github.com/stellar/kelp/support/postgresdb" "github.com/stellar/kelp/support/utils" @@ -101,7 +101,7 @@ func makeSqlQueryDailyValues(marketIDs []string) string { } inClause := strings.Join(inClauseParts, ", ") - return fmt.Sprintf(database.SqlQueryDailyValuesTemplate, inClause) + return fmt.Sprintf(kelpdb.SqlQueryDailyValuesTemplate, inClause) } var _ SubmitFilter = &volumeFilter{} diff --git a/support/database/schema.go b/support/database/schema.go new file mode 100644 index 000000000..479608149 --- /dev/null +++ b/support/database/schema.go @@ -0,0 +1,43 @@ +package database + +import ( + "database/sql" + "fmt" + "log" +) + +/* + tables +*/ +const SqlDbVersionTableCreate = "CREATE TABLE IF NOT EXISTS db_version (version INTEGER NOT NULL, date_completed_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, num_scripts INTEGER NOT NULL, time_elapsed_millis BIGINT NOT NULL, PRIMARY KEY (version))" + +/* + queries +*/ +// sqlQueryDbVersion queries the db_version table +const sqlQueryDbVersion = "SELECT version FROM db_version ORDER BY version desc LIMIT 1" + +/* + query helper functions +*/ +// QueryDbVersion queries for the version of the database +func QueryDbVersion(db *sql.DB) (uint32, error) { + rows, e := db.Query(sqlQueryDbVersion) + if e != nil { + return 0, fmt.Errorf("could not execute sql select query (%s): %s", sqlQueryDbVersion, e) + } + defer rows.Close() + + for rows.Next() { + var dbVersion uint32 + e = rows.Scan(&dbVersion) + if e != nil { + return 0, fmt.Errorf("could not scan row to get the db version: %s", e) + } + + log.Printf("fetched dbVersion from db: %d", dbVersion) + return dbVersion, nil + } + + return 0, nil +} diff --git a/database/upgrade.go b/support/database/upgrade.go similarity index 90% rename from database/upgrade.go rename to support/database/upgrade.go index 9cb5c6eb5..1ccc28824 100644 --- a/database/upgrade.go +++ b/support/database/upgrade.go @@ -11,18 +11,8 @@ import ( "github.com/stellar/kelp/support/utils" ) -var upgradeScripts = []*UpgradeScript{ - makeUpgradeScript(1, sqlDbVersionTableCreate), - makeUpgradeScript(2, - sqlMarketsTableCreate, - sqlTradesTableCreate, - sqlTradesIndexCreate, - ), - makeUpgradeScript(3, - sqlTradesIndexDrop, - sqlTradesIndexCreate2, - ), -} +// sqlDbVersionTableInsertTemplate inserts into the db_version table +const sqlDbVersionTableInsertTemplate = "INSERT INTO db_version (version, date_completed_utc, num_scripts, time_elapsed_millis) VALUES (%d, '%s', %d, %d)" // UpgradeScript encapsulates a script to be run to upgrade the database from one version to the next type UpgradeScript struct { @@ -30,8 +20,8 @@ type UpgradeScript struct { commands []string } -// makeUpgradeScript encapsulates a script to be run to upgrade the database from one version to the next -func makeUpgradeScript(version uint32, command string, moreCommands ...string) *UpgradeScript { +// MakeUpgradeScript encapsulates a script to be run to upgrade the database from one version to the next +func MakeUpgradeScript(version uint32, command string, moreCommands ...string) *UpgradeScript { allCommands := []string{command} allCommands = append(allCommands, moreCommands...) @@ -42,7 +32,7 @@ func makeUpgradeScript(version uint32, command string, moreCommands ...string) * } // ConnectInitializedDatabase creates a database with the required metadata tables -func ConnectInitializedDatabase(postgresDbConfig *postgresdb.Config) (*sql.DB, error) { +func ConnectInitializedDatabase(postgresDbConfig *postgresdb.Config, upgradeScripts []*UpgradeScript) (*sql.DB, error) { dbCreated, e := postgresdb.CreateDatabaseIfNotExists(postgresDbConfig) if e != nil { if strings.Contains(e.Error(), "connect: connection refused") {