Skip to content

Commit

Permalink
Merge branch 'master' into v0.11
Browse files Browse the repository at this point in the history
  • Loading branch information
rauljordan authored Apr 1, 2020
2 parents ae1b4aa + 3dd5576 commit dbb4d02
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 46 deletions.
4 changes: 4 additions & 0 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ func (s *Service) pruneGarbageState(ctx context.Context, slot uint64) error {
return err
}

if err := s.beaconDB.SaveLastArchivedIndex(ctx, 0); err != nil {
return err
}

return nil
}

Expand Down
5 changes: 4 additions & 1 deletion beacon-chain/db/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ go_library(
"attestations.go",
"backup.go",
"blocks.go",
"check_state.go",
"check_historical_state.go",
"checkpoint.go",
"deposit_contract.go",
"encoding.go",
"finalized_block_roots.go",
"kv.go",
"operations.go",
"powchain.go",
"regen_historical_states.go",
"schema.go",
"slashings.go",
"state.go",
Expand All @@ -28,13 +29,15 @@ go_library(
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//proto/beacon/db:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/cmd:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",
Expand Down
16 changes: 16 additions & 0 deletions beacon-chain/db/kv/archived_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kv

import (
"context"
"encoding/binary"

"github.com/prysmaticlabs/prysm/shared/bytesutil"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -29,6 +30,21 @@ func (k *Store) SaveLastArchivedIndex(ctx context.Context, index uint64) error {
})
}

// LastArchivedIndex from the db.
func (k *Store) LastArchivedIndex(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndex")
defer span.End()
var index uint64
err := k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
b := bucket.Get(lastArchivedIndexKey)
index = binary.LittleEndian.Uint64(b)
return nil
})

return index, err
}

// LastArchivedIndexRoot from the db.
func (k *Store) LastArchivedIndexRoot(ctx context.Context) [32]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndexRoot")
Expand Down
55 changes: 55 additions & 0 deletions beacon-chain/db/kv/check_historical_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package kv

import (
"context"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
bolt "go.etcd.io/bbolt"
)

var historicalStateDeletedKey = []byte("historical-states-deleted")

func (kv *Store) ensureNewStateServiceCompatible(ctx context.Context) error {
if !featureconfig.Get().NewStateMgmt {
return kv.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(newStateServiceCompatibleBucket)
return bkt.Put(historicalStateDeletedKey, []byte{0x01})
})
}

var historicalStateDeleted bool
kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(newStateServiceCompatibleBucket)
v := bkt.Get(historicalStateDeletedKey)
historicalStateDeleted = len(v) == 1 && v[0] == 0x01
return nil
})

regenHistoricalStatesConfirmed := false
var err error
if historicalStateDeleted {
actionText := "Looks like you stopped using --new-state-mgmt. To reuse it, the node will need " +
"to generate and save historical states. The process may take a while, - do you want to proceed? (Y/N)"
deniedText := "Historical states will not be generated. Please remove usage --new-state-mgmt"

regenHistoricalStatesConfirmed, err = cmd.ConfirmAction(actionText, deniedText)
if err != nil {
return err
}

if !regenHistoricalStatesConfirmed {
return errors.New("exiting... please do not run with flag --new-state-mgmt")
}

if err := kv.regenHistoricalStates(ctx); err != nil {
return errors.Wrap(err, "could not regenerate historical states, please retry")
}
}

return kv.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(newStateServiceCompatibleBucket)
return bkt.Put(historicalStateDeletedKey, []byte{0x00})
})
}
33 changes: 0 additions & 33 deletions beacon-chain/db/kv/check_state.go

This file was deleted.

3 changes: 2 additions & 1 deletion beacon-chain/db/kv/kv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kv

import (
"context"
"os"
"path"
"sync"
Expand Down Expand Up @@ -120,7 +121,7 @@ func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*St
return nil, err
}

if err := kv.ensureNewStateServiceCompatible(); err != nil {
if err := kv.ensureNewStateServiceCompatible(context.Background()); err != nil {
return nil, err
}

Expand Down
194 changes: 194 additions & 0 deletions beacon-chain/db/kv/regen_historical_states.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package kv

import (
"context"
"fmt"

"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
transition "github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/params"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

func (kv *Store) regenHistoricalStates(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStates")
defer span.End()

genesisState, err := kv.GenesisState(ctx)
if err != nil {
return err
}
currentState := genesisState.Copy()
startSlot := genesisState.Slot()

// Restore from last archived point if this process was previously interrupted.
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
lastArchivedIndex, err := kv.LastArchivedIndex(ctx)
if err != nil {
return err
}
if lastArchivedIndex > 0 {
archivedIndexStart := lastArchivedIndex - 1
wantedSlotBelow := archivedIndexStart*slotsPerArchivedPoint + 1
states, err := kv.HighestSlotStatesBelow(ctx, wantedSlotBelow)
if err != nil {
return err
}
if len(states) == 0 {
return errors.New("states can't be empty")
}
if states[0] == nil {
return errors.New("nil last state")
}
currentState = states[0]
startSlot = currentState.Slot()
}

lastSavedBlockArchivedIndex, err := kv.lastSavedBlockArchivedIndex(ctx)
if err != nil {
return err
}
for i := lastArchivedIndex; i <= lastSavedBlockArchivedIndex; i++ {
targetSlot := startSlot + slotsPerArchivedPoint
filter := filters.NewFilter().SetStartSlot(startSlot + 1).SetEndSlot(targetSlot)
blocks, err := kv.Blocks(ctx, filter)
if err != nil {
return err
}

// Replay blocks and replay slots if necessary.
if len(blocks) > 0 {
for i := 0; i < len(blocks); i++ {
if blocks[i].Block.Slot == 0 {
continue
}
currentState, err = regenHistoricalStateTransition(ctx, currentState, blocks[i])
if err != nil {
return err
}
}
}
if targetSlot > currentState.Slot() {
currentState, err = regenHistoricalStateProcessSlots(ctx, currentState, targetSlot)
if err != nil {
return err
}
}

if len(blocks) > 0 {
// Save the historical root, state and highest index to the DB.
if helpers.IsEpochStart(currentState.Slot()) && currentState.Slot()%slotsPerArchivedPoint == 0 && blocks[len(blocks)-1].Block.Slot&slotsPerArchivedPoint == 0 {
if err := kv.saveArchivedInfo(ctx, currentState, blocks, i); err != nil {
return err
}
log.WithFields(log.Fields{
"currentArchivedIndex/totalArchivedIndices": fmt.Sprintf("%d/%d", i, lastSavedBlockArchivedIndex),
"archivedStateSlot": currentState.Slot()}).Info("Saved historical state")
}
}
startSlot += slotsPerArchivedPoint
}
return nil
}

// This runs state transition to recompute historical state.
func regenHistoricalStateTransition(
ctx context.Context,
state *stateTrie.BeaconState,
signed *ethpb.SignedBeaconBlock,
) (*stateTrie.BeaconState, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if signed == nil || signed.Block == nil {
return nil, errors.New("block can't be nil")
}
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateTransition")
defer span.End()
var err error
state, err = regenHistoricalStateProcessSlots(ctx, state, signed.Block.Slot)
if err != nil {
return nil, errors.Wrap(err, "could not process slot")
}
state, err = transition.ProcessBlockForStateRoot(ctx, state, signed)
if err != nil {
return nil, errors.Wrap(err, "could not process block")
}
return state, nil
}

// This runs slot transition to recompute historical state.
func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.BeaconState, slot uint64) (*stateTrie.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateProcessSlots")
defer span.End()
if state == nil {
return nil, errors.New("state can't be nil")
}
if state.Slot() > slot {
err := fmt.Errorf("expected state.slot %d < slot %d", state.Slot(), slot)
return nil, err
}
if state.Slot() == slot {
return state, nil
}
for state.Slot() < slot {
state, err := transition.ProcessSlot(ctx, state)
if err != nil {
return nil, errors.Wrap(err, "could not process slot")
}
if transition.CanProcessEpoch(state) {
state, err = transition.ProcessEpochPrecompute(ctx, state)
if err != nil {
return nil, errors.Wrap(err, "could not process epoch with optimizations")
}
}
state.SetSlot(state.Slot() + 1)
}
return state, nil
}

// This retrieves the last saved block's archived index.
func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error) {
b, err := kv.HighestSlotBlocks(ctx)
if err != nil {
return 0, err
}
if len(b) == 0 {
return 0, errors.New("blocks can't be empty")
}
if b[0] == nil {
return 0, errors.New("nil last block")
}
lastSavedBlockSlot := b[0].Block.Slot
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
lastSavedBlockArchivedIndex := lastSavedBlockSlot/slotsPerArchivedPoint - 1

return lastSavedBlockArchivedIndex, nil
}

// This saved archived info (state, root, index) into the db.
func (kv *Store) saveArchivedInfo(ctx context.Context,
currentState *stateTrie.BeaconState,
blocks []*ethpb.SignedBeaconBlock,
archivedIndex uint64) error {
lastBlocksRoot, err := ssz.HashTreeRoot(blocks[len(blocks)-1].Block)
if err != nil {
return nil
}
if err := kv.SaveState(ctx, currentState, lastBlocksRoot); err != nil {
return err
}
if err := kv.SaveArchivedPointRoot(ctx, lastBlocksRoot, archivedIndex); err != nil {
return err
}
if err := kv.SaveLastArchivedIndex(ctx, archivedIndex); err != nil {
return err
}
return nil
}
Loading

0 comments on commit dbb4d02

Please sign in to comment.