diff --git a/ingest/genesis.go b/ingest/genesis.go deleted file mode 100644 index ecc38d6243..0000000000 --- a/ingest/genesis.go +++ /dev/null @@ -1,31 +0,0 @@ -package ingest - -import ( - "github.com/stellar/go/amount" - "github.com/stellar/go/keypair" - "github.com/stellar/go/xdr" -) - -// GenesisChange returns the Change occurring at the genesis ledger (ledgerseq = 1).. -func GenesisChange(networkPassPhrase string) Change { - masterKeyPair := keypair.Master(networkPassPhrase) - - masterAccountEntry := xdr.LedgerEntry{ - LastModifiedLedgerSeq: 1, - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeAccount, - Account: &xdr.AccountEntry{ - AccountId: xdr.MustAddress(masterKeyPair.Address()), - // 100B - Balance: amount.MustParse("100000000000"), - SeqNum: 0, - Thresholds: xdr.Thresholds{1, 0, 0, 0}, - }, - }, - } - - return Change{ - Type: masterAccountEntry.Data.Type, - Post: &masterAccountEntry, - } -} diff --git a/ingest/genesis_test.go b/ingest/genesis_test.go deleted file mode 100644 index 47fe097fc9..0000000000 --- a/ingest/genesis_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package ingest - -import ( - "testing" - - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" -) - -func TestGenesisLeaderStateReader(t *testing.T) { - change := GenesisChange("Public Global Stellar Network ; September 2015") - assert.Equal(t, xdr.LedgerEntryTypeAccount, change.Type) - assert.Equal(t, xdr.Uint32(1), change.Post.LastModifiedLedgerSeq) - account := change.Post.Data.MustAccount() - assert.Equal(t, "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", account.AccountId.Address()) - assert.Equal(t, xdr.SequenceNumber(0), account.SeqNum) - assert.Equal(t, xdr.Int64(1000000000000000000), account.Balance) - assert.Equal(t, xdr.Thresholds{1, 0, 0, 0}, account.Thresholds) -} diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index f6b94a8f52..50638fc1c5 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -259,56 +259,6 @@ var ingestTriggerStateRebuildCmd = &cobra.Command{ }, } -var ingestInitGenesisStateCmd = &cobra.Command{ - Use: "init-genesis-state", - Short: "ingests genesis state (ledger 1)", - RunE: func(cmd *cobra.Command, args []string) error { - ctx := context.Background() - if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { - return err - } - - horizonSession, err := db.Open("postgres", globalConfig.DatabaseURL) - if err != nil { - return fmt.Errorf("cannot open Horizon DB: %v", err) - } - - historyQ := &history.Q{SessionInterface: horizonSession} - - lastIngestedLedger, err := historyQ.GetLastLedgerIngestNonBlocking(ctx) - if err != nil { - return fmt.Errorf("cannot get last ledger value: %v", err) - } - - if lastIngestedLedger != 0 { - return fmt.Errorf("cannot run on non-empty DB") - } - - ingestConfig := ingest.Config{ - NetworkPassphrase: globalConfig.NetworkPassphrase, - HistorySession: horizonSession, - HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, - CheckpointFrequency: globalConfig.CheckpointFrequency, - RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, - CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, - CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, - } - - system, err := ingest.NewSystem(ingestConfig) - if err != nil { - return err - } - - err = system.BuildGenesisState() - if err != nil { - return err - } - - log.Info("Genesis ledger stat successfully ingested!") - return nil - }, -} - var ingestBuildStateCmd = &cobra.Command{ Use: "build-state", Short: "builds state at a given checkpoint. warning! requires clean DB.", @@ -342,7 +292,7 @@ var ingestBuildStateCmd = &cobra.Command{ } mngr := historyarchive.NewCheckpointManager(globalConfig.CheckpointFrequency) - if !mngr.IsCheckpoint(ingestBuildStateSequence) && ingestBuildStateSequence != 1 { + if !mngr.IsCheckpoint(ingestBuildStateSequence) { return fmt.Errorf("`--sequence` must be a checkpoint ledger") } @@ -406,7 +356,6 @@ func init() { ingestVerifyRangeCmd, ingestStressTestCmd, ingestTriggerStateRebuildCmd, - ingestInitGenesisStateCmd, ingestBuildStateCmd, ) } diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 38fca67576..397e3342ad 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -84,14 +84,12 @@ var ( DbFillGapsCmd = "fill-gaps" DbReingestCmd = "reingest" IngestTriggerStateRebuild = "trigger-state-rebuild" - IngestInitGenesisStateCmd = "init-genesis-state" IngestBuildStateCmd = "build-state" IngestStressTestCmd = "stress-test" IngestVerifyRangeCmd = "verify-range" ApiServerCommands = []string{HorizonCmd, ServeCmd} IngestionCommands = append(ApiServerCommands, - IngestInitGenesisStateCmd, IngestBuildStateCmd, IngestStressTestCmd, IngestVerifyRangeCmd, diff --git a/services/horizon/internal/ingest/build_state_test.go b/services/horizon/internal/ingest/build_state_test.go index d1409182d9..6da191dd35 100644 --- a/services/horizon/internal/ingest/build_state_test.go +++ b/services/horizon/internal/ingest/build_state_test.go @@ -216,27 +216,6 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionGenesisReturnsError() { - // Recreate mock in this single test to remove assertions. - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} - - s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() - s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(0)).Return(nil).Once() - s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once() - s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once() - - s.runner. - On("RunGenesisStateIngestion"). - Return(ingest.StatsChangeProcessorResults{}, errors.New("my error")). - Once() - next, err := buildState{checkpointLedger: 1}.run(s.system) - - s.Assert().Error(err) - s.Assert().EqualError(err, "Error ingesting history archive: my error") - s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) -} - func (s *BuildStateTestSuite) TestUpdateLastLedgerIngestAfterIngestReturnsError() { s.mockCommonHistoryQ() s.runner. diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index c165faf371..8e241123c9 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -299,16 +299,11 @@ func (b buildState) run(s *system) (transition, error) { return nextFailState, errors.New("unexpected checkpointLedger value") } - // We don't need to prepare range for genesis checkpoint because we don't - // perform protocol version and bucket list hash checks. - // In the long term we should probably create artificial xdr.LedgerCloseMeta - // for ledger #1 instead of using `ingest.GenesisChange` reader in - // ProcessorRunner.RunHistoryArchiveIngestion(). - // We can also skip preparing range if `skipChecks` is `true` because we + // We can skip preparing range if `skipChecks` is `true` because we // won't need bucket list hash and protocol version. var protocolVersion uint32 var bucketListHash xdr.Hash - if b.checkpointLedger != 1 && !b.skipChecks { + if !b.skipChecks { err := s.maybePrepareRange(s.ctx, b.checkpointLedger) if err != nil { return nextFailState, err @@ -381,16 +376,13 @@ func (b buildState) run(s *system) (transition, error) { startTime := time.Now() var stats ingest.StatsChangeProcessorResults - if b.checkpointLedger == 1 { - stats, err = s.runner.RunGenesisStateIngestion() - } else { - stats, err = s.runner.RunHistoryArchiveIngestion( - b.checkpointLedger, - b.skipChecks, - protocolVersion, - bucketListHash, - ) - } + + stats, err = s.runner.RunHistoryArchiveIngestion( + b.checkpointLedger, + b.skipChecks, + protocolVersion, + bucketListHash, + ) if err != nil { return nextFailState, errors.Wrap(err, "Error ingesting history archive") diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 38f9fe1d3a..9ec13078e3 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -225,7 +225,6 @@ type System interface { VerifyRange(fromLedger, toLedger uint32, verifyState bool) error BuildState(sequence uint32, skipChecks bool) error ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error - BuildGenesisState() error Shutdown() GetCurrentState() State RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error @@ -675,15 +674,6 @@ func (s *system) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) err return s.historyQ.RebuildTradeAggregationBuckets(s.ctx, fromLedger, toLedger, s.config.RoundingSlippageFilter) } -// BuildGenesisState runs the ingestion pipeline on genesis ledger. Transitions -// to stopState when done. -func (s *system) BuildGenesisState() error { - return s.runStateMachine(buildState{ - checkpointLedger: 1, - stop: true, - }) -} - func (s *system) runStateMachine(cur stateMachineNode) error { s.wg.Add(1) defer func() { diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 470039fd92..a736c7b229 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -642,11 +642,6 @@ func (m *mockProcessorsRunner) DisableMemoryStatsLogging() { m.Called() } -func (m *mockProcessorsRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) { - args := m.Called() - return args.Get(0).(ingest.StatsChangeProcessorResults), args.Error(1) -} - func (m *mockProcessorsRunner) RunHistoryArchiveIngestion( checkpointLedger uint32, skipChecks bool, @@ -721,11 +716,6 @@ func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force boo return args.Error(0) } -func (m *mockSystem) BuildGenesisState() error { - args := m.Called() - return args.Error(0) -} - func (m *mockSystem) GetCurrentState() State { args := m.Called() return args.Get(0).(State) diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 75b8645953..a4dbfdc656 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -69,7 +69,6 @@ type ProcessorRunnerInterface interface { SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface) EnableMemoryStatsLogging() DisableMemoryStatsLogging() - RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) RunHistoryArchiveIngestion( checkpointLedger uint32, skipChecks bool, @@ -192,10 +191,6 @@ func (s *ProcessorRunner) checkIfProtocolVersionSupported(ledgerProtocolVersion return nil } -func (s *ProcessorRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) { - return s.RunHistoryArchiveIngestion(1, false, 0, xdr.Hash{}) -} - func (s *ProcessorRunner) RunHistoryArchiveIngestion( checkpointLedger uint32, skipChecks bool, @@ -218,43 +213,37 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion( return ingest.StatsChangeProcessorResults{}, err } - if checkpointLedger == 1 { - if err := changeProcessor.ProcessChange(s.ctx, ingest.GenesisChange(s.config.NetworkPassphrase)); err != nil { - return changeStats.GetResults(), errors.Wrap(err, "Error ingesting genesis ledger") - } - } else { - if !skipChecks { - if err := s.checkIfProtocolVersionSupported(ledgerProtocolVersion); err != nil { - return changeStats.GetResults(), errors.Wrap(err, "Error while checking for supported protocol version") - } + if !skipChecks { + if err := s.checkIfProtocolVersionSupported(ledgerProtocolVersion); err != nil { + return changeStats.GetResults(), errors.Wrap(err, "Error while checking for supported protocol version") } + } - changeReader, err := s.historyAdapter.GetState(s.ctx, checkpointLedger) - if err != nil { - return changeStats.GetResults(), errors.Wrap(err, "Error creating HAS reader") - } + changeReader, err := s.historyAdapter.GetState(s.ctx, checkpointLedger) + if err != nil { + return changeStats.GetResults(), errors.Wrap(err, "Error creating HAS reader") + } - if !skipChecks { - if err = changeReader.VerifyBucketList(bucketListHash); err != nil { - return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS") - } + if !skipChecks { + if err = changeReader.VerifyBucketList(bucketListHash); err != nil { + return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS") } + } - defer changeReader.Close() + defer changeReader.Close() - log.WithField("sequence", checkpointLedger). - Info("Processing entries from History Archive Snapshot") + log.WithField("sequence", checkpointLedger). + Info("Processing entries from History Archive Snapshot") - err = streamChanges(s.ctx, changeProcessor, checkpointLedger, newloggingChangeReader( - changeReader, - "historyArchive", - checkpointLedger, - logFrequency, - s.logMemoryStats, - )) - if err != nil { - return changeStats.GetResults(), errors.Wrap(err, "Error streaming changes from HAS") - } + err = streamChanges(s.ctx, changeProcessor, checkpointLedger, newloggingChangeReader( + changeReader, + "historyArchive", + checkpointLedger, + logFrequency, + s.logMemoryStats, + )) + if err != nil { + return changeStats.GetResults(), errors.Wrap(err, "Error streaming changes from HAS") } if err := changeProcessor.Commit(s.ctx); err != nil { diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 82c712b737..a4fd59a72b 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -3,12 +3,11 @@ package ingest import ( "context" "fmt" + "github.com/stellar/go/amount" "io" "reflect" "testing" - "github.com/guregu/null" - "github.com/guregu/null/zero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -20,51 +19,6 @@ import ( "github.com/stellar/go/xdr" ) -func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { - ctx := context.Background() - - mockSession := &db.MockSession{} - - q := &mockDBQ{} - - batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true) - defer mock.AssertExpectationsForObjects(t, batchBuilders...) - - assert.IsType(t, &history.MockAccountSignersBatchInsertBuilder{}, batchBuilders[0]) - batchBuilders[0].(*history.MockAccountSignersBatchInsertBuilder).On("Add", history.AccountSigner{ - Account: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", - Signer: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", - Weight: 1, - Sponsor: null.String{}, - }).Return(nil).Once() - - assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1]) - batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{ - LastModifiedLedger: 1, - AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", - Balance: int64(1000000000000000000), - SequenceNumber: 0, - SequenceTime: zero.IntFrom(0), - MasterWeight: 1, - }).Return(nil).Once() - - q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). - Return(nil) - - runner := ProcessorRunner{ - ctx: ctx, - config: Config{ - NetworkPassphrase: network.PublicNetworkPassphrase, - }, - historyQ: q, - filters: &MockFilters{}, - session: mockSession, - } - - _, err := runner.RunGenesisStateIngestion() - assert.NoError(t, err) -} - func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { ctx := context.Background() @@ -79,12 +33,30 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { defer mock.AssertExpectationsForObjects(t, historyAdapter) m := &ingest.MockChangeReader{} - m.On("Read").Return(ingest.GenesisChange(network.PublicNetworkPassphrase), nil).Once() - m.On("Read").Return(ingest.Change{}, io.EOF).Once() m.On("Close").Return(nil).Once() bucketListHash := xdr.Hash([32]byte{0, 1, 2}) m.On("VerifyBucketList", bucketListHash).Return(nil).Once() + changeEntry := ingest.Change{ + Type: xdr.LedgerEntryTypeAccount, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 1, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + // Master account address from Ledger 1 + AccountId: xdr.MustAddress("GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7"), + // 100B + Balance: amount.MustParse("100000000000"), + SeqNum: 0, + Thresholds: xdr.Thresholds{1, 0, 0, 0}, + }, + }, + }, + } + m.On("Read").Return(changeEntry, nil).Once() + m.On("Read").Return(ingest.Change{}, io.EOF).Once() + historyAdapter. On("GetState", ctx, uint32(63)). Return(