Skip to content

Commit

Permalink
Reindex P-chain blocks (#2869)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Mar 26, 2024
1 parent f945aa5 commit f57f0f2
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 63 deletions.
16 changes: 16 additions & 0 deletions vms/platformvm/state/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

166 changes: 161 additions & 5 deletions vms/platformvm/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

"github.com/google/btree"
Expand All @@ -28,6 +30,8 @@ import (
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/hashing"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/vms/components/avax"
"github.com/ava-labs/avalanchego/vms/platformvm/block"
Expand All @@ -42,6 +46,13 @@ import (
safemath "github.com/ava-labs/avalanchego/utils/math"
)

const (
indexIterationLimit = 4096
indexIterationSleepMultiplier = 5
indexIterationSleepCap = 10 * time.Second
indexLogFrequency = 30 * time.Second
)

var (
_ State = (*state)(nil)

Expand Down Expand Up @@ -69,11 +80,12 @@ var (
ChainPrefix = []byte("chain")
SingletonPrefix = []byte("singleton")

TimestampKey = []byte("timestamp")
CurrentSupplyKey = []byte("current supply")
LastAcceptedKey = []byte("last accepted")
HeightsIndexedKey = []byte("heights indexed")
InitializedKey = []byte("initialized")
TimestampKey = []byte("timestamp")
CurrentSupplyKey = []byte("current supply")
LastAcceptedKey = []byte("last accepted")
HeightsIndexedKey = []byte("heights indexed")
InitializedKey = []byte("initialized")
BlocksReindexedKey = []byte("blocks reindexed")
)

// Chain collects all methods to manage the state of the chain for block
Expand Down Expand Up @@ -167,6 +179,14 @@ type State interface {
// Discard uncommitted changes to the database.
Abort()

// ReindexBlocks converts any block indices using the legacy storage format
// to the new format. If this database has already updated the indices,
// this function will return immediately, without iterating over the
// database.
//
// TODO: Remove after v1.12.x is activated
ReindexBlocks(lock sync.Locker, log logging.Logger) error

// Commit changes to the base database.
Commit() error

Expand Down Expand Up @@ -245,6 +265,7 @@ type stateBlk struct {
* | '-- txID -> nil
* '-. singletons
* |-- initializedKey -> nil
* |-- blocksReindexedKey -> nil
* |-- timestampKey -> timestamp
* |-- currentSupplyKey -> currentSupply
* |-- lastAcceptedKey -> lastAccepted
Expand Down Expand Up @@ -2292,3 +2313,138 @@ func parseStoredBlock(blkBytes []byte) (block.Block, bool, error) {
blk, err = block.Parse(block.GenesisCodec, blkState.Bytes)
return blk, true, err
}

func (s *state) ReindexBlocks(lock sync.Locker, log logging.Logger) error {
has, err := s.singletonDB.Has(BlocksReindexedKey)
if err != nil {
return err
}
if has {
log.Info("blocks already reindexed")
return nil
}

// It is possible that new blocks are added after grabbing this iterator.
// New blocks are guaranteed to be persisted in the new format, so we don't
// need to check them.
blockIterator := s.blockDB.NewIterator()
// Releasing is done using a closure to ensure that updating blockIterator
// will result in having the most recent iterator released when executing
// the deferred function.
defer func() {
blockIterator.Release()
}()

log.Info("starting block reindexing")

var (
startTime = time.Now()
lastCommit = startTime
nextUpdate = startTime.Add(indexLogFrequency)
numIndicesChecked = 0
numIndicesUpdated = 0
)

for blockIterator.Next() {
valueBytes := blockIterator.Value()
blk, isStateBlk, err := parseStoredBlock(valueBytes)
if err != nil {
return fmt.Errorf("failed to parse block: %w", err)
}

blkID := blk.ID()

// This block was previously stored using the legacy format, update the
// index to remove the usage of stateBlk.
if isStateBlk {
blkBytes := blk.Bytes()
if err := s.blockDB.Put(blkID[:], blkBytes); err != nil {
return fmt.Errorf("failed to write block: %w", err)
}

numIndicesUpdated++
}

numIndicesChecked++

now := time.Now()
if now.After(nextUpdate) {
nextUpdate = now.Add(indexLogFrequency)

progress := timer.ProgressFromHash(blkID[:])
eta := timer.EstimateETA(
startTime,
progress,
math.MaxUint64,
)

log.Info("reindexing blocks",
zap.Int("numIndicesUpdated", numIndicesUpdated),
zap.Int("numIndicesChecked", numIndicesChecked),
zap.Duration("eta", eta),
)
}

if numIndicesChecked%indexIterationLimit == 0 {
// We must hold the lock during committing to make sure we don't
// attempt to commit to disk while a block is concurrently being
// accepted.
lock.Lock()
err := utils.Err(
s.Commit(),
blockIterator.Error(),
)
lock.Unlock()
if err != nil {
return err
}

// We release the iterator here to allow the underlying database to
// clean up deleted state.
blockIterator.Release()

// We take the minimum here because it's possible that the node is
// currently bootstrapping. This would mean that grabbing the lock
// could take an extremely long period of time; which we should not
// delay processing for.
indexDuration := now.Sub(lastCommit)
sleepDuration := min(
indexIterationSleepMultiplier*indexDuration,
indexIterationSleepCap,
)
time.Sleep(sleepDuration)

// Make sure not to include the sleep duration into the next index
// duration.
lastCommit = time.Now()

blockIterator = s.blockDB.NewIteratorWithStart(blkID[:])
}
}

// Ensure we fully iterated over all blocks before writing that indexing has
// finished.
//
// Note: This is needed because a transient read error could cause the
// iterator to stop early.
if err := blockIterator.Error(); err != nil {
return fmt.Errorf("failed to iterate over historical blocks: %w", err)
}

if err := s.singletonDB.Put(BlocksReindexedKey, nil); err != nil {
return fmt.Errorf("failed to put marked blocks as reindexed: %w", err)
}

// We must hold the lock during committing to make sure we don't attempt to
// commit to disk while a block is concurrently being accepted.
lock.Lock()
defer lock.Unlock()

log.Info("finished block reindexing",
zap.Int("numIndicesUpdated", numIndicesUpdated),
zap.Int("numIndicesChecked", numIndicesChecked),
zap.Duration("duration", time.Since(startTime)),
)

return s.Commit()
}
Loading

0 comments on commit f57f0f2

Please sign in to comment.