diff --git a/api/exchange.go b/api/exchange.go index 346cb403e..9d3966399 100644 --- a/api/exchange.go +++ b/api/exchange.go @@ -266,7 +266,13 @@ func ConvertOperation2TM(ops []txnbuild.Operation) []build.TransactionMutator { // ConvertTM2Operation is a temporary adapter to support transitioning from the old Go SDK to the new SDK without having to bump the major version func ConvertTM2Operation(muts []build.TransactionMutator) []txnbuild.Operation { - ops := []txnbuild.Operation{} + msos := ConvertTM2MSO(muts) + return ConvertMSO2Ops(msos) +} + +// ConvertTM2MSO converts mutators from the old SDK to ManageSellOffer ops in the new one. +func ConvertTM2MSO(muts []build.TransactionMutator) []*txnbuild.ManageSellOffer { + msos := []*txnbuild.ManageSellOffer{} for _, m := range muts { var mso *txnbuild.ManageSellOffer if mob, ok := m.(build.ManageOfferBuilder); ok { @@ -276,6 +282,15 @@ func ConvertTM2Operation(muts []build.TransactionMutator) []txnbuild.Operation { } else { panic(fmt.Sprintf("could not convert build.TransactionMutator to txnbuild.Operation: %v (type=%T)\n", m, m)) } + msos = append(msos, mso) + } + return msos +} + +// ConvertMSO2Ops converts manage sell offers into Operations. +func ConvertMSO2Ops(msos []*txnbuild.ManageSellOffer) []txnbuild.Operation { + ops := []txnbuild.Operation{} + for _, mso := range msos { ops = append(ops, mso) } return ops diff --git a/plugins/metricsTracker.go b/plugins/metricsTracker.go index e35434c57..2174d1619 100644 --- a/plugins/metricsTracker.go +++ b/plugins/metricsTracker.go @@ -91,6 +91,10 @@ type updateProps struct { Success bool `json:"success"` MillisForUpdate int64 `json:"millis_for_update"` SecondsSinceLastUpdateMetric float64 `json:"seconds_since_last_update_metric"` // helps understand total runtime of bot when summing this field across events + NumPruneOps int `json:"num_prune_ops"` + NumUpdateOpsDelete int `json:"num_update_ops_delete"` + NumUpdateOpsUpdate int `json:"num_update_ops_update"` + NumUpdateOpsCreate int `json:"num_update_ops_create"` } // deleteProps holds the properties for the delete Amplitude event. @@ -106,6 +110,16 @@ type eventWrapper struct { Events []event `json:"events"` } +// UpdateLoopResult contains the results of the orderbook update. +// Note that this is used in `trader/trader.go`, but it is defined here to avoid an import cycle. +type UpdateLoopResult struct { + Success bool + NumPruneOps int + NumUpdateOpsDelete int + NumUpdateOpsUpdate int + NumUpdateOpsCreate int +} + // response structure taken from here: https://help.amplitude.com/hc/en-us/articles/360032842391-HTTP-API-V2#tocSsuccesssummary type amplitudeResponse struct { Code int `json:"code"` @@ -273,7 +287,7 @@ func (mt *MetricsTracker) SendStartupEvent(now time.Time) error { } // SendUpdateEvent sends the update Amplitude event. -func (mt *MetricsTracker) SendUpdateEvent(now time.Time, success bool, millisForUpdate int64) error { +func (mt *MetricsTracker) SendUpdateEvent(now time.Time, updateResult UpdateLoopResult, millisForUpdate int64) error { var secondsSinceLastUpdateMetric float64 if mt.updateEventSentTime == nil { secondsSinceLastUpdateMetric = now.Sub(mt.botStartTime).Seconds() @@ -282,9 +296,13 @@ func (mt *MetricsTracker) SendUpdateEvent(now time.Time, success bool, millisFor } updateProps := updateProps{ - Success: success, + Success: updateResult.Success, MillisForUpdate: millisForUpdate, SecondsSinceLastUpdateMetric: secondsSinceLastUpdateMetric, + NumPruneOps: updateResult.NumPruneOps, + NumUpdateOpsDelete: updateResult.NumUpdateOpsDelete, + NumUpdateOpsUpdate: updateResult.NumUpdateOpsUpdate, + NumUpdateOpsCreate: updateResult.NumUpdateOpsCreate, } e := mt.SendEvent(updateEventName, updateProps, now) diff --git a/trader/trader.go b/trader/trader.go index 28c645838..64c009ac7 100644 --- a/trader/trader.go +++ b/trader/trader.go @@ -5,6 +5,7 @@ import ( "log" "math" "sort" + "strconv" "time" "github.com/nikhilsaraf/go-tools/multithreading" @@ -122,12 +123,12 @@ func (t *Trader) Start() { for { currentUpdateTime := time.Now() if lastUpdateTime.IsZero() || t.timeController.ShouldUpdate(lastUpdateTime, currentUpdateTime) { - success := t.update() + updateResult := t.update() millisForUpdate := time.Since(currentUpdateTime).Milliseconds() log.Printf("time taken for update loop: %d millis\n", millisForUpdate) if shouldSendUpdateMetric(t.startTime, currentUpdateTime, t.metricsTracker.GetUpdateEventSentTime()) { e := t.threadTracker.TriggerGoroutine(func(inputs []interface{}) { - e := t.metricsTracker.SendUpdateEvent(currentUpdateTime, success, millisForUpdate) + e := t.metricsTracker.SendUpdateEvent(currentUpdateTime, updateResult, millisForUpdate) if e != nil { log.Printf("failed to send update event metric: %s", e) } @@ -137,7 +138,7 @@ func (t *Trader) Start() { } } - if t.fixedIterations != nil && success { + if t.fixedIterations != nil && updateResult.Success { *t.fixedIterations = *t.fixedIterations - 1 if *t.fixedIterations <= 0 { log.Printf("finished requested number of iterations, waiting for all threads to finish...\n") @@ -352,12 +353,24 @@ func isStateSynchronized( // time to update the order book and possibly readjust the offers // returns true if the update was successful, otherwise false -func (t *Trader) update() bool { +func (t *Trader) update() plugins.UpdateLoopResult { + // initialize counts of types of ops + numPruneOps := 0 + numUpdateOpsDelete := 0 + numUpdateOpsUpdate := 0 + numUpdateOpsCreate := 0 + e := t.synchronizeFetchBalancesOffersTrades() if e != nil { log.Println(e) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } pair := &model.TradingPair{ @@ -376,7 +389,13 @@ func (t *Trader) update() bool { if e != nil { log.Println(e) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } // strategy has a chance to set any state it needs @@ -384,20 +403,33 @@ func (t *Trader) update() bool { if e != nil { log.Println(e) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } // delete excess offers var pruneOps []build.TransactionMutator pruneOps, t.buyingAOffers, t.sellingAOffers = t.strategy.PruneExistingOffers(t.buyingAOffers, t.sellingAOffers) - log.Printf("created %d operations to prune excess offers\n", len(pruneOps)) - if len(pruneOps) > 0 { + numPruneOps = len(pruneOps) + log.Printf("created %d operations to prune excess offers\n", numPruneOps) + if numPruneOps > 0 { // to prune/delete offers the submitMode doesn't matter, so use api.SubmitModeBoth as the default e = t.exchangeShim.SubmitOps(pruneOps, api.SubmitModeBoth, nil) if e != nil { log.Println(e) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } // TODO 2 streamline the request data instead of caching - may not need this since result of PruneOps is async @@ -410,7 +442,13 @@ func (t *Trader) update() bool { if e != nil { log.Println(e) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } } @@ -422,16 +460,42 @@ func (t *Trader) update() bool { log.Printf("liabilities (force recomputed) after encountering an error after a call to UpdateWithOps\n") t.sdex.IEIF().RecomputeAndLogCachedLiabilities(t.assetBase, t.assetQuote) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } + } + + msos := api.ConvertTM2MSO(opsOld) + numUpdateOpsDelete, numUpdateOpsUpdate, numUpdateOpsCreate, e = countOfferChangeTypes(msos) + if e != nil { + log.Println(e) + t.deleteAllOffers(false) + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } - ops := api.ConvertTM2Operation(opsOld) + ops := api.ConvertMSO2Ops(msos) for i, filter := range t.submitFilters { ops, e = filter.Apply(ops, t.sellingAOffers, t.buyingAOffers) if e != nil { log.Printf("error in filter index %d: %s\n", i, e) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } } @@ -443,11 +507,16 @@ func (t *Trader) update() bool { t.deleteAllOffers(true) } }) - if e != nil { log.Println(e) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } } @@ -455,12 +524,24 @@ func (t *Trader) update() bool { if e != nil { log.Println(e) t.deleteAllOffers(false) - return false + return plugins.UpdateLoopResult{ + Success: false, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } // reset deleteCycles on every successful run t.deleteCycles = 0 - return true + return plugins.UpdateLoopResult{ + Success: true, + NumPruneOps: numPruneOps, + NumUpdateOpsDelete: numUpdateOpsDelete, + NumUpdateOpsUpdate: numUpdateOpsUpdate, + NumUpdateOpsCreate: numUpdateOpsCreate, + } } func (t *Trader) getBalances() (*api.Balance /*baseBalance*/, *api.Balance /*quoteBalance*/, error) { @@ -534,3 +615,30 @@ func (t *Trader) getExistingOffers() ([]hProtocol.Offer /*sellingAOffers*/, []hP func (t *Trader) setExistingOffers(sellingAOffers []hProtocol.Offer, buyingAOffers []hProtocol.Offer) { t.sellingAOffers, t.buyingAOffers = sellingAOffers, buyingAOffers } + +func countOfferChangeTypes(offers []*txnbuild.ManageSellOffer) (int /*numDelete*/, int /*numUpdate*/, int /*numCreate*/, error) { + numDelete, numUpdate, numCreate := 0, 0, 0 + for i, o := range offers { + if o == nil { + return 0, 0, 0, fmt.Errorf("offer at index %d was not of expected type ManageSellOffer (actual type = %T): %+v", i, o, o) + } + + opAmount, e := strconv.ParseFloat(o.Amount, 64) + if e != nil { + return 0, 0, 0, fmt.Errorf("invalid operation amount (%s) could not be parsed as float for operation at index %d: %v", o.Amount, i, o) + } + + // 0 amount represents deletion + // 0 offer id represents creating a new offer + // anything else represents updating an extiing offer + if opAmount == 0 { + numDelete++ + } else if o.OfferID == 0 { + numCreate++ + } else { + numUpdate++ + } + } + + return numDelete, numUpdate, numCreate, nil +} diff --git a/trader/trader_test.go b/trader/trader_test.go index 2fa24429a..bce41f156 100644 --- a/trader/trader_test.go +++ b/trader/trader_test.go @@ -1,12 +1,14 @@ package trader import ( + "fmt" "testing" "time" "github.com/stretchr/testify/assert" hProtocol "github.com/stellar/go/protocols/horizon" + "github.com/stellar/go/txnbuild" "github.com/stellar/kelp/api" "github.com/stellar/kelp/model" ) @@ -247,3 +249,100 @@ func TestShouldSendUpdateMetric_NilLastMetricUpdate(t *testing.T) { shouldUpdate := shouldSendUpdateMetric(now, now, nil) assert.Equal(t, true, shouldUpdate) } + +func TestCountOfferChangeTypes(t *testing.T) { + testCases := []struct { + name string + offers []*txnbuild.ManageSellOffer + wantNumCreate int + wantNumDelete int + wantNumUpdate int + }{ + // We shorten the types of ManageSellOffers for test readability. + // C = create a sell offer, U = update existing offer, D = delete offer + { + name: "C-C-C-U-D", + offers: []*txnbuild.ManageSellOffer{ + createTestMSO("create"), + createTestMSO("create"), + createTestMSO("create"), + createTestMSO("update"), + createTestMSO("delete"), + }, + wantNumCreate: 3, + wantNumDelete: 1, + wantNumUpdate: 1, + }, + { + name: "D-C-U-C-U-D", + offers: []*txnbuild.ManageSellOffer{ + createTestMSO("delete"), + createTestMSO("create"), + createTestMSO("update"), + createTestMSO("create"), + createTestMSO("update"), + createTestMSO("delete"), + }, + wantNumCreate: 2, + wantNumDelete: 2, + wantNumUpdate: 2, + }, + { + name: "U-U", + offers: []*txnbuild.ManageSellOffer{ + createTestMSO("update"), + createTestMSO("update"), + }, + wantNumCreate: 0, + wantNumDelete: 0, + wantNumUpdate: 2, + }, + { + name: "C", + offers: []*txnbuild.ManageSellOffer{ + createTestMSO("create"), + }, + wantNumCreate: 1, + wantNumDelete: 0, + wantNumUpdate: 0, + }, + } + + for _, k := range testCases { + t.Run(k.name, func(t *testing.T) { + gotNumDelete, gotNumUpdate, gotNumCreate, gotErr := countOfferChangeTypes(k.offers) + if !assert.Nil(t, gotErr) { + return + } + + assert.Equal(t, k.wantNumDelete, gotNumDelete) + assert.Equal(t, k.wantNumUpdate, gotNumUpdate) + assert.Equal(t, k.wantNumCreate, gotNumCreate) + }) + } + +} + +func createTestMSO(msoType string) *txnbuild.ManageSellOffer { + mso := txnbuild.ManageSellOffer{} + switch msoType { + case "create": + mso = txnbuild.ManageSellOffer{ + Amount: "1.0", + OfferID: 0, + } + case "update": + mso = txnbuild.ManageSellOffer{ + Amount: "1.0", + OfferID: 1, + } + case "delete": + mso = txnbuild.ManageSellOffer{ + Amount: "0.0", + OfferID: 1, + } + default: + panic(fmt.Sprintf("invalid manage sell offer type: %s", msoType)) + } + return &mso +}