From a7e2ab64d41d8d8359779253b13b1a5c13f011e1 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 10 Jan 2024 10:44:09 -0800 Subject: [PATCH 1/2] #5153: removed cursor update against core during ledger ingestion and unused core db related config flags --- services/horizon/CHANGELOG.md | 5 +- services/horizon/cmd/db.go | 1 - services/horizon/internal/config.go | 8 -- services/horizon/internal/flags.go | 28 ++++--- services/horizon/internal/flags_test.go | 70 +++++++++++++++++ .../internal/ingest/build_state_test.go | 32 -------- .../internal/ingest/db_integration_test.go | 1 - services/horizon/internal/ingest/fsm.go | 19 ----- services/horizon/internal/ingest/main.go | 32 +------- services/horizon/internal/ingest/main_test.go | 1 - services/horizon/internal/ingest/parallel.go | 3 - .../internal/ingest/resume_state_test.go | 68 ---------------- services/horizon/internal/init.go | 3 - .../internal/integration/parameters_test.go | 78 ------------------- services/horizon/internal/test/db/main.go | 6 ++ services/horizon/internal/test/main.go | 11 +-- services/horizon/internal/test/t.go | 14 +--- 17 files changed, 106 insertions(+), 274 deletions(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index c3c5705464..962fca7128 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -8,7 +8,10 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ### Added - Add a deprecation warning for using command-line flags when running Horizon ([5051](https://github.com/stellar/go/pull/5051)) -- Deprecate configuration flags related to legacy non-captive core ingestion ([5100](https://github.com/stellar/go/pull/5100)) + +### Breaking Changes +- Removed configuration flags `--stellar-core-url-db`, `--cursor-name` `--skip-cursor-update` , they were related to legacy non-captive core ingestion and are no longer usable. + ## 2.27.0 ### Fixed diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index a83597932e..501c3d194d 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -416,7 +416,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, RemoteCaptiveCoreURL: config.RemoteCaptiveCoreURL, CaptiveCoreToml: config.CaptiveCoreToml, CaptiveCoreStoragePath: config.CaptiveCoreStoragePath, - StellarCoreCursor: config.CursorName, StellarCoreURL: config.StellarCoreURL, RoundingSlippageFilter: config.RoundingSlippageFilter, EnableIngestionFiltering: config.EnableIngestionFiltering, diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 1cc14b4900..6300c227ae 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -68,11 +68,6 @@ type Config struct { TLSKey string // Ingest toggles whether this horizon instance should run the data ingestion subsystem. Ingest bool - // CursorName is the cursor used for ingesting from stellar-core. - // Setting multiple cursors in different Horizon instances allows multiple - // Horizons to ingest from the same stellar-core instance without cursor - // collisions. - CursorName string // HistoryRetentionCount represents the minimum number of ledgers worth of // history data to retain in the horizon database. For the purposes of // determining a "retention duration", each ledger roughly corresponds to 10 @@ -82,9 +77,6 @@ type Config struct { // out-of-date by before horizon begins to respond with an error to history // requests. StaleThreshold uint - // SkipCursorUpdate causes the ingestor to skip reporting the "last imported - // ledger" state to stellar-core. - SkipCursorUpdate bool // IngestDisableStateVerification disables state verification // `System.verifyState()` when set to `true`. IngestDisableStateVerification bool diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index e2783680fd..40bfc08afe 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -338,11 +338,7 @@ func Flags() (*Config, support.ConfigOptions) { Hidden: true, CustomSetValue: func(opt *support.ConfigOption) error { if val := viper.GetString(opt.Name); val != "" { - stdLog.Printf( - "DEPRECATED - The usage of the flag --stellar-core-db-url has been deprecated. " + - "Horizon now uses Captive-Core ingestion by default and this flag will soon be removed in " + - "the future.", - ) + return fmt.Errorf("flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion") } return nil }, @@ -595,11 +591,15 @@ func Flags() (*Config, support.ConfigOptions) { &support.ConfigOption{ Name: "cursor-name", EnvVar: "CURSOR_NAME", - ConfigKey: &config.CursorName, OptType: types.String, - FlagDefault: "HORIZON", - Usage: "ingestor cursor used by horizon to ingest from stellar core. must be uppercase and unique for each horizon instance ingesting from that core instance.", + Hidden: true, UsedInCommands: IngestionCommands, + CustomSetValue: func(opt *support.ConfigOption) error { + if val := viper.GetString(opt.Name); val != "" { + return fmt.Errorf("flag --cursor-name has been removed and no longer valid, must use captive core configuration for ingestion") + } + return nil + }, }, &support.ConfigOption{ Name: "history-retention-count", @@ -619,11 +619,15 @@ func Flags() (*Config, support.ConfigOptions) { }, &support.ConfigOption{ Name: "skip-cursor-update", - ConfigKey: &config.SkipCursorUpdate, - OptType: types.Bool, - FlagDefault: false, - Usage: "causes the ingester to skip reporting the last imported ledger state to stellar-core", + OptType: types.String, + Hidden: true, UsedInCommands: IngestionCommands, + CustomSetValue: func(opt *support.ConfigOption) error { + if val := viper.GetString(opt.Name); val != "" { + return fmt.Errorf("flag --skip-cursor-update has been removed and no longer valid, must use captive core configuration for ingestion") + } + return nil + }, }, &support.ConfigOption{ Name: "ingest-disable-state-verification", diff --git a/services/horizon/internal/flags_test.go b/services/horizon/internal/flags_test.go index b2e617bc00..ef2d5d3a02 100644 --- a/services/horizon/internal/flags_test.go +++ b/services/horizon/internal/flags_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/spf13/cobra" + "github.com/stellar/go/services/horizon/internal/test" "github.com/stretchr/testify/assert" @@ -259,3 +260,72 @@ func TestEnvironmentVariables(t *testing.T) { assert.Equal(t, config.CaptiveCoreConfigPath, "../docker/captive-core-classic-integration-tests.cfg") assert.Equal(t, config.CaptiveCoreConfigUseDB, true) } + +func TestRemovedFlags(t *testing.T) { + tests := []struct { + name string + environmentVars map[string]string + errStr string + cmdArgs []string + }{ + { + name: "STELLAR_CORE_DATABASE_URL removed", + environmentVars: map[string]string{ + "INGEST": "false", + "STELLAR_CORE_DATABASE_URL": "coredb", + "DATABASE_URL": "dburl", + }, + errStr: "flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion", + }, + { + name: "--stellar-core-db-url removed", + environmentVars: map[string]string{ + "INGEST": "false", + "DATABASE_URL": "dburl", + }, + errStr: "flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion", + cmdArgs: []string{"--stellar-core-db-url=coredb"}, + }, + { + name: "CURSOR_NAME removed", + environmentVars: map[string]string{ + "INGEST": "false", + "CURSOR_NAME": "cursor", + "DATABASE_URL": "dburl", + }, + errStr: "flag --cursor-name has been removed and no longer valid, must use captive core configuration for ingestion", + }, + { + name: "SKIP_CURSOR_UPDATE removed", + environmentVars: map[string]string{ + "INGEST": "false", + "SKIP_CURSOR_UPDATE": "true", + "DATABASE_URL": "dburl", + }, + errStr: "flag --skip-cursor-update has been removed and no longer valid, must use captive core configuration for ingestion", + }, + } + + envManager := test.NewEnvironmentManager() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + envManager.Restore() + }() + err := envManager.InitializeEnvironmentVariables(tt.environmentVars) + require.NoError(t, err) + + config, flags := Flags() + testCmd := &cobra.Command{ + Use: "test", + } + + require.NoError(t, flags.Init(testCmd)) + require.NoError(t, testCmd.ParseFlags(tt.cmdArgs)) + + err = ApplyFlags(config, flags, ApplyOptions{}) + require.Error(t, err) + assert.Equal(t, tt.errStr, err.Error()) + }) + } +} diff --git a/services/horizon/internal/ingest/build_state_test.go b/services/horizon/internal/ingest/build_state_test.go index 7e03818795..d1409182d9 100644 --- a/services/horizon/internal/ingest/build_state_test.go +++ b/services/horizon/internal/ingest/build_state_test.go @@ -10,7 +10,6 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -83,12 +82,6 @@ func (s *BuildStateTestSuite) mockCommonHistoryQ() { s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(nil).Once() s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once() s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once() - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(62), - ).Return(nil).Once() } func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() { @@ -175,12 +168,6 @@ func (s *BuildStateTestSuite) TestUpdateLastLedgerIngestReturnsError() { s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(s.lastLedger, nil).Once() s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(errors.New("my error")).Once() - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(62), - ).Return(nil).Once() next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) @@ -194,12 +181,6 @@ func (s *BuildStateTestSuite) TestUpdateExpStateInvalidReturnsError() { s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(nil).Once() s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(errors.New("my error")).Once() - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(62), - ).Return(nil).Once() next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) @@ -215,13 +196,6 @@ func (s *BuildStateTestSuite) TestTruncateIngestStateTablesReturnsError() { s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once() s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(errors.New("my error")).Once() - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(62), - ).Return(nil).Once() - next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) s.Assert().Error(err) @@ -251,12 +225,6 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionGenesisReturnsError( s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(0)).Return(nil).Once() s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once() s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once() - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(0), - ).Return(nil).Once() s.runner. On("RunGenesisStateIngestion"). diff --git a/services/horizon/internal/ingest/db_integration_test.go b/services/horizon/internal/ingest/db_integration_test.go index 86576db137..60a45f158e 100644 --- a/services/horizon/internal/ingest/db_integration_test.go +++ b/services/horizon/internal/ingest/db_integration_test.go @@ -81,7 +81,6 @@ func (s *DBTestSuite) SetupTest() { s.historyAdapter = &mockHistoryArchiveAdapter{} var err error sIface, err := NewSystem(Config{ - CoreSession: s.tt.CoreSession(), HistorySession: s.tt.HorizonSession(), HistoryArchiveURLs: []string{"http://ignore.test"}, DisableStateVerification: false, diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index f5b4f94456..3cc6d31c7d 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -326,11 +326,6 @@ func (b buildState) run(s *system) (transition, error) { return nextFailState, nil } - if err = s.updateCursor(b.checkpointLedger - 1); err != nil { - // Don't return updateCursor error. - log.WithError(err).Warn("error updating stellar-core cursor") - } - log.Info("Starting ingestion system from empty state...") // Clear last_ingested_ledger in key value store @@ -454,14 +449,6 @@ func (r resumeState) run(s *system) (transition, error) { WithField("lastIngestedLedger", lastIngestedLedger). Info("bumping ingest ledger to next ledger after ingested ledger in db") - // Update cursor if there's more than one ingesting instance: either - // Captive-Core or DB ingestion connected to another Stellar-Core. - // remove now? - if err = s.updateCursor(lastIngestedLedger); err != nil { - // Don't return updateCursor error. - log.WithError(err).Warn("error updating stellar-core cursor") - } - // resume immediately so Captive-Core catchup is not slowed down return resumeImmediately(lastIngestedLedger), nil } @@ -522,12 +509,6 @@ func (r resumeState) run(s *system) (transition, error) { return retryResume(r), err } - //TODO remove now? stellar-core-db-url is removed - if err = s.updateCursor(ingestLedger); err != nil { - // Don't return updateCursor error. - log.WithError(err).Warn("error updating stellar-core cursor") - } - duration = time.Since(startTime).Seconds() s.Metrics().LedgerIngestionDuration.Observe(float64(duration)) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 2cf067441e..3b4dc0ef1a 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -79,9 +79,7 @@ const ( var log = logpkg.DefaultLogger.WithField("service", "ingest") type Config struct { - CoreSession db.SessionInterface StellarCoreURL string - StellarCoreCursor string CaptiveCoreBinaryPath string CaptiveCoreStoragePath string CaptiveCoreToml *ledgerbackend.CaptiveCoreToml @@ -252,7 +250,8 @@ func NewSystem(config Config) (System, error) { cancel() return nil, errors.Wrap(err, "error creating captive core backend") } - } else if config.LocalCaptiveCoreEnabled() { + } else { + // the only other option is local captive core config logger := log.WithField("subservice", "stellar-core") ledgerBackend, err = ledgerbackend.NewCaptive( ledgerbackend.CaptiveCoreConfig{ @@ -273,13 +272,6 @@ func NewSystem(config Config) (System, error) { cancel() return nil, errors.Wrap(err, "error creating captive core backend") } - } else { - coreSession := config.CoreSession.Clone() - ledgerBackend, err = ledgerbackend.NewDatabaseBackendFromSession(coreSession, config.NetworkPassphrase) - if err != nil { - cancel() - return nil, errors.Wrap(err, "error creating ledger backend") - } } historyQ := &history.Q{config.HistorySession.Clone()} @@ -752,26 +744,6 @@ func (s *system) resetStateVerificationErrors() { s.stateVerificationErrors = 0 } -func (s *system) updateCursor(ledgerSequence uint32) error { - if s.stellarCoreClient == nil { - return nil - } - - cursor := defaultCoreCursorName - if s.config.StellarCoreCursor != "" { - cursor = s.config.StellarCoreCursor - } - - ctx, cancel := context.WithTimeout(s.ctx, time.Second) - defer cancel() - err := s.stellarCoreClient.SetCursor(ctx, cursor, int32(ledgerSequence)) - if err != nil { - return errors.Wrap(err, "Setting stellar-core cursor failed") - } - - return nil -} - func (s *system) Shutdown() { log.Info("Shutting down ingestion system...") s.stateVerificationMutex.Lock() diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 55860eeaff..460c27e062 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -90,7 +90,6 @@ func TestLedgerEligibleForStateVerification(t *testing.T) { func TestNewSystem(t *testing.T) { config := Config{ - CoreSession: &db.Session{DB: &sqlx.DB{}}, HistorySession: &db.Session{DB: &sqlx.DB{}}, DisableStateVerification: true, HistoryArchiveURLs: []string{"https://history.stellar.org/prd/core-live/core_live_001"}, diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index b3c163689d..525f153b81 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -52,9 +52,6 @@ func (ps *ParallelSystems) Shutdown() { if ps.config.HistorySession != nil { ps.config.HistorySession.Close() } - if ps.config.CoreSession != nil { - ps.config.CoreSession.Close() - } } func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, reingestJobQueue <-chan history.LedgerRange) rangeError { diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index 82a7869d4b..013f176ae8 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -273,14 +273,6 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(101)).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() - - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(101), - ).Return(nil).Once() - s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() } func (s *ResumeTestTestSuite) TestBumpIngestLedger() { @@ -303,13 +295,6 @@ func (s *ResumeTestTestSuite) TestBumpIngestLedger() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(101), nil).Once() - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(101), - ).Return(errors.New("my error")).Once() - next, err := resumeState{latestSuccessfullyProcessedLedger: 99}.run(s.system) s.Assert().NoError(err) s.Assert().Equal( @@ -335,45 +320,6 @@ func (s *ResumeTestTestSuite) TestIngestAllMasterNode() { ) } -func (s *ResumeTestTestSuite) TestErrorSettingCursorIgnored() { - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() - s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() - s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(100), nil) - - s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")). - Run(func(args mock.Arguments) { - meta := args.Get(0).(xdr.LedgerCloseMeta) - s.Assert().Equal(uint32(101), meta.LedgerSequence()) - }). - Return( - ledgerStats{}, - nil, - ).Once() - s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(101)).Return(nil).Once() - s.historyQ.On("Commit").Return(nil).Once() - - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(101), - ).Return(errors.New("my error")).Once() - - s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() - s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() - - next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) - s.Assert().NoError(err) - s.Assert().Equal( - transition{ - node: resumeState{latestSuccessfullyProcessedLedger: 101}, - sleepDuration: 0, - }, - next, - ) -} - func (s *ResumeTestTestSuite) TestRebuildTradeAggregationBucketsError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() @@ -422,13 +368,6 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(101)).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(101), - ).Return(nil).Once() - s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() // Reap lookup tables not executed @@ -466,13 +405,6 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(101)).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.stellarCoreClient.On( - "SetCursor", - mock.AnythingOfType("*context.timerCtx"), - defaultCoreCursorName, - int32(101), - ).Return(nil).Once() - s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() // Reap lookup tables: diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 5d38c86ccf..7ae0e1a4b9 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -91,9 +91,7 @@ func mustInitHorizonDB(app *App) { func initIngester(app *App) { var err error - var coreSession db.SessionInterface app.ingester, err = ingest.NewSystem(ingest.Config{ - CoreSession: coreSession, HistorySession: mustNewDBSession( db.IngestSubservice, app.config.DatabaseURL, ingest.MaxDBConnections, ingest.MaxDBConnections, app.prometheusRegistry, ), @@ -101,7 +99,6 @@ func initIngester(app *App) { HistoryArchiveURLs: app.config.HistoryArchiveURLs, CheckpointFrequency: app.config.CheckpointFrequency, StellarCoreURL: app.config.StellarCoreURL, - StellarCoreCursor: app.config.CursorName, CaptiveCoreBinaryPath: app.config.CaptiveCoreBinaryPath, CaptiveCoreStoragePath: app.config.CaptiveCoreStoragePath, CaptiveCoreConfigUseDB: app.config.CaptiveCoreConfigUseDB, diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index 97fab268bc..ebe3c3bfda 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -541,84 +541,6 @@ func TestDeprecatedOutputs(t *testing.T) { "Configuring section in the developer documentation on how to use them - "+ "https://developers.stellar.org/docs/run-api-server/configuring") }) - t.Run("deprecated output for --stellar-core-db-url and --enable-captive-core-ingestion", func(t *testing.T) { - originalStderr := os.Stderr - r, w, _ := os.Pipe() - os.Stderr = w - stdLog.SetOutput(os.Stderr) - - testConfig := integration.GetTestConfig() - testConfig.HorizonIngestParameters = map[string]string{ - "stellar-core-db-url": "temp-url", - "enable-captive-core-ingestion": "true", - } - test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() - assert.NoError(t, err) - test.WaitForHorizon() - - // Use a wait group to wait for the goroutine to finish before proceeding - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if err := w.Close(); err != nil { - t.Errorf("Failed to close Stdout") - return - } - }() - - outputBytes, _ := io.ReadAll(r) - wg.Wait() // Wait for the goroutine to finish before proceeding - _ = r.Close() - os.Stderr = originalStderr - - assert.Contains(t, string(outputBytes), "DEPRECATED - The usage of the flag --stellar-core-db-url has been deprecated. "+ - "Horizon now uses Captive-Core ingestion by default and this flag will soon be removed in "+ - "the future.") - assert.Contains(t, string(outputBytes), "DEPRECATED - The usage of the flag --enable-captive-core-ingestion has been deprecated. "+ - "Horizon now uses Captive-Core ingestion by default and this flag will soon be removed in "+ - "the future.") - }) - t.Run("deprecated output for env vars STELLAR_CORE_DATABASE_URL and ENABLE_CAPTIVE_CORE_INGESTION", func(t *testing.T) { - originalStderr := os.Stderr - r, w, _ := os.Pipe() - os.Stderr = w - stdLog.SetOutput(os.Stderr) - - testConfig := integration.GetTestConfig() - testConfig.HorizonEnvironment = map[string]string{ - "STELLAR_CORE_DATABASE_URL": "temp-url", - "ENABLE_CAPTIVE_CORE_INGESTION": "true", - } - test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() - assert.NoError(t, err) - test.WaitForHorizon() - - // Use a wait group to wait for the goroutine to finish before proceeding - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if err := w.Close(); err != nil { - t.Errorf("Failed to close Stdout") - return - } - }() - - outputBytes, _ := io.ReadAll(r) - wg.Wait() // Wait for the goroutine to finish before proceeding - _ = r.Close() - os.Stderr = originalStderr - - assert.Contains(t, string(outputBytes), "DEPRECATED - The usage of the flag --stellar-core-db-url has been deprecated. "+ - "Horizon now uses Captive-Core ingestion by default and this flag will soon be removed in "+ - "the future.") - assert.Contains(t, string(outputBytes), "DEPRECATED - The usage of the flag --enable-captive-core-ingestion has been deprecated. "+ - "Horizon now uses Captive-Core ingestion by default and this flag will soon be removed in "+ - "the future.") - }) } func TestGlobalFlagsOutput(t *testing.T) { diff --git a/services/horizon/internal/test/db/main.go b/services/horizon/internal/test/db/main.go index 4156ec25fb..6114a677ff 100644 --- a/services/horizon/internal/test/db/main.go +++ b/services/horizon/internal/test/db/main.go @@ -29,6 +29,8 @@ func horizonPostgres(t *testing.T) *db.DB { return horizonDB } +// TODO, remove refs to internal core db, need to remove scenario tests which require this +// to seed core db. func corePostgres(t *testing.T) *db.DB { if coreDB != nil { return coreDB @@ -60,6 +62,8 @@ func HorizonROURL() string { return horizonDB.RO_DSN } +// TODO, remove refs to core db, need to remove scenario tests which require this +// to seed core db. func StellarCore(t *testing.T) *sqlx.DB { if coreDBConn != nil { return coreDBConn @@ -68,6 +72,8 @@ func StellarCore(t *testing.T) *sqlx.DB { return coreDBConn } +// TODO, remove refs to core db, need to remove scenario tests which require this +// to seed core db. func StellarCoreURL() string { if coreDB == nil { log.Panic(fmt.Errorf("StellarCore not initialized")) diff --git a/services/horizon/internal/test/main.go b/services/horizon/internal/test/main.go index fea814b4c3..93ed4a94db 100644 --- a/services/horizon/internal/test/main.go +++ b/services/horizon/internal/test/main.go @@ -25,11 +25,12 @@ type StaticMockServer struct { // T provides a common set of functionality for each test in horizon type T struct { - T *testing.T - Assert *assert.Assertions - Require *require.Assertions - Ctx context.Context - HorizonDB *sqlx.DB + T *testing.T + Assert *assert.Assertions + Require *require.Assertions + Ctx context.Context + HorizonDB *sqlx.DB + //TODO - remove ref to core db once scenario tests are removed. CoreDB *sqlx.DB EndLogTest func() []logrus.Entry } diff --git a/services/horizon/internal/test/t.go b/services/horizon/internal/test/t.go index c2a75da986..2f86f70565 100644 --- a/services/horizon/internal/test/t.go +++ b/services/horizon/internal/test/t.go @@ -18,7 +18,7 @@ import ( "github.com/stellar/go/support/render/hal" ) -// CoreSession returns a db.Session instance pointing at the stellar core test database +// TODO - remove ref to core db once scenario tests are removed. func (t *T) CoreSession() *db.Session { return &db.Session{ DB: t.CoreDB, @@ -143,17 +143,7 @@ func (t *T) UnmarshalExtras(r io.Reader) map[string]string { func (t *T) LoadLedgerStatus() ledger.Status { var next ledger.Status - err := t.CoreSession().GetRaw(t.Ctx, &next, ` - SELECT - COALESCE(MAX(ledgerseq), 0) as core_latest - FROM ledgerheaders - `) - - if err != nil { - panic(err) - } - - err = t.HorizonSession().GetRaw(t.Ctx, &next, ` + err := t.HorizonSession().GetRaw(t.Ctx, &next, ` SELECT COALESCE(MIN(sequence), 0) as history_elder, COALESCE(MAX(sequence), 0) as history_latest From a781a11ea006e418491043bb5482fed073ebcb34 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 11 Jan 2024 11:07:06 -0800 Subject: [PATCH 2/2] #5153: removed remotecaptivecoreurl config var, it's no longer referenced --- services/horizon/cmd/db.go | 1 - services/horizon/cmd/ingest.go | 3 -- services/horizon/internal/config.go | 1 - services/horizon/internal/ingest/main.go | 63 ++++++++---------------- services/horizon/internal/init.go | 1 - 5 files changed, 20 insertions(+), 49 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 501c3d194d..a0d0e6c518 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -413,7 +413,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, ReingestRetryBackoffSeconds: int(retryBackoffSeconds), CaptiveCoreBinaryPath: config.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: config.CaptiveCoreConfigUseDB, - RemoteCaptiveCoreURL: config.RemoteCaptiveCoreURL, CaptiveCoreToml: config.CaptiveCoreToml, CaptiveCoreStoragePath: config.CaptiveCoreStoragePath, StellarCoreURL: config.StellarCoreURL, diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index e2d38977ab..3833dba7fd 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -130,7 +130,6 @@ var ingestVerifyRangeCmd = &cobra.Command{ HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, - RemoteCaptiveCoreURL: globalConfig.RemoteCaptiveCoreURL, CheckpointFrequency: globalConfig.CheckpointFrequency, CaptiveCoreToml: globalConfig.CaptiveCoreToml, CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath, @@ -213,7 +212,6 @@ var ingestStressTestCmd = &cobra.Command{ HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, - RemoteCaptiveCoreURL: globalConfig.RemoteCaptiveCoreURL, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, } @@ -353,7 +351,6 @@ var ingestBuildStateCmd = &cobra.Command{ HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, - RemoteCaptiveCoreURL: globalConfig.RemoteCaptiveCoreURL, CheckpointFrequency: globalConfig.CheckpointFrequency, CaptiveCoreToml: globalConfig.CaptiveCoreToml, CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath, diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 6300c227ae..7454f52bb7 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -21,7 +21,6 @@ type Config struct { EnableIngestionFiltering bool CaptiveCoreBinaryPath string - RemoteCaptiveCoreURL string CaptiveCoreConfigPath string CaptiveCoreTomlParams ledgerbackend.CaptiveCoreTomlParams CaptiveCoreToml *ledgerbackend.CaptiveCoreToml diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 3b4dc0ef1a..13f7017cbf 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -84,7 +84,6 @@ type Config struct { CaptiveCoreStoragePath string CaptiveCoreToml *ledgerbackend.CaptiveCoreToml CaptiveCoreConfigUseDB bool - RemoteCaptiveCoreURL string NetworkPassphrase string HistorySession db.SessionInterface @@ -109,19 +108,6 @@ type Config struct { MaxLedgerPerFlush uint32 } -// LocalCaptiveCoreEnabled returns true if configured to run -// a local captive core instance for ingestion. -func (c Config) LocalCaptiveCoreEnabled() bool { - // c.RemoteCaptiveCoreURL is always empty when running local captive core. - return c.RemoteCaptiveCoreURL == "" -} - -// RemoteCaptiveCoreEnabled returns true if configured to run -// a remote captive core instance for ingestion. -func (c Config) RemoteCaptiveCoreEnabled() bool { - return c.RemoteCaptiveCoreURL != "" -} - const ( getLastIngestedErrMsg string = "Error getting last ingested ledger" getIngestVersionErrMsg string = "Error getting ingestion version" @@ -243,35 +229,26 @@ func NewSystem(config Config) (System, error) { return nil, errors.Wrap(err, "error creating history archive") } - var ledgerBackend ledgerbackend.LedgerBackend - if config.RemoteCaptiveCoreEnabled() { - ledgerBackend, err = ledgerbackend.NewRemoteCaptive(config.RemoteCaptiveCoreURL) - if err != nil { - cancel() - return nil, errors.Wrap(err, "error creating captive core backend") - } - } else { - // the only other option is local captive core config - logger := log.WithField("subservice", "stellar-core") - ledgerBackend, err = ledgerbackend.NewCaptive( - ledgerbackend.CaptiveCoreConfig{ - BinaryPath: config.CaptiveCoreBinaryPath, - StoragePath: config.CaptiveCoreStoragePath, - UseDB: config.CaptiveCoreConfigUseDB, - Toml: config.CaptiveCoreToml, - NetworkPassphrase: config.NetworkPassphrase, - HistoryArchiveURLs: config.HistoryArchiveURLs, - CheckpointFrequency: config.CheckpointFrequency, - LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), - Log: logger, - Context: ctx, - UserAgent: fmt.Sprintf("captivecore horizon/%s golang/%s", apkg.Version(), runtime.Version()), - }, - ) - if err != nil { - cancel() - return nil, errors.Wrap(err, "error creating captive core backend") - } + // the only ingest option is local captive core config + logger := log.WithField("subservice", "stellar-core") + ledgerBackend, err := ledgerbackend.NewCaptive( + ledgerbackend.CaptiveCoreConfig{ + BinaryPath: config.CaptiveCoreBinaryPath, + StoragePath: config.CaptiveCoreStoragePath, + UseDB: config.CaptiveCoreConfigUseDB, + Toml: config.CaptiveCoreToml, + NetworkPassphrase: config.NetworkPassphrase, + HistoryArchiveURLs: config.HistoryArchiveURLs, + CheckpointFrequency: config.CheckpointFrequency, + LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), + Log: logger, + Context: ctx, + UserAgent: fmt.Sprintf("captivecore horizon/%s golang/%s", apkg.Version(), runtime.Version()), + }, + ) + if err != nil { + cancel() + return nil, errors.Wrap(err, "error creating captive core backend") } historyQ := &history.Q{config.HistorySession.Clone()} diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 7ae0e1a4b9..1b6664b8ba 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -103,7 +103,6 @@ func initIngester(app *App) { CaptiveCoreStoragePath: app.config.CaptiveCoreStoragePath, CaptiveCoreConfigUseDB: app.config.CaptiveCoreConfigUseDB, CaptiveCoreToml: app.config.CaptiveCoreToml, - RemoteCaptiveCoreURL: app.config.RemoteCaptiveCoreURL, DisableStateVerification: app.config.IngestDisableStateVerification, StateVerificationCheckpointFrequency: uint32(app.config.IngestStateVerificationCheckpointFrequency), StateVerificationTimeout: app.config.IngestStateVerificationTimeout,