Skip to content

Commit

Permalink
Remove init-genesis-state command (#5504)
Browse files Browse the repository at this point in the history
* Remove init-genesis-state command

* fix the expectation of call

* remove the test mock setup for inserting account in ledger 1

* Redo the test fixture

* cleanup

* remove comments
  • Loading branch information
karthikiyer56 authored Nov 5, 2024
1 parent 64efc32 commit 39a8d36
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 246 deletions.
31 changes: 0 additions & 31 deletions ingest/genesis.go

This file was deleted.

19 changes: 0 additions & 19 deletions ingest/genesis_test.go

This file was deleted.

53 changes: 1 addition & 52 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,56 +259,6 @@ var ingestTriggerStateRebuildCmd = &cobra.Command{
},
}

var ingestInitGenesisStateCmd = &cobra.Command{
Use: "init-genesis-state",
Short: "ingests genesis state (ledger 1)",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil {
return err
}

horizonSession, err := db.Open("postgres", globalConfig.DatabaseURL)
if err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

historyQ := &history.Q{SessionInterface: horizonSession}

lastIngestedLedger, err := historyQ.GetLastLedgerIngestNonBlocking(ctx)
if err != nil {
return fmt.Errorf("cannot get last ledger value: %v", err)
}

if lastIngestedLedger != 0 {
return fmt.Errorf("cannot run on non-empty DB")
}

ingestConfig := ingest.Config{
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
CheckpointFrequency: globalConfig.CheckpointFrequency,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
}

system, err := ingest.NewSystem(ingestConfig)
if err != nil {
return err
}

err = system.BuildGenesisState()
if err != nil {
return err
}

log.Info("Genesis ledger stat successfully ingested!")
return nil
},
}

var ingestBuildStateCmd = &cobra.Command{
Use: "build-state",
Short: "builds state at a given checkpoint. warning! requires clean DB.",
Expand Down Expand Up @@ -342,7 +292,7 @@ var ingestBuildStateCmd = &cobra.Command{
}

mngr := historyarchive.NewCheckpointManager(globalConfig.CheckpointFrequency)
if !mngr.IsCheckpoint(ingestBuildStateSequence) && ingestBuildStateSequence != 1 {
if !mngr.IsCheckpoint(ingestBuildStateSequence) {
return fmt.Errorf("`--sequence` must be a checkpoint ledger")
}

Expand Down Expand Up @@ -406,7 +356,6 @@ func init() {
ingestVerifyRangeCmd,
ingestStressTestCmd,
ingestTriggerStateRebuildCmd,
ingestInitGenesisStateCmd,
ingestBuildStateCmd,
)
}
2 changes: 0 additions & 2 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,12 @@ var (
DbFillGapsCmd = "fill-gaps"
DbReingestCmd = "reingest"
IngestTriggerStateRebuild = "trigger-state-rebuild"
IngestInitGenesisStateCmd = "init-genesis-state"
IngestBuildStateCmd = "build-state"
IngestStressTestCmd = "stress-test"
IngestVerifyRangeCmd = "verify-range"

ApiServerCommands = []string{HorizonCmd, ServeCmd}
IngestionCommands = append(ApiServerCommands,
IngestInitGenesisStateCmd,
IngestBuildStateCmd,
IngestStressTestCmd,
IngestVerifyRangeCmd,
Expand Down
21 changes: 0 additions & 21 deletions services/horizon/internal/ingest/build_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,27 +216,6 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() {
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionGenesisReturnsError() {
// Recreate mock in this single test to remove assertions.
*s.ledgerBackend = ledgerbackend.MockDatabaseBackend{}

s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once()
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()
s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(0)).Return(nil).Once()
s.historyQ.On("UpdateExpStateInvalid", s.ctx, false).Return(nil).Once()
s.historyQ.On("TruncateIngestStateTables", s.ctx).Return(nil).Once()

s.runner.
On("RunGenesisStateIngestion").
Return(ingest.StatsChangeProcessorResults{}, errors.New("my error")).
Once()
next, err := buildState{checkpointLedger: 1}.run(s.system)

s.Assert().Error(err)
s.Assert().EqualError(err, "Error ingesting history archive: my error")
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

func (s *BuildStateTestSuite) TestUpdateLastLedgerIngestAfterIngestReturnsError() {
s.mockCommonHistoryQ()
s.runner.
Expand Down
26 changes: 9 additions & 17 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,11 @@ func (b buildState) run(s *system) (transition, error) {
return nextFailState, errors.New("unexpected checkpointLedger value")
}

// We don't need to prepare range for genesis checkpoint because we don't
// perform protocol version and bucket list hash checks.
// In the long term we should probably create artificial xdr.LedgerCloseMeta
// for ledger #1 instead of using `ingest.GenesisChange` reader in
// ProcessorRunner.RunHistoryArchiveIngestion().
// We can also skip preparing range if `skipChecks` is `true` because we
// We can skip preparing range if `skipChecks` is `true` because we
// won't need bucket list hash and protocol version.
var protocolVersion uint32
var bucketListHash xdr.Hash
if b.checkpointLedger != 1 && !b.skipChecks {
if !b.skipChecks {
err := s.maybePrepareRange(s.ctx, b.checkpointLedger)
if err != nil {
return nextFailState, err
Expand Down Expand Up @@ -381,16 +376,13 @@ func (b buildState) run(s *system) (transition, error) {
startTime := time.Now()

var stats ingest.StatsChangeProcessorResults
if b.checkpointLedger == 1 {
stats, err = s.runner.RunGenesisStateIngestion()
} else {
stats, err = s.runner.RunHistoryArchiveIngestion(
b.checkpointLedger,
b.skipChecks,
protocolVersion,
bucketListHash,
)
}

stats, err = s.runner.RunHistoryArchiveIngestion(
b.checkpointLedger,
b.skipChecks,
protocolVersion,
bucketListHash,
)

if err != nil {
return nextFailState, errors.Wrap(err, "Error ingesting history archive")
Expand Down
10 changes: 0 additions & 10 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ type System interface {
VerifyRange(fromLedger, toLedger uint32, verifyState bool) error
BuildState(sequence uint32, skipChecks bool) error
ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error
BuildGenesisState() error
Shutdown()
GetCurrentState() State
RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error
Expand Down Expand Up @@ -675,15 +674,6 @@ func (s *system) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) err
return s.historyQ.RebuildTradeAggregationBuckets(s.ctx, fromLedger, toLedger, s.config.RoundingSlippageFilter)
}

// BuildGenesisState runs the ingestion pipeline on genesis ledger. Transitions
// to stopState when done.
func (s *system) BuildGenesisState() error {
return s.runStateMachine(buildState{
checkpointLedger: 1,
stop: true,
})
}

func (s *system) runStateMachine(cur stateMachineNode) error {
s.wg.Add(1)
defer func() {
Expand Down
10 changes: 0 additions & 10 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,11 +642,6 @@ func (m *mockProcessorsRunner) DisableMemoryStatsLogging() {
m.Called()
}

func (m *mockProcessorsRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) {
args := m.Called()
return args.Get(0).(ingest.StatsChangeProcessorResults), args.Error(1)
}

func (m *mockProcessorsRunner) RunHistoryArchiveIngestion(
checkpointLedger uint32,
skipChecks bool,
Expand Down Expand Up @@ -721,11 +716,6 @@ func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force boo
return args.Error(0)
}

func (m *mockSystem) BuildGenesisState() error {
args := m.Called()
return args.Error(0)
}

func (m *mockSystem) GetCurrentState() State {
args := m.Called()
return args.Get(0).(State)
Expand Down
59 changes: 24 additions & 35 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ type ProcessorRunnerInterface interface {
SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface)
EnableMemoryStatsLogging()
DisableMemoryStatsLogging()
RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error)
RunHistoryArchiveIngestion(
checkpointLedger uint32,
skipChecks bool,
Expand Down Expand Up @@ -192,10 +191,6 @@ func (s *ProcessorRunner) checkIfProtocolVersionSupported(ledgerProtocolVersion
return nil
}

func (s *ProcessorRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error) {
return s.RunHistoryArchiveIngestion(1, false, 0, xdr.Hash{})
}

func (s *ProcessorRunner) RunHistoryArchiveIngestion(
checkpointLedger uint32,
skipChecks bool,
Expand All @@ -218,43 +213,37 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
return ingest.StatsChangeProcessorResults{}, err
}

if checkpointLedger == 1 {
if err := changeProcessor.ProcessChange(s.ctx, ingest.GenesisChange(s.config.NetworkPassphrase)); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error ingesting genesis ledger")
}
} else {
if !skipChecks {
if err := s.checkIfProtocolVersionSupported(ledgerProtocolVersion); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error while checking for supported protocol version")
}
if !skipChecks {
if err := s.checkIfProtocolVersionSupported(ledgerProtocolVersion); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error while checking for supported protocol version")
}
}

changeReader, err := s.historyAdapter.GetState(s.ctx, checkpointLedger)
if err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error creating HAS reader")
}
changeReader, err := s.historyAdapter.GetState(s.ctx, checkpointLedger)
if err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error creating HAS reader")
}

if !skipChecks {
if err = changeReader.VerifyBucketList(bucketListHash); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS")
}
if !skipChecks {
if err = changeReader.VerifyBucketList(bucketListHash); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error validating bucket list from HAS")
}
}

defer changeReader.Close()
defer changeReader.Close()

log.WithField("sequence", checkpointLedger).
Info("Processing entries from History Archive Snapshot")
log.WithField("sequence", checkpointLedger).
Info("Processing entries from History Archive Snapshot")

err = streamChanges(s.ctx, changeProcessor, checkpointLedger, newloggingChangeReader(
changeReader,
"historyArchive",
checkpointLedger,
logFrequency,
s.logMemoryStats,
))
if err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error streaming changes from HAS")
}
err = streamChanges(s.ctx, changeProcessor, checkpointLedger, newloggingChangeReader(
changeReader,
"historyArchive",
checkpointLedger,
logFrequency,
s.logMemoryStats,
))
if err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error streaming changes from HAS")
}

if err := changeProcessor.Commit(s.ctx); err != nil {
Expand Down
Loading

0 comments on commit 39a8d36

Please sign in to comment.