Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
move around db package and extract kelpdb specific parts into /kelpdb
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsaraf committed Apr 12, 2020
1 parent 75a6900 commit dcd0663
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 94 deletions.
18 changes: 16 additions & 2 deletions cmd/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/support/config"
"github.com/stellar/kelp/api"
"github.com/stellar/kelp/database"
"github.com/stellar/kelp/kelpdb"
"github.com/stellar/kelp/model"
"github.com/stellar/kelp/plugins"
"github.com/stellar/kelp/support/database"
"github.com/stellar/kelp/support/logger"
"github.com/stellar/kelp/support/monitoring"
"github.com/stellar/kelp/support/networking"
Expand All @@ -29,6 +30,19 @@ import (
"github.com/stellar/kelp/trader"
)

var upgradeScripts = []*database.UpgradeScript{
database.MakeUpgradeScript(1, database.SqlDbVersionTableCreate),
database.MakeUpgradeScript(2,
kelpdb.SqlMarketsTableCreate,
kelpdb.SqlTradesTableCreate,
kelpdb.SqlTradesIndexCreate,
),
database.MakeUpgradeScript(3,
kelpdb.SqlTradesIndexDrop,
kelpdb.SqlTradesIndexCreate2,
),
}

const tradeExamples = ` kelp trade --botConf ./path/trader.cfg --strategy buysell --stratConf ./path/buysell.cfg
kelp trade --botConf ./path/trader.cfg --strategy buysell --stratConf ./path/buysell.cfg --sim`

Expand Down Expand Up @@ -489,7 +503,7 @@ func runTradeCmd(options inputs) {
}

var e error
db, e = database.ConnectInitializedDatabase(botConfig.PostgresDbConfig)
db, e = database.ConnectInitializedDatabase(botConfig.PostgresDbConfig, upgradeScripts)
if e != nil {
logger.Fatal(l, fmt.Errorf("problem encountered while initializing the db: %s", e))
}
Expand Down
70 changes: 0 additions & 70 deletions database/schema.go

This file was deleted.

32 changes: 32 additions & 0 deletions kelpdb/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kelpdb

/*
tables
*/
const SqlMarketsTableCreate = "CREATE TABLE IF NOT EXISTS markets (market_id TEXT PRIMARY KEY, exchange_name TEXT NOT NULL, base TEXT NOT NULL, quote TEXT NOT NULL)"
const SqlTradesTableCreate = "CREATE TABLE IF NOT EXISTS trades (market_id TEXT NOT NULL, txid TEXT NOT NULL, date_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, action TEXT NOT NULL, type TEXT NOT NULL, counter_price DOUBLE PRECISION NOT NULL, base_volume DOUBLE PRECISION NOT NULL, counter_cost DOUBLE PRECISION NOT NULL, fee DOUBLE PRECISION NOT NULL, PRIMARY KEY (market_id, txid))"

/*
indexes
*/
const SqlTradesIndexCreate = "CREATE INDEX IF NOT EXISTS date ON trades (market_id, date_utc)"
const SqlTradesIndexDrop = "DROP INDEX IF EXISTS date"
const SqlTradesIndexCreate2 = "CREATE INDEX IF NOT EXISTS trades_mdd ON trades (market_id, DATE(date_utc), date_utc)"

/*
insert statements
*/
// SqlMarketsInsertTemplate inserts into the markets table
const SqlMarketsInsertTemplate = "INSERT INTO markets (market_id, exchange_name, base, quote) VALUES ('%s', '%s', '%s', '%s')"

// SqlTradesInsertTemplate inserts into the trades table
const SqlTradesInsertTemplate = "INSERT INTO trades (market_id, txid, date_utc, action, type, counter_price, base_volume, counter_cost, fee) VALUES ('%s', '%s', '%s', '%s', '%s', %.15f, %.15f, %.15f, %.15f)"

/*
queries
*/
// SqlQueryMarketsById queries the markets table
const SqlQueryMarketsById = "SELECT market_id, exchange_name, base, quote FROM markets WHERE market_id = $1 LIMIT 1"

// SqlQueryDailyValuesTemplate queries the trades table to get the values for a given day
const SqlQueryDailyValuesTemplate = "SELECT SUM(base_volume) as total_base_volume, SUM(counter_cost) as total_counter_volume FROM trades WHERE market_id IN (%s) AND DATE(date_utc) = $1 and action = $2 group by DATE(date_utc)"
10 changes: 5 additions & 5 deletions plugins/fillDBWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

_ "github.com/lib/pq"
"github.com/stellar/kelp/api"
"github.com/stellar/kelp/database"
"github.com/stellar/kelp/kelpdb"
"github.com/stellar/kelp/model"
"github.com/stellar/kelp/support/postgresdb"
"github.com/stellar/kelp/support/utils"
Expand Down Expand Up @@ -117,9 +117,9 @@ func (f *FillDBWriter) fetchOrRegisterMarket(trade model.Trade) (*tradingMarket,
}

func (f *FillDBWriter) fetchMarketFromDb(marketId string) (*tradingMarket, error) {
rows, e := f.db.Query(database.SqlQueryMarketsById, marketId)
rows, e := f.db.Query(kelpdb.SqlQueryMarketsById, marketId)
if e != nil {
return nil, fmt.Errorf("could not execute sql select query (%s) for marketId (%s): %s", database.SqlQueryMarketsById, marketId, e)
return nil, fmt.Errorf("could not execute sql select query (%s) for marketId (%s): %s", kelpdb.SqlQueryMarketsById, marketId, e)
}
defer rows.Close()

Expand All @@ -138,7 +138,7 @@ func (f *FillDBWriter) fetchMarketFromDb(marketId string) (*tradingMarket, error
}

func (f *FillDBWriter) registerMarket(market *tradingMarket) error {
sqlInsert := fmt.Sprintf(database.SqlMarketsInsertTemplate,
sqlInsert := fmt.Sprintf(kelpdb.SqlMarketsInsertTemplate,
market.ID,
market.ExchangeName,
market.BaseAsset,
Expand Down Expand Up @@ -166,7 +166,7 @@ func (f *FillDBWriter) HandleFill(trade model.Trade) error {
return fmt.Errorf("cannot fetch or register market for trade (txid=%s): %s", txid, e)
}

sqlInsert := fmt.Sprintf(database.SqlTradesInsertTemplate,
sqlInsert := fmt.Sprintf(kelpdb.SqlTradesInsertTemplate,
market.ID,
txid,
dateString,
Expand Down
4 changes: 2 additions & 2 deletions plugins/volumeFilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/txnbuild"
"github.com/stellar/kelp/database"
"github.com/stellar/kelp/kelpdb"
"github.com/stellar/kelp/model"
"github.com/stellar/kelp/support/postgresdb"
"github.com/stellar/kelp/support/utils"
Expand Down Expand Up @@ -101,7 +101,7 @@ func makeSqlQueryDailyValues(marketIDs []string) string {
}
inClause := strings.Join(inClauseParts, ", ")

return fmt.Sprintf(database.SqlQueryDailyValuesTemplate, inClause)
return fmt.Sprintf(kelpdb.SqlQueryDailyValuesTemplate, inClause)
}

var _ SubmitFilter = &volumeFilter{}
Expand Down
43 changes: 43 additions & 0 deletions support/database/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package database

import (
"database/sql"
"fmt"
"log"
)

/*
tables
*/
const SqlDbVersionTableCreate = "CREATE TABLE IF NOT EXISTS db_version (version INTEGER NOT NULL, date_completed_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, num_scripts INTEGER NOT NULL, time_elapsed_millis BIGINT NOT NULL, PRIMARY KEY (version))"

/*
queries
*/
// sqlQueryDbVersion queries the db_version table
const sqlQueryDbVersion = "SELECT version FROM db_version ORDER BY version desc LIMIT 1"

/*
query helper functions
*/
// QueryDbVersion queries for the version of the database
func QueryDbVersion(db *sql.DB) (uint32, error) {
rows, e := db.Query(sqlQueryDbVersion)
if e != nil {
return 0, fmt.Errorf("could not execute sql select query (%s): %s", sqlQueryDbVersion, e)
}
defer rows.Close()

for rows.Next() {
var dbVersion uint32
e = rows.Scan(&dbVersion)
if e != nil {
return 0, fmt.Errorf("could not scan row to get the db version: %s", e)
}

log.Printf("fetched dbVersion from db: %d", dbVersion)
return dbVersion, nil
}

return 0, nil
}
20 changes: 5 additions & 15 deletions database/upgrade.go → support/database/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,17 @@ import (
"github.com/stellar/kelp/support/utils"
)

var upgradeScripts = []*UpgradeScript{
makeUpgradeScript(1, sqlDbVersionTableCreate),
makeUpgradeScript(2,
sqlMarketsTableCreate,
sqlTradesTableCreate,
sqlTradesIndexCreate,
),
makeUpgradeScript(3,
sqlTradesIndexDrop,
sqlTradesIndexCreate2,
),
}
// sqlDbVersionTableInsertTemplate inserts into the db_version table
const sqlDbVersionTableInsertTemplate = "INSERT INTO db_version (version, date_completed_utc, num_scripts, time_elapsed_millis) VALUES (%d, '%s', %d, %d)"

// UpgradeScript encapsulates a script to be run to upgrade the database from one version to the next
type UpgradeScript struct {
version uint32
commands []string
}

// makeUpgradeScript encapsulates a script to be run to upgrade the database from one version to the next
func makeUpgradeScript(version uint32, command string, moreCommands ...string) *UpgradeScript {
// MakeUpgradeScript encapsulates a script to be run to upgrade the database from one version to the next
func MakeUpgradeScript(version uint32, command string, moreCommands ...string) *UpgradeScript {
allCommands := []string{command}
allCommands = append(allCommands, moreCommands...)

Expand All @@ -42,7 +32,7 @@ func makeUpgradeScript(version uint32, command string, moreCommands ...string) *
}

// ConnectInitializedDatabase creates a database with the required metadata tables
func ConnectInitializedDatabase(postgresDbConfig *postgresdb.Config) (*sql.DB, error) {
func ConnectInitializedDatabase(postgresDbConfig *postgresdb.Config, upgradeScripts []*UpgradeScript) (*sql.DB, error) {
dbCreated, e := postgresdb.CreateDatabaseIfNotExists(postgresDbConfig)
if e != nil {
if strings.Contains(e.Error(), "connect: connection refused") {
Expand Down

0 comments on commit dcd0663

Please sign in to comment.