Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
shiqizng committed Jun 10, 2022
1 parent 30e53cc commit 64fb50a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 47 deletions.
22 changes: 20 additions & 2 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/importer"
localledger "github.com/algorand/indexer/migrations/local_ledger"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/algorand/indexer/util/metrics"
Expand Down Expand Up @@ -55,6 +56,8 @@ var (
defaultApplicationsLimit uint32
enableAllParameters bool
indexerDataDir string
initLedger bool
catchpoint string
)

var daemonCmd = &cobra.Command{
Expand Down Expand Up @@ -133,6 +136,19 @@ var daemonCmd = &cobra.Command{
fmt.Fprint(os.Stderr, "missing indexer data directory")
os.Exit(1)
}

// sync local ledger
nextDBRound, err := db.GetNextRoundToAccount()
maybeFail(err, "Error getting DB round")
if nextDBRound > 0 {
if catchpoint != "" {
err = localledger.RunMigrationFastCatchup(catchpoint, &opts)
maybeFail(err, "Error running ledger migration in fast catchup mode")
}
err = localledger.RunMigrationSimple(nextDBRound, &opts)
maybeFail(err, "Error running ledger migration")
}

wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -154,9 +170,9 @@ var daemonCmd = &cobra.Command{
genesisBlock, err := getGenesisBlock(bot.Algod())
maybeFail(err, "Error getting genesis block")

proc, err := blockprocessor.MakeProcessor(&genesis, &genesisBlock, indexerDataDir, imp.ImportBlock)
proc, err := blockprocessor.MakeProcessor(&genesis, &genesisBlock, nextDBRound, indexerDataDir, imp.ImportBlock)
if err != nil {
maybeFail(err, "blockprocessor.MakeProcessor() err %v", err)
maybeFail(err, "Error creating a block processor")
}

bot.SetNextRound(proc.NextRoundToProcess())
Expand Down Expand Up @@ -217,6 +233,8 @@ func init() {
daemonCmd.Flags().Uint32VarP(&defaultApplicationsLimit, "default-applications-limit", "", 100, "set the default Limit parameter for querying applications, if none is provided")

daemonCmd.Flags().StringVarP(&indexerDataDir, "data-dir", "i", "", "path to indexer data dir, or $INDEXER_DATA")
daemonCmd.Flags().BoolVar(&initLedger, "init-ledger", true, "initialize local ledger using sequential mode")
daemonCmd.Flags().StringVarP(&catchpoint, "catchpoint", "c", "", "initialize local ledger using fast catchup")

viper.RegisterAlias("algod", "algod-data-dir")
viper.RegisterAlias("algod-net", "algod-address")
Expand Down
32 changes: 0 additions & 32 deletions idb/postgres/postgres_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/algorand/indexer/idb/migration"
"github.com/algorand/indexer/idb/postgres/internal/encoding"
cad "github.com/algorand/indexer/idb/postgres/internal/migrations/convert_account_data"
ledger "github.com/algorand/indexer/idb/postgres/internal/migrations/local_ledger"
"github.com/algorand/indexer/idb/postgres/internal/schema"
"github.com/algorand/indexer/idb/postgres/internal/types"
)
Expand Down Expand Up @@ -51,9 +50,6 @@ func init() {
{upgradeNotSupported, true, "notify the user that upgrade is not supported"},
{dropTxnBytesColumn, true, "drop txnbytes column"},
{convertAccountData, true, "convert account.account_data column"},

// Migrations for x.x.x release
{ledgerMigration, true, "sync between local ledger and database"},
}
}

Expand Down Expand Up @@ -253,31 +249,3 @@ func convertAccountData(db *IndexerDb, migrationState *types.MigrationState, opt
*migrationState = newMigrationState
return nil
}

func ledgerMigration(db *IndexerDb, migrationState *types.MigrationState, opts *idb.IndexerDbOptions) error {
newMigrationState := *migrationState
newMigrationState.NextMigration++
f := func(tx pgx.Tx) error {
round, err := db.getMaxRoundAccounted(context.Background(), tx)
if err != nil {
return fmt.Errorf("ledgerMigration() err: %w", err)
}
err = ledger.RunMigrationSimple(round, opts)
if err != nil {
return fmt.Errorf("ledgerMigration() err: %w", err)
}
err = db.setMigrationState(tx, &newMigrationState)
if err != nil {
return fmt.Errorf("ledgerMigration() err: %w", err)
}

return nil
}
err := db.txWithRetry(serializable, f)
if err != nil {
return fmt.Errorf("ledgerMigration() err: %w", err)
}

*migrationState = newMigrationState
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func RunMigrationSimple(round uint64, opts *idb.IndexerDbOptions) error {
return fmt.Errorf("RunMigration() err: %w", err)
}

proc, err := blockprocessor.MakeProcessor(&genesis, genesisBlock, opts.IndexerDatadir, nil)
proc, err := blockprocessor.MakeProcessor(&genesis, genesisBlock, 0, opts.IndexerDatadir, nil)
if err != nil {
return fmt.Errorf("RunMigration() err: %w", err)
}
Expand Down
File renamed without changes.
25 changes: 13 additions & 12 deletions processor/blockprocessor/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/algorand/indexer/util"
)

const Prefix = "ledger"

type blockProcessor struct {
handler func(block *ledgercore.ValidatedBlock) error
ledger *ledger.Ledger
Expand All @@ -40,21 +42,20 @@ func MakeProcessorWithLedger(l *ledger.Ledger, handler func(block *ledgercore.Va
}

// MakeProcessor creates a block processor
func MakeProcessor(genesis *bookkeeping.Genesis, genesisBlock *bookkeeping.Block, datadir string, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) {
func MakeProcessor(genesis *bookkeeping.Genesis, genesisBlock *bookkeeping.Block, dbRound uint64, datadir string, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) {
initState, err := util.CreateInitState(genesis, genesisBlock)
if err != nil {
return nil, fmt.Errorf("MakeProcessor() err: %w", err)
}
if !ledgerExists(datadir) {
msg := []string{
"The ledger cache was not found in the data directory and must be initialized. There are several ways to initialize it:\n",
"1.Fetch blocks and re-initialize, this takes a long time and is the most secure\n",
"2.Initialize with a catchpoint, this requires trusting that the relay is providing the correct ledger snapshot.\n",
"3.Copy files X/Y/Z from an existing node installation from before round 1234 into the indexer data directory.\n",
}
if dbRound != 0 && !ledgerExists(datadir, Prefix) {
msg := fmt.Sprintf("%s\n%s\n%s\n%s\n",
"The ledger cache was not found in the data directory and must be initialized. There are several ways to initialize it:",
"1.Fetch blocks and re-initialize, this takes a long time and is the most secure",
"2.Initialize with a catchpoint, this requires trusting that the relay is providing the correct ledger snapshot.",
fmt.Sprintf("3.Copy files ledger.block.sqlite and ledger.tracker.sqlite from an existing node installation from before round %d into the indexer data directory.", dbRound))
return nil, fmt.Errorf("MakeProcessor() err: %s", msg)
}
l, err := ledger.OpenLedger(logging.NewLogger(), filepath.Join(path.Dir(datadir), "ledger"), false, initState, algodConfig.GetDefaultLocal())
l, err := ledger.OpenLedger(logging.NewLogger(), filepath.Join(path.Dir(datadir), Prefix), false, initState, algodConfig.GetDefaultLocal())
if err != nil {
return nil, fmt.Errorf("MakeProcessor() err: %w", err)
}
Expand Down Expand Up @@ -214,10 +215,10 @@ func prepareAccountsResources(l *indexerledger.LedgerForEvaluator, payset transa
return accounts, resources, nil
}

func ledgerExists(datadir string) bool {
func ledgerExists(datadir, prefix string) bool {
ledgerFiles := []string{
"ledger.block.sqlite",
"ledger.tracker.sqlite",
fmt.Sprintf("%s.block.sqlite", prefix),
fmt.Sprintf("%s.tracker.sqlite", prefix),
}
for _, f := range ledgerFiles {
if _, err := os.Stat(filepath.Join(path.Dir(datadir), f)); errors.Is(err, os.ErrNotExist) {
Expand Down

0 comments on commit 64fb50a

Please sign in to comment.