Skip to content

Commit

Permalink
services/horizon/ingest: removed legacy core cursor update against du…
Browse files Browse the repository at this point in the history
…ring ledger ingestion (stellar#5158)
  • Loading branch information
sreuland committed Jan 27, 2024
1 parent d5d3218 commit 6a221d7
Show file tree
Hide file tree
Showing 18 changed files with 124 additions and 321 deletions.
5 changes: 4 additions & 1 deletion services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
### Added

- Add a deprecation warning for using command-line flags when running Horizon ([5051](https://github.com/stellar/go/pull/5051))
- Deprecate configuration flags related to legacy non-captive core ingestion ([5100](https://github.com/stellar/go/pull/5100))

### Breaking Changes
- Removed configuration flags `--stellar-core-url-db`, `--cursor-name` `--skip-cursor-update` , they were related to legacy non-captive core ingestion and are no longer usable.

## 2.27.0

### Fixed
Expand Down
2 changes: 0 additions & 2 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,8 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
ReingestRetryBackoffSeconds: int(retryBackoffSeconds),
CaptiveCoreBinaryPath: config.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: config.CaptiveCoreConfigUseDB,
RemoteCaptiveCoreURL: config.RemoteCaptiveCoreURL,
CaptiveCoreToml: config.CaptiveCoreToml,
CaptiveCoreStoragePath: config.CaptiveCoreStoragePath,
StellarCoreCursor: config.CursorName,
StellarCoreURL: config.StellarCoreURL,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
Expand Down
3 changes: 0 additions & 3 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ var ingestVerifyRangeCmd = &cobra.Command{
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
RemoteCaptiveCoreURL: globalConfig.RemoteCaptiveCoreURL,
CheckpointFrequency: globalConfig.CheckpointFrequency,
CaptiveCoreToml: globalConfig.CaptiveCoreToml,
CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath,
Expand Down Expand Up @@ -213,7 +212,6 @@ var ingestStressTestCmd = &cobra.Command{
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
RemoteCaptiveCoreURL: globalConfig.RemoteCaptiveCoreURL,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
}

Expand Down Expand Up @@ -353,7 +351,6 @@ var ingestBuildStateCmd = &cobra.Command{
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
RemoteCaptiveCoreURL: globalConfig.RemoteCaptiveCoreURL,
CheckpointFrequency: globalConfig.CheckpointFrequency,
CaptiveCoreToml: globalConfig.CaptiveCoreToml,
CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath,
Expand Down
9 changes: 0 additions & 9 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type Config struct {

EnableIngestionFiltering bool
CaptiveCoreBinaryPath string
RemoteCaptiveCoreURL string
CaptiveCoreConfigPath string
CaptiveCoreTomlParams ledgerbackend.CaptiveCoreTomlParams
CaptiveCoreToml *ledgerbackend.CaptiveCoreToml
Expand Down Expand Up @@ -68,11 +67,6 @@ type Config struct {
TLSKey string
// Ingest toggles whether this horizon instance should run the data ingestion subsystem.
Ingest bool
// CursorName is the cursor used for ingesting from stellar-core.
// Setting multiple cursors in different Horizon instances allows multiple
// Horizons to ingest from the same stellar-core instance without cursor
// collisions.
CursorName string
// HistoryRetentionCount represents the minimum number of ledgers worth of
// history data to retain in the horizon database. For the purposes of
// determining a "retention duration", each ledger roughly corresponds to 10
Expand All @@ -82,9 +76,6 @@ type Config struct {
// out-of-date by before horizon begins to respond with an error to history
// requests.
StaleThreshold uint
// SkipCursorUpdate causes the ingestor to skip reporting the "last imported
// ledger" state to stellar-core.
SkipCursorUpdate bool
// IngestDisableStateVerification disables state verification
// `System.verifyState()` when set to `true`.
IngestDisableStateVerification bool
Expand Down
28 changes: 16 additions & 12 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,7 @@ func Flags() (*Config, support.ConfigOptions) {
Hidden: true,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetString(opt.Name); val != "" {
stdLog.Printf(
"DEPRECATED - The usage of the flag --stellar-core-db-url has been deprecated. " +
"Horizon now uses Captive-Core ingestion by default and this flag will soon be removed in " +
"the future.",
)
return fmt.Errorf("flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion")
}
return nil
},
Expand Down Expand Up @@ -595,11 +591,15 @@ func Flags() (*Config, support.ConfigOptions) {
&support.ConfigOption{
Name: "cursor-name",
EnvVar: "CURSOR_NAME",
ConfigKey: &config.CursorName,
OptType: types.String,
FlagDefault: "HORIZON",
Usage: "ingestor cursor used by horizon to ingest from stellar core. must be uppercase and unique for each horizon instance ingesting from that core instance.",
Hidden: true,
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetString(opt.Name); val != "" {
return fmt.Errorf("flag --cursor-name has been removed and no longer valid, must use captive core configuration for ingestion")
}
return nil
},
},
&support.ConfigOption{
Name: "history-retention-count",
Expand All @@ -619,11 +619,15 @@ func Flags() (*Config, support.ConfigOptions) {
},
&support.ConfigOption{
Name: "skip-cursor-update",
ConfigKey: &config.SkipCursorUpdate,
OptType: types.Bool,
FlagDefault: false,
Usage: "causes the ingester to skip reporting the last imported ledger state to stellar-core",
OptType: types.String,
Hidden: true,
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetString(opt.Name); val != "" {
return fmt.Errorf("flag --skip-cursor-update has been removed and no longer valid, must use captive core configuration for ingestion")
}
return nil
},
},
&support.ConfigOption{
Name: "ingest-disable-state-verification",
Expand Down
70 changes: 70 additions & 0 deletions services/horizon/internal/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/spf13/cobra"

"github.com/stellar/go/services/horizon/internal/test"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -259,3 +260,72 @@ func TestEnvironmentVariables(t *testing.T) {
assert.Equal(t, config.CaptiveCoreConfigPath, "../docker/captive-core-classic-integration-tests.cfg")
assert.Equal(t, config.CaptiveCoreConfigUseDB, true)
}

func TestRemovedFlags(t *testing.T) {
tests := []struct {
name string
environmentVars map[string]string
errStr string
cmdArgs []string
}{
{
name: "STELLAR_CORE_DATABASE_URL removed",
environmentVars: map[string]string{
"INGEST": "false",
"STELLAR_CORE_DATABASE_URL": "coredb",
"DATABASE_URL": "dburl",
},
errStr: "flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion",
},
{
name: "--stellar-core-db-url removed",
environmentVars: map[string]string{
"INGEST": "false",
"DATABASE_URL": "dburl",
},
errStr: "flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion",
cmdArgs: []string{"--stellar-core-db-url=coredb"},
},
{
name: "CURSOR_NAME removed",
environmentVars: map[string]string{
"INGEST": "false",
"CURSOR_NAME": "cursor",
"DATABASE_URL": "dburl",
},
errStr: "flag --cursor-name has been removed and no longer valid, must use captive core configuration for ingestion",
},
{
name: "SKIP_CURSOR_UPDATE removed",
environmentVars: map[string]string{
"INGEST": "false",
"SKIP_CURSOR_UPDATE": "true",
"DATABASE_URL": "dburl",
},
errStr: "flag --skip-cursor-update has been removed and no longer valid, must use captive core configuration for ingestion",
},
}

envManager := test.NewEnvironmentManager()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer func() {
envManager.Restore()
}()
err := envManager.InitializeEnvironmentVariables(tt.environmentVars)
require.NoError(t, err)

config, flags := Flags()
testCmd := &cobra.Command{
Use: "test",
}

require.NoError(t, flags.Init(testCmd))
require.NoError(t, testCmd.ParseFlags(tt.cmdArgs))

err = ApplyFlags(config, flags, ApplyOptions{})
require.Error(t, err)
assert.Equal(t, tt.errStr, err.Error())
})
}
}
32 changes: 0 additions & 32 deletions services/horizon/internal/ingest/build_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -83,12 +82,6 @@ func (s *BuildStateTestSuite) mockCommonHistoryQ() {
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(nil).Once()
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once()
s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once()
s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(62),
).Return(nil).Once()
}

func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() {
Expand Down Expand Up @@ -175,12 +168,6 @@ func (s *BuildStateTestSuite) TestUpdateLastLedgerIngestReturnsError() {
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(s.lastLedger, nil).Once()
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(errors.New("my error")).Once()
s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(62),
).Return(nil).Once()

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)

Expand All @@ -194,12 +181,6 @@ func (s *BuildStateTestSuite) TestUpdateExpStateInvalidReturnsError() {
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(nil).Once()
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(errors.New("my error")).Once()
s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(62),
).Return(nil).Once()

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)

Expand All @@ -215,13 +196,6 @@ func (s *BuildStateTestSuite) TestTruncateIngestStateTablesReturnsError() {
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once()
s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(errors.New("my error")).Once()

s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(62),
).Return(nil).Once()

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)

s.Assert().Error(err)
Expand Down Expand Up @@ -251,12 +225,6 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionGenesisReturnsError(
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(0)).Return(nil).Once()
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once()
s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once()
s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(0),
).Return(nil).Once()

s.runner.
On("RunGenesisStateIngestion").
Expand Down
1 change: 0 additions & 1 deletion services/horizon/internal/ingest/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func (s *DBTestSuite) SetupTest() {
s.historyAdapter = &mockHistoryArchiveAdapter{}
var err error
sIface, err := NewSystem(Config{
CoreSession: s.tt.CoreSession(),
HistorySession: s.tt.HorizonSession(),
HistoryArchiveURLs: []string{"http://ignore.test"},
DisableStateVerification: false,
Expand Down
19 changes: 0 additions & 19 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,6 @@ func (b buildState) run(s *system) (transition, error) {
return nextFailState, nil
}

if err = s.updateCursor(b.checkpointLedger - 1); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
}

log.Info("Starting ingestion system from empty state...")

// Clear last_ingested_ledger in key value store
Expand Down Expand Up @@ -454,14 +449,6 @@ func (r resumeState) run(s *system) (transition, error) {
WithField("lastIngestedLedger", lastIngestedLedger).
Info("bumping ingest ledger to next ledger after ingested ledger in db")

// Update cursor if there's more than one ingesting instance: either
// Captive-Core or DB ingestion connected to another Stellar-Core.
// remove now?
if err = s.updateCursor(lastIngestedLedger); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
}

// resume immediately so Captive-Core catchup is not slowed down
return resumeImmediately(lastIngestedLedger), nil
}
Expand Down Expand Up @@ -522,12 +509,6 @@ func (r resumeState) run(s *system) (transition, error) {
return retryResume(r), err
}

//TODO remove now? stellar-core-db-url is removed
if err = s.updateCursor(ingestLedger); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
}

duration = time.Since(startTime).Seconds()
s.Metrics().LedgerIngestionDuration.Observe(float64(duration))

Expand Down
Loading

0 comments on commit 6a221d7

Please sign in to comment.