Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/expingest: Ingest into memory only flag #2299

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions services/horizon/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ var configOpts = support.ConfigOptions{
FlagDefault: false,
Usage: "causes this horizon process to ingest data from stellar-core into horizon's db",
},
&support.ConfigOption{
Name: "exp-ingest-in-memory-only",
ConfigKey: &config.IngestInMemoryOnly,
OptType: types.Bool,
FlagDefault: false,
Usage: "[experimental flag!] causes this horizon process to ingest data from stellar-core into memory structures only, ignored when --ingest not set",
},
&support.ConfigOption{
Name: "ingest-failed-transactions",
ConfigKey: &config.IngestFailedTransactions,
Expand Down
2 changes: 2 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Config struct {
TLSKey string
// Ingest toggles whether this horizon instance should run the data ingestion subsystem.
Ingest bool
// IngestInMemoryOnly causes this horizon process to ingest data from stellar-core into memory structures only.
IngestInMemoryOnly bool
// IngestFailedTransactions toggles whether to ingest failed transactions
IngestFailedTransactions bool
// CursorName is the cursor used for ingesting from stellar-core.
Expand Down
17 changes: 17 additions & 0 deletions services/horizon/internal/expingest/build_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,23 @@ func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() {
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *BuildStateTestSuite) TestIngestInMemoryOnly() {
// Recreate mock in this single test to remove Rollback assertion.
*s.historyQ = mockDBQ{}

// Recreate orderbook graph to remove Discard assertion
*s.graph = mockOrderBookGraph{}

s.system.config.IngestInMemoryOnly = true
defer func() {
s.system.config.IngestInMemoryOnly = false
}()

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)
s.Assert().NoError(err)
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *BuildStateTestSuite) TestBeginReturnsError() {
// Recreate mock in this single test to remove Rollback assertion.
*s.historyQ = mockDBQ{}
Expand Down
24 changes: 24 additions & 0 deletions services/horizon/internal/expingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ func (startState) run(s *System) (transition, error) {

switch {
case lastHistoryLedger > lastIngestedLedger:
// If ingesting into memory wait until other ingesting instance ingests state.
if s.config.IngestInMemoryOnly {
log.Info("Waiting for other ingesting instance to ingest into DB...")
return start(), nil
tamirms marked this conversation as resolved.
Show resolved Hide resolved
}

// Expingest was running at some point the past but was turned off.
// Now it's on by default but the latest history ledger is greater
// than the latest expingest ledger. We reset the exp ledger sequence
Expand Down Expand Up @@ -222,6 +228,12 @@ func (b buildState) run(s *System) (transition, error) {
return start(), errors.New("unexpected checkpointLedger value")
}

// If ingesting into memory wait until other ingesting instance ingests state.
if s.config.IngestInMemoryOnly {
log.Info("Waiting for other ingesting instance to ingest into DB...")
return start(), nil
}

if err := s.historyQ.Begin(); err != nil {
return start(), errors.Wrap(err, "Error starting a transaction")
}
Expand Down Expand Up @@ -426,6 +438,12 @@ func (r resumeState) run(s *System) (transition, error) {
return resumeImmediately(ingestLedger), nil
}

// If ingesting into memory wait until other ingesting instance updates DB.
if s.config.IngestInMemoryOnly {
log.Info("Waiting for other ingesting instance to ingest into DB...")
return retryResume(r), nil
}

log.WithFields(logpkg.F{
"sequence": ingestLedger,
"state": true,
Expand Down Expand Up @@ -483,6 +501,12 @@ func (h historyRangeState) String() string {

// historyRangeState is used when catching up history data
func (h historyRangeState) run(s *System) (transition, error) {
// If ingesting into memory wait until other ingesting instance ingests state.
if s.config.IngestInMemoryOnly {
log.Info("Waiting for other ingesting instance to ingest into DB...")
return start(), nil
}

if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ func (s *IngestHistoryRangeStateTestSuite) TearDownTest() {
s.runner.AssertExpectations(t)
}

func (s *IngestHistoryRangeStateTestSuite) TestIngestInMemory() {
// Recreate mock in this single test to remove Rollback assertion.
*s.historyQ = mockDBQ{}

s.system.config.IngestInMemoryOnly = true
defer func() {
s.system.config.IngestInMemoryOnly = false
}()

next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system)
s.Assert().NoError(err)
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *IngestHistoryRangeStateTestSuite) TestInvalidRange() {
// Recreate mock in this single test to remove Rollback assertion.
*s.historyQ = mockDBQ{}
Expand Down
22 changes: 21 additions & 1 deletion services/horizon/internal/expingest/init_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *InitStateTestSuite) TestBuildStateOldHistory() {
)
}

// TestResumeStateBehind is testing the case when:
// TestResumeStateInFront is testing the case when:
// * state doesn't need to be rebuilt,
// * history is in front of expingest.
func (s *InitStateTestSuite) TestResumeStateInFront() {
Expand All @@ -195,6 +195,26 @@ func (s *InitStateTestSuite) TestResumeStateInFront() {
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

// TestResumeStateInFrontInMemoryOnly is testing the case when:
// * state doesn't need to be rebuilt,
// * history is in front of expingest,
// * Horizon is ingesting into memory only.
func (s *InitStateTestSuite) TestResumeStateInFrontInMemoryOnly() {
s.system.config.IngestInMemoryOnly = true
defer func() {
s.system.config.IngestInMemoryOnly = false
}()

s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(100), nil).Once()
s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once()
s.historyQ.On("GetLatestLedger").Return(uint32(130), nil).Once()

next, err := startState{}.run(s.system)
s.Assert().NoError(err)
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

// TestResumeStateBehind is testing the case when:
// * state doesn't need to be rebuilt,
// * history is behind of expingest.
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Config struct {
MaxStreamRetries int

OrderBookGraph *orderbook.OrderBookGraph
IngestInMemoryOnly bool
IngestFailedTransactions bool
}

Expand Down
27 changes: 27 additions & 0 deletions services/horizon/internal/expingest/resume_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,33 @@ func (s *ResumeTestTestSuite) TestIngestOrderbookOnlyWhenLastLedgerExpEqualsCurr
)
}

// TestNewLedgerButInMemoryOnly tests a scenario when there's a new ledger to
// ingest into a DB but the instances is ingesting into memory only. In such
// case it should wait for another instance to ingest into a DB.
func (s *ResumeTestTestSuite) TestNewLedgerButInMemoryOnly() {
s.system.config.IngestInMemoryOnly = true
defer func() {
s.system.config.IngestInMemoryOnly = false
}()

s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(100), nil).Once()
s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once()
s.historyQ.On("GetLatestLedger").Return(uint32(0), nil)

s.ledgeBackend.On("GetLatestLedgerSequence").Return(uint32(111), nil).Once()

next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system)
s.Assert().NoError(err)
s.Assert().Equal(
transition{
node: resumeState{latestSuccessfullyProcessedLedger: 100},
sleepDuration: defaultSleep,
},
next,
)
}

func (s *ResumeTestTestSuite) TestIngestAllMasterNode() {
s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(100), nil).Once()
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func initExpIngester(app *App, orderBookGraph *orderbook.OrderBookGraph) {
MaxStreamRetries: 3,
DisableStateVerification: app.config.IngestDisableStateVerification,
IngestFailedTransactions: app.config.IngestFailedTransactions,
IngestInMemoryOnly: app.config.IngestInMemoryOnly,
})
if err != nil {
log.Fatal(err)
Expand Down