Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Centralized fill tracking, closes #116 #125

Merged
3 changes: 2 additions & 1 deletion api/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type TradeAPI interface {

GetTrades(pair *model.TradingPair, maybeCursor interface{}) (*TradesResult, error)

TradeFetcher
FillTrackable

GetOpenOrders(pairs []*model.TradingPair) (map[model.TradingPair][]model.OpenOrder, error)

Expand Down Expand Up @@ -213,4 +213,5 @@ type ExchangeShim interface {
LoadOffersHack() ([]horizon.Offer, error)
Constrainable
OrderbookFetcher
FillTrackable
}
8 changes: 1 addition & 7 deletions cmd/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,6 @@ func init() {
// we want to delete all the offers and exit here since there is something wrong with our setup
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
}
if !isTradingSdex && botConfig.FillTrackerSleepMillis != 0 {
log.Println()
log.Println("cannot run on a non-SDEX exchange with FILL_TRACKER_SLEEP_MILLIS set to something other than 0")
// we want to delete all the offers and exit here since there is something wrong with our setup
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
}
timeController := plugins.MakeIntervalTimeController(
time.Duration(botConfig.TickIntervalSeconds)*time.Second,
botConfig.MaxTickDelayMillis,
Expand Down Expand Up @@ -304,7 +298,7 @@ func init() {
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
}
if botConfig.FillTrackerSleepMillis != 0 {
fillTracker := plugins.MakeFillTracker(tradingPair, threadTracker, sdex, botConfig.FillTrackerSleepMillis)
fillTracker := plugins.MakeFillTracker(tradingPair, threadTracker, exchangeShim, botConfig.FillTrackerSleepMillis)
fillLogger := plugins.MakeFillLogger()
fillTracker.RegisterHandler(fillLogger)
if strategyFillHandlers != nil {
Expand Down
25 changes: 25 additions & 0 deletions model/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,31 @@ type Trade struct {
Fee *Number
}

// TradesByTsID implements sort.Interface for []Trade based on Timestamp and TransactionID
type TradesByTsID []Trade

func (t TradesByTsID) Len() int {
return len(t)
}
func (t TradesByTsID) Swap(i int, j int) {
t[i], t[j] = t[j], t[i]
}
func (t TradesByTsID) Less(i int, j int) bool {
if t[i].Order.Timestamp.AsInt64() < t[j].Order.Timestamp.AsInt64() {
return true
} else if t[i].Order.Timestamp.AsInt64() > t[j].Order.Timestamp.AsInt64() {
return false
}

if t[i].TransactionID != nil && t[j].TransactionID != nil {
return t[i].TransactionID.String() < t[j].TransactionID.String()
}
if t[i].TransactionID != nil {
return false
}
return true
}

func (t Trade) String() string {
return fmt.Sprintf("Trade[txid: %s, ts: %s, pair: %s, action: %s, type: %s, counterPrice: %s, baseVolume: %s, counterCost: %s, fee: %s]",
utils.CheckedString(t.TransactionID),
Expand Down
10 changes: 10 additions & 0 deletions plugins/batchedExchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ func (b BatchedExchange) GetOrderBook(pair *model.TradingPair, maxCount int32) (
return b.inner.GetOrderBook(pair, maxCount)
}

// GetTradeHistory impl
func (b BatchedExchange) GetTradeHistory(pair model.TradingPair, maybeCursorStart interface{}, maybeCursorEnd interface{}) (*api.TradeHistoryResult, error) {
return b.inner.GetTradeHistory(pair, maybeCursorStart, maybeCursorEnd)
}

// GetLatestTradeCursor impl
func (b BatchedExchange) GetLatestTradeCursor() (interface{}, error) {
return b.inner.GetLatestTradeCursor()
}

// SubmitOps performs any finalization or submission step needed by the exchange
func (b BatchedExchange) SubmitOps(ops []build.TransactionMutator, asyncCallback func(hash string, e error)) error {
var e error
Expand Down
52 changes: 39 additions & 13 deletions plugins/ccxtExchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package plugins
import (
"fmt"
"log"
"sort"
"strconv"
"time"

"github.com/stellar/kelp/api"
"github.com/stellar/kelp/model"
Expand Down Expand Up @@ -208,20 +211,35 @@ func (c ccxtExchange) GetTradeHistory(pair model.TradingPair, maybeCursorStart i

trades := []model.Trade{}
for _, raw := range tradesRaw {
t, e := c.readTrade(&pair, pairString, raw)
var t *model.Trade
t, e = c.readTrade(&pair, pairString, raw)
if e != nil {
return nil, fmt.Errorf("error while reading trade: %s", e)
}
trades = append(trades, *t)
}

// TODO implement cursor logic
sort.Sort(model.TradesByTsID(trades))
cursor := maybeCursorStart
if len(trades) > 0 {
lastCursor := trades[len(trades)-1].Order.Timestamp.AsInt64()
// add 1 to lastCursor so we don't repeat the same cursor on the next run
cursor = strconv.FormatInt(lastCursor+1, 10)
}

return &api.TradeHistoryResult{
Cursor: nil,
Cursor: cursor,
Trades: trades,
}, nil
}

// GetLatestTradeCursor impl.
func (c ccxtExchange) GetLatestTradeCursor() (interface{}, error) {
timeNowMillis := time.Now().UnixNano() / int64(time.Millisecond)
latestTradeCursor := fmt.Sprintf("%d", timeNowMillis)
return latestTradeCursor, nil
}

// GetTrades impl
func (c ccxtExchange) GetTrades(pair *model.TradingPair, maybeCursor interface{}) (*api.TradesResult, error) {
pairString, e := pair.ToString(c.assetConverter, c.delimiter)
Expand All @@ -237,16 +255,24 @@ func (c ccxtExchange) GetTrades(pair *model.TradingPair, maybeCursor interface{}

trades := []model.Trade{}
for _, raw := range tradesRaw {
t, e := c.readTrade(pair, pairString, raw)
var t *model.Trade
t, e = c.readTrade(pair, pairString, raw)
if e != nil {
return nil, fmt.Errorf("error while reading trade: %s", e)
}
trades = append(trades, *t)
}

// TODO implement cursor logic
sort.Sort(model.TradesByTsID(trades))
cursor := maybeCursor
if len(trades) > 0 {
lastCursor := trades[len(trades)-1].Order.Timestamp.AsInt64()
// add 1 to lastCursor so we don't repeat the same cursor on the next run
cursor = strconv.FormatInt(lastCursor+1, 10)
}

return &api.TradesResult{
Cursor: nil,
Cursor: cursor,
Trades: trades,
}, nil
}
Expand All @@ -258,6 +284,11 @@ func (c ccxtExchange) readTrade(pair *model.TradingPair, pairString string, rawT

pricePrecision := c.GetOrderConstraints(pair).PricePrecision
volumePrecision := c.GetOrderConstraints(pair).VolumePrecision
// use bigger precision for fee and cost since they are logically derived from amount and price
feecCostPrecision := pricePrecision
if volumePrecision > pricePrecision {
feecCostPrecision = volumePrecision
}

trade := model.Trade{
Order: model.Order{
Expand All @@ -268,7 +299,7 @@ func (c ccxtExchange) readTrade(pair *model.TradingPair, pairString string, rawT
Timestamp: model.MakeTimestamp(rawTrade.Timestamp),
},
TransactionID: model.MakeTransactionID(rawTrade.ID),
Fee: nil,
Fee: model.NumberFromFloat(rawTrade.Fee.Cost, feecCostPrecision),
}

if rawTrade.Side == "sell" {
Expand All @@ -280,12 +311,7 @@ func (c ccxtExchange) readTrade(pair *model.TradingPair, pairString string, rawT
}

if rawTrade.Cost != 0.0 {
// use bigger precision for cost since it's logically derived from amount and price
costPrecision := pricePrecision
if volumePrecision > pricePrecision {
costPrecision = volumePrecision
}
trade.Cost = model.NumberFromFloat(rawTrade.Cost, costPrecision)
trade.Cost = model.NumberFromFloat(rawTrade.Cost, feecCostPrecision)
}

return &trade, nil
Expand Down
45 changes: 43 additions & 2 deletions plugins/ccxtExchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package plugins
import (
"fmt"
"log"
"strconv"
"testing"
"time"

"github.com/stellar/kelp/api"
"github.com/stellar/kelp/model"
Expand Down Expand Up @@ -124,7 +126,11 @@ func TestGetTradeHistory_Ccxt(t *testing.T) {
if !assert.NoError(t, e) {
return
}
assert.Equal(t, nil, tradeHistoryResult.Cursor)
if len(tradeHistoryResult.Trades) > 0 {
assert.NotNil(t, tradeHistoryResult.Cursor)
} else {
assert.Nil(t, tradeHistoryResult.Cursor)
}

validateTrades(t, pair, tradeHistoryResult.Trades)
})
Expand All @@ -151,7 +157,7 @@ func validateTrades(t *testing.T, pair model.TradingPair, trades []model.Trade)
if !assert.NotNil(t, trade.TransactionID) {
return
}
if !assert.Nil(t, trade.Fee) {
if !assert.NotNil(t, trade.Fee) {
return
}
if trade.OrderAction != model.OrderActionBuy && trade.OrderAction != model.OrderActionSell {
Expand All @@ -164,6 +170,41 @@ func validateTrades(t *testing.T, pair model.TradingPair, trades []model.Trade)
}
}

func TestGetLatestTradeCursor_Ccxt(t *testing.T) {
for exchangeName, apiKey := range supportedTradingExchanges {
t.Run(exchangeName, func(t *testing.T) {
testCcxtExchange, e := makeCcxtExchange(exchangeName, testOrderConstraints, []api.ExchangeAPIKey{apiKey}, false)
if !assert.NoError(t, e) {
return
}

startIntervalMillis := time.Now().UnixNano() / int64(time.Millisecond)
cursor, e := testCcxtExchange.GetLatestTradeCursor()
if !assert.NoError(t, e) {
return
}
endIntervalMillis := time.Now().UnixNano() / int64(time.Millisecond)

if !assert.IsType(t, "string", cursor) {
return
}

cursorString := cursor.(string)
cursorInt, e := strconv.ParseInt(cursorString, 10, 64)
if !assert.NoError(t, e) {
return
}

if !assert.True(t, startIntervalMillis <= cursorInt, fmt.Sprintf("returned cursor (%d) should gte the start time of the function call in milliseconds (%d)", cursorInt, startIntervalMillis)) {
return
}
if !assert.True(t, endIntervalMillis >= cursorInt, fmt.Sprintf("returned cursor (%d) should lte the end time of the function call in milliseconds (%d)", cursorInt, endIntervalMillis)) {
return
}
})
}
}

func TestGetAccountBalances_Ccxt(t *testing.T) {
if testing.Short() {
return
Expand Down
34 changes: 32 additions & 2 deletions plugins/krakenExchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"log"
"math"
"reflect"
"sort"
"strconv"
"strings"
"time"

"github.com/Beldur/kraken-go-api-client"
"github.com/stellar/kelp/api"
Expand Down Expand Up @@ -371,7 +374,8 @@ func (k *krakenExchange) getTradeHistory(tradingPair model.TradingPair, maybeCur
_cost := m["cost"].(string)
_fee := m["fee"].(string)
_pair := m["pair"].(string)
pair, e := model.TradingPairFromString(4, k.assetConverter, _pair)
var pair *model.TradingPair
pair, e = model.TradingPairFromString(4, k.assetConverter, _pair)
if e != nil {
return nil, e
}
Expand All @@ -397,11 +401,32 @@ func (k *krakenExchange) getTradeHistory(tradingPair model.TradingPair, maybeCur
Fee: model.MustNumberFromString(_fee, feeCostPrecision),
})
}
res.Cursor = _time
}

// sort to be in ascending order
sort.Sort(model.TradesByTsID(res.Trades))

// set correct value for cursor
if len(res.Trades) > 0 {
lastCursor := res.Trades[len(res.Trades)-1].Order.Timestamp.AsInt64()
// add 1 to lastCursor so we don't repeat the same cursor on the next run
res.Cursor = strconv.FormatInt(lastCursor+1, 10)
} else if maybeCursorStart != nil {
res.Cursor = *maybeCursorStart
} else {
res.Cursor = nil
}

return &res, nil
}

// GetLatestTradeCursor impl.
func (k *krakenExchange) GetLatestTradeCursor() (interface{}, error) {
timeNowSecs := time.Now().Unix()
latestTradeCursor := fmt.Sprintf("%d", timeNowSecs)
return latestTradeCursor, nil
}

// GetTrades impl.
func (k *krakenExchange) GetTrades(pair *model.TradingPair, maybeCursor interface{}) (*api.TradesResult, error) {
if maybeCursor != nil {
Expand Down Expand Up @@ -456,6 +481,11 @@ func (k *krakenExchange) getTrades(pair *model.TradingPair, maybeCursor *int64)
// Fee unavailable
})
}

// sort to be in ascending order
sort.Sort(model.TradesByTsID(tradesResult.Trades))
// cursor is already set using the result from the kraken go sdk, so no need to set again here

return tradesResult, nil
}

Expand Down
27 changes: 27 additions & 0 deletions plugins/krakenExchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"testing"
"time"

"github.com/stellar/kelp/api"

Expand Down Expand Up @@ -155,6 +156,32 @@ func TestGetTradeHistory(t *testing.T) {
assert.Fail(t, "force fail")
}

func TestGetLatestTradeCursor(t *testing.T) {
startIntervalSecs := time.Now().Unix()
cursor, e := testKrakenExchange.GetLatestTradeCursor()
if !assert.NoError(t, e) {
return
}
endIntervalSecs := time.Now().Unix()

if !assert.IsType(t, "string", cursor) {
return
}

cursorString := cursor.(string)
cursorInt, e := strconv.ParseInt(cursorString, 10, 64)
if !assert.NoError(t, e) {
return
}

if !assert.True(t, startIntervalSecs <= cursorInt, fmt.Sprintf("returned cursor (%d) should gte the start time of the function call in seconds (%d)", cursorInt, startIntervalSecs)) {
return
}
if !assert.True(t, endIntervalSecs >= cursorInt, fmt.Sprintf("returned cursor (%d) should lte the end time of the function call in seconds (%d)", cursorInt, endIntervalSecs)) {
return
}
}

func TestGetOpenOrders(t *testing.T) {
if testing.Short() {
return
Expand Down
4 changes: 4 additions & 0 deletions support/sdk/ccxt.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ type CcxtTrade struct {
Side string `json:"side"`
Symbol string `json:"symbol"`
Timestamp int64 `json:"timestamp"`
Fee struct {
Cost float64 `json:"cost"`
Currency string `json:"currency"`
} `json:"fee"`
}

// FetchTrades calls the /fetchTrades endpoint on CCXT, trading pair is the CCXT version of the trading pair
Expand Down
Loading