From 174dbf841fb9d403a6fb0860f9ca349231ff658b Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 20 Feb 2020 16:11:37 +0100 Subject: [PATCH 1/4] services/horizon/expingest: Ingest into memory only flag --- services/horizon/cmd/root.go | 7 +++++ services/horizon/internal/config.go | 2 ++ .../internal/expingest/build_state_test.go | 17 ++++++++++++ services/horizon/internal/expingest/fsm.go | 11 ++++++++ services/horizon/internal/expingest/main.go | 1 + .../internal/expingest/resume_state_test.go | 27 +++++++++++++++++++ services/horizon/internal/init.go | 1 + 7 files changed, 66 insertions(+) diff --git a/services/horizon/cmd/root.go b/services/horizon/cmd/root.go index 98df9ef378..9062cd72a5 100644 --- a/services/horizon/cmd/root.go +++ b/services/horizon/cmd/root.go @@ -309,6 +309,13 @@ var configOpts = support.ConfigOptions{ FlagDefault: false, Usage: "causes this horizon process to ingest data from stellar-core into horizon's db", }, + &support.ConfigOption{ + Name: "ingest-in-memory-only", + ConfigKey: &config.IngestInMemoryOnly, + OptType: types.Bool, + FlagDefault: false, + Usage: "causes this horizon process to ingest data from stellar-core into memory structures only, ignored when --ingest not set", + }, &support.ConfigOption{ Name: "ingest-failed-transactions", ConfigKey: &config.IngestFailedTransactions, diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 1bbebfc5dc..15f5a01c59 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -45,6 +45,8 @@ type Config struct { TLSKey string // Ingest toggles whether this horizon instance should run the data ingestion subsystem. Ingest bool + // IngestInMemoryOnly causes this horizon process to ingest data from stellar-core into memory structures only. + IngestInMemoryOnly bool // IngestFailedTransactions toggles whether to ingest failed transactions IngestFailedTransactions bool // CursorName is the cursor used for ingesting from stellar-core. diff --git a/services/horizon/internal/expingest/build_state_test.go b/services/horizon/internal/expingest/build_state_test.go index 5e6f640e09..b482107a1b 100644 --- a/services/horizon/internal/expingest/build_state_test.go +++ b/services/horizon/internal/expingest/build_state_test.go @@ -86,6 +86,23 @@ func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } +func (s *BuildStateTestSuite) TestIngestInMemoryOnly() { + // Recreate mock in this single test to remove Rollback assertion. + *s.historyQ = mockDBQ{} + + // Recreate orderbook graph to remove Discard assertion + *s.graph = mockOrderBookGraph{} + + s.system.config.IngestInMemoryOnly = true + defer func() { + s.system.config.IngestInMemoryOnly = false + }() + + next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) + s.Assert().NoError(err) + s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) +} + func (s *BuildStateTestSuite) TestBeginReturnsError() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} diff --git a/services/horizon/internal/expingest/fsm.go b/services/horizon/internal/expingest/fsm.go index 21ed2d90d3..5646043e85 100644 --- a/services/horizon/internal/expingest/fsm.go +++ b/services/horizon/internal/expingest/fsm.go @@ -222,6 +222,11 @@ func (b buildState) run(s *System) (transition, error) { return start(), errors.New("unexpected checkpointLedger value") } + // If ingesting into memory wait until other ingesting instance ingests state. + if s.config.IngestInMemoryOnly { + return start(), nil + } + if err := s.historyQ.Begin(); err != nil { return start(), errors.Wrap(err, "Error starting a transaction") } @@ -426,6 +431,12 @@ func (r resumeState) run(s *System) (transition, error) { return resumeImmediately(ingestLedger), nil } + // If ingesting into memory wait until other ingesting instance updates DB. + if s.config.IngestInMemoryOnly { + log.Info("Waiting for other ingesting instance to ingest into DB...") + return retryResume(r), nil + } + log.WithFields(logpkg.F{ "sequence": ingestLedger, "state": true, diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 80ad09eaf0..0e1c7799f9 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -66,6 +66,7 @@ type Config struct { MaxStreamRetries int OrderBookGraph *orderbook.OrderBookGraph + IngestInMemoryOnly bool IngestFailedTransactions bool } diff --git a/services/horizon/internal/expingest/resume_state_test.go b/services/horizon/internal/expingest/resume_state_test.go index 88144195ec..f409aadbd5 100644 --- a/services/horizon/internal/expingest/resume_state_test.go +++ b/services/horizon/internal/expingest/resume_state_test.go @@ -264,6 +264,33 @@ func (s *ResumeTestTestSuite) TestIngestOrderbookOnlyWhenLastLedgerExpEqualsCurr ) } +// TestNewLedgerButInMemoryOnly tests a scenario when there's a new ledger to +// ingest into a DB but the instances is ingesting into memory only. In such +// case it should wait for another instance to ingest into a DB. +func (s *ResumeTestTestSuite) TestNewLedgerButInMemoryOnly() { + s.system.config.IngestInMemoryOnly = true + defer func() { + s.system.config.IngestInMemoryOnly = false + }() + + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(100), nil).Once() + s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once() + s.historyQ.On("GetLatestLedger").Return(uint32(0), nil) + + s.ledgeBackend.On("GetLatestLedgerSequence").Return(uint32(111), nil).Once() + + next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) + s.Assert().NoError(err) + s.Assert().Equal( + transition{ + node: resumeState{latestSuccessfullyProcessedLedger: 100}, + sleepDuration: defaultSleep, + }, + next, + ) +} + func (s *ResumeTestTestSuite) TestIngestAllMasterNode() { s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(100), nil).Once() diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 3e6e360502..8fe2e0ccf7 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -59,6 +59,7 @@ func initExpIngester(app *App, orderBookGraph *orderbook.OrderBookGraph) { MaxStreamRetries: 3, DisableStateVerification: app.config.IngestDisableStateVerification, IngestFailedTransactions: app.config.IngestFailedTransactions, + IngestInMemoryOnly: app.config.IngestInMemoryOnly, }) if err != nil { log.Fatal(err) From 26fcc860b2ebbe13e20c2562c7f09d87d980a663 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 20 Feb 2020 16:20:01 +0100 Subject: [PATCH 2/4] gofmt --- services/horizon/internal/expingest/main.go | 2 +- services/horizon/internal/init.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 0e1c7799f9..cb4993da39 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -66,7 +66,7 @@ type Config struct { MaxStreamRetries int OrderBookGraph *orderbook.OrderBookGraph - IngestInMemoryOnly bool + IngestInMemoryOnly bool IngestFailedTransactions bool } diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 8fe2e0ccf7..6a463de8e9 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -59,7 +59,7 @@ func initExpIngester(app *App, orderBookGraph *orderbook.OrderBookGraph) { MaxStreamRetries: 3, DisableStateVerification: app.config.IngestDisableStateVerification, IngestFailedTransactions: app.config.IngestFailedTransactions, - IngestInMemoryOnly: app.config.IngestInMemoryOnly, + IngestInMemoryOnly: app.config.IngestInMemoryOnly, }) if err != nil { log.Fatal(err) From 8d0d358507a0eea2e98d644f8902dc9925e6afef Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 20 Feb 2020 22:09:12 +0100 Subject: [PATCH 3/4] No DB writes when ingesting in memory --- services/horizon/internal/expingest/fsm.go | 13 +++++++++++ .../ingest_history_range_state_test.go | 14 ++++++++++++ .../internal/expingest/init_state_test.go | 22 ++++++++++++++++++- 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/expingest/fsm.go b/services/horizon/internal/expingest/fsm.go index 5646043e85..3fa932044b 100644 --- a/services/horizon/internal/expingest/fsm.go +++ b/services/horizon/internal/expingest/fsm.go @@ -172,6 +172,12 @@ func (startState) run(s *System) (transition, error) { switch { case lastHistoryLedger > lastIngestedLedger: + // If ingesting into memory wait until other ingesting instance ingests state. + if s.config.IngestInMemoryOnly { + log.Info("Waiting for other ingesting instance to ingest into DB...") + return start(), nil + } + // Expingest was running at some point the past but was turned off. // Now it's on by default but the latest history ledger is greater // than the latest expingest ledger. We reset the exp ledger sequence @@ -224,6 +230,7 @@ func (b buildState) run(s *System) (transition, error) { // If ingesting into memory wait until other ingesting instance ingests state. if s.config.IngestInMemoryOnly { + log.Info("Waiting for other ingesting instance to ingest into DB...") return start(), nil } @@ -494,6 +501,12 @@ func (h historyRangeState) String() string { // historyRangeState is used when catching up history data func (h historyRangeState) run(s *System) (transition, error) { + // If ingesting into memory wait until other ingesting instance ingests state. + if s.config.IngestInMemoryOnly { + log.Info("Waiting for other ingesting instance to ingest into DB...") + return start(), nil + } + if h.fromLedger == 0 || h.toLedger == 0 || h.fromLedger > h.toLedger { return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) diff --git a/services/horizon/internal/expingest/ingest_history_range_state_test.go b/services/horizon/internal/expingest/ingest_history_range_state_test.go index 5a6daff817..e9c279d1dd 100644 --- a/services/horizon/internal/expingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/expingest/ingest_history_range_state_test.go @@ -45,6 +45,20 @@ func (s *IngestHistoryRangeStateTestSuite) TearDownTest() { s.runner.AssertExpectations(t) } +func (s *IngestHistoryRangeStateTestSuite) TestIngestInMemory() { + // Recreate mock in this single test to remove Rollback assertion. + *s.historyQ = mockDBQ{} + + s.system.config.IngestInMemoryOnly = true + defer func() { + s.system.config.IngestInMemoryOnly = false + }() + + next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) + s.Assert().NoError(err) + s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) +} + func (s *IngestHistoryRangeStateTestSuite) TestInvalidRange() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} diff --git a/services/horizon/internal/expingest/init_state_test.go b/services/horizon/internal/expingest/init_state_test.go index 89e04c2992..07d89cd085 100644 --- a/services/horizon/internal/expingest/init_state_test.go +++ b/services/horizon/internal/expingest/init_state_test.go @@ -178,7 +178,7 @@ func (s *InitStateTestSuite) TestBuildStateOldHistory() { ) } -// TestResumeStateBehind is testing the case when: +// TestResumeStateInFront is testing the case when: // * state doesn't need to be rebuilt, // * history is in front of expingest. func (s *InitStateTestSuite) TestResumeStateInFront() { @@ -195,6 +195,26 @@ func (s *InitStateTestSuite) TestResumeStateInFront() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } +// TestResumeStateInFrontInMemoryOnly is testing the case when: +// * state doesn't need to be rebuilt, +// * history is in front of expingest, +// * Horizon is ingesting into memory only. +func (s *InitStateTestSuite) TestResumeStateInFrontInMemoryOnly() { + s.system.config.IngestInMemoryOnly = true + defer func() { + s.system.config.IngestInMemoryOnly = false + }() + + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(100), nil).Once() + s.historyQ.On("GetExpIngestVersion").Return(CurrentVersion, nil).Once() + s.historyQ.On("GetLatestLedger").Return(uint32(130), nil).Once() + + next, err := startState{}.run(s.system) + s.Assert().NoError(err) + s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) +} + // TestResumeStateBehind is testing the case when: // * state doesn't need to be rebuilt, // * history is behind of expingest. From 951c73063940dde6c2b52ba273d29a4c62b7015e Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Fri, 21 Feb 2020 13:13:52 +0100 Subject: [PATCH 4/4] Change flag to be experimental --- services/horizon/cmd/root.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/horizon/cmd/root.go b/services/horizon/cmd/root.go index 9062cd72a5..be56bd6166 100644 --- a/services/horizon/cmd/root.go +++ b/services/horizon/cmd/root.go @@ -310,11 +310,11 @@ var configOpts = support.ConfigOptions{ Usage: "causes this horizon process to ingest data from stellar-core into horizon's db", }, &support.ConfigOption{ - Name: "ingest-in-memory-only", + Name: "exp-ingest-in-memory-only", ConfigKey: &config.IngestInMemoryOnly, OptType: types.Bool, FlagDefault: false, - Usage: "causes this horizon process to ingest data from stellar-core into memory structures only, ignored when --ingest not set", + Usage: "[experimental flag!] causes this horizon process to ingest data from stellar-core into memory structures only, ignored when --ingest not set", }, &support.ConfigOption{ Name: "ingest-failed-transactions",