Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/expingest: Remove orderbook graph from ingestion system #2639

Merged
merged 5 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stellar/go/exp/orderbook"
"github.com/stellar/go/services/horizon/internal/expingest"
support "github.com/stellar/go/support/config"
"github.com/stellar/go/support/db"
Expand Down Expand Up @@ -107,7 +106,6 @@ var ingestVerifyRangeCmd = &cobra.Command{
NetworkPassphrase: config.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
OrderBookGraph: orderbook.NewOrderBookGraph(),
IngestFailedTransactions: config.IngestFailedTransactions,
}

Expand Down Expand Up @@ -185,7 +183,6 @@ var ingestStressTestCmd = &cobra.Command{
NetworkPassphrase: config.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
OrderBookGraph: orderbook.NewOrderBookGraph(),
IngestFailedTransactions: config.IngestFailedTransactions,
}

Expand Down
29 changes: 9 additions & 20 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (a *App) Serve() {
}

go a.run()
go a.orderBookStream.Run(a.ctx)

// WaitGroup for all go routines. Makes sure that DB is closed when
// all services gracefully shutdown.
Expand Down Expand Up @@ -414,15 +415,7 @@ func (a *App) Tick() {
var wg sync.WaitGroup
log.Debug("ticking app")
// update ledger state, operation fee state, and stellar-core info in parallel
wg.Add(4)
go func() {
defer wg.Done()
if a.orderBookStream != nil {
if err := a.orderBookStream.Update(); err != nil {
log.WithField("error", err).Error("could not apply updates from order book stream")
}
}
}()
wg.Add(3)
go func() { a.UpdateLedgerState(); wg.Done() }()
go func() { a.UpdateFeeStatsState(); wg.Done() }()
go func() { a.UpdateStellarCoreInfo(); wg.Done() }()
Expand Down Expand Up @@ -462,19 +455,15 @@ func (a *App) init() {
mustInitCoreDB(a)

if a.config.Ingest {
orderBookGraph := orderbook.NewOrderBookGraph()
// expingester
initExpIngester(a, orderBookGraph)
// path-finder
initPathFinder(a, orderBookGraph)
} else {
orderBookGraph := orderbook.NewOrderBookGraph()
a.orderBookStream = &expingest.OrderBookStream{
OrderBookGraph: orderBookGraph,
HistoryQ: &history.Q{a.HorizonSession(a.ctx)},
}
initPathFinder(a, orderBookGraph)
initExpIngester(a)
}
orderBookGraph := orderbook.NewOrderBookGraph()
a.orderBookStream = &expingest.OrderBookStream{
OrderBookGraph: orderBookGraph,
HistoryQ: &history.Q{a.HorizonSession(a.ctx)},
}
initPathFinder(a, orderBookGraph)

// txsub
initSubmissionSystem(a)
Expand Down
42 changes: 0 additions & 42 deletions services/horizon/internal/expingest/build_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func TestBuildStateTestSuite(t *testing.T) {

type BuildStateTestSuite struct {
suite.Suite
graph *mockOrderBookGraph
historyQ *mockDBQ
historyAdapter *adapters.MockHistoryArchiveAdapter
system *System
Expand All @@ -28,7 +27,6 @@ type BuildStateTestSuite struct {
}

func (s *BuildStateTestSuite) SetupTest() {
s.graph = &mockOrderBookGraph{}
s.historyQ = &mockDBQ{}
s.runner = &mockProcessorsRunner{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
Expand All @@ -39,15 +37,13 @@ func (s *BuildStateTestSuite) SetupTest() {
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
graph: s.graph,
runner: s.runner,
stellarCoreClient: s.stellarCoreClient,
}
s.system.initMetrics()

s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()
s.graph.On("Discard").Once()
}

func (s *BuildStateTestSuite) TearDownTest() {
Expand All @@ -56,7 +52,6 @@ func (s *BuildStateTestSuite) TearDownTest() {
s.historyAdapter.AssertExpectations(t)
s.runner.AssertExpectations(t)
s.stellarCoreClient.AssertExpectations(t)
s.graph.AssertExpectations(t)
}

func (s *BuildStateTestSuite) mockCommonHistoryQ() {
Expand All @@ -77,9 +72,6 @@ func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() {
// Recreate mock in this single test to remove Rollback assertion.
*s.historyQ = mockDBQ{}

// Recreate orderbook graph to remove Discard assertion
*s.graph = mockOrderBookGraph{}

next, err := buildState{checkpointLedger: 0}.run(s.system)
s.Assert().Error(err)
s.Assert().EqualError(err, "unexpected checkpointLedger value")
Expand All @@ -91,9 +83,6 @@ func (s *BuildStateTestSuite) TestBeginReturnsError() {
*s.historyQ = mockDBQ{}
s.historyQ.On("Begin").Return(errors.New("my error")).Once()

// Recreate orderbook graph to remove Discard assertion
*s.graph = mockOrderBookGraph{}

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)
s.Assert().Error(err)
s.Assert().EqualError(err, "Error starting a transaction: my error")
Expand Down Expand Up @@ -187,7 +176,6 @@ func (s *BuildStateTestSuite) TestTruncateExpingestStateTablesReturnsError() {

func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() {
s.mockCommonHistoryQ()
s.graph.On("Clear").Return().Once()
s.runner.
On("RunHistoryArchiveIngestion", s.checkpointLedger).
Return(io.StatsChangeProcessorResults{}, errors.New("my error")).
Expand All @@ -201,7 +189,6 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() {

func (s *BuildStateTestSuite) TestUpdateLastLedgerExpIngestAfterIngestReturnsError() {
s.mockCommonHistoryQ()
s.graph.On("Clear").Return(nil).Once()
s.runner.
On("RunHistoryArchiveIngestion", s.checkpointLedger).
Return(io.StatsChangeProcessorResults{}, nil).
Expand All @@ -221,7 +208,6 @@ func (s *BuildStateTestSuite) TestUpdateLastLedgerExpIngestAfterIngestReturnsErr

func (s *BuildStateTestSuite) TestUpdateExpIngestVersionIngestReturnsError() {
s.mockCommonHistoryQ()
s.graph.On("Clear").Return(nil).Once()
s.runner.
On("RunHistoryArchiveIngestion", s.checkpointLedger).
Return(io.StatsChangeProcessorResults{}, nil).
Expand All @@ -248,7 +234,6 @@ func (s *BuildStateTestSuite) TestUpdateCommitReturnsError() {
s.historyQ.On("UpdateExpIngestVersion", CurrentVersion).
Return(nil).
Once()
s.graph.On("Clear").Return(nil).Once()
s.historyQ.On("Commit").
Return(errors.New("my error")).
Once()
Expand All @@ -259,31 +244,6 @@ func (s *BuildStateTestSuite) TestUpdateCommitReturnsError() {
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *BuildStateTestSuite) TestOBGraphApplyReturnsError() {
s.mockCommonHistoryQ()
s.runner.
On("RunHistoryArchiveIngestion", s.checkpointLedger).
Return(io.StatsChangeProcessorResults{}, nil).
Once()
s.historyQ.On("UpdateLastLedgerExpIngest", s.checkpointLedger).
Return(nil).
Once()
s.historyQ.On("UpdateExpIngestVersion", CurrentVersion).
Return(nil).
Once()
s.historyQ.On("Commit").
Return(nil).
Once()

s.graph.On("Clear").Return().Once()
s.graph.On("Apply", s.checkpointLedger).Return(errors.New("my error")).Once()
next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)

s.Assert().Error(err)
s.Assert().EqualError(err, "Error applying order book changes: my error")
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *BuildStateTestSuite) TestBuildStateSucceeds() {
s.mockCommonHistoryQ()
s.runner.
Expand All @@ -300,8 +260,6 @@ func (s *BuildStateTestSuite) TestBuildStateSucceeds() {
Return(nil).
Once()

s.graph.On("Clear").Return(nil).Once()
s.graph.On("Apply", s.checkpointLedger).Return(nil).Once()
next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)

s.Assert().NoError(err)
Expand Down
4 changes: 1 addition & 3 deletions services/horizon/internal/expingest/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/stellar/go/exp/ingest/adapters"
"github.com/stellar/go/exp/ingest/io"
"github.com/stellar/go/exp/ingest/ledgerbackend"
"github.com/stellar/go/exp/orderbook"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -82,7 +81,6 @@ func (s *DBTestSuite) SetupTest() {
CoreSession: s.tt.CoreSession(),
HistorySession: s.tt.HorizonSession(),
HistoryArchiveURL: "http://ignore.test",
OrderBookGraph: orderbook.NewOrderBookGraph(),
MaxStreamRetries: 3,
DisableStateVerification: false,
IngestFailedTransactions: true,
Expand Down Expand Up @@ -144,7 +142,7 @@ func (s *DBTestSuite) TestBuildState() {
s.Assert().Equal(s.sequence, resume.latestSuccessfullyProcessedLedger)

s.mockChangeReader()
s.Assert().NoError(s.system.verifyState(s.system.graph.OffersMap(), false))
s.Assert().NoError(s.system.verifyState(false))
}

func (s *DBTestSuite) TestVersionMismatchTriggersRebuild() {
Expand Down
Loading