From fbd36c29a26190f831a8d8524173499b924fbf67 Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Sun, 31 May 2020 13:29:50 +0200 Subject: [PATCH 1/4] Remove orderbook graph from ingestion system --- services/horizon/cmd/ingest.go | 3 - services/horizon/internal/app.go | 24 +- .../internal/expingest/build_state_test.go | 42 --- .../internal/expingest/db_integration_test.go | 4 +- services/horizon/internal/expingest/fsm.go | 95 +----- .../internal/expingest/init_state_test.go | 118 -------- services/horizon/internal/expingest/main.go | 107 +------ .../horizon/internal/expingest/main_test.go | 8 - .../horizon/internal/expingest/orderbook.go | 179 +++++------- .../internal/expingest/orderbook_test.go | 246 +++++++++++++--- .../internal/expingest/processor_runner.go | 24 -- .../expingest/processor_runner_test.go | 79 +---- .../processors/new_orderbook_processor.go | 54 ---- .../processors/orderbook_processor_test.go | 273 ------------------ .../internal/expingest/resume_state_test.go | 85 ++---- .../horizon/internal/expingest/stress_test.go | 20 -- services/horizon/internal/expingest/verify.go | 50 +--- .../expingest/verify_range_state_test.go | 15 - services/horizon/internal/init.go | 4 +- 19 files changed, 354 insertions(+), 1076 deletions(-) delete mode 100644 services/horizon/internal/expingest/processors/new_orderbook_processor.go delete mode 100644 services/horizon/internal/expingest/processors/orderbook_processor_test.go diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 0d8cc8a32c..2d6597e434 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -8,7 +8,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/expingest" support "github.com/stellar/go/support/config" "github.com/stellar/go/support/db" @@ -107,7 +106,6 @@ var ingestVerifyRangeCmd = &cobra.Command{ NetworkPassphrase: config.NetworkPassphrase, HistorySession: horizonSession, HistoryArchiveURL: config.HistoryArchiveURLs[0], - OrderBookGraph: orderbook.NewOrderBookGraph(), IngestFailedTransactions: config.IngestFailedTransactions, } @@ -185,7 +183,6 @@ var ingestStressTestCmd = &cobra.Command{ NetworkPassphrase: config.NetworkPassphrase, HistorySession: horizonSession, HistoryArchiveURL: config.HistoryArchiveURLs[0], - OrderBookGraph: orderbook.NewOrderBookGraph(), IngestFailedTransactions: config.IngestFailedTransactions, } diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index 7d316419a3..faaeb334ab 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -417,10 +417,8 @@ func (a *App) Tick() { wg.Add(4) go func() { defer wg.Done() - if a.orderBookStream != nil { - if err := a.orderBookStream.Update(); err != nil { - log.WithField("error", err).Error("could not apply updates from order book stream") - } + if err := a.orderBookStream.Update(); err != nil { + log.WithField("error", err).Error("could not apply updates from order book stream") } }() go func() { a.UpdateLedgerState(); wg.Done() }() @@ -462,19 +460,15 @@ func (a *App) init() { mustInitCoreDB(a) if a.config.Ingest { - orderBookGraph := orderbook.NewOrderBookGraph() // expingester - initExpIngester(a, orderBookGraph) - // path-finder - initPathFinder(a, orderBookGraph) - } else { - orderBookGraph := orderbook.NewOrderBookGraph() - a.orderBookStream = &expingest.OrderBookStream{ - OrderBookGraph: orderBookGraph, - HistoryQ: &history.Q{a.HorizonSession(a.ctx)}, - } - initPathFinder(a, orderBookGraph) + initExpIngester(a) + } + orderBookGraph := orderbook.NewOrderBookGraph() + a.orderBookStream = &expingest.OrderBookStream{ + OrderBookGraph: orderBookGraph, + HistoryQ: &history.Q{a.HorizonSession(a.ctx)}, } + initPathFinder(a, orderBookGraph) // txsub initSubmissionSystem(a) diff --git a/services/horizon/internal/expingest/build_state_test.go b/services/horizon/internal/expingest/build_state_test.go index 5e6f640e09..f984f9d779 100644 --- a/services/horizon/internal/expingest/build_state_test.go +++ b/services/horizon/internal/expingest/build_state_test.go @@ -17,7 +17,6 @@ func TestBuildStateTestSuite(t *testing.T) { type BuildStateTestSuite struct { suite.Suite - graph *mockOrderBookGraph historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter system *System @@ -28,7 +27,6 @@ type BuildStateTestSuite struct { } func (s *BuildStateTestSuite) SetupTest() { - s.graph = &mockOrderBookGraph{} s.historyQ = &mockDBQ{} s.runner = &mockProcessorsRunner{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} @@ -39,7 +37,6 @@ func (s *BuildStateTestSuite) SetupTest() { ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, - graph: s.graph, runner: s.runner, stellarCoreClient: s.stellarCoreClient, } @@ -47,7 +44,6 @@ func (s *BuildStateTestSuite) SetupTest() { s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.graph.On("Discard").Once() } func (s *BuildStateTestSuite) TearDownTest() { @@ -56,7 +52,6 @@ func (s *BuildStateTestSuite) TearDownTest() { s.historyAdapter.AssertExpectations(t) s.runner.AssertExpectations(t) s.stellarCoreClient.AssertExpectations(t) - s.graph.AssertExpectations(t) } func (s *BuildStateTestSuite) mockCommonHistoryQ() { @@ -77,9 +72,6 @@ func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} - // Recreate orderbook graph to remove Discard assertion - *s.graph = mockOrderBookGraph{} - next, err := buildState{checkpointLedger: 0}.run(s.system) s.Assert().Error(err) s.Assert().EqualError(err, "unexpected checkpointLedger value") @@ -91,9 +83,6 @@ func (s *BuildStateTestSuite) TestBeginReturnsError() { *s.historyQ = mockDBQ{} s.historyQ.On("Begin").Return(errors.New("my error")).Once() - // Recreate orderbook graph to remove Discard assertion - *s.graph = mockOrderBookGraph{} - next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) s.Assert().Error(err) s.Assert().EqualError(err, "Error starting a transaction: my error") @@ -187,7 +176,6 @@ func (s *BuildStateTestSuite) TestTruncateExpingestStateTablesReturnsError() { func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() { s.mockCommonHistoryQ() - s.graph.On("Clear").Return().Once() s.runner. On("RunHistoryArchiveIngestion", s.checkpointLedger). Return(io.StatsChangeProcessorResults{}, errors.New("my error")). @@ -201,7 +189,6 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() { func (s *BuildStateTestSuite) TestUpdateLastLedgerExpIngestAfterIngestReturnsError() { s.mockCommonHistoryQ() - s.graph.On("Clear").Return(nil).Once() s.runner. On("RunHistoryArchiveIngestion", s.checkpointLedger). Return(io.StatsChangeProcessorResults{}, nil). @@ -221,7 +208,6 @@ func (s *BuildStateTestSuite) TestUpdateLastLedgerExpIngestAfterIngestReturnsErr func (s *BuildStateTestSuite) TestUpdateExpIngestVersionIngestReturnsError() { s.mockCommonHistoryQ() - s.graph.On("Clear").Return(nil).Once() s.runner. On("RunHistoryArchiveIngestion", s.checkpointLedger). Return(io.StatsChangeProcessorResults{}, nil). @@ -248,7 +234,6 @@ func (s *BuildStateTestSuite) TestUpdateCommitReturnsError() { s.historyQ.On("UpdateExpIngestVersion", CurrentVersion). Return(nil). Once() - s.graph.On("Clear").Return(nil).Once() s.historyQ.On("Commit"). Return(errors.New("my error")). Once() @@ -259,31 +244,6 @@ func (s *BuildStateTestSuite) TestUpdateCommitReturnsError() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *BuildStateTestSuite) TestOBGraphApplyReturnsError() { - s.mockCommonHistoryQ() - s.runner. - On("RunHistoryArchiveIngestion", s.checkpointLedger). - Return(io.StatsChangeProcessorResults{}, nil). - Once() - s.historyQ.On("UpdateLastLedgerExpIngest", s.checkpointLedger). - Return(nil). - Once() - s.historyQ.On("UpdateExpIngestVersion", CurrentVersion). - Return(nil). - Once() - s.historyQ.On("Commit"). - Return(nil). - Once() - - s.graph.On("Clear").Return().Once() - s.graph.On("Apply", s.checkpointLedger).Return(errors.New("my error")).Once() - next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) - - s.Assert().Error(err) - s.Assert().EqualError(err, "Error applying order book changes: my error") - s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) -} - func (s *BuildStateTestSuite) TestBuildStateSucceeds() { s.mockCommonHistoryQ() s.runner. @@ -300,8 +260,6 @@ func (s *BuildStateTestSuite) TestBuildStateSucceeds() { Return(nil). Once() - s.graph.On("Clear").Return(nil).Once() - s.graph.On("Apply", s.checkpointLedger).Return(nil).Once() next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) s.Assert().NoError(err) diff --git a/services/horizon/internal/expingest/db_integration_test.go b/services/horizon/internal/expingest/db_integration_test.go index 9de4ec91db..700f7d99c3 100644 --- a/services/horizon/internal/expingest/db_integration_test.go +++ b/services/horizon/internal/expingest/db_integration_test.go @@ -9,7 +9,6 @@ import ( "github.com/stellar/go/exp/ingest/adapters" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/xdr" "github.com/stretchr/testify/suite" @@ -82,7 +81,6 @@ func (s *DBTestSuite) SetupTest() { CoreSession: s.tt.CoreSession(), HistorySession: s.tt.HorizonSession(), HistoryArchiveURL: "http://ignore.test", - OrderBookGraph: orderbook.NewOrderBookGraph(), MaxStreamRetries: 3, DisableStateVerification: false, IngestFailedTransactions: true, @@ -144,7 +142,7 @@ func (s *DBTestSuite) TestBuildState() { s.Assert().Equal(s.sequence, resume.latestSuccessfullyProcessedLedger) s.mockChangeReader() - s.Assert().NoError(s.system.verifyState(s.system.graph.OffersMap(), false)) + s.Assert().NoError(s.system.verifyState(false)) } func (s *DBTestSuite) TestVersionMismatchTriggersRebuild() { diff --git a/services/horizon/internal/expingest/fsm.go b/services/horizon/internal/expingest/fsm.go index 563a161b50..ee9e4f1771 100644 --- a/services/horizon/internal/expingest/fsm.go +++ b/services/horizon/internal/expingest/fsm.go @@ -201,10 +201,6 @@ func (startState) run(s *System) (transition, error) { log.WithField("last_ledger", lastIngestedLedger). Info("Resuming ingestion system from last processed ledger...") - if err = s.loadOffersIntoMemory(lastIngestedLedger); err != nil { - return start(), errors.Wrap(err, "Error loading offers into in memory graph") - } - return resume(lastIngestedLedger), nil } } @@ -226,7 +222,6 @@ func (b buildState) run(s *System) (transition, error) { return start(), errors.Wrap(err, "Error starting a transaction") } defer s.historyQ.Rollback() - defer s.graph.Discard() // We need to get this value `FOR UPDATE` so all other instances // are blocked. @@ -274,9 +269,6 @@ func (b buildState) run(s *System) (transition, error) { return start(), errors.Wrap(err, "Error clearing ingest tables") } - // Graph should be empty. - s.graph.Clear() - log.WithFields(logpkg.F{ "ledger": b.checkpointLedger, }).Info("Processing state") @@ -325,7 +317,6 @@ func (r resumeState) run(s *System) (transition, error) { errors.Wrap(err, "Error starting a transaction") } defer s.historyQ.Rollback() - defer s.graph.Discard() // This will get the value `FOR UPDATE`, blocking it for other nodes. lastIngestedLedger, err := s.historyQ.GetLastLedgerExpIngest() @@ -338,6 +329,11 @@ func (r resumeState) run(s *System) (transition, error) { if ingestLedger > lastIngestedLedger+1 { return start(), errors.New("expected ingest ledger to be at most one greater " + "than last ingested ledger in db") + } else if ingestLedger <= lastIngestedLedger { + log.WithField("ingestLedger", ingestLedger). + WithField("lastIngestedLedger", lastIngestedLedger). + Info("bumping ingest ledger to next ledger after ingested ledger in db") + ingestLedger = lastIngestedLedger + 1 } ingestVersion, err := s.historyQ.GetExpIngestVersion() @@ -386,71 +382,10 @@ func (r resumeState) run(s *System) (transition, error) { startTime := time.Now() - if ingestLedger <= lastIngestedLedger { - // this ingestion node is behind so we will update the order book stream - // without doing any verification. the verification routine only works - // when we are updating both the db and order book stream at the same time - if s.verifyOrderBookStream { - var status ingestionStatus - status, err = s.orderBookStream.getIngestionStatus() - if err != nil { - return retryResume(r), errors.Wrap(err, "Error obtaining ingestion status") - } - - _, _, err = s.orderBookStream.update(status) - if err != nil { - return retryResume(r), errors.Wrap(err, "Error updating order book stream") - } - } - - // rollback because we will not be updating the DB - // so there is no need to hold on to the distributed lock - // and thereby block the other nodes from ingesting - if err = s.historyQ.Rollback(); err != nil { - return retryResume(r), errors.Wrap(err, "Error rolling back transaction") - } - - log.WithFields(logpkg.F{ - "sequence": ingestLedger, - "state": false, - "ledger": false, - "graph": true, - "commit": false, - }).Info("Processing ledger") - - var stats io.StatsChangeProcessorResults - stats, err = s.runner.RunOrderBookProcessorOnLedger(ingestLedger) - if err != nil { - return retryResume(r), errors.Wrap(err, "Error running change processor on ledger") - - } - - if err = s.graphApply(ingestLedger); err != nil { - return retryResume(r), errors.Wrap(err, "Error applying graph changes from ledger") - } - - duration := time.Since(startTime) - s.Metrics.LedgerInMemoryIngestionTimer.Update(duration) - log. - WithFields(stats.Map()). - WithFields(logpkg.F{ - "sequence": ingestLedger, - "duration": duration.Seconds(), - "state": false, - "ledger": false, - "graph": true, - "commit": false, - }). - Info("Processed ledger") - - return resumeImmediately(ingestLedger), nil - } - log.WithFields(logpkg.F{ "sequence": ingestLedger, "state": true, "ledger": true, - "graph": true, "commit": true, }).Info("Processing ledger") @@ -478,7 +413,6 @@ func (r resumeState) run(s *System) (transition, error) { "duration": duration.Seconds(), "state": true, "ledger": true, - "graph": true, "commit": true, }). Info("Processed ledger") @@ -549,7 +483,6 @@ func runTransactionProcessorsOnLedger(s *System, ledger uint32) error { "sequence": ledger, "state": false, "ledger": true, - "graph": false, "commit": false, }).Info("Processing ledger") startTime := time.Now() @@ -566,7 +499,6 @@ func runTransactionProcessorsOnLedger(s *System, ledger uint32) error { "duration": time.Since(startTime).Seconds(), "state": false, "ledger": true, - "graph": false, "commit": false, }). Info("Processed ledger") @@ -717,7 +649,6 @@ func (v verifyRangeState) run(s *System) (transition, error) { return stop(), err } defer s.historyQ.Rollback() - defer s.graph.Discard() // Simple check if DB clean lastIngestedLedger, err := s.historyQ.GetLastLedgerExpIngest() @@ -759,7 +690,6 @@ func (v verifyRangeState) run(s *System) (transition, error) { "sequence": sequence, "state": true, "ledger": true, - "graph": true, "commit": true, }).Info("Processing ledger") startTime := time.Now() @@ -789,14 +719,13 @@ func (v verifyRangeState) run(s *System) (transition, error) { "duration": time.Since(startTime).Seconds(), "state": true, "ledger": true, - "graph": true, "commit": true, }). Info("Processed ledger") } if v.verifyState { - err = s.verifyState(s.graph.OffersMap(), false) + err = s.verifyState(false) } return stop(), err @@ -814,7 +743,6 @@ func (stressTestState) run(s *System) (transition, error) { return stop(), err } defer s.historyQ.Rollback() - defer s.graph.Discard() // Simple check if DB clean lastIngestedLedger, err := s.historyQ.GetLastLedgerExpIngest() @@ -836,7 +764,6 @@ func (stressTestState) run(s *System) (transition, error) { "sequence": sequence, "state": true, "ledger": true, - "graph": true, "commit": true, }).Info("Processing ledger") startTime := time.Now() @@ -862,7 +789,6 @@ func (stressTestState) run(s *System) (transition, error) { "duration": time.Since(startTime).Seconds(), "state": true, "ledger": true, - "graph": true, "commit": true, }). Info("Processed ledger") @@ -880,18 +806,9 @@ func (s *System) completeIngestion(ledger uint32) error { return err } - if s.verifyOrderBookStream { - s.orderBookStream.updateAndVerify(s.graph, ledger) - } - if err := s.historyQ.Commit(); err != nil { return errors.Wrap(err, commitErrMsg) } - if err := s.graphApply(ledger); err != nil { - err = errors.Wrap(err, "Error applying order book changes") - return err - } - return nil } diff --git a/services/horizon/internal/expingest/init_state_test.go b/services/horizon/internal/expingest/init_state_test.go index 36b92d9f0b..a27e345261 100644 --- a/services/horizon/internal/expingest/init_state_test.go +++ b/services/horizon/internal/expingest/init_state_test.go @@ -5,9 +5,7 @@ import ( "testing" "github.com/stellar/go/exp/ingest/adapters" - "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" "github.com/stretchr/testify/suite" ) @@ -17,21 +15,18 @@ func TestInitStateTestSuite(t *testing.T) { type InitStateTestSuite struct { suite.Suite - graph *mockOrderBookGraph historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter system *System } func (s *InitStateTestSuite) SetupTest() { - s.graph = &mockOrderBookGraph{} s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.system = &System{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, - graph: s.graph, } s.system.initMetrics() @@ -42,7 +37,6 @@ func (s *InitStateTestSuite) TearDownTest() { t := s.T() s.historyQ.AssertExpectations(t) s.historyAdapter.AssertExpectations(t) - s.graph.AssertExpectations(t) } func (s *InitStateTestSuite) TestBeginReturnsError() { @@ -225,62 +219,6 @@ func (s *InitStateTestSuite) TestResumeStateBehindHistory0() { s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once() s.historyQ.On("GetLatestLedger").Return(uint32(0), nil).Once() - s.historyQ.On("GetAllOffers").Return( - []history.Offer{ - history.Offer{ - OfferID: eurOffer.OfferId, - SellerID: eurOffer.SellerId.Address(), - SellingAsset: eurOffer.Selling, - BuyingAsset: eurOffer.Buying, - Amount: eurOffer.Amount, - Pricen: int32(eurOffer.Price.N), - Priced: int32(eurOffer.Price.D), - Price: float64(eurOffer.Price.N) / float64(eurOffer.Price.D), - Flags: uint32(eurOffer.Flags), - }, - history.Offer{ - OfferID: twoEurOffer.OfferId, - SellerID: twoEurOffer.SellerId.Address(), - SellingAsset: twoEurOffer.Selling, - BuyingAsset: twoEurOffer.Buying, - Amount: twoEurOffer.Amount, - Pricen: int32(twoEurOffer.Price.N), - Priced: int32(twoEurOffer.Price.D), - Price: float64(twoEurOffer.Price.N) / float64(twoEurOffer.Price.D), - Flags: uint32(twoEurOffer.Flags), - }, - }, - nil, - ).Once() - - s.graph.On("Clear").Once() - s.graph.On("AddOffer", xdr.OfferEntry{ - SellerId: eurOffer.SellerId, - OfferId: eurOffer.OfferId, - Selling: eurOffer.Selling, - Buying: eurOffer.Buying, - Amount: eurOffer.Amount, - Price: xdr.Price{ - N: xdr.Int32(eurOffer.Price.N), - D: xdr.Int32(eurOffer.Price.D), - }, - Flags: xdr.Uint32(eurOffer.Flags), - }).Once() - s.graph.On("AddOffer", xdr.OfferEntry{ - SellerId: twoEurOffer.SellerId, - OfferId: twoEurOffer.OfferId, - Selling: twoEurOffer.Selling, - Buying: twoEurOffer.Buying, - Amount: twoEurOffer.Amount, - Price: xdr.Price{ - N: xdr.Int32(twoEurOffer.Price.N), - D: xdr.Int32(twoEurOffer.Price.D), - }, - Flags: xdr.Uint32(twoEurOffer.Flags), - }).Once() - s.graph.On("Apply", uint32(130)).Return(nil).Once() - s.graph.On("Discard").Once() - next, err := startState{}.run(s.system) s.Assert().NoError(err) s.Assert().Equal( @@ -301,62 +239,6 @@ func (s *InitStateTestSuite) TestResumeStateSync() { s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once() s.historyQ.On("GetLatestLedger").Return(uint32(130), nil).Once() - s.historyQ.On("GetAllOffers").Return( - []history.Offer{ - history.Offer{ - OfferID: eurOffer.OfferId, - SellerID: eurOffer.SellerId.Address(), - SellingAsset: eurOffer.Selling, - BuyingAsset: eurOffer.Buying, - Amount: eurOffer.Amount, - Pricen: int32(eurOffer.Price.N), - Priced: int32(eurOffer.Price.D), - Price: float64(eurOffer.Price.N) / float64(eurOffer.Price.D), - Flags: uint32(eurOffer.Flags), - }, - history.Offer{ - OfferID: twoEurOffer.OfferId, - SellerID: twoEurOffer.SellerId.Address(), - SellingAsset: twoEurOffer.Selling, - BuyingAsset: twoEurOffer.Buying, - Amount: twoEurOffer.Amount, - Pricen: int32(twoEurOffer.Price.N), - Priced: int32(twoEurOffer.Price.D), - Price: float64(twoEurOffer.Price.N) / float64(twoEurOffer.Price.D), - Flags: uint32(twoEurOffer.Flags), - }, - }, - nil, - ).Once() - - s.graph.On("Clear").Once() - s.graph.On("AddOffer", xdr.OfferEntry{ - SellerId: eurOffer.SellerId, - OfferId: eurOffer.OfferId, - Selling: eurOffer.Selling, - Buying: eurOffer.Buying, - Amount: eurOffer.Amount, - Price: xdr.Price{ - N: xdr.Int32(eurOffer.Price.N), - D: xdr.Int32(eurOffer.Price.D), - }, - Flags: xdr.Uint32(eurOffer.Flags), - }).Once() - s.graph.On("AddOffer", xdr.OfferEntry{ - SellerId: twoEurOffer.SellerId, - OfferId: twoEurOffer.OfferId, - Selling: twoEurOffer.Selling, - Buying: twoEurOffer.Buying, - Amount: twoEurOffer.Amount, - Price: xdr.Price{ - N: xdr.Int32(twoEurOffer.Price.N), - D: xdr.Int32(twoEurOffer.Price.D), - }, - Flags: xdr.Uint32(twoEurOffer.Flags), - }).Once() - s.graph.On("Apply", uint32(130)).Return(nil).Once() - s.graph.On("Discard").Once() - next, err := startState{}.run(s.system) s.Assert().NoError(err) s.Assert().Equal( diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 3f70f66265..5762a515b1 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -13,13 +13,11 @@ import ( "github.com/stellar/go/exp/ingest/adapters" ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/historyarchive" logpkg "github.com/stellar/go/support/log" - "github.com/stellar/go/xdr" ) const ( @@ -68,7 +66,6 @@ type Config struct { // Set MaxStreamRetries to 0 if there should be no retry attempts MaxStreamRetries int - OrderBookGraph *orderbook.OrderBookGraph IngestFailedTransactions bool } @@ -97,10 +94,6 @@ type System struct { // StateVerifyTimer exposes timing metrics about the rate and // duration of state verification. StateVerifyTimer metrics.Timer - - // LocalLatestLedgerGauge exposes the local (order book graph) - // latest processed ledger - LocalLatestLedgerGauge metrics.Gauge } ctx context.Context @@ -108,11 +101,8 @@ type System struct { config Config - graph orderbook.OBGraph - verifyOrderBookStream bool - orderBookStream *OrderBookStream - historyQ history.IngestionQ - runner ProcessorRunnerInterface + historyQ history.IngestionQ + runner ProcessorRunnerInterface ledgerBackend ledgerbackend.LedgerBackend historyAdapter adapters.HistoryArchiveAdapterInterface @@ -160,18 +150,12 @@ func NewSystem(config Config) (*System, error) { historyAdapter := adapters.MakeHistoryArchiveAdapter(archive) system := &System{ - ctx: ctx, - cancel: cancel, - historyAdapter: historyAdapter, - ledgerBackend: ledgerBackend, - config: config, - historyQ: historyQ, - graph: config.OrderBookGraph, - orderBookStream: &OrderBookStream{ - HistoryQ: historyQ, - OrderBookGraph: orderbook.NewOrderBookGraph(), - }, - verifyOrderBookStream: true, + ctx: ctx, + cancel: cancel, + historyAdapter: historyAdapter, + ledgerBackend: ledgerBackend, + config: config, + historyQ: historyQ, disableStateVerification: config.DisableStateVerification, maxStreamRetries: config.MaxStreamRetries, stellarCoreClient: &stellarcore.Client{ @@ -180,7 +164,6 @@ func NewSystem(config Config) (*System, error) { runner: &ProcessorRunner{ ctx: ctx, config: config, - graph: config.OrderBookGraph, historyQ: historyQ, historyAdapter: historyAdapter, ledgerBackend: ledgerBackend, @@ -195,7 +178,6 @@ func (s *System) initMetrics() { s.Metrics.LedgerIngestionTimer = metrics.NewTimer() s.Metrics.LedgerInMemoryIngestionTimer = metrics.NewTimer() s.Metrics.StateVerifyTimer = metrics.NewTimer() - s.Metrics.LocalLatestLedgerGauge = metrics.NewGauge() } // Run starts ingestion system. Ingestion system supports distributed ingestion @@ -320,69 +302,6 @@ func (s *System) runStateMachine(cur stateMachineNode) error { } } -func (s *System) graphApply(sequence uint32) error { - if err := s.graph.Apply(sequence); err != nil { - return err - } - s.Metrics.LocalLatestLedgerGauge.Update(int64(sequence)) - return nil -} - -func addOffersToGraph(offers []history.Offer, graph orderbook.OBGraph) { - for _, offer := range offers { - sellerID := xdr.MustAddress(offer.SellerID) - graph.AddOffer(xdr.OfferEntry{ - SellerId: sellerID, - OfferId: offer.OfferID, - Selling: offer.SellingAsset, - Buying: offer.BuyingAsset, - Amount: offer.Amount, - Price: xdr.Price{ - N: xdr.Int32(offer.Pricen), - D: xdr.Int32(offer.Priced), - }, - Flags: xdr.Uint32(offer.Flags), - }) - } -} - -func loadOffersIntoGraph(q history.IngestionQ, graph orderbook.OBGraph) ([]history.Offer, error) { - offers, err := q.GetAllOffers() - if err != nil { - return nil, errors.Wrap(err, "GetAllOffers error") - } - - addOffersToGraph(offers, graph) - return offers, nil -} - -func (s *System) loadOffersIntoMemory(sequence uint32) error { - defer s.graph.Discard() - s.graph.Clear() - - log.Info("Loading offers from a database into memory store...") - start := time.Now() - - if _, err := loadOffersIntoGraph(s.historyQ, s.graph); err != nil { - return errors.Wrap(err, "GetAllOffers error") - } - - if s.verifyOrderBookStream { - s.orderBookStream.updateAndVerify(s.graph, sequence) - } - - if err := s.graphApply(sequence); err != nil { - return errors.Wrap(err, "Error running graph.Apply") - } - - log.WithField( - "duration", - time.Since(start).Seconds(), - ).Info("Finished loading offers from a database into memory store") - - return nil -} - func (s *System) maybeVerifyState(lastIngestedLedger uint32) { stateInvalid, err := s.historyQ.GetExpStateInvalid() if err != nil && !isCancelledError(err) { @@ -393,15 +312,11 @@ func (s *System) maybeVerifyState(lastIngestedLedger uint32) { if !stateInvalid && // state has not been proved to be invalid... !s.disableStateVerification && // state verification is not disabled... historyarchive.IsCheckpoint(lastIngestedLedger) { // it's a checkpoint ledger. - if s.verifyOrderBookStream { - s.orderBookStream.verifyGraph(s.graph) - } - s.wg.Add(1) - go func(graphOffersMap map[xdr.Int64]xdr.OfferEntry) { + go func() { defer s.wg.Done() - err := s.verifyState(graphOffersMap, true) + err := s.verifyState(true) if err != nil { if isCancelledError(err) { return @@ -421,7 +336,7 @@ func (s *System) maybeVerifyState(lastIngestedLedger uint32) { } else { s.resetStateVerificationErrors() } - }(s.graph.OffersMap()) + }() } } diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index dba84fe29a..d3bbb95cc4 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -94,12 +94,10 @@ func TestNewSystem(t *testing.T) { assert.NoError(t, err) assert.Equal(t, config, system.config) - assert.Equal(t, config.OrderBookGraph, system.graph) assert.Equal(t, config.DisableStateVerification, system.disableStateVerification) assert.Equal(t, config.MaxStreamRetries, system.maxStreamRetries) assert.Equal(t, config, system.runner.(*ProcessorRunner).config) - assert.Equal(t, config.OrderBookGraph, system.runner.(*ProcessorRunner).graph) assert.Equal(t, system.ctx, system.runner.(*ProcessorRunner).ctx) } @@ -154,11 +152,9 @@ func TestContextCancel(t *testing.T) { // non-zero exit code. func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing.T) { historyQ := &mockDBQ{} - graph := &mockOrderBookGraph{} system := &System{ ctx: context.Background(), historyQ: historyQ, - graph: graph, } historyQ.On("GetTx").Return(nil).Once() @@ -196,11 +192,9 @@ func TestMaybeVerifyStateGetExpStateInvalidDBErrCancelOrContextCanceled(t *testi } func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { historyQ := &mockDBQ{} - graph := &mockOrderBookGraph{} system := &System{ historyQ: historyQ, ctx: context.Background(), - graph: graph, } var out bytes.Buffer @@ -212,7 +206,6 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { log = logger defer func() { log = oldLogger }() - graph.On("OffersMap").Return(map[xdr.Int64]xdr.OfferEntry{}).Twice() historyQ.On("GetExpStateInvalid").Return(false, nil).Twice() historyQ.On("Rollback").Return(nil).Twice() historyQ.On("CloneIngestionQ").Return(historyQ).Twice() @@ -230,7 +223,6 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { // it logs "State verification finished" twice, but no errors assert.Len(t, logged, 2) - graph.AssertExpectations(t) historyQ.AssertExpectations(t) } diff --git a/services/horizon/internal/expingest/orderbook.go b/services/horizon/internal/expingest/orderbook.go index cb2b31c42f..5cbbe77743 100644 --- a/services/horizon/internal/expingest/orderbook.go +++ b/services/horizon/internal/expingest/orderbook.go @@ -2,7 +2,9 @@ package expingest import ( "database/sql" + "math/rand" "sort" + "time" "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -10,6 +12,8 @@ import ( "github.com/stellar/go/xdr" ) +const verificationFrequency = time.Hour + // OrderBookStream updates an in memory graph to be consistent with // offers in the Horizon DB. Any offers which are created, modified, or removed // from the Horizon DB during ingestion will be applied to the in memory order book @@ -20,6 +24,7 @@ type OrderBookStream struct { OrderBookGraph orderbook.OBGraph HistoryQ history.IngestionQ lastLedger uint32 + lastUpdate time.Time } type ingestionStatus struct { @@ -60,7 +65,23 @@ func (o *OrderBookStream) getIngestionStatus() (ingestionStatus, error) { return status, nil } -func (o *OrderBookStream) update(status ingestionStatus) ([]history.Offer, []xdr.Int64, error) { +func addOfferToGraph(graph orderbook.OBGraph, offer history.Offer) { + sellerID := xdr.MustAddress(offer.SellerID) + graph.AddOffer(xdr.OfferEntry{ + SellerId: sellerID, + OfferId: offer.OfferID, + Selling: offer.SellingAsset, + Buying: offer.BuyingAsset, + Amount: offer.Amount, + Price: xdr.Price{ + N: xdr.Int32(offer.Pricen), + D: xdr.Int32(offer.Priced), + }, + Flags: xdr.Uint32(offer.Flags), + }) +} + +func (o *OrderBookStream) update(status ingestionStatus) error { reset := o.lastLedger == 0 if status.StateInvalid || !status.HistoryConsistentWithState { log.WithField("status", status).Warn("ingestion state is invalid") @@ -85,59 +106,65 @@ func (o *OrderBookStream) update(status ingestionStatus) ([]history.Offer, []xdr if status.StateInvalid || !status.HistoryConsistentWithState { log.WithField("status", status). Info("waiting for ingestion to update offers table") - return []history.Offer{}, []xdr.Int64{}, nil + return nil } defer o.OrderBookGraph.Discard() - offers, err := loadOffersIntoGraph(o.HistoryQ, o.OrderBookGraph) + + offers, err := o.HistoryQ.GetAllOffers() if err != nil { - return nil, nil, errors.Wrap(err, "Error from loadOffersIntoGraph") + return errors.Wrap(err, "Error from GetAllOffers") + } + + for _, offer := range offers { + addOfferToGraph(o.OrderBookGraph, offer) } - if err = o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil { - return nil, nil, errors.Wrap(err, "Error applying changes to order book") + if err := o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil { + return errors.Wrap(err, "Error applying changes to order book") } o.lastLedger = status.LastIngestedLedger - return offers, []xdr.Int64{}, nil + return nil } if status.LastIngestedLedger == o.lastLedger { - return []history.Offer{}, []xdr.Int64{}, nil + return nil } defer o.OrderBookGraph.Discard() - var updated []history.Offer - var removed []xdr.Int64 - rows, err := o.HistoryQ.GetUpdatedOffers(o.lastLedger) + offers, err := o.HistoryQ.GetUpdatedOffers(o.lastLedger) if err != nil { - return nil, nil, errors.Wrap(err, "Error from GetUpdatedOffers") + return errors.Wrap(err, "Error from GetUpdatedOffers") } - for _, row := range rows { - if row.Deleted { - removed = append(removed, row.OfferID) + for _, offer := range offers { + if offer.Deleted { + o.OrderBookGraph.RemoveOffer(offer.OfferID) } else { - updated = append(updated, row) + addOfferToGraph(o.OrderBookGraph, offer) } } - addOffersToGraph(updated, o.OrderBookGraph) - - for _, offerID := range removed { - o.OrderBookGraph.RemoveOffer(offerID) - } if err = o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil { - return nil, nil, errors.Wrap(err, "Error applying changes to order book") + return errors.Wrap(err, "Error applying changes to order book") } + o.lastUpdate = time.Now() o.lastLedger = status.LastIngestedLedger - return updated, removed, nil + return nil } -func (o *OrderBookStream) verifyGraph(ingestion orderbook.OBGraph) { +func (o *OrderBookStream) verifyAllOffers() { offers := o.OrderBookGraph.Offers() - ingestionOffers := ingestion.Offers() + ingestionOffers, err := o.HistoryQ.GetAllOffers() + if err != nil { + // reset last update so that we retry verification on next update + o.lastUpdate = time.Now().Add(time.Hour * -2) + log.WithError(err).Info("Could not verify offers because of error from GetAllOffers") + return + } + mismatch := len(offers) != len(ingestionOffers) if !mismatch { @@ -145,40 +172,11 @@ func (o *OrderBookStream) verifyGraph(ingestion orderbook.OBGraph) { return offers[i].OfferId < offers[j].OfferId }) sort.Slice(ingestionOffers, func(i, j int) bool { - return ingestionOffers[i].OfferId < ingestionOffers[j].OfferId + return ingestionOffers[i].OfferID < ingestionOffers[j].OfferID }) - offerBase64, err := xdr.MarshalBase64(offers) - if err != nil { - log.WithError(err).Error("could not serialize offers") - return - } - ingestionOffersBase64, err := xdr.MarshalBase64(ingestionOffers) - if err != nil { - log.WithError(err).Error("could not serialize ingestion offers") - return - } - mismatch = offerBase64 != ingestionOffersBase64 - } - - if mismatch { - log.WithField("stream_offers", offers). - WithField("ingestion_offers", ingestionOffers). - Error("offers derived from order book stream does not match offers from ingestion") - } -} - -func verifyUpdatedOffers(ledger uint32, fromDB []history.Offer, fromIngestion []xdr.OfferEntry) { - sort.Slice(fromDB, func(i, j int) bool { - return fromDB[i].OfferID < fromDB[j].OfferID - }) - sort.Slice(fromIngestion, func(i, j int) bool { - return fromIngestion[i].OfferId < fromIngestion[j].OfferId - }) - mismatch := len(fromDB) != len(fromIngestion) - if !mismatch { - for i, offerRow := range fromDB { - offerEntry := fromIngestion[i] + for i, offerRow := range ingestionOffers { + offerEntry := offers[i] if offerRow.OfferID != offerEntry.OfferId || offerRow.Amount != offerEntry.Amount || offerRow.Priced != int32(offerEntry.Price.D) || @@ -191,53 +189,16 @@ func verifyUpdatedOffers(ledger uint32, fromDB []history.Offer, fromIngestion [] } } } - if mismatch { - log.WithField("fromDB", fromDB). - WithField("fromIngestion", fromIngestion). - WithField("sequence", ledger). - Warn("offers from db does not match offers from ingestion") - } -} -func verifyRemovedOffers(ledger uint32, fromDB []xdr.Int64, fromIngestion []xdr.Int64) { - sort.Slice(fromDB, func(i, j int) bool { - return fromDB[i] < fromDB[j] - }) - sort.Slice(fromIngestion, func(i, j int) bool { - return fromIngestion[i] < fromIngestion[j] - }) - mismatch := len(fromDB) != len(fromIngestion) - if !mismatch { - for i, offerRow := range fromDB { - if offerRow != fromIngestion[i] { - mismatch = true - break - } - } - } if mismatch { - log.WithField("fromDB", fromDB). - WithField("fromIngestion", fromIngestion). - WithField("sequence", ledger). - Warn("offers from db does not match offers from ingestion") - } -} - -func (o *OrderBookStream) updateAndVerify(graph orderbook.OBGraph, sequence uint32) { - status, err := o.getIngestionStatus() - if err != nil { - log.WithError(err).WithField("sequence", sequence).Info("Error obtaining ingestion status") - return - } - - dbUpdates, dbRemoved, err := o.update(status) - if err != nil { - log.WithError(err).WithField("sequence", sequence).Info("Error consuming from order book stream") - return + log.WithField("stream_offers", offers). + WithField("ingestion_offers", ingestionOffers). + Error("offers derived from order book stream does not match offers from ingestion") + // set last ledger to 0 so that we reset on next update + o.lastLedger = 0 + } else { + log.Info("order book stream verification succeeded") } - ingestionUpdates, ingestionRemoved := graph.Pending() - verifyUpdatedOffers(sequence, dbUpdates, ingestionUpdates) - verifyRemovedOffers(sequence, dbRemoved, ingestionRemoved) } // Update will query the Horizon DB for offers which have been created, removed, or updated since the @@ -250,11 +211,23 @@ func (o *OrderBookStream) Update() error { } defer o.HistoryQ.Rollback() + // add 15 minute jitter so that not all horizon nodes are calling + // HistoryQ.GetAllOffers at the same time + jitter := time.Duration(rand.Int63n(int64(15 * time.Minute))) + requiresVerification := !o.lastUpdate.Equal(time.Time{}) && + time.Since(o.lastUpdate) >= verificationFrequency+jitter + status, err := o.getIngestionStatus() if err != nil { return errors.Wrap(err, "Error obtaining ingestion status") } - _, _, err = o.update(status) - return err + if err := o.update(status); err != nil { + return errors.Wrap(err, "Error updating") + } + + if requiresVerification { + o.verifyAllOffers() + } + return nil } diff --git a/services/horizon/internal/expingest/orderbook_test.go b/services/horizon/internal/expingest/orderbook_test.go index e7257000c8..e3af36e293 100644 --- a/services/horizon/internal/expingest/orderbook_test.go +++ b/services/horizon/internal/expingest/orderbook_test.go @@ -6,6 +6,7 @@ import ( "github.com/stellar/go/xdr" "github.com/stretchr/testify/suite" "testing" + "time" ) type IngestionStatusTestSuite struct { @@ -202,9 +203,10 @@ func (t *UpdateOrderBookStreamTestSuite) TestGetAllOffersError() { Once() t.stream.lastLedger = 300 - _, _, err := t.stream.update(status) - t.Assert().EqualError(err, "Error from loadOffersIntoGraph: GetAllOffers error: offers error") + err := t.stream.update(status) + t.Assert().EqualError(err, "Error from GetAllOffers: offers error") t.Assert().Equal(uint32(0), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } func (t *UpdateOrderBookStreamTestSuite) TestResetApplyError() { @@ -240,12 +242,13 @@ func (t *UpdateOrderBookStreamTestSuite) TestResetApplyError() { Once() t.stream.lastLedger = 300 - _, _, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().EqualError(err, "Error applying changes to order book: apply error") t.Assert().Equal(uint32(0), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } -func (t *UpdateOrderBookStreamTestSuite) mockReset(status ingestionStatus) []history.Offer { +func (t *UpdateOrderBookStreamTestSuite) mockReset(status ingestionStatus) { t.graph.On("Clear").Return().Once() t.graph.On("Discard").Return().Once() @@ -271,7 +274,6 @@ func (t *UpdateOrderBookStreamTestSuite) mockReset(status ingestionStatus) []his t.graph.On("Apply", status.LastIngestedLedger). Return(nil). Once() - return offers } func (t *UpdateOrderBookStreamTestSuite) TestFirstUpdateSucceeds() { @@ -281,13 +283,12 @@ func (t *UpdateOrderBookStreamTestSuite) TestFirstUpdateSucceeds() { LastIngestedLedger: 201, LastOfferCompactionLedger: 100, } - offers := t.mockReset(status) + t.mockReset(status) - updated, removed, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().NoError(err) - t.Assert().Empty(removed) - t.Assert().Equal(offers, updated) t.Assert().Equal(uint32(201), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } func (t *UpdateOrderBookStreamTestSuite) TestInvalidState() { @@ -299,21 +300,18 @@ func (t *UpdateOrderBookStreamTestSuite) TestInvalidState() { } t.graph.On("Clear").Return().Once() - updated, removed, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().NoError(err) - t.Assert().Empty(removed) - t.Assert().Empty(updated) t.Assert().Equal(uint32(0), t.stream.lastLedger) t.stream.lastLedger = 123 t.graph.On("Clear").Return().Once() - updated, removed, err = t.stream.update(status) + err = t.stream.update(status) t.Assert().NoError(err) - t.Assert().Empty(removed) - t.Assert().Empty(updated) t.Assert().Equal(uint32(0), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } func (t *UpdateOrderBookStreamTestSuite) TestHistoryInconsistentWithState() { @@ -325,21 +323,18 @@ func (t *UpdateOrderBookStreamTestSuite) TestHistoryInconsistentWithState() { } t.graph.On("Clear").Return().Once() - updated, removed, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().NoError(err) - t.Assert().Empty(removed) - t.Assert().Empty(updated) t.Assert().Equal(uint32(0), t.stream.lastLedger) t.stream.lastLedger = 123 t.graph.On("Clear").Return().Once() - updated, removed, err = t.stream.update(status) + err = t.stream.update(status) t.Assert().NoError(err) - t.Assert().Empty(removed) - t.Assert().Empty(updated) t.Assert().Equal(uint32(0), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } func (t *UpdateOrderBookStreamTestSuite) TestLastIngestedLedgerBehindStream() { @@ -349,14 +344,13 @@ func (t *UpdateOrderBookStreamTestSuite) TestLastIngestedLedgerBehindStream() { LastIngestedLedger: 201, LastOfferCompactionLedger: 100, } - offers := t.mockReset(status) + t.mockReset(status) t.stream.lastLedger = 300 - updated, removed, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().NoError(err) - t.Assert().Empty(removed) - t.Assert().Equal(offers, updated) t.Assert().Equal(uint32(201), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } func (t *UpdateOrderBookStreamTestSuite) TestStreamBehindLastCompactionLedger() { @@ -366,14 +360,13 @@ func (t *UpdateOrderBookStreamTestSuite) TestStreamBehindLastCompactionLedger() LastIngestedLedger: 201, LastOfferCompactionLedger: 100, } - offers := t.mockReset(status) + t.mockReset(status) t.stream.lastLedger = 99 - updated, removed, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().NoError(err) - t.Assert().Empty(removed) - t.Assert().Equal(offers, updated) t.Assert().Equal(uint32(201), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } func (t *UpdateOrderBookStreamTestSuite) TestStreamLedgerEqualsLastIngestedLedger() { @@ -385,11 +378,10 @@ func (t *UpdateOrderBookStreamTestSuite) TestStreamLedgerEqualsLastIngestedLedge } t.stream.lastLedger = 201 - updated, removed, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().NoError(err) - t.Assert().Empty(removed) - t.Assert().Empty(updated) t.Assert().Equal(uint32(201), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } func (t *UpdateOrderBookStreamTestSuite) TestGetUpdatedOffersError() { @@ -406,12 +398,13 @@ func (t *UpdateOrderBookStreamTestSuite) TestGetUpdatedOffersError() { Return([]history.Offer{}, fmt.Errorf("updated offers error")). Once() - _, _, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().EqualError(err, "Error from GetUpdatedOffers: updated offers error") t.Assert().Equal(uint32(100), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } -func (t *UpdateOrderBookStreamTestSuite) mockUpdate() ([]history.Offer, []xdr.Int64) { +func (t *UpdateOrderBookStreamTestSuite) mockUpdate() { t.stream.lastLedger = 100 t.graph.On("Discard").Return().Once() @@ -435,8 +428,6 @@ func (t *UpdateOrderBookStreamTestSuite) mockUpdate() ([]history.Offer, []xdr.In t.graph.On("AddOffer", offerEntry).Return().Once() t.graph.On("AddOffer", otherOfferEntry).Return().Once() t.graph.On("RemoveOffer", deletedOffer.OfferID).Return(t.graph).Once() - - return offers[:2], []xdr.Int64{deletedOffer.OfferID} } func (t *UpdateOrderBookStreamTestSuite) TestApplyUpdatesError() { @@ -453,9 +444,10 @@ func (t *UpdateOrderBookStreamTestSuite) TestApplyUpdatesError() { Return(fmt.Errorf("apply error")). Once() - _, _, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().EqualError(err, "Error applying changes to order book: apply error") t.Assert().Equal(uint32(100), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } func (t *UpdateOrderBookStreamTestSuite) TestApplyUpdatesSucceeds() { @@ -466,15 +458,187 @@ func (t *UpdateOrderBookStreamTestSuite) TestApplyUpdatesSucceeds() { LastOfferCompactionLedger: 100, } - expectedUpdates, expectedRemoved := t.mockUpdate() + t.mockUpdate() t.graph.On("Apply", status.LastIngestedLedger). Return(nil). Once() - updates, removed, err := t.stream.update(status) + err := t.stream.update(status) t.Assert().NoError(err) t.Assert().Equal(status.LastIngestedLedger, t.stream.lastLedger) - t.Assert().Equal(expectedUpdates, updates) - t.Assert().Equal(expectedRemoved, removed) + t.Assert().False(t.stream.lastUpdate.Equal(time.Time{})) +} + +type VerifyOrderBookStreamTestSuite struct { + suite.Suite + historyQ *mockDBQ + graph *mockOrderBookGraph + stream *OrderBookStream +} + +func TestVerifyOrderBookStream(t *testing.T) { + suite.Run(t, new(VerifyOrderBookStreamTestSuite)) +} + +func (t *VerifyOrderBookStreamTestSuite) SetupTest() { + t.historyQ = &mockDBQ{} + t.graph = &mockOrderBookGraph{} + t.stream = &OrderBookStream{OrderBookGraph: t.graph, HistoryQ: t.historyQ} + + sellerID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" + otherSellerID := "GAXI33UCLQTCKM2NMRBS7XYBR535LLEVAHL5YBN4FTCB4HZHT7ZA5CVK" + t.graph.On("Offers").Return([]xdr.OfferEntry{ + { + SellerId: xdr.MustAddress(sellerID), + OfferId: 1, + Selling: xdr.MustNewNativeAsset(), + Buying: xdr.MustNewCreditAsset("USD", sellerID), + Amount: 123, + Price: xdr.Price{ + N: 1, + D: 2, + }, + Flags: 1, + Ext: xdr.OfferEntryExt{}, + }, + { + SellerId: xdr.MustAddress(otherSellerID), + OfferId: 3, + Selling: xdr.MustNewCreditAsset("EUR", sellerID), + Buying: xdr.MustNewCreditAsset("CHF", sellerID), + Amount: 9, + Price: xdr.Price{ + N: 3, + D: 1, + }, + Flags: 0, + Ext: xdr.OfferEntryExt{}, + }, + }).Once() +} + +func (t *VerifyOrderBookStreamTestSuite) TearDownTest() { + t.historyQ.AssertExpectations(t.T()) + t.graph.AssertExpectations(t.T()) +} + +func (t *VerifyOrderBookStreamTestSuite) TestGetAllOffersError() { + t.historyQ.On("GetAllOffers"). + Return([]history.Offer{}, fmt.Errorf("offers error")). + Once() + + t.stream.lastLedger = 300 + t.stream.verifyAllOffers() + t.Assert().Equal(uint32(300), t.stream.lastLedger) + t.Assert().False(t.stream.lastUpdate.Equal(time.Time{})) + t.Assert().True(t.stream.lastUpdate.Before(time.Now())) +} + +func (t *VerifyOrderBookStreamTestSuite) TestEmptyDBOffers() { + var offers []history.Offer + t.historyQ.On("GetAllOffers").Return(offers, nil).Once() + + t.stream.lastLedger = 300 + t.stream.verifyAllOffers() + t.Assert().Equal(uint32(0), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) +} + +func (t *VerifyOrderBookStreamTestSuite) TestLengthMismatch() { + offers := []history.Offer{ + { + OfferID: 1, + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + SellingAsset: xdr.MustNewNativeAsset(), + BuyingAsset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 123, + Pricen: 1, + Priced: 2, + Price: 0.5, + Flags: 1, + Deleted: false, + LastModifiedLedger: 1, + }, + } + t.historyQ.On("GetAllOffers").Return(offers, nil).Once() + + t.stream.lastLedger = 300 + t.stream.verifyAllOffers() + t.Assert().Equal(uint32(0), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) +} + +func (t *VerifyOrderBookStreamTestSuite) TestContentMismatch() { + offers := []history.Offer{ + { + OfferID: 1, + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + SellingAsset: xdr.MustNewNativeAsset(), + BuyingAsset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 123, + Pricen: 1, + Priced: 2, + Price: 0.5, + Flags: 1, + Deleted: false, + LastModifiedLedger: 1, + }, + { + OfferID: 3, + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + SellingAsset: xdr.MustNewNativeAsset(), + BuyingAsset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 123, + Pricen: 1, + Priced: 2, + Price: 0.5, + Flags: 1, + Deleted: false, + LastModifiedLedger: 1, + }, + } + t.historyQ.On("GetAllOffers").Return(offers, nil).Once() + + t.stream.lastLedger = 300 + t.stream.verifyAllOffers() + t.Assert().Equal(uint32(0), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) +} + +func (t *VerifyOrderBookStreamTestSuite) TestSuccess() { + offers := []history.Offer{ + { + OfferID: 1, + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + SellingAsset: xdr.MustNewNativeAsset(), + BuyingAsset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 123, + Pricen: 1, + Priced: 2, + Price: 0.5, + Flags: 1, + Deleted: false, + LastModifiedLedger: 1, + }, + { + OfferID: 3, + SellerID: "GAXI33UCLQTCKM2NMRBS7XYBR535LLEVAHL5YBN4FTCB4HZHT7ZA5CVK", + SellingAsset: xdr.MustNewCreditAsset("EUR", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + BuyingAsset: xdr.MustNewCreditAsset("CHF", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 9, + Pricen: 3, + Priced: 1, + Price: 3, + Flags: 0, + Deleted: false, + LastModifiedLedger: 1, + }, + } + t.historyQ.On("GetAllOffers").Return(offers, nil).Once() + + t.stream.lastLedger = 300 + t.stream.verifyAllOffers() + t.Assert().Equal(uint32(300), t.stream.lastLedger) + t.Assert().True(t.stream.lastUpdate.Equal(time.Time{})) } diff --git a/services/horizon/internal/expingest/processor_runner.go b/services/horizon/internal/expingest/processor_runner.go index 4e858e833a..71c87570a5 100644 --- a/services/horizon/internal/expingest/processor_runner.go +++ b/services/horizon/internal/expingest/processor_runner.go @@ -8,7 +8,6 @@ import ( "github.com/stellar/go/exp/ingest/adapters" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/expingest/processors" "github.com/stellar/go/support/errors" @@ -53,7 +52,6 @@ type ProcessorRunnerInterface interface { DisableMemoryStatsLogging() RunHistoryArchiveIngestion(checkpointLedger uint32) (io.StatsChangeProcessorResults, error) RunTransactionProcessorsOnLedger(sequence uint32) (io.StatsLedgerTransactionProcessorResults, error) - RunOrderBookProcessorOnLedger(sequence uint32) (io.StatsChangeProcessorResults, error) RunAllProcessorsOnLedger(sequence uint32) ( io.StatsChangeProcessorResults, io.StatsLedgerTransactionProcessorResults, @@ -67,7 +65,6 @@ type ProcessorRunner struct { config Config ctx context.Context - graph orderbook.OBGraph historyQ history.IngestionQ historyAdapter adapters.HistoryArchiveAdapterInterface ledgerBackend ledgerbackend.LedgerBackend @@ -102,7 +99,6 @@ func (s *ProcessorRunner) buildChangeProcessor( useLedgerCache := source == ledgerSource return groupChangeProcessors{ statsChangeProcessor, - processors.NewOrderbookProcessor(s.graph), processors.NewAccountDataProcessor(s.historyQ), processors.NewAccountsProcessor(s.historyQ), processors.NewOffersProcessor(s.historyQ, sequence), @@ -306,23 +302,3 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(sequence uint32) (io.StatsCha return changeStats.GetResults(), statsLedgerTransactionProcessorResults, nil } - -func (s *ProcessorRunner) RunOrderBookProcessorOnLedger(ledger uint32) (io.StatsChangeProcessorResults, error) { - changeStats := io.StatsChangeProcessor{} - - statsChangeProcessor := &statsChangeProcessor{ - StatsChangeProcessor: &changeStats, - } - - groupProcessor := groupChangeProcessors{ - statsChangeProcessor, - processors.NewOrderbookProcessor(s.graph), - } - - err := s.runChangeProcessorOnLedger(groupProcessor, ledger) - if err != nil { - return changeStats.GetResults(), err - } - - return changeStats.GetResults(), nil -} diff --git a/services/horizon/internal/expingest/processor_runner_test.go b/services/horizon/internal/expingest/processor_runner_test.go index bdfa4bcde7..355dc1dfab 100644 --- a/services/horizon/internal/expingest/processor_runner_test.go +++ b/services/horizon/internal/expingest/processor_runner_test.go @@ -19,7 +19,6 @@ import ( func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { maxBatchSize := 100000 - graph := &mockOrderBookGraph{} q := &mockDBQ{} // Batches @@ -68,7 +67,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { config: Config{ NetworkPassphrase: network.PublicNetworkPassphrase, }, - graph: graph, historyQ: q, } @@ -84,8 +82,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { MaxStreamRetries: 3, } - graph := &mockOrderBookGraph{} - defer mock.AssertExpectationsForObjects(t, graph) q := &mockDBQ{} defer mock.AssertExpectationsForObjects(t, q) historyAdapter := &adapters.MockHistoryArchiveAdapter{} @@ -168,7 +164,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { runner := ProcessorRunner{ ctx: context.Background(), config: config, - graph: graph, historyQ: q, historyAdapter: historyAdapter, ledgerBackend: ledgerBackend, @@ -201,17 +196,16 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { assert.IsType(t, groupChangeProcessors{}, processor) assert.IsType(t, &statsChangeProcessor{}, processor.(groupChangeProcessors)[0]) - assert.IsType(t, &processors.OrderbookProcessor{}, processor.(groupChangeProcessors)[1]) - assert.IsType(t, &processors.AccountDataProcessor{}, processor.(groupChangeProcessors)[2]) - assert.IsType(t, &processors.AccountsProcessor{}, processor.(groupChangeProcessors)[3]) - assert.IsType(t, &processors.OffersProcessor{}, processor.(groupChangeProcessors)[4]) - assert.IsType(t, &processors.AssetStatsProcessor{}, processor.(groupChangeProcessors)[5]) - assert.True(t, reflect.ValueOf(processor.(groupChangeProcessors)[5]). + assert.IsType(t, &processors.AccountDataProcessor{}, processor.(groupChangeProcessors)[1]) + assert.IsType(t, &processors.AccountsProcessor{}, processor.(groupChangeProcessors)[2]) + assert.IsType(t, &processors.OffersProcessor{}, processor.(groupChangeProcessors)[3]) + assert.IsType(t, &processors.AssetStatsProcessor{}, processor.(groupChangeProcessors)[4]) + assert.True(t, reflect.ValueOf(processor.(groupChangeProcessors)[4]). Elem().FieldByName("useLedgerEntryCache").Bool()) - assert.IsType(t, &processors.SignersProcessor{}, processor.(groupChangeProcessors)[6]) - assert.True(t, reflect.ValueOf(processor.(groupChangeProcessors)[6]). + assert.IsType(t, &processors.SignersProcessor{}, processor.(groupChangeProcessors)[5]) + assert.True(t, reflect.ValueOf(processor.(groupChangeProcessors)[5]). Elem().FieldByName("useLedgerEntryCache").Bool()) - assert.IsType(t, &processors.TrustLinesProcessor{}, processor.(groupChangeProcessors)[7]) + assert.IsType(t, &processors.TrustLinesProcessor{}, processor.(groupChangeProcessors)[6]) runner = ProcessorRunner{ historyQ: q, @@ -221,17 +215,16 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { assert.IsType(t, groupChangeProcessors{}, processor) assert.IsType(t, &statsChangeProcessor{}, processor.(groupChangeProcessors)[0]) - assert.IsType(t, &processors.OrderbookProcessor{}, processor.(groupChangeProcessors)[1]) - assert.IsType(t, &processors.AccountDataProcessor{}, processor.(groupChangeProcessors)[2]) - assert.IsType(t, &processors.AccountsProcessor{}, processor.(groupChangeProcessors)[3]) - assert.IsType(t, &processors.OffersProcessor{}, processor.(groupChangeProcessors)[4]) - assert.IsType(t, &processors.AssetStatsProcessor{}, processor.(groupChangeProcessors)[5]) - assert.False(t, reflect.ValueOf(processor.(groupChangeProcessors)[5]). + assert.IsType(t, &processors.AccountDataProcessor{}, processor.(groupChangeProcessors)[1]) + assert.IsType(t, &processors.AccountsProcessor{}, processor.(groupChangeProcessors)[2]) + assert.IsType(t, &processors.OffersProcessor{}, processor.(groupChangeProcessors)[3]) + assert.IsType(t, &processors.AssetStatsProcessor{}, processor.(groupChangeProcessors)[4]) + assert.False(t, reflect.ValueOf(processor.(groupChangeProcessors)[4]). Elem().FieldByName("useLedgerEntryCache").Bool()) - assert.IsType(t, &processors.SignersProcessor{}, processor.(groupChangeProcessors)[6]) - assert.False(t, reflect.ValueOf(processor.(groupChangeProcessors)[6]). + assert.IsType(t, &processors.SignersProcessor{}, processor.(groupChangeProcessors)[5]) + assert.False(t, reflect.ValueOf(processor.(groupChangeProcessors)[5]). Elem().FieldByName("useLedgerEntryCache").Bool()) - assert.IsType(t, &processors.TrustLinesProcessor{}, processor.(groupChangeProcessors)[7]) + assert.IsType(t, &processors.TrustLinesProcessor{}, processor.(groupChangeProcessors)[6]) } func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { @@ -284,8 +277,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { MaxStreamRetries: 3, } - graph := &mockOrderBookGraph{} - defer mock.AssertExpectationsForObjects(t, graph) q := &mockDBQ{} defer mock.AssertExpectationsForObjects(t, q) ledgerBackend := &ledgerbackend.MockDatabaseBackend{} @@ -342,7 +333,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { runner := ProcessorRunner{ ctx: context.Background(), config: config, - graph: graph, historyQ: q, ledgerBackend: ledgerBackend, } @@ -351,43 +341,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { assert.NoError(t, err) } -func TestProcessorRunnerRunOrderBookProcessorOnLedger(t *testing.T) { - config := Config{ - NetworkPassphrase: network.PublicNetworkPassphrase, - MaxStreamRetries: 3, - } - - graph := &mockOrderBookGraph{} - defer mock.AssertExpectationsForObjects(t, graph) - ledgerBackend := &ledgerbackend.MockDatabaseBackend{} - defer mock.AssertExpectationsForObjects(t, ledgerBackend) - - ledger := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - BucketListHash: xdr.Hash([32]byte{0, 1, 2}), - }, - } - - ledgerBackend.On("GetLedger", uint32(63)). - Return( - true, - ledgerbackend.LedgerCloseMeta{ - LedgerHeader: ledger, - }, - nil, - ).Once() - - runner := ProcessorRunner{ - ctx: context.Background(), - config: config, - graph: graph, - ledgerBackend: ledgerBackend, - } - - _, err := runner.RunOrderBookProcessorOnLedger(63) - assert.NoError(t, err) -} - func TestSkipFailedTransactions(t *testing.T) { mockProcessor := &mockHorizonTransactionProcessor{} successfulTx := io.LedgerTransaction{ diff --git a/services/horizon/internal/expingest/processors/new_orderbook_processor.go b/services/horizon/internal/expingest/processors/new_orderbook_processor.go deleted file mode 100644 index a6889bc0cf..0000000000 --- a/services/horizon/internal/expingest/processors/new_orderbook_processor.go +++ /dev/null @@ -1,54 +0,0 @@ -package processors - -import ( - "github.com/stellar/go/exp/ingest/io" - "github.com/stellar/go/exp/orderbook" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" -) - -// OrderbookProcessor is a processor (both state and ledger) that's responsible -// for updating orderbook graph with new/updated/removed offers. Orderbook graph -// can be later used for path finding. -type OrderbookProcessor struct { - graph orderbook.OBGraph - - cache *io.LedgerEntryChangeCache -} - -func NewOrderbookProcessor(graph orderbook.OBGraph) *OrderbookProcessor { - return &OrderbookProcessor{ - graph: graph, - cache: io.NewLedgerEntryChangeCache(), - } -} - -func (p *OrderbookProcessor) ProcessChange(change io.Change) error { - if change.Type != xdr.LedgerEntryTypeOffer { - return nil - } - - err := p.cache.AddChange(change) - if err != nil { - return errors.Wrap(err, "error adding to ledgerCache") - } - - return nil -} - -func (p *OrderbookProcessor) Commit() error { - changes := p.cache.GetChanges() - for _, change := range changes { - switch { - case change.Post != nil: - // Created or updated - offer := change.Post.Data.MustOffer() - p.graph.AddOffer(offer) - case change.Pre != nil && change.Post == nil: - // Removed - offer := change.Pre.Data.MustOffer() - p.graph.RemoveOffer(offer.OfferId) - } - } - return nil -} diff --git a/services/horizon/internal/expingest/processors/orderbook_processor_test.go b/services/horizon/internal/expingest/processors/orderbook_processor_test.go deleted file mode 100644 index bab6039323..0000000000 --- a/services/horizon/internal/expingest/processors/orderbook_processor_test.go +++ /dev/null @@ -1,273 +0,0 @@ -package processors - -import ( - "testing" - - "github.com/stellar/go/exp/ingest/io" - "github.com/stellar/go/exp/orderbook" - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" -) - -func TestProcessOrderBookState(t *testing.T) { - graph := orderbook.NewOrderBookGraph() - processor := NewOrderbookProcessor(graph) - - err := processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: nil, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(1), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 2}, - }, - }, - }, - }) - assert.NoError(t, err) - err = processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: nil, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(2), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 2}, - }, - }, - }, - }) - assert.NoError(t, err) - err = processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: nil, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(3), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 2}, - }, - }, - }, - }) - assert.NoError(t, err) - - assert.NoError(t, processor.Commit()) - if err := graph.Apply(2); err != nil { - t.Fatalf("unexpected error %v", err) - } - - expectedOffers := map[xdr.Int64]bool{ - xdr.Int64(1): true, - xdr.Int64(2): true, - xdr.Int64(3): true, - } - - offers := graph.Offers() - for _, offer := range offers { - if !expectedOffers[offer.OfferId] { - t.Fatalf("unexpected offer id %v", offer.OfferId) - } - delete(expectedOffers, offer.OfferId) - } - if len(expectedOffers) != 0 { - t.Fatal("expected offers does not match offers in graph") - } -} - -func TestProcessOrderBookLedger(t *testing.T) { - graph := orderbook.NewOrderBookGraph() - processor := NewOrderbookProcessor(graph) - - // should be ignored because it's not an offer type - err := processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeAccount, - Pre: nil, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeAccount, - Account: &xdr.AccountEntry{ - AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Thresholds: [4]byte{1, 1, 1, 1}, - }, - }, - }, - }) - assert.NoError(t, err) - - err = processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: nil, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(1), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 2}, - }, - }, - }, - }) - assert.NoError(t, err) - - err = processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: nil, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(2), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 3}, - }, - }, - }, - }) - assert.NoError(t, err) - - err = processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: nil, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(3), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{3, 1}, - }, - }, - }, - }) - assert.NoError(t, err) - - err = processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(2), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 3}, - }, - }, - }, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(2), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 6}, - }, - }, - }, - }) - assert.NoError(t, err) - - err = processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(3), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{3, 1}, - }, - }, - }, - Post: nil, - }) - assert.NoError(t, err) - - assert.NoError(t, processor.Commit()) - - if err := graph.Apply(2); err != nil { - t.Fatalf("unexpected error %v", err) - } - - expectedOffers := map[xdr.Int64]xdr.Price{ - xdr.Int64(1): xdr.Price{1, 2}, - xdr.Int64(2): xdr.Price{1, 6}, - } - - offers := graph.Offers() - for _, offer := range offers { - if price, ok := expectedOffers[offer.OfferId]; !ok { - t.Fatalf("unexpected offer id %v", offer.OfferId) - } else if offer.Price != price { - t.Fatalf("unexpected offer price %v for offer with id %v", offer.Price, offer.OfferId) - } - delete(expectedOffers, offer.OfferId) - } - if len(expectedOffers) != 0 { - t.Fatal("expected offers does not match offers in graph") - } -} - -func TestProcessOrderBookLedgerProcessUpgradeChanges(t *testing.T) { - graph := orderbook.NewOrderBookGraph() - processor := NewOrderbookProcessor(graph) - - err := processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: nil, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(1), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 2}, - }, - }, - }, - }) - assert.NoError(t, err) - - err = processor.ProcessChange(io.Change{ - Type: xdr.LedgerEntryTypeOffer, - Pre: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(1), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{1, 2}, - }, - }, - }, - Post: &xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeOffer, - Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(1), - SellerId: xdr.MustAddress("GCFMARUTEFR6NW5HU5JZIVHEN5MO764GQKGRLHOIRY265Z343NZ7AK3J"), - Price: xdr.Price{100, 2}, - }, - }, - }, - }) - assert.NoError(t, err) - - assert.NoError(t, processor.Commit()) - - if err := graph.Apply(2); err != nil { - t.Fatalf("unexpected error %v", err) - } - - offers := graph.Offers() - assert.Equal(t, xdr.Int32(100), offers[0].Price.N) - assert.Equal(t, xdr.Int32(2), offers[0].Price.D) -} diff --git a/services/horizon/internal/expingest/resume_state_test.go b/services/horizon/internal/expingest/resume_state_test.go index 88144195ec..04ec556be1 100644 --- a/services/horizon/internal/expingest/resume_state_test.go +++ b/services/horizon/internal/expingest/resume_state_test.go @@ -18,7 +18,6 @@ func TestResumeTestTestSuite(t *testing.T) { type ResumeTestTestSuite struct { suite.Suite - graph *mockOrderBookGraph ledgeBackend *ledgerbackend.MockDatabaseBackend historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter @@ -28,7 +27,6 @@ type ResumeTestTestSuite struct { } func (s *ResumeTestTestSuite) SetupTest() { - s.graph = &mockOrderBookGraph{} s.ledgeBackend = &ledgerbackend.MockDatabaseBackend{} s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} @@ -40,13 +38,11 @@ func (s *ResumeTestTestSuite) SetupTest() { historyAdapter: s.historyAdapter, runner: s.runner, ledgerBackend: s.ledgeBackend, - graph: s.graph, stellarCoreClient: s.stellarCoreClient, } s.system.initMetrics() s.historyQ.On("Rollback").Return(nil).Once() - s.graph.On("Discard").Once() } func (s *ResumeTestTestSuite) TearDownTest() { @@ -56,16 +52,12 @@ func (s *ResumeTestTestSuite) TearDownTest() { s.historyAdapter.AssertExpectations(t) s.ledgeBackend.AssertExpectations(t) s.stellarCoreClient.AssertExpectations(t) - s.graph.AssertExpectations(t) } func (s *ResumeTestTestSuite) TestInvalidParam() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} - // Recreate orderbook graph to remove Discard assertion - *s.graph = mockOrderBookGraph{} - next, err := resumeState{latestSuccessfullyProcessedLedger: 0}.run(s.system) s.Assert().Error(err) s.Assert().EqualError(err, "unexpected latestSuccessfullyProcessedLedger value") @@ -80,9 +72,6 @@ func (s *ResumeTestTestSuite) TestBeginReturnsError() { *s.historyQ = mockDBQ{} s.historyQ.On("Begin").Return(errors.New("my error")).Once() - // Recreate orderbook graph to remove Discard assertion - *s.graph = mockOrderBookGraph{} - next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) s.Assert().Error(err) s.Assert().EqualError(err, "Error starting a transaction: my error") @@ -213,51 +202,49 @@ func (s *ResumeTestTestSuite) TestLatestHistoryLedgerGreaterThanIngestLedger() { ) } -func (s *ResumeTestTestSuite) TestIngestOrderbookOnly() { +func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { s.historyQ.On("Begin").Return(nil).Once() - s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(110), nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(101), nil).Once() s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once() - s.historyQ.On("GetLatestLedger").Return(uint32(0), nil) + s.historyQ.On("GetLatestLedger").Return(uint32(101), nil) s.ledgeBackend.On("GetLatestLedgerSequence").Return(uint32(111), nil).Once() - // Rollback to release the lock as we're not updating DB - s.historyQ.On("Rollback").Return(nil).Once() - s.runner.On("RunOrderBookProcessorOnLedger", uint32(101)).Return(io.StatsChangeProcessorResults{}, nil).Once() - s.graph.On("Apply", uint32(101)).Return(nil).Once() + s.runner.On("RunAllProcessorsOnLedger", uint32(102)).Return(io.StatsChangeProcessorResults{}, io.StatsLedgerTransactionProcessorResults{}, nil).Once() + s.historyQ.On("UpdateLastLedgerExpIngest", uint32(102)).Return(nil).Once() + s.historyQ.On("Commit").Return(nil).Once() - next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) + s.stellarCoreClient.On( + "SetCursor", + mock.AnythingOfType("*context.timerCtx"), + defaultCoreCursorName, + int32(102), + ).Return(nil).Once() + + s.historyQ.On("GetExpStateInvalid").Return(false, nil).Once() +} +func (s *ResumeTestTestSuite) TestBumpIngestLedger() { + s.mockSuccessfulIngestion() + + next, err := resumeState{latestSuccessfullyProcessedLedger: 99}.run(s.system) s.Assert().NoError(err) s.Assert().Equal( transition{ - node: resumeState{latestSuccessfullyProcessedLedger: 101}, + node: resumeState{latestSuccessfullyProcessedLedger: 102}, sleepDuration: 0, }, next, ) } -// TestIngestOrderbookOnlyWhenLastLedgerExpEqualsCurrent is very similar to the test above -// but it checks the `ingestLedger <= lastIngestedLedger` that, if incorrect, could lead -// to a nasty off-by-one error. -func (s *ResumeTestTestSuite) TestIngestOrderbookOnlyWhenLastLedgerExpEqualsCurrent() { - s.historyQ.On("Begin").Return(nil).Once() - s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(101), nil).Once() - s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once() - s.historyQ.On("GetLatestLedger").Return(uint32(101), nil) - - s.ledgeBackend.On("GetLatestLedgerSequence").Return(uint32(111), nil).Once() - - // Rollback to release the lock as we're not updating DB - s.historyQ.On("Rollback").Return(nil).Once() - s.runner.On("RunOrderBookProcessorOnLedger", uint32(101)).Return(io.StatsChangeProcessorResults{}, nil).Once() - s.graph.On("Apply", uint32(101)).Return(nil).Once() +func (s *ResumeTestTestSuite) TestBumpIngestLedgerWhenIngestLedgerEqualsLastLedgerExpIngest() { + s.mockSuccessfulIngestion() next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) s.Assert().NoError(err) s.Assert().Equal( transition{ - node: resumeState{latestSuccessfullyProcessedLedger: 101}, + node: resumeState{latestSuccessfullyProcessedLedger: 102}, sleepDuration: 0, }, next, @@ -265,32 +252,13 @@ func (s *ResumeTestTestSuite) TestIngestOrderbookOnlyWhenLastLedgerExpEqualsCurr } func (s *ResumeTestTestSuite) TestIngestAllMasterNode() { - s.historyQ.On("Begin").Return(nil).Once() - s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(100), nil).Once() - s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once() - s.historyQ.On("GetLatestLedger").Return(uint32(0), nil) - - s.ledgeBackend.On("GetLatestLedgerSequence").Return(uint32(111), nil).Once() - - s.runner.On("RunAllProcessorsOnLedger", uint32(101)).Return(io.StatsChangeProcessorResults{}, io.StatsLedgerTransactionProcessorResults{}, nil).Once() - s.historyQ.On("UpdateLastLedgerExpIngest", uint32(101)).Return(nil).Once() - s.historyQ.On("Commit").Return(nil).Once() - s.graph.On("Apply", uint32(101)).Return(nil).Once() - - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(101), - ).Return(nil).Once() - - s.historyQ.On("GetExpStateInvalid").Return(false, nil).Once() + s.mockSuccessfulIngestion() - next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) + next, err := resumeState{latestSuccessfullyProcessedLedger: 101}.run(s.system) s.Assert().NoError(err) s.Assert().Equal( transition{ - node: resumeState{latestSuccessfullyProcessedLedger: 101}, + node: resumeState{latestSuccessfullyProcessedLedger: 102}, sleepDuration: 0, }, next, @@ -308,7 +276,6 @@ func (s *ResumeTestTestSuite) TestErrorSettingCursorIgnored() { s.runner.On("RunAllProcessorsOnLedger", uint32(101)).Return(io.StatsChangeProcessorResults{}, io.StatsLedgerTransactionProcessorResults{}, nil).Once() s.historyQ.On("UpdateLastLedgerExpIngest", uint32(101)).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.graph.On("Apply", uint32(101)).Return(nil).Once() s.stellarCoreClient.On( "SetCursor", diff --git a/services/horizon/internal/expingest/stress_test.go b/services/horizon/internal/expingest/stress_test.go index 93981268a5..c0d02f1b44 100644 --- a/services/horizon/internal/expingest/stress_test.go +++ b/services/horizon/internal/expingest/stress_test.go @@ -15,7 +15,6 @@ func TestStressTestStateTestSuite(t *testing.T) { type StressTestStateTestSuite struct { suite.Suite - graph *mockOrderBookGraph historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner @@ -23,7 +22,6 @@ type StressTestStateTestSuite struct { } func (s *StressTestStateTestSuite) SetupTest() { - s.graph = &mockOrderBookGraph{} s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} @@ -31,13 +29,11 @@ func (s *StressTestStateTestSuite) SetupTest() { historyQ: s.historyQ, historyAdapter: s.historyAdapter, runner: s.runner, - graph: s.graph, } s.system.initMetrics() s.historyQ.On("GetTx").Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.graph.On("Discard").Once() s.runner.On("EnableMemoryStatsLogging").Return() s.runner.On("SetLedgerBackend", fakeLedgerBackend{ numTransactions: 10, @@ -51,12 +47,10 @@ func (s *StressTestStateTestSuite) TearDownTest() { s.historyQ.AssertExpectations(t) s.historyAdapter.AssertExpectations(t) s.runner.AssertExpectations(t) - s.graph.AssertExpectations(t) } func (s *StressTestStateTestSuite) TestBounds() { *s.historyQ = mockDBQ{} - *s.graph = mockOrderBookGraph{} *s.runner = mockProcessorsRunner{} err := s.system.StressTest(-1, 4) @@ -71,7 +65,6 @@ func (s *StressTestStateTestSuite) TestBounds() { func (s *StressTestStateTestSuite) TestBeginReturnsError() { *s.historyQ = mockDBQ{} - *s.graph = mockOrderBookGraph{} s.historyQ.On("GetTx").Return(nil).Once() s.historyQ.On("Begin").Return(errors.New("my error")).Once() @@ -114,18 +107,6 @@ func (s *StressTestStateTestSuite) TestUpdateLastLedgerExpIngestReturnsError() { s.Assert().EqualError(err, "Error updating last ingested ledger: my error") } -func (s *StressTestStateTestSuite) TestApplyReturnsError() { - s.historyQ.On("Begin").Return(nil).Once() - s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() - s.runner.On("RunAllProcessorsOnLedger", uint32(1)).Return(io.StatsChangeProcessorResults{}, io.StatsLedgerTransactionProcessorResults{}, nil).Once() - s.historyQ.On("UpdateLastLedgerExpIngest", uint32(1)).Return(nil).Once() - s.historyQ.On("Commit").Return(nil).Once() - s.graph.On("Apply", uint32(1)).Return(errors.New("my error")).Once() - - err := s.system.StressTest(10, 4) - s.Assert().EqualError(err, "Error applying order book changes: my error") -} - func (s *StressTestStateTestSuite) TestCommitReturnsError() { s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() @@ -143,7 +124,6 @@ func (s *StressTestStateTestSuite) TestSucceeds() { s.runner.On("RunAllProcessorsOnLedger", uint32(1)).Return(io.StatsChangeProcessorResults{}, io.StatsLedgerTransactionProcessorResults{}, nil).Once() s.historyQ.On("UpdateLastLedgerExpIngest", uint32(1)).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.graph.On("Apply", uint32(1)).Return(nil).Once() err := s.system.StressTest(10, 4) s.Assert().NoError(err) diff --git a/services/horizon/internal/expingest/verify.go b/services/horizon/internal/expingest/verify.go index 9d3ee391f3..aa40fc9b68 100644 --- a/services/horizon/internal/expingest/verify.go +++ b/services/horizon/internal/expingest/verify.go @@ -1,7 +1,6 @@ package expingest import ( - "bytes" "database/sql" "fmt" "time" @@ -31,10 +30,7 @@ const stateVerifierExpectedIngestionVersion = 10 // verifyState is called as a go routine from pipeline post hook every 64 // ledgers. It checks if the state is correct. If another go routine is already // running it exits. -func (s *System) verifyState( - graphOffers map[xdr.Int64]xdr.OfferEntry, - verifyAgainstLatestCheckpoint bool, -) error { +func (s *System) verifyState(verifyAgainstLatestCheckpoint bool) error { s.stateVerificationMutex.Lock() if s.stateVerificationRunning { log.Warn("State verification is already running...") @@ -178,7 +174,7 @@ func (s *System) verifyState( return errors.Wrap(err, "addDataToStateVerifier failed") } - err = addOffersToStateVerifier(verifier, historyQ, offers, graphOffers) + err = addOffersToStateVerifier(verifier, historyQ, offers) if err != nil { return errors.Wrap(err, "addOffersToStateVerifier failed") } @@ -194,16 +190,6 @@ func (s *System) verifyState( localLog.WithField("total", total).Info("Finished writing to StateVerifier") - if len(graphOffers) != 0 { - offerIDs := make([]xdr.Int64, 0, len(graphOffers)) - for id := range graphOffers { - offerIDs = append(offerIDs, id) - } - return ingesterrors.NewStateError( - fmt.Errorf("orderbook graph contains offers missing from HAS: %v", offerIDs), - ) - } - countAccounts, err := historyQ.CountAccounts() if err != nil { return errors.Wrap(err, "Error running historyQ.CountAccounts") @@ -413,24 +399,10 @@ func addDataToStateVerifier(verifier *verify.StateVerifier, q history.IngestionQ return nil } -func offerEntryEquals(offer, other xdr.OfferEntry) (bool, error) { - serialized, err := offer.MarshalBinary() - if err != nil { - return false, errors.Wrap(err, "could not serialize offer") - } - otherSerialized, err := other.MarshalBinary() - if err != nil { - return false, errors.Wrap(err, "could not serialize offer") - } - - return bytes.Equal(serialized, otherSerialized), nil -} - func addOffersToStateVerifier( verifier *verify.StateVerifier, q history.IngestionQ, ids []int64, - graphOffers map[xdr.Int64]xdr.OfferEntry, ) error { if len(ids) == 0 { return nil @@ -460,23 +432,7 @@ func addOffersToStateVerifier( }, }, } - graphOffer, ok := graphOffers[row.OfferID] - if !ok { - return ingesterrors.NewStateError( - fmt.Errorf("offer %v is not in orderbook graph", row.OfferID), - ) - } - if equal, err := offerEntryEquals(graphOffer, *entry.Data.Offer); err != nil { - return errors.Wrap(err, "could not compare offers") - } else if !equal { - return ingesterrors.NewStateError( - fmt.Errorf( - "offer %v from db does not match offer in orderbook graph", - row.OfferID, - ), - ) - } - delete(graphOffers, row.OfferID) + err := verifier.Write(entry) if err != nil { return err diff --git a/services/horizon/internal/expingest/verify_range_state_test.go b/services/horizon/internal/expingest/verify_range_state_test.go index f8883cbe17..a9cbcd99a8 100644 --- a/services/horizon/internal/expingest/verify_range_state_test.go +++ b/services/horizon/internal/expingest/verify_range_state_test.go @@ -22,7 +22,6 @@ func TestVerifyRangeStateTestSuite(t *testing.T) { type VerifyRangeStateTestSuite struct { suite.Suite - graph *mockOrderBookGraph historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner @@ -30,7 +29,6 @@ type VerifyRangeStateTestSuite struct { } func (s *VerifyRangeStateTestSuite) SetupTest() { - s.graph = &mockOrderBookGraph{} s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} @@ -38,12 +36,10 @@ func (s *VerifyRangeStateTestSuite) SetupTest() { historyQ: s.historyQ, historyAdapter: s.historyAdapter, runner: s.runner, - graph: s.graph, } s.system.initMetrics() s.historyQ.On("Rollback").Return(nil).Once() - s.graph.On("Discard").Once() } func (s *VerifyRangeStateTestSuite) TearDownTest() { @@ -51,13 +47,11 @@ func (s *VerifyRangeStateTestSuite) TearDownTest() { s.historyQ.AssertExpectations(t) s.historyAdapter.AssertExpectations(t) s.runner.AssertExpectations(t) - s.graph.AssertExpectations(t) } func (s *VerifyRangeStateTestSuite) TestInvalidRange() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} - *s.graph = mockOrderBookGraph{} next, err := verifyRangeState{fromLedger: 0, toLedger: 0}.run(s.system) s.Assert().Error(err) @@ -95,7 +89,6 @@ func (s *VerifyRangeStateTestSuite) TestInvalidRange() { func (s *VerifyRangeStateTestSuite) TestBeginReturnsError() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} - *s.graph = mockOrderBookGraph{} s.historyQ.On("Begin").Return(errors.New("my error")).Once() next, err := verifyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -154,7 +147,6 @@ func (s *VerifyRangeStateTestSuite) TestSuccess() { s.runner.On("RunHistoryArchiveIngestion", uint32(100)).Return(ingestio.StatsChangeProcessorResults{}, nil).Once() s.historyQ.On("UpdateLastLedgerExpIngest", uint32(100)).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.graph.On("Apply", uint32(100)).Return(nil).Once() for i := uint32(101); i <= 200; i++ { s.historyQ.On("Begin").Return(nil).Once() @@ -162,7 +154,6 @@ func (s *VerifyRangeStateTestSuite) TestSuccess() { ingestio.StatsLedgerTransactionProcessorResults{}, nil).Once() s.historyQ.On("UpdateLastLedgerExpIngest", i).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.graph.On("Apply", i).Return(nil).Once() } next, err := verifyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -179,7 +170,6 @@ func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { s.runner.On("RunHistoryArchiveIngestion", uint32(100)).Return(ingestio.StatsChangeProcessorResults{}, nil).Once() s.historyQ.On("UpdateLastLedgerExpIngest", uint32(100)).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.graph.On("Apply", uint32(100)).Return(nil).Once() for i := uint32(101); i <= 110; i++ { s.historyQ.On("Begin").Return(nil).Once() @@ -187,13 +177,8 @@ func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { ingestio.StatsLedgerTransactionProcessorResults{}, nil).Once() s.historyQ.On("UpdateLastLedgerExpIngest", i).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.graph.On("Apply", i).Return(nil).Once() } - s.graph.On("OffersMap").Return(map[xdr.Int64]xdr.OfferEntry{ - eurOffer.OfferId: eurOffer, - }).Once() - clonedQ := &mockDBQ{} s.historyQ.On("CloneIngestionQ").Return(clonedQ).Once() diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index a962c89aa0..0fb84b12dc 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -72,7 +72,7 @@ func mustInitCoreDB(app *App) { )} } -func initExpIngester(app *App, orderBookGraph *orderbook.OrderBookGraph) { +func initExpIngester(app *App) { var err error app.expingester, err = expingest.NewSystem(expingest.Config{ CoreSession: mustNewDBSession( @@ -88,7 +88,6 @@ func initExpIngester(app *App, orderBookGraph *orderbook.OrderBookGraph) { HistoryArchiveURL: app.config.HistoryArchiveURLs[0], StellarCoreURL: app.config.StellarCoreURL, StellarCoreCursor: app.config.CursorName, - OrderBookGraph: orderBookGraph, MaxStreamRetries: 3, DisableStateVerification: app.config.IngestDisableStateVerification, IngestFailedTransactions: app.config.IngestFailedTransactions, @@ -159,7 +158,6 @@ func initIngestMetrics(app *App) { app.metrics.Register("ingest.ledger_ingestion", app.expingester.Metrics.LedgerIngestionTimer) app.metrics.Register("ingest.ledger_in_memory_ingestion", app.expingester.Metrics.LedgerInMemoryIngestionTimer) app.metrics.Register("ingest.state_verify", app.expingester.Metrics.StateVerifyTimer) - app.metrics.Register("ingest.local_latest_ledger", app.expingester.Metrics.LocalLatestLedgerGauge) } func initTxSubMetrics(app *App) { From dda6b4aa8411fd46aece2dd4904b96ad73b0809b Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Mon, 1 Jun 2020 20:09:42 +0200 Subject: [PATCH 2/4] Code review feedback --- services/horizon/internal/app.go | 9 ++----- .../horizon/internal/expingest/orderbook.go | 26 +++++++++++++++++-- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index faaeb334ab..02a24c9fee 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -113,6 +113,7 @@ func (a *App) Serve() { } go a.run() + go a.orderBookStream.Run(a.ctx) // WaitGroup for all go routines. Makes sure that DB is closed when // all services gracefully shutdown. @@ -414,13 +415,7 @@ func (a *App) Tick() { var wg sync.WaitGroup log.Debug("ticking app") // update ledger state, operation fee state, and stellar-core info in parallel - wg.Add(4) - go func() { - defer wg.Done() - if err := a.orderBookStream.Update(); err != nil { - log.WithField("error", err).Error("could not apply updates from order book stream") - } - }() + wg.Add(3) go func() { a.UpdateLedgerState(); wg.Done() }() go func() { a.UpdateFeeStatsState(); wg.Done() }() go func() { a.UpdateStellarCoreInfo(); wg.Done() }() diff --git a/services/horizon/internal/expingest/orderbook.go b/services/horizon/internal/expingest/orderbook.go index 5cbbe77743..b391a0245b 100644 --- a/services/horizon/internal/expingest/orderbook.go +++ b/services/horizon/internal/expingest/orderbook.go @@ -1,6 +1,7 @@ package expingest import ( + "context" "database/sql" "math/rand" "sort" @@ -12,7 +13,10 @@ import ( "github.com/stellar/go/xdr" ) -const verificationFrequency = time.Hour +const ( + verificationFrequency = time.Hour + updateFrequency = 30 * time.Second +) // OrderBookStream updates an in memory graph to be consistent with // offers in the Horizon DB. Any offers which are created, modified, or removed @@ -160,7 +164,7 @@ func (o *OrderBookStream) verifyAllOffers() { ingestionOffers, err := o.HistoryQ.GetAllOffers() if err != nil { // reset last update so that we retry verification on next update - o.lastUpdate = time.Now().Add(time.Hour * -2) + o.lastUpdate = time.Now().Add(verificationFrequency * -2) log.WithError(err).Info("Could not verify offers because of error from GetAllOffers") return } @@ -231,3 +235,21 @@ func (o *OrderBookStream) Update() error { } return nil } + +// Run will call Update() every 30 seconds until the given context is terminated. +func (o *OrderBookStream) Run(ctx context.Context) { + ticker := time.NewTicker(updateFrequency) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := o.Update(); err != nil { + log.WithError(err).Error("could not apply updates from order book stream") + } + case <-ctx.Done(): + log.Info("finished background ticker") + return + } + } +} From 35eb86e660c6d28b86cf91331d75728d70d93b7e Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 1 Jun 2020 21:45:27 +0200 Subject: [PATCH 3/4] Update services/horizon/internal/expingest/orderbook.go Co-authored-by: Bartek Nowotarski --- services/horizon/internal/expingest/orderbook.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/expingest/orderbook.go b/services/horizon/internal/expingest/orderbook.go index b391a0245b..fee42d8452 100644 --- a/services/horizon/internal/expingest/orderbook.go +++ b/services/horizon/internal/expingest/orderbook.go @@ -248,7 +248,7 @@ func (o *OrderBookStream) Run(ctx context.Context) { log.WithError(err).Error("could not apply updates from order book stream") } case <-ctx.Done(): - log.Info("finished background ticker") + log.Info("shutting down OrderBookStream") return } } From 571b8edc8fbc60c26d2d401b5ac3c5786c880d86 Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Mon, 1 Jun 2020 21:46:25 +0200 Subject: [PATCH 4/4] change updateFrequency --- services/horizon/internal/expingest/orderbook.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/expingest/orderbook.go b/services/horizon/internal/expingest/orderbook.go index fee42d8452..91ab819b4e 100644 --- a/services/horizon/internal/expingest/orderbook.go +++ b/services/horizon/internal/expingest/orderbook.go @@ -15,7 +15,7 @@ import ( const ( verificationFrequency = time.Hour - updateFrequency = 30 * time.Second + updateFrequency = 2 * time.Second ) // OrderBookStream updates an in memory graph to be consistent with