diff --git a/exp/orderbook/batch.go b/exp/orderbook/batch.go index b9cddb169e..20709f172f 100644 --- a/exp/orderbook/batch.go +++ b/exp/orderbook/batch.go @@ -60,7 +60,7 @@ func (tx *orderBookBatchedUpdates) apply(ledger uint32) error { } tx.committed = true - if tx.orderbook.lastLedger > 0 && ledger != tx.orderbook.lastLedger+1 { + if tx.orderbook.lastLedger > 0 && ledger <= tx.orderbook.lastLedger { return errUnexpectedLedger } @@ -71,6 +71,9 @@ func (tx *orderBookBatchedUpdates) apply(ledger uint32) error { panic(errors.Wrap(err, "could not apply update in batch")) } case removeOfferOperationType: + if _, ok := tx.orderbook.tradingPairForOffer[operation.offerID]; !ok { + continue + } if err := tx.orderbook.remove(operation.offerID); err != nil { panic(errors.Wrap(err, "could not apply update in batch")) } diff --git a/exp/orderbook/graph_test.go b/exp/orderbook/graph_test.go index 09b975f819..0e86f44e2c 100644 --- a/exp/orderbook/graph_test.go +++ b/exp/orderbook/graph_test.go @@ -328,13 +328,23 @@ func TestApplyOutdatedLedger(t *testing.T) { graph.Discard() graph.AddOffer(eurOffer) - err = graph.Apply(4) + err = graph.Apply(2) if err != errUnexpectedLedger { t.Fatalf("expected error %v but got %v", errUnexpectedLedger, err) } if graph.lastLedger != 2 { t.Fatalf("expected last ledger to be %v but got %v", 2, graph.lastLedger) } + + graph.Discard() + + err = graph.Apply(4) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + if graph.lastLedger != 4 { + t.Fatalf("expected last ledger to be %v but got %v", 4, graph.lastLedger) + } } func TestAddOfferOrderBook(t *testing.T) { @@ -815,6 +825,17 @@ func TestRemoveOfferOrderBook(t *testing.T) { t.Fatalf("expected last ledger to be %v but got %v", 3, graph.lastLedger) } + // Skip over offer ids which are not present in the graph + err = graph. + RemoveOffer(988888). + Apply(5) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + if graph.lastLedger != 5 { + t.Fatalf("expected last ledger to be %v but got %v", 3, graph.lastLedger) + } + expectedGraph.edgesForSellingAsset = map[string]edgeSet{} expectedGraph.tradingPairForOffer = map[xdr.Int64]tradingPair{} assertGraphEquals(t, graph, expectedGraph) diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index 02a24c9fee..9698288b17 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -12,7 +12,6 @@ import ( metrics "github.com/rcrowley/go-metrics" "github.com/stellar/go/clients/stellarcore" - "github.com/stellar/go/exp/orderbook" horizonContext "github.com/stellar/go/services/horizon/internal/context" "github.com/stellar/go/services/horizon/internal/db2/core" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -458,12 +457,7 @@ func (a *App) init() { // expingester initExpIngester(a) } - orderBookGraph := orderbook.NewOrderBookGraph() - a.orderBookStream = &expingest.OrderBookStream{ - OrderBookGraph: orderBookGraph, - HistoryQ: &history.Q{a.HorizonSession(a.ctx)}, - } - initPathFinder(a, orderBookGraph) + initPathFinder(a) // txsub initSubmissionSystem(a) diff --git a/services/horizon/internal/expingest/orderbook.go b/services/horizon/internal/expingest/orderbook.go index 91ab819b4e..a9578bcf34 100644 --- a/services/horizon/internal/expingest/orderbook.go +++ b/services/horizon/internal/expingest/orderbook.go @@ -7,6 +7,8 @@ import ( "sort" "time" + "github.com/rcrowley/go-metrics" + "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -25,10 +27,22 @@ const ( // in memory graph. However, it is safe for other go routines to use the // in memory graph for read operations. type OrderBookStream struct { - OrderBookGraph orderbook.OBGraph - HistoryQ history.IngestionQ - lastLedger uint32 - lastUpdate time.Time + graph orderbook.OBGraph + historyQ history.IngestionQ + // LatestLedgerGauge exposes the local (order book graph) + // latest processed ledger + LatestLedgerGauge metrics.Gauge + lastLedger uint32 + lastUpdate time.Time +} + +// NewOrderBookStream constructs and initializes an OrderBookStream instance +func NewOrderBookStream(historyQ history.IngestionQ, graph orderbook.OBGraph) *OrderBookStream { + return &OrderBookStream{ + graph: graph, + historyQ: historyQ, + LatestLedgerGauge: metrics.NewGauge(), + } } type ingestionStatus struct { @@ -42,21 +56,21 @@ func (o *OrderBookStream) getIngestionStatus() (ingestionStatus, error) { var status ingestionStatus var err error - status.StateInvalid, err = o.HistoryQ.GetExpStateInvalid() + status.StateInvalid, err = o.historyQ.GetExpStateInvalid() if err != nil { return status, errors.Wrap(err, "Error from GetExpStateInvalid") } var lastHistoryLedger uint32 - lastHistoryLedger, err = o.HistoryQ.GetLatestLedger() + lastHistoryLedger, err = o.historyQ.GetLatestLedger() if err != nil { return status, errors.Wrap(err, "Error from GetLatestLedger") } - status.LastIngestedLedger, err = o.HistoryQ.GetLastLedgerExpIngestNonBlocking() + status.LastIngestedLedger, err = o.historyQ.GetLastLedgerExpIngestNonBlocking() if err != nil { return status, errors.Wrap(err, "Error from GetLastLedgerExpIngestNonBlocking") } - status.LastOfferCompactionLedger, err = o.HistoryQ.GetOfferCompactionSequence() + status.LastOfferCompactionLedger, err = o.historyQ.GetOfferCompactionSequence() if err != nil { return status, errors.Wrap(err, "Error from GetOfferCompactionSequence") } @@ -103,7 +117,7 @@ func (o *OrderBookStream) update(status ingestionStatus) error { } if reset { - o.OrderBookGraph.Clear() + o.graph.Clear() o.lastLedger = 0 // wait until offers in horizon db is valid before populating order book graph @@ -113,22 +127,23 @@ func (o *OrderBookStream) update(status ingestionStatus) error { return nil } - defer o.OrderBookGraph.Discard() + defer o.graph.Discard() - offers, err := o.HistoryQ.GetAllOffers() + offers, err := o.historyQ.GetAllOffers() if err != nil { return errors.Wrap(err, "Error from GetAllOffers") } for _, offer := range offers { - addOfferToGraph(o.OrderBookGraph, offer) + addOfferToGraph(o.graph, offer) } - if err := o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil { + if err := o.graph.Apply(status.LastIngestedLedger); err != nil { return errors.Wrap(err, "Error applying changes to order book") } o.lastLedger = status.LastIngestedLedger + o.LatestLedgerGauge.Update(int64(status.LastIngestedLedger)) return nil } @@ -136,32 +151,33 @@ func (o *OrderBookStream) update(status ingestionStatus) error { return nil } - defer o.OrderBookGraph.Discard() + defer o.graph.Discard() - offers, err := o.HistoryQ.GetUpdatedOffers(o.lastLedger) + offers, err := o.historyQ.GetUpdatedOffers(o.lastLedger) if err != nil { return errors.Wrap(err, "Error from GetUpdatedOffers") } for _, offer := range offers { if offer.Deleted { - o.OrderBookGraph.RemoveOffer(offer.OfferID) + o.graph.RemoveOffer(offer.OfferID) } else { - addOfferToGraph(o.OrderBookGraph, offer) + addOfferToGraph(o.graph, offer) } } - if err = o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil { + if err = o.graph.Apply(status.LastIngestedLedger); err != nil { return errors.Wrap(err, "Error applying changes to order book") } o.lastUpdate = time.Now() o.lastLedger = status.LastIngestedLedger + o.LatestLedgerGauge.Update(int64(status.LastIngestedLedger)) return nil } func (o *OrderBookStream) verifyAllOffers() { - offers := o.OrderBookGraph.Offers() - ingestionOffers, err := o.HistoryQ.GetAllOffers() + offers := o.graph.Offers() + ingestionOffers, err := o.historyQ.GetAllOffers() if err != nil { // reset last update so that we retry verification on next update o.lastUpdate = time.Now().Add(verificationFrequency * -2) @@ -210,13 +226,13 @@ func (o *OrderBookStream) verifyAllOffers() { // After calling this function, the the in memory order book graph should be consistent with the // Horizon DB (assuming no error is returned). func (o *OrderBookStream) Update() error { - if err := o.HistoryQ.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead}); err != nil { + if err := o.historyQ.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead}); err != nil { return errors.Wrap(err, "Error starting repeatable read transaction") } - defer o.HistoryQ.Rollback() + defer o.historyQ.Rollback() // add 15 minute jitter so that not all horizon nodes are calling - // HistoryQ.GetAllOffers at the same time + // historyQ.GetAllOffers at the same time jitter := time.Duration(rand.Int63n(int64(15 * time.Minute))) requiresVerification := !o.lastUpdate.Equal(time.Time{}) && time.Since(o.lastUpdate) >= verificationFrequency+jitter diff --git a/services/horizon/internal/expingest/orderbook_test.go b/services/horizon/internal/expingest/orderbook_test.go index e3af36e293..7f73f92c72 100644 --- a/services/horizon/internal/expingest/orderbook_test.go +++ b/services/horizon/internal/expingest/orderbook_test.go @@ -21,7 +21,7 @@ func TestIngestionStatus(t *testing.T) { func (t *IngestionStatusTestSuite) SetupTest() { t.historyQ = &mockDBQ{} - t.stream = &OrderBookStream{HistoryQ: t.historyQ} + t.stream = NewOrderBookStream(t.historyQ, &mockOrderBookGraph{}) } func (t *IngestionStatusTestSuite) TearDownTest() { @@ -181,7 +181,7 @@ func TestUpdateOrderBookStream(t *testing.T) { func (t *UpdateOrderBookStreamTestSuite) SetupTest() { t.historyQ = &mockDBQ{} t.graph = &mockOrderBookGraph{} - t.stream = &OrderBookStream{OrderBookGraph: t.graph, HistoryQ: t.historyQ} + t.stream = NewOrderBookStream(t.historyQ, t.graph) } func (t *UpdateOrderBookStreamTestSuite) TearDownTest() { @@ -484,7 +484,7 @@ func TestVerifyOrderBookStream(t *testing.T) { func (t *VerifyOrderBookStreamTestSuite) SetupTest() { t.historyQ = &mockDBQ{} t.graph = &mockOrderBookGraph{} - t.stream = &OrderBookStream{OrderBookGraph: t.graph, HistoryQ: t.historyQ} + t.stream = NewOrderBookStream(t.historyQ, t.graph) sellerID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" otherSellerID := "GAXI33UCLQTCKM2NMRBS7XYBR535LLEVAHL5YBN4FTCB4HZHT7ZA5CVK" diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 0fb84b12dc..066e867fda 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -97,7 +97,13 @@ func initExpIngester(app *App) { } } -func initPathFinder(app *App, orderBookGraph *orderbook.OrderBookGraph) { +func initPathFinder(app *App) { + orderBookGraph := orderbook.NewOrderBookGraph() + app.orderBookStream = expingest.NewOrderBookStream( + &history.Q{app.HorizonSession(app.ctx)}, + orderBookGraph, + ) + app.paths = simplepath.NewInMemoryFinder(orderBookGraph) } @@ -144,6 +150,7 @@ func initDbMetrics(app *App) { app.metrics.Register("history.latest_ledger", app.historyLatestLedgerGauge) app.metrics.Register("history.elder_ledger", app.historyElderLedgerGauge) app.metrics.Register("stellar_core.latest_ledger", app.coreLatestLedgerGauge) + app.metrics.Register("order_book_stream.latest_ledger", app.orderBookStream.LatestLedgerGauge) app.metrics.Register("history.open_connections", app.horizonConnGauge) app.metrics.Register("stellar_core.open_connections", app.coreConnGauge) app.metrics.Register("goroutines", app.goroutineGauge)