Skip to content

Commit

Permalink
stellar#5153: removed cursor update against core during ledger ingest…
Browse files Browse the repository at this point in the history
…ion and unused core db related config flags
  • Loading branch information
sreuland committed Jan 10, 2024
1 parent 495d18c commit a7e2ab6
Show file tree
Hide file tree
Showing 17 changed files with 106 additions and 274 deletions.
5 changes: 4 additions & 1 deletion services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
### Added

- Add a deprecation warning for using command-line flags when running Horizon ([5051](https://github.com/stellar/go/pull/5051))
- Deprecate configuration flags related to legacy non-captive core ingestion ([5100](https://github.com/stellar/go/pull/5100))

### Breaking Changes
- Removed configuration flags `--stellar-core-url-db`, `--cursor-name` `--skip-cursor-update` , they were related to legacy non-captive core ingestion and are no longer usable.

## 2.27.0

### Fixed
Expand Down
1 change: 0 additions & 1 deletion services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
RemoteCaptiveCoreURL: config.RemoteCaptiveCoreURL,
CaptiveCoreToml: config.CaptiveCoreToml,
CaptiveCoreStoragePath: config.CaptiveCoreStoragePath,
StellarCoreCursor: config.CursorName,
StellarCoreURL: config.StellarCoreURL,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
Expand Down
8 changes: 0 additions & 8 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ type Config struct {
TLSKey string
// Ingest toggles whether this horizon instance should run the data ingestion subsystem.
Ingest bool
// CursorName is the cursor used for ingesting from stellar-core.
// Setting multiple cursors in different Horizon instances allows multiple
// Horizons to ingest from the same stellar-core instance without cursor
// collisions.
CursorName string
// HistoryRetentionCount represents the minimum number of ledgers worth of
// history data to retain in the horizon database. For the purposes of
// determining a "retention duration", each ledger roughly corresponds to 10
Expand All @@ -82,9 +77,6 @@ type Config struct {
// out-of-date by before horizon begins to respond with an error to history
// requests.
StaleThreshold uint
// SkipCursorUpdate causes the ingestor to skip reporting the "last imported
// ledger" state to stellar-core.
SkipCursorUpdate bool
// IngestDisableStateVerification disables state verification
// `System.verifyState()` when set to `true`.
IngestDisableStateVerification bool
Expand Down
28 changes: 16 additions & 12 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,7 @@ func Flags() (*Config, support.ConfigOptions) {
Hidden: true,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetString(opt.Name); val != "" {
stdLog.Printf(
"DEPRECATED - The usage of the flag --stellar-core-db-url has been deprecated. " +
"Horizon now uses Captive-Core ingestion by default and this flag will soon be removed in " +
"the future.",
)
return fmt.Errorf("flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion")
}
return nil
},
Expand Down Expand Up @@ -595,11 +591,15 @@ func Flags() (*Config, support.ConfigOptions) {
&support.ConfigOption{
Name: "cursor-name",
EnvVar: "CURSOR_NAME",
ConfigKey: &config.CursorName,
OptType: types.String,
FlagDefault: "HORIZON",
Usage: "ingestor cursor used by horizon to ingest from stellar core. must be uppercase and unique for each horizon instance ingesting from that core instance.",
Hidden: true,
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetString(opt.Name); val != "" {
return fmt.Errorf("flag --cursor-name has been removed and no longer valid, must use captive core configuration for ingestion")
}
return nil
},
},
&support.ConfigOption{
Name: "history-retention-count",
Expand All @@ -619,11 +619,15 @@ func Flags() (*Config, support.ConfigOptions) {
},
&support.ConfigOption{
Name: "skip-cursor-update",
ConfigKey: &config.SkipCursorUpdate,
OptType: types.Bool,
FlagDefault: false,
Usage: "causes the ingester to skip reporting the last imported ledger state to stellar-core",
OptType: types.String,
Hidden: true,
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetString(opt.Name); val != "" {
return fmt.Errorf("flag --skip-cursor-update has been removed and no longer valid, must use captive core configuration for ingestion")
}
return nil
},
},
&support.ConfigOption{
Name: "ingest-disable-state-verification",
Expand Down
70 changes: 70 additions & 0 deletions services/horizon/internal/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/spf13/cobra"

"github.com/stellar/go/services/horizon/internal/test"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -259,3 +260,72 @@ func TestEnvironmentVariables(t *testing.T) {
assert.Equal(t, config.CaptiveCoreConfigPath, "../docker/captive-core-classic-integration-tests.cfg")
assert.Equal(t, config.CaptiveCoreConfigUseDB, true)
}

func TestRemovedFlags(t *testing.T) {
tests := []struct {
name string
environmentVars map[string]string
errStr string
cmdArgs []string
}{
{
name: "STELLAR_CORE_DATABASE_URL removed",
environmentVars: map[string]string{
"INGEST": "false",
"STELLAR_CORE_DATABASE_URL": "coredb",
"DATABASE_URL": "dburl",
},
errStr: "flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion",
},
{
name: "--stellar-core-db-url removed",
environmentVars: map[string]string{
"INGEST": "false",
"DATABASE_URL": "dburl",
},
errStr: "flag --stellar-core-db-url and environment variable STELLAR_CORE_DATABASE_URL have been removed and no longer valid, must use captive core configuration for ingestion",
cmdArgs: []string{"--stellar-core-db-url=coredb"},
},
{
name: "CURSOR_NAME removed",
environmentVars: map[string]string{
"INGEST": "false",
"CURSOR_NAME": "cursor",
"DATABASE_URL": "dburl",
},
errStr: "flag --cursor-name has been removed and no longer valid, must use captive core configuration for ingestion",
},
{
name: "SKIP_CURSOR_UPDATE removed",
environmentVars: map[string]string{
"INGEST": "false",
"SKIP_CURSOR_UPDATE": "true",
"DATABASE_URL": "dburl",
},
errStr: "flag --skip-cursor-update has been removed and no longer valid, must use captive core configuration for ingestion",
},
}

envManager := test.NewEnvironmentManager()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer func() {
envManager.Restore()
}()
err := envManager.InitializeEnvironmentVariables(tt.environmentVars)
require.NoError(t, err)

config, flags := Flags()
testCmd := &cobra.Command{
Use: "test",
}

require.NoError(t, flags.Init(testCmd))
require.NoError(t, testCmd.ParseFlags(tt.cmdArgs))

err = ApplyFlags(config, flags, ApplyOptions{})
require.Error(t, err)
assert.Equal(t, tt.errStr, err.Error())
})
}
}
32 changes: 0 additions & 32 deletions services/horizon/internal/ingest/build_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -83,12 +82,6 @@ func (s *BuildStateTestSuite) mockCommonHistoryQ() {
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(nil).Once()
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once()
s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once()
s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(62),
).Return(nil).Once()
}

func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() {
Expand Down Expand Up @@ -175,12 +168,6 @@ func (s *BuildStateTestSuite) TestUpdateLastLedgerIngestReturnsError() {
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(s.lastLedger, nil).Once()
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(errors.New("my error")).Once()
s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(62),
).Return(nil).Once()

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)

Expand All @@ -194,12 +181,6 @@ func (s *BuildStateTestSuite) TestUpdateExpStateInvalidReturnsError() {
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, s.lastLedger).Return(nil).Once()
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(errors.New("my error")).Once()
s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(62),
).Return(nil).Once()

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)

Expand All @@ -215,13 +196,6 @@ func (s *BuildStateTestSuite) TestTruncateIngestStateTablesReturnsError() {
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once()
s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(errors.New("my error")).Once()

s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(62),
).Return(nil).Once()

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)

s.Assert().Error(err)
Expand Down Expand Up @@ -251,12 +225,6 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionGenesisReturnsError(
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(0)).Return(nil).Once()
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once()
s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once()
s.stellarCoreClient.On(
"SetCursor",
mock.AnythingOfType("*context.timerCtx"),
defaultCoreCursorName,
int32(0),
).Return(nil).Once()

s.runner.
On("RunGenesisStateIngestion").
Expand Down
1 change: 0 additions & 1 deletion services/horizon/internal/ingest/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func (s *DBTestSuite) SetupTest() {
s.historyAdapter = &mockHistoryArchiveAdapter{}
var err error
sIface, err := NewSystem(Config{
CoreSession: s.tt.CoreSession(),
HistorySession: s.tt.HorizonSession(),
HistoryArchiveURLs: []string{"http://ignore.test"},
DisableStateVerification: false,
Expand Down
19 changes: 0 additions & 19 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,6 @@ func (b buildState) run(s *system) (transition, error) {
return nextFailState, nil
}

if err = s.updateCursor(b.checkpointLedger - 1); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
}

log.Info("Starting ingestion system from empty state...")

// Clear last_ingested_ledger in key value store
Expand Down Expand Up @@ -454,14 +449,6 @@ func (r resumeState) run(s *system) (transition, error) {
WithField("lastIngestedLedger", lastIngestedLedger).
Info("bumping ingest ledger to next ledger after ingested ledger in db")

// Update cursor if there's more than one ingesting instance: either
// Captive-Core or DB ingestion connected to another Stellar-Core.
// remove now?
if err = s.updateCursor(lastIngestedLedger); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
}

// resume immediately so Captive-Core catchup is not slowed down
return resumeImmediately(lastIngestedLedger), nil
}
Expand Down Expand Up @@ -522,12 +509,6 @@ func (r resumeState) run(s *system) (transition, error) {
return retryResume(r), err
}

//TODO remove now? stellar-core-db-url is removed
if err = s.updateCursor(ingestLedger); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
}

duration = time.Since(startTime).Seconds()
s.Metrics().LedgerIngestionDuration.Observe(float64(duration))

Expand Down
32 changes: 2 additions & 30 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ const (
var log = logpkg.DefaultLogger.WithField("service", "ingest")

type Config struct {
CoreSession db.SessionInterface
StellarCoreURL string
StellarCoreCursor string
CaptiveCoreBinaryPath string
CaptiveCoreStoragePath string
CaptiveCoreToml *ledgerbackend.CaptiveCoreToml
Expand Down Expand Up @@ -252,7 +250,8 @@ func NewSystem(config Config) (System, error) {
cancel()
return nil, errors.Wrap(err, "error creating captive core backend")
}
} else if config.LocalCaptiveCoreEnabled() {
} else {
// the only other option is local captive core config
logger := log.WithField("subservice", "stellar-core")
ledgerBackend, err = ledgerbackend.NewCaptive(
ledgerbackend.CaptiveCoreConfig{
Expand All @@ -273,13 +272,6 @@ func NewSystem(config Config) (System, error) {
cancel()
return nil, errors.Wrap(err, "error creating captive core backend")
}
} else {
coreSession := config.CoreSession.Clone()
ledgerBackend, err = ledgerbackend.NewDatabaseBackendFromSession(coreSession, config.NetworkPassphrase)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error creating ledger backend")
}
}

historyQ := &history.Q{config.HistorySession.Clone()}
Expand Down Expand Up @@ -752,26 +744,6 @@ func (s *system) resetStateVerificationErrors() {
s.stateVerificationErrors = 0
}

func (s *system) updateCursor(ledgerSequence uint32) error {
if s.stellarCoreClient == nil {
return nil
}

cursor := defaultCoreCursorName
if s.config.StellarCoreCursor != "" {
cursor = s.config.StellarCoreCursor
}

ctx, cancel := context.WithTimeout(s.ctx, time.Second)
defer cancel()
err := s.stellarCoreClient.SetCursor(ctx, cursor, int32(ledgerSequence))
if err != nil {
return errors.Wrap(err, "Setting stellar-core cursor failed")
}

return nil
}

func (s *system) Shutdown() {
log.Info("Shutting down ingestion system...")
s.stateVerificationMutex.Lock()
Expand Down
1 change: 0 additions & 1 deletion services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func TestLedgerEligibleForStateVerification(t *testing.T) {

func TestNewSystem(t *testing.T) {
config := Config{
CoreSession: &db.Session{DB: &sqlx.DB{}},
HistorySession: &db.Session{DB: &sqlx.DB{}},
DisableStateVerification: true,
HistoryArchiveURLs: []string{"https://history.stellar.org/prd/core-live/core_live_001"},
Expand Down
3 changes: 0 additions & 3 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ func (ps *ParallelSystems) Shutdown() {
if ps.config.HistorySession != nil {
ps.config.HistorySession.Close()
}
if ps.config.CoreSession != nil {
ps.config.CoreSession.Close()
}
}

func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, reingestJobQueue <-chan history.LedgerRange) rangeError {
Expand Down
Loading

0 comments on commit a7e2ab6

Please sign in to comment.