From df9a534826037ddac8dcaac0b1b470ce9fa8ecd4 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Tue, 31 Mar 2020 16:54:24 -0700 Subject: [PATCH 1/2] Regen historical states for `new-state-mgmt` compatibility (#5261) --- beacon-chain/blockchain/service.go | 4 + beacon-chain/db/kv/BUILD.bazel | 5 +- beacon-chain/db/kv/archived_point.go | 16 ++ beacon-chain/db/kv/check_historical_state.go | 55 +++++ beacon-chain/db/kv/check_state.go | 33 --- beacon-chain/db/kv/kv.go | 3 +- beacon-chain/db/kv/regen_historical_states.go | 194 ++++++++++++++++++ shared/params/config.go | 2 + 8 files changed, 277 insertions(+), 35 deletions(-) create mode 100644 beacon-chain/db/kv/check_historical_state.go delete mode 100644 beacon-chain/db/kv/check_state.go create mode 100644 beacon-chain/db/kv/regen_historical_states.go diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 560a2616e05b..486a7f98c2e4 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -468,6 +468,10 @@ func (s *Service) pruneGarbageState(ctx context.Context, slot uint64) error { return err } + if err := s.beaconDB.SaveLastArchivedIndex(ctx, 0); err != nil { + return err + } + return nil } diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index 3028065ad71b..76c8dfd82296 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -8,7 +8,7 @@ go_library( "attestations.go", "backup.go", "blocks.go", - "check_state.go", + "check_historical_state.go", "checkpoint.go", "deposit_contract.go", "encoding.go", @@ -16,6 +16,7 @@ go_library( "kv.go", "operations.go", "powchain.go", + "regen_historical_states.go", "schema.go", "slashings.go", "state.go", @@ -28,6 +29,7 @@ go_library( deps = [ "//beacon-chain/cache:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/core/state:go_default_library", "//beacon-chain/db/filters:go_default_library", "//beacon-chain/db/iface:go_default_library", "//beacon-chain/state:go_default_library", @@ -35,6 +37,7 @@ go_library( "//proto/beacon/db:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/cmd:go_default_library", "//shared/featureconfig:go_default_library", "//shared/params:go_default_library", "//shared/sliceutil:go_default_library", diff --git a/beacon-chain/db/kv/archived_point.go b/beacon-chain/db/kv/archived_point.go index 1afd568fde13..f543bdd2cf3f 100644 --- a/beacon-chain/db/kv/archived_point.go +++ b/beacon-chain/db/kv/archived_point.go @@ -2,6 +2,7 @@ package kv import ( "context" + "encoding/binary" "github.com/prysmaticlabs/prysm/shared/bytesutil" bolt "go.etcd.io/bbolt" @@ -29,6 +30,21 @@ func (k *Store) SaveLastArchivedIndex(ctx context.Context, index uint64) error { }) } +// LastArchivedIndex from the db. +func (k *Store) LastArchivedIndex(ctx context.Context) (uint64, error) { + ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndex") + defer span.End() + var index uint64 + err := k.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(archivedIndexRootBucket) + b := bucket.Get(lastArchivedIndexKey) + index = binary.LittleEndian.Uint64(b) + return nil + }) + + return index, err +} + // LastArchivedIndexRoot from the db. func (k *Store) LastArchivedIndexRoot(ctx context.Context) [32]byte { ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndexRoot") diff --git a/beacon-chain/db/kv/check_historical_state.go b/beacon-chain/db/kv/check_historical_state.go new file mode 100644 index 000000000000..6210cd400ed9 --- /dev/null +++ b/beacon-chain/db/kv/check_historical_state.go @@ -0,0 +1,55 @@ +package kv + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/shared/cmd" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + bolt "go.etcd.io/bbolt" +) + +var historicalStateDeletedKey = []byte("historical-states-deleted") + +func (kv *Store) ensureNewStateServiceCompatible(ctx context.Context) error { + if !featureconfig.Get().NewStateMgmt { + return kv.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(newStateServiceCompatibleBucket) + return bkt.Put(historicalStateDeletedKey, []byte{0x01}) + }) + } + + var historicalStateDeleted bool + kv.db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(newStateServiceCompatibleBucket) + v := bkt.Get(historicalStateDeletedKey) + historicalStateDeleted = len(v) == 1 && v[0] == 0x01 + return nil + }) + + regenHistoricalStatesConfirmed := false + var err error + if historicalStateDeleted { + actionText := "Looks like you stopped using --new-state-mgmt. To reuse it, the node will need " + + "to generate and save historical states. The process may take a while, - do you want to proceed? (Y/N)" + deniedText := "Historical states will not be generated. Please remove usage --new-state-mgmt" + + regenHistoricalStatesConfirmed, err = cmd.ConfirmAction(actionText, deniedText) + if err != nil { + return err + } + + if !regenHistoricalStatesConfirmed { + return errors.New("exiting... please do not run with flag --new-state-mgmt") + } + + if err := kv.regenHistoricalStates(ctx); err != nil { + return errors.Wrap(err, "could not regenerate historical states, please retry") + } + } + + return kv.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(newStateServiceCompatibleBucket) + return bkt.Put(historicalStateDeletedKey, []byte{0x00}) + }) +} diff --git a/beacon-chain/db/kv/check_state.go b/beacon-chain/db/kv/check_state.go deleted file mode 100644 index f83fe1755b6c..000000000000 --- a/beacon-chain/db/kv/check_state.go +++ /dev/null @@ -1,33 +0,0 @@ -package kv - -import ( - "errors" - - "github.com/prysmaticlabs/prysm/shared/featureconfig" - bolt "go.etcd.io/bbolt" -) - -var historicalStateDeletedKey = []byte("historical-states-deleted") - -func (kv *Store) ensureNewStateServiceCompatible() error { - if !featureconfig.Get().NewStateMgmt { - return kv.db.Update(func(tx *bolt.Tx) error { - bkt := tx.Bucket(newStateServiceCompatibleBucket) - return bkt.Put(historicalStateDeletedKey, []byte{0x01}) - }) - } - - var historicalStateDeleted bool - kv.db.View(func(tx *bolt.Tx) error { - bkt := tx.Bucket(newStateServiceCompatibleBucket) - v := bkt.Get(historicalStateDeletedKey) - historicalStateDeleted = len(v) == 1 && v[0] == 0x01 - return nil - }) - - if historicalStateDeleted { - return errors.New("historical states were pruned in db, do not run with flag --new-state-mgmt") - } - - return nil -} diff --git a/beacon-chain/db/kv/kv.go b/beacon-chain/db/kv/kv.go index 4dff30d2973e..feb570bf0788 100644 --- a/beacon-chain/db/kv/kv.go +++ b/beacon-chain/db/kv/kv.go @@ -1,6 +1,7 @@ package kv import ( + "context" "os" "path" "sync" @@ -120,7 +121,7 @@ func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*St return nil, err } - if err := kv.ensureNewStateServiceCompatible(); err != nil { + if err := kv.ensureNewStateServiceCompatible(context.Background()); err != nil { return nil, err } diff --git a/beacon-chain/db/kv/regen_historical_states.go b/beacon-chain/db/kv/regen_historical_states.go new file mode 100644 index 000000000000..d0931578b555 --- /dev/null +++ b/beacon-chain/db/kv/regen_historical_states.go @@ -0,0 +1,194 @@ +package kv + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + transition "github.com/prysmaticlabs/prysm/beacon-chain/core/state" + "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" + stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + "github.com/prysmaticlabs/prysm/shared/params" + log "github.com/sirupsen/logrus" + "go.opencensus.io/trace" +) + +func (kv *Store) regenHistoricalStates(ctx context.Context) error { + ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStates") + defer span.End() + + genesisState, err := kv.GenesisState(ctx) + if err != nil { + return err + } + currentState := genesisState.Copy() + startSlot := genesisState.Slot() + + // Restore from last archived point if this process was previously interrupted. + slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint + lastArchivedIndex, err := kv.LastArchivedIndex(ctx) + if err != nil { + return err + } + if lastArchivedIndex > 0 { + archivedIndexStart := lastArchivedIndex - 1 + wantedSlotBelow := archivedIndexStart*slotsPerArchivedPoint + 1 + states, err := kv.HighestSlotStatesBelow(ctx, wantedSlotBelow) + if err != nil { + return err + } + if len(states) == 0 { + return errors.New("states can't be empty") + } + if states[0] == nil { + return errors.New("nil last state") + } + currentState = states[0] + startSlot = currentState.Slot() + } + + lastSavedBlockArchivedIndex, err := kv.lastSavedBlockArchivedIndex(ctx) + if err != nil { + return err + } + for i := lastArchivedIndex; i <= lastSavedBlockArchivedIndex; i++ { + targetSlot := startSlot + slotsPerArchivedPoint + filter := filters.NewFilter().SetStartSlot(startSlot + 1).SetEndSlot(targetSlot) + blocks, err := kv.Blocks(ctx, filter) + if err != nil { + return err + } + + // Replay blocks and replay slots if necessary. + if len(blocks) > 0 { + for i := 0; i < len(blocks); i++ { + if blocks[i].Block.Slot == 0 { + continue + } + currentState, err = regenHistoricalStateTransition(ctx, currentState, blocks[i]) + if err != nil { + return err + } + } + } + if targetSlot > currentState.Slot() { + currentState, err = regenHistoricalStateProcessSlots(ctx, currentState, targetSlot) + if err != nil { + return err + } + } + + if len(blocks) > 0 { + // Save the historical root, state and highest index to the DB. + if helpers.IsEpochStart(currentState.Slot()) && currentState.Slot()%slotsPerArchivedPoint == 0 && blocks[len(blocks)-1].Block.Slot&slotsPerArchivedPoint == 0 { + if err := kv.saveArchivedInfo(ctx, currentState, blocks, i); err != nil { + return err + } + log.WithFields(log.Fields{ + "currentArchivedIndex/totalArchivedIndices": fmt.Sprintf("%d/%d", i, lastSavedBlockArchivedIndex), + "archivedStateSlot": currentState.Slot()}).Info("Saved historical state") + } + } + startSlot += slotsPerArchivedPoint + } + return nil +} + +// This runs state transition to recompute historical state. +func regenHistoricalStateTransition( + ctx context.Context, + state *stateTrie.BeaconState, + signed *ethpb.SignedBeaconBlock, +) (*stateTrie.BeaconState, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + if signed == nil || signed.Block == nil { + return nil, errors.New("block can't be nil") + } + ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateTransition") + defer span.End() + var err error + state, err = regenHistoricalStateProcessSlots(ctx, state, signed.Block.Slot) + if err != nil { + return nil, errors.Wrap(err, "could not process slot") + } + state, err = transition.ProcessBlockForStateRoot(ctx, state, signed) + if err != nil { + return nil, errors.Wrap(err, "could not process block") + } + return state, nil +} + +// This runs slot transition to recompute historical state. +func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.BeaconState, slot uint64) (*stateTrie.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateProcessSlots") + defer span.End() + if state == nil { + return nil, errors.New("state can't be nil") + } + if state.Slot() > slot { + err := fmt.Errorf("expected state.slot %d < slot %d", state.Slot(), slot) + return nil, err + } + if state.Slot() == slot { + return state, nil + } + for state.Slot() < slot { + state, err := transition.ProcessSlot(ctx, state) + if err != nil { + return nil, errors.Wrap(err, "could not process slot") + } + if transition.CanProcessEpoch(state) { + state, err = transition.ProcessEpochPrecompute(ctx, state) + if err != nil { + return nil, errors.Wrap(err, "could not process epoch with optimizations") + } + } + state.SetSlot(state.Slot() + 1) + } + return state, nil +} + +// This retrieves the last saved block's archived index. +func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error) { + b, err := kv.HighestSlotBlocks(ctx) + if err != nil { + return 0, err + } + if len(b) == 0 { + return 0, errors.New("blocks can't be empty") + } + if b[0] == nil { + return 0, errors.New("nil last block") + } + lastSavedBlockSlot := b[0].Block.Slot + slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint + lastSavedBlockArchivedIndex := lastSavedBlockSlot/slotsPerArchivedPoint - 1 + + return lastSavedBlockArchivedIndex, nil +} + +// This saved archived info (state, root, index) into the db. +func (kv *Store) saveArchivedInfo(ctx context.Context, + currentState *stateTrie.BeaconState, + blocks []*ethpb.SignedBeaconBlock, + archivedIndex uint64) error { + lastBlocksRoot, err := ssz.HashTreeRoot(blocks[len(blocks)-1].Block) + if err != nil { + return nil + } + if err := kv.SaveState(ctx, currentState, lastBlocksRoot); err != nil { + return err + } + if err := kv.SaveArchivedPointRoot(ctx, lastBlocksRoot, archivedIndex); err != nil { + return err + } + if err := kv.SaveLastArchivedIndex(ctx, archivedIndex); err != nil { + return err + } + return nil +} diff --git a/shared/params/config.go b/shared/params/config.go index 10027b725c75..e57ca44e4d87 100644 --- a/shared/params/config.go +++ b/shared/params/config.go @@ -94,6 +94,7 @@ type BeaconChainConfig struct { EmptySignature [96]byte // EmptySignature is used to represent a zeroed out BLS Signature. DefaultPageSize int // DefaultPageSize defines the default page size for RPC server request. MaxPeersToSync int // MaxPeersToSync describes the limit for number of peers in round robin sync. + SlotsPerArchivedPoint uint64 // SlotsPerArchivedPoint defines the number of slots per one archived point. // Slasher constants. WeakSubjectivityPeriod uint64 // WeakSubjectivityPeriod defines the time period expressed in number of epochs were proof of stake network should validate block headers and attestations for slashable events. @@ -185,6 +186,7 @@ var defaultBeaconConfig = &BeaconChainConfig{ EmptySignature: [96]byte{}, DefaultPageSize: 250, MaxPeersToSync: 15, + SlotsPerArchivedPoint: 256, // Slasher related values. WeakSubjectivityPeriod: 54000, From 3dd5576e33b2130df295bc3c95f8d80e28a429ae Mon Sep 17 00:00:00 2001 From: Ivan Martinez Date: Wed, 1 Apr 2020 11:23:23 -0400 Subject: [PATCH 2/2] Improvement, flake fixes (#5263) --- endtoend/endtoend_test.go | 6 +++--- endtoend/minimal_antiflake_e2e_1_test.go | 1 - endtoend/minimal_antiflake_e2e_2_test.go | 1 - endtoend/params/params.go | 13 +++++++------ 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/endtoend/endtoend_test.go b/endtoend/endtoend_test.go index 11dfc38405b8..a8e83feab757 100644 --- a/endtoend/endtoend_test.go +++ b/endtoend/endtoend_test.go @@ -27,7 +27,7 @@ func runEndToEndTest(t *testing.T, config *types.E2EConfig) { keystorePath, eth1PID := components.StartEth1Node(t) multiAddrs, bProcessIDs := components.StartBeaconNodes(t, config) valProcessIDs := components.StartValidators(t, config, keystorePath) - processIDs := append(bProcessIDs, valProcessIDs...) + processIDs := append(valProcessIDs, bProcessIDs...) processIDs = append(processIDs, eth1PID) defer helpers.LogOutput(t, config) defer helpers.KillProcesses(t, processIDs) @@ -95,7 +95,7 @@ func runEndToEndTest(t *testing.T, config *types.E2EConfig) { return } - multiAddr, pID := components.StartNewBeaconNode(t, config, multiAddrs) + multiAddr, processID := components.StartNewBeaconNode(t, config, multiAddrs) multiAddrs = append(multiAddrs, multiAddr) index := e2e.TestParams.BeaconNodeCount syncConn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", e2e.TestParams.BeaconNodeRPCPort+index), grpc.WithInsecure()) @@ -115,7 +115,7 @@ func runEndToEndTest(t *testing.T, config *types.E2EConfig) { t.Fatal(err) } defer helpers.LogErrorOutput(t, syncLogFile, "beacon chain node", index) - defer helpers.KillProcesses(t, []int{pID}) + defer helpers.KillProcesses(t, []int{processID}) if err := helpers.WaitForTextInFile(syncLogFile, "Synced up to"); err != nil { t.Fatalf("Failed to sync: %v", err) } diff --git a/endtoend/minimal_antiflake_e2e_1_test.go b/endtoend/minimal_antiflake_e2e_1_test.go index 5e5c3cfefac0..5b8914f49d74 100644 --- a/endtoend/minimal_antiflake_e2e_1_test.go +++ b/endtoend/minimal_antiflake_e2e_1_test.go @@ -11,7 +11,6 @@ import ( ) func TestEndToEnd_AntiFlake_MinimalConfig_1(t *testing.T) { - t.Skip("skipping to prevent E2E flakes") testutil.ResetCache() params.UseMinimalConfig() diff --git a/endtoend/minimal_antiflake_e2e_2_test.go b/endtoend/minimal_antiflake_e2e_2_test.go index e95112f20372..547ea4876955 100644 --- a/endtoend/minimal_antiflake_e2e_2_test.go +++ b/endtoend/minimal_antiflake_e2e_2_test.go @@ -11,7 +11,6 @@ import ( ) func TestEndToEnd_AntiFlake_MinimalConfig_2(t *testing.T) { - t.Skip("skipping to prevent E2E flakes") testutil.ResetCache() params.UseMinimalConfig() diff --git a/endtoend/params/params.go b/endtoend/params/params.go index 931a5627234a..d0c7c6fb4a58 100644 --- a/endtoend/params/params.go +++ b/endtoend/params/params.go @@ -45,6 +45,7 @@ func Init(beaconNodeCount int) error { } testIndexStr, ok := os.LookupEnv("TEST_SHARD_INDEX") if !ok { + // Picking a index that won't normally be used. testIndexStr = "8" } testIndex, err := strconv.Atoi(testIndexStr) @@ -57,12 +58,12 @@ func Init(beaconNodeCount int) error { LogPath: logPath, TestShardIndex: testIndex, BeaconNodeCount: beaconNodeCount, - Eth1RPCPort: 3000 + testIndex*100, // Multiplying 100 here so the test index doesn't conflict with the other node ports. - BeaconNodeRPCPort: 4000 + testIndex*100, - BeaconNodeMetricsPort: 5000 + testIndex*100, - ValidatorMetricsPort: 6000 + testIndex*100, - SlasherRPCPort: 7000 + testIndex*100, - SlasherMetricsPort: 8000 + testIndex*100, + Eth1RPCPort: 3100 + testIndex*100, // Multiplying 100 here so the test index doesn't conflict with the other node ports. + BeaconNodeRPCPort: 4100 + testIndex*100, + BeaconNodeMetricsPort: 5100 + testIndex*100, + ValidatorMetricsPort: 6100 + testIndex*100, + SlasherRPCPort: 7100 + testIndex*100, + SlasherMetricsPort: 8100 + testIndex*100, } return nil }