Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
Fill Tracker should accommodate N errors before causing bot to exit (#…
Browse files Browse the repository at this point in the history
…137), closes #96
  • Loading branch information
nikhilsaraf authored Mar 26, 2019
1 parent ec331c6 commit de12bfe
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 67 deletions.
1 change: 1 addition & 0 deletions api/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type Balance struct {
// ExchangeShim is the interface we use as a generic API for all crypto exchanges
type ExchangeShim interface {
SubmitOps(ops []build.TransactionMutator, asyncCallback func(hash string, e error)) error
SubmitOpsSynch(ops []build.TransactionMutator, asyncCallback func(hash string, e error)) error // forced synchronous version of SubmitOps
GetBalanceHack(asset horizon.Asset) (*Balance, error)
LoadOffersHack() ([]horizon.Offer, error)
Constrainable
Expand Down
40 changes: 27 additions & 13 deletions cmd/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,22 +234,23 @@ func makeStrategy(
ieif *plugins.IEIF,
tradingPair *model.TradingPair,
options inputs,
threadTracker *multithreading.ThreadTracker,
) api.Strategy {
// setting the temp hack variables for the sdex price feeds
e := plugins.SetPrivateSdexHack(client, plugins.MakeIEIF(true), network)
if e != nil {
l.Info("")
l.Errorf("%s", e)
// we want to delete all the offers and exit here since there is something wrong with our setup
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}

strategy, e := plugins.MakeStrategy(sdex, ieif, tradingPair, &assetBase, &assetQuote, *options.strategy, *options.stratConfigPath, *options.simMode)
if e != nil {
l.Info("")
l.Errorf("%s", e)
// we want to delete all the offers and exit here since there is something wrong with our setup
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}
return strategy
}
Expand All @@ -275,7 +276,7 @@ func makeBot(
log.Println()
log.Println(e)
// we want to delete all the offers and exit here since there is something wrong with our setup
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}
dataKey := model.MakeSortedBotKey(botConfig.AssetBase(), botConfig.AssetQuote())
alert, e := monitoring.MakeAlert(botConfig.AlertType, botConfig.AlertAPIKey)
Expand Down Expand Up @@ -345,6 +346,7 @@ func runTradeCmd(options inputs) {
ieif,
tradingPair,
options,
threadTracker,
)
bot := makeBot(
l,
Expand All @@ -371,7 +373,7 @@ func runTradeCmd(options inputs) {
// we want to delete all the offers and exit here because we don't want the bot to run if monitoring isn't working
// if monitoring is desired but not working properly, we want the bot to be shut down and guarantee that there
// aren't outstanding offers.
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}
}()
}
Expand Down Expand Up @@ -444,11 +446,11 @@ func startFillTracking(
l.Info("")
l.Info("problem encountered while instantiating the fill tracker:")
l.Errorf("%s", e)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}

if botConfig.FillTrackerSleepMillis != 0 {
fillTracker := plugins.MakeFillTracker(tradingPair, threadTracker, exchangeShim, botConfig.FillTrackerSleepMillis)
fillTracker := plugins.MakeFillTracker(tradingPair, threadTracker, exchangeShim, botConfig.FillTrackerSleepMillis, botConfig.FillTrackerDeleteCyclesThreshold)
fillLogger := plugins.MakeFillLogger()
fillTracker.RegisterHandler(fillLogger)
if strategyFillHandlers != nil {
Expand All @@ -462,17 +464,16 @@ func startFillTracking(
e := fillTracker.TrackFills()
if e != nil {
l.Info("")
l.Info("problem encountered while running the fill tracker:")
l.Errorf("%s", e)
l.Errorf("problem encountered while running the fill tracker: %s", e)
// we want to delete all the offers and exit here because we don't want the bot to run if fill tracking isn't working
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}
}()
} else if strategyFillHandlers != nil && len(strategyFillHandlers) > 0 {
l.Info("")
l.Error("error: strategy has FillHandlers but fill tracking was disabled (set FILL_TRACKER_SLEEP_MILLIS to a non-zero value)")
// we want to delete all the offers and exit here because we don't want the bot to run if fill tracking isn't working
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim)
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}
}

Expand Down Expand Up @@ -509,7 +510,20 @@ func validateTrustlines(l logger.Logger, client *horizon.Client, botConfig *trad
l.Info("trustlines valid")
}

func deleteAllOffersAndExit(l logger.Logger, botConfig trader.BotConfig, client *horizon.Client, sdex *plugins.SDEX, exchangeShim api.ExchangeShim) {
func deleteAllOffersAndExit(
l logger.Logger,
botConfig trader.BotConfig,
client *horizon.Client,
sdex *plugins.SDEX,
exchangeShim api.ExchangeShim,
threadTracker *multithreading.ThreadTracker,
) {
l.Info("")
l.Infof("waiting for all outstanding threads (%d) to finish before loading offers to be deleted...", threadTracker.NumActiveThreads())
threadTracker.Stop(multithreading.StopModeError)
threadTracker.Wait()
l.Info("...all outstanding threads finished")

l.Info("")
l.Info("deleting all offers and then exiting...")

Expand All @@ -525,7 +539,7 @@ func deleteAllOffersAndExit(l logger.Logger, botConfig trader.BotConfig, client
l.Infof("created %d operations to delete offers\n", len(dOps))

if len(dOps) > 0 {
e := exchangeShim.SubmitOps(dOps, func(hash string, e error) {
e := exchangeShim.SubmitOpsSynch(dOps, func(hash string, e error) {
if e != nil {
logger.Fatal(l, e)
return
Expand All @@ -539,7 +553,7 @@ func deleteAllOffersAndExit(l logger.Logger, botConfig trader.BotConfig, client

for {
sleepSeconds := 10
l.Infof("sleeping for %d seconds until our deletion is confirmed and we exit...\n", sleepSeconds)
l.Infof("sleeping for %d seconds until our deletion is confirmed and we exit...(should never reach this line since we submit delete ops synchronously)\n", sleepSeconds)
time.Sleep(time.Duration(sleepSeconds) * time.Second)
}
} else {
Expand Down
11 changes: 10 additions & 1 deletion examples/configs/trader/sample_trader.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,20 @@ SUBMIT_MODE="both"
# the typical use case for this config value is to keep the orders on your orderbook intact if your price feed is unreachable for a small amount of time.
# example: use -1 if you never want to delete all offers (this is not recommended).
# example: use 0 if you want to delete all offers on any error.
# example: use 2 if you want to tolerate 2 continuous update cycle with errors, i.e. three continuous update cycles with errors will delete all offers.
# example: use 2 if you want to tolerate 2 continuous update cycles with errors, i.e. 3 continuous update cycles with errors will delete all offers.
DELETE_CYCLES_THRESHOLD=0
# how many milliseconds to sleep before checking for fills again, a value of 0 disables fill tracking
# fill tracking is not supported when trading on a non-SDEX exchange (i.e. set it to 0)
FILL_TRACKER_SLEEP_MILLIS=0
# how many continuous errors in each fill-tracking cycle can the bot accept before it will delete all offers to protect its exposure.
# this number has to be exceeded for all the offers to be deleted and any error will be counted only once per cycle.
# any time the bot completes a full run successfully this counter will be reset.
# the bot will continue running even if it hits an error or deletes all offers.
# the typical use case for this config value is to keep the orders on your orderbook intact if you the endpoint to read fills is unreachable for a small amount of time.
# example: use -1 if you never want to delete all offers (this is not recommended).
# example: use 0 if you want to delete all offers on any error.
# example: use 2 if you want to tolerate 2 continuous cycles with errors, i.e. 3 continuous cycles with errors will delete all offers.
FILL_TRACKER_DELETE_CYCLES_THRESHOLD=0
# the url for your horizon instance. If this url contains the string "test" then the bot assumes it is using the test network.
HORIZON_URL="https://horizon-testnet.stellar.org"

Expand Down
6 changes: 3 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ import:
- package: github.com/lechengfan/googleauth
version: 7595ba02fbce171759c10d69d96e4cd898d1fa93
- package: github.com/nikhilsaraf/go-tools
version: 19004f22be08c82a22e679726ca22853c65919ae
version: a26df67722de7fcf1a8e22cd934e63e553dd3875
- package: github.com/mitchellh/mapstructure
version: v1.1.2
5 changes: 5 additions & 0 deletions plugins/batchedExchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ func (b BatchedExchange) GetLatestTradeCursor() (interface{}, error) {
return b.inner.GetLatestTradeCursor()
}

// SubmitOpsSynch is the forced synchronous version of SubmitOps below (same for batchedExchange)
func (b BatchedExchange) SubmitOpsSynch(ops []build.TransactionMutator, asyncCallback func(hash string, e error)) error {
return b.SubmitOps(ops, asyncCallback)
}

// SubmitOps performs any finalization or submission step needed by the exchange
func (b BatchedExchange) SubmitOps(ops []build.TransactionMutator, asyncCallback func(hash string, e error)) error {
var e error
Expand Down
68 changes: 57 additions & 11 deletions plugins/fillTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ import (

// FillTracker tracks fills
type FillTracker struct {
pair *model.TradingPair
threadTracker *multithreading.ThreadTracker
fillTrackable api.FillTrackable
fillTrackerSleepMillis uint32
pair *model.TradingPair
threadTracker *multithreading.ThreadTracker
fillTrackable api.FillTrackable
fillTrackerSleepMillis uint32
fillTrackerDeleteCyclesThreshold int64

// initialized runtime vars
fillTrackerDeleteCycles int64

// uninitialized
handlers []api.FillHandler
Expand All @@ -31,12 +35,16 @@ func MakeFillTracker(
threadTracker *multithreading.ThreadTracker,
fillTrackable api.FillTrackable,
fillTrackerSleepMillis uint32,
fillTrackerDeleteCyclesThreshold int64,
) api.FillTracker {
return &FillTracker{
pair: pair,
threadTracker: threadTracker,
fillTrackable: fillTrackable,
fillTrackerSleepMillis: fillTrackerSleepMillis,
pair: pair,
threadTracker: threadTracker,
fillTrackable: fillTrackable,
fillTrackerSleepMillis: fillTrackerSleepMillis,
fillTrackerDeleteCyclesThreshold: fillTrackerDeleteCyclesThreshold,
// initialized runtime vars
fillTrackerDeleteCycles: 0,
}
}

Expand All @@ -45,6 +53,23 @@ func (f *FillTracker) GetPair() (pair *model.TradingPair) {
return f.pair
}

// countError updates the error count and returns true if the error limit has been exceeded
func (f *FillTracker) countError() bool {
if f.fillTrackerDeleteCyclesThreshold < 0 {
log.Printf("not deleting any offers because fillTrackerDeleteCyclesThreshold is negative\n")
return false
}

f.fillTrackerDeleteCycles++
if f.fillTrackerDeleteCycles <= f.fillTrackerDeleteCyclesThreshold {
log.Printf("not deleting any offers, fillTrackerDeleteCycles (=%d) needs to exceed fillTrackerDeleteCyclesThreshold (=%d)\n", f.fillTrackerDeleteCycles, f.fillTrackerDeleteCyclesThreshold)
return false
}

log.Printf("deleting all offers, num. continuous fill tracking cycles with errors (including this one): %d; (fillTrackerDeleteCyclesThreshold to be exceeded=%d)\n", f.fillTrackerDeleteCycles, f.fillTrackerDeleteCyclesThreshold)
return true
}

// TrackFills impl
func (f *FillTracker) TrackFills() error {
// get the last cursor so we only start querying from the current position
Expand All @@ -58,19 +83,26 @@ func (f *FillTracker) TrackFills() error {
for {
select {
case e := <-ech:
// always return an error if any of the fill handlers return an eror
return fmt.Errorf("caught an error when tracking fills: %s", e)
default:
// do nothing
}

tradeHistoryResult, e := f.fillTrackable.GetTradeHistory(*f.GetPair(), lastCursor, nil)
if e != nil {
return fmt.Errorf("error when fetching trades: %s", e)
eMsg := fmt.Sprintf("error when fetching trades: %s", e)
if f.countError() {
return fmt.Errorf(eMsg)
}
log.Printf("%s\n", eMsg)
f.sleep()
continue
}

if len(tradeHistoryResult.Trades) > 0 {
// use a single goroutine so we handle trades sequentially and also respect the handler sequence
f.threadTracker.TriggerGoroutine(func(inputs []interface{}) {
e = f.threadTracker.TriggerGoroutine(func(inputs []interface{}) {
ech := inputs[0].(chan error)
defer handlePanic(ech)

Expand All @@ -87,13 +119,27 @@ func (f *FillTracker) TrackFills() error {
}
}
}, []interface{}{ech, f.handlers, tradeHistoryResult.Trades})
if e != nil {
eMsg := fmt.Sprintf("error spawning fill handler: %s", e)
if f.countError() {
return fmt.Errorf(eMsg)
}
log.Printf("%s\n", eMsg)
f.sleep()
continue
}
}

lastCursor = tradeHistoryResult.Cursor
time.Sleep(time.Duration(f.fillTrackerSleepMillis) * time.Millisecond)
f.fillTrackerDeleteCycles = 0
f.sleep()
}
}

func (f *FillTracker) sleep() {
time.Sleep(time.Duration(f.fillTrackerSleepMillis) * time.Millisecond)
}

func handlePanic(ech chan error) {
if r := recover(); r != nil {
e := r.(error)
Expand Down
Loading

0 comments on commit de12bfe

Please sign in to comment.