Skip to content

Commit

Permalink
Add ePBS to db (#13971)
Browse files Browse the repository at this point in the history
* Add ePBS to db
  • Loading branch information
potuz committed Jul 20, 2024
1 parent aec2a05 commit db58cfd
Show file tree
Hide file tree
Showing 20 changed files with 2,255 additions and 28 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type ReadOnlyDatabase interface {
DepositContractAddress(ctx context.Context) ([]byte, error)
// ExecutionChainData operations.
ExecutionChainData(ctx context.Context) (*ethpb.ETH1ChainData, error)
SignedBlindPayloadEnvelope(ctx context.Context, blockRoot []byte) (*ethpb.SignedBlindPayloadEnvelope, error)
// Fee recipients operations.
FeeRecipientByValidatorID(ctx context.Context, id primitives.ValidatorIndex) (common.Address, error)
RegistrationByValidatorID(ctx context.Context, id primitives.ValidatorIndex) (*ethpb.ValidatorRegistrationV1, error)
Expand Down Expand Up @@ -87,6 +88,7 @@ type NoHeadAccessDatabase interface {
SaveDepositContractAddress(ctx context.Context, addr common.Address) error
// SaveExecutionChainData operations.
SaveExecutionChainData(ctx context.Context, data *ethpb.ETH1ChainData) error
SaveBlindPayloadEnvelope(ctx context.Context, envelope *ethpb.SignedBlindPayloadEnvelope) error
// Run any required database migrations.
RunMigrations(ctx context.Context) error
// Fee recipients operations.
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/db/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"archived_point.go",
"backfill.go",
"backup.go",
"blind_payload_envelope.go",
"blocks.go",
"checkpoint.go",
"deposit_contract.go",
Expand Down Expand Up @@ -78,6 +79,7 @@ go_test(
"archived_point_test.go",
"backfill_test.go",
"backup_test.go",
"blind_payload_envelope_test.go",
"blocks_test.go",
"checkpoint_test.go",
"deposit_contract_test.go",
Expand Down Expand Up @@ -119,6 +121,7 @@ go_test(
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//testing/util/random:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
Expand Down
45 changes: 45 additions & 0 deletions beacon-chain/db/kv/blind_payload_envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kv

import (
"context"

ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)

// SaveBlindPayloadEnvelope saves a signed execution payload envelope blind in the database.
func (s *Store) SaveBlindPayloadEnvelope(ctx context.Context, env *ethpb.SignedBlindPayloadEnvelope) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlindPayloadEnvelope")
defer span.End()

enc, err := encode(ctx, env)
if err != nil {
return err
}

r := env.Message.BeaconBlockRoot
err = s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(executionPayloadEnvelopeBucket)
return bucket.Put(r, enc)
})

return err
}

// SignedBlindPayloadEnvelope retrieves a signed execution payload envelope blind from the database.
func (s *Store) SignedBlindPayloadEnvelope(ctx context.Context, blockRoot []byte) (*ethpb.SignedBlindPayloadEnvelope, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SignedBlindPayloadEnvelope")
defer span.End()

env := &ethpb.SignedBlindPayloadEnvelope{}
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(executionPayloadEnvelopeBucket)
enc := bkt.Get(blockRoot)
if enc == nil {
return ErrNotFound
}
return decode(ctx, enc, env)
})
return env, err
}
23 changes: 23 additions & 0 deletions beacon-chain/db/kv/blind_payload_envelope_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kv

import (
"context"
"testing"

"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util/random"
)

func TestStore_SignedBlindPayloadEnvelope(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
_, err := db.SignedBlindPayloadEnvelope(ctx, []byte("test"))
require.ErrorIs(t, err, ErrNotFound)

env := random.SignedBlindPayloadEnvelope(t)
err = db.SaveBlindPayloadEnvelope(ctx, env)
require.NoError(t, err)
got, err := db.SignedBlindPayloadEnvelope(ctx, env.Message.BeaconBlockRoot)
require.NoError(t, err)
require.DeepEqual(t, got, env)
}
7 changes: 7 additions & 0 deletions beacon-chain/db/kv/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,11 @@ func unmarshalBlock(_ context.Context, enc []byte) (interfaces.ReadOnlySignedBea
if err := rawBlock.UnmarshalSSZ(enc[len(electraBlindKey):]); err != nil {
return nil, errors.Wrap(err, "could not unmarshal blinded Electra block")
}
case hasEpbsKey(enc):
rawBlock = &ethpb.SignedBeaconBlockEpbs{}
if err := rawBlock.UnmarshalSSZ(enc[len(epbsKey):]); err != nil {
return nil, errors.Wrap(err, "could not unmarshal EPBS block")
}
default:
// Marshal block bytes to phase 0 beacon block.
rawBlock = &ethpb.SignedBeaconBlock{}
Expand Down Expand Up @@ -852,6 +857,8 @@ func encodeBlock(blk interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {

func keyForBlock(blk interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
switch blk.Version() {
case version.EPBS:
return epbsKey, nil
case version.Electra:
if blk.IsBlinded() {
return electraBlindKey, nil
Expand Down
34 changes: 23 additions & 11 deletions beacon-chain/db/kv/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/prysmaticlabs/prysm/v5/testing/util/random"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -146,6 +147,17 @@ var blockTests = []struct {
}
return blocks.NewSignedBeaconBlock(b)
}},
{
name: "epbs",
newBlock: func(slot primitives.Slot, root []byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
b := random.SignedBeaconBlock(&testing.T{})
b.Block.Slot = slot
if root != nil {
b.Block.ParentRoot = root
}
return blocks.NewSignedBeaconBlock(b)
},
},
}

func TestStore_SaveBlock_NoDuplicates(t *testing.T) {
Expand Down Expand Up @@ -202,7 +214,7 @@ func TestStore_BlocksCRUD(t *testing.T) {
retrievedBlock, err = db.Block(ctx, blockRoot)
require.NoError(t, err)
wanted := retrievedBlock
if retrievedBlock.Version() >= version.Bellatrix {
if retrievedBlock.Version() >= version.Bellatrix && retrievedBlock.Version() < version.EPBS {
wanted, err = retrievedBlock.ToBlinded()
require.NoError(t, err)
}
Expand Down Expand Up @@ -390,7 +402,7 @@ func TestStore_BlocksCRUD_NoCache(t *testing.T) {
require.NoError(t, err)

wanted := blk
if blk.Version() >= version.Bellatrix {
if blk.Version() >= version.Bellatrix && blk.Version() < version.EPBS {
wanted, err = blk.ToBlinded()
require.NoError(t, err)
}
Expand Down Expand Up @@ -609,7 +621,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err := db.Block(ctx, root)
require.NoError(t, err)
wanted := block1
if block1.Version() >= version.Bellatrix {
if block1.Version() >= version.Bellatrix && block1.Version() < version.EPBS {
wanted, err = wanted.ToBlinded()
require.NoError(t, err)
}
Expand All @@ -627,7 +639,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted2 := block2
if block2.Version() >= version.Bellatrix {
if block2.Version() >= version.Bellatrix && block2.Version() < version.EPBS {
wanted2, err = block2.ToBlinded()
require.NoError(t, err)
}
Expand All @@ -645,7 +657,7 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = block3
if block3.Version() >= version.Bellatrix {
if block3.Version() >= version.Bellatrix && block3.Version() < version.EPBS {
wanted, err = wanted.ToBlinded()
require.NoError(t, err)
}
Expand Down Expand Up @@ -681,7 +693,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err := db.Block(ctx, root)
require.NoError(t, err)
wanted := block1
if block1.Version() >= version.Bellatrix {
if block1.Version() >= version.Bellatrix && block1.Version() < version.EPBS {
wanted, err = block1.ToBlinded()
require.NoError(t, err)
}
Expand All @@ -698,7 +710,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = genesisBlock
if genesisBlock.Version() >= version.Bellatrix {
if genesisBlock.Version() >= version.Bellatrix && genesisBlock.Version() < version.EPBS {
wanted, err = genesisBlock.ToBlinded()
require.NoError(t, err)
}
Expand All @@ -715,7 +727,7 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
b, err = db.Block(ctx, root)
require.NoError(t, err)
wanted = genesisBlock
if genesisBlock.Version() >= version.Bellatrix {
if genesisBlock.Version() >= version.Bellatrix && genesisBlock.Version() < version.EPBS {
wanted, err = genesisBlock.ToBlinded()
require.NoError(t, err)
}
Expand Down Expand Up @@ -811,7 +823,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
require.NoError(t, err)

wanted := b1
if b1.Version() >= version.Bellatrix {
if b1.Version() >= version.Bellatrix && b1.Version() < version.EPBS {
wanted, err = b1.ToBlinded()
require.NoError(t, err)
}
Expand All @@ -827,7 +839,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
t.Fatalf("Expected 2 blocks, received %d blocks", len(retrievedBlocks))
}
wanted = b2
if b2.Version() >= version.Bellatrix {
if b2.Version() >= version.Bellatrix && b2.Version() < version.EPBS {
wanted, err = b2.ToBlinded()
require.NoError(t, err)
}
Expand All @@ -837,7 +849,7 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, true, proto.Equal(wantedPb, retrieved0Pb), "Wanted: %v, received: %v", retrievedBlocks[0], wanted)
wanted = b3
if b3.Version() >= version.Bellatrix {
if b3.Version() >= version.Bellatrix && b3.Version() < version.EPBS {
wanted, err = b3.ToBlinded()
require.NoError(t, err)
}
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/db/kv/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func isSSZStorageFormat(obj interface{}) bool {
return true
case *ethpb.VoluntaryExit:
return true
case *ethpb.SignedBlindPayloadEnvelope:
return true
case *ethpb.ValidatorRegistrationV1:
return true
default:
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/db/kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,10 @@ func hasElectraBlindKey(enc []byte) bool {
}
return bytes.Equal(enc[:len(electraBlindKey)], electraBlindKey)
}

func hasEpbsKey(enc []byte) bool {
if len(epbsKey) >= len(enc) {
return false
}
return bytes.Equal(enc[:len(epbsKey)], epbsKey)
}
3 changes: 3 additions & 0 deletions beacon-chain/db/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ var Buckets = [][]byte{

feeRecipientBucket,
registrationBucket,

// ePBS
executionPayloadEnvelopeBucket,
}

// KVStoreOption is a functional option that modifies a kv.Store.
Expand Down
20 changes: 11 additions & 9 deletions beacon-chain/db/kv/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ package kv
// it easy to scan for keys that have a certain shard number as a prefix and return those
// corresponding attestations.
var (
blocksBucket = []byte("blocks")
stateBucket = []byte("state")
stateSummaryBucket = []byte("state-summary")
chainMetadataBucket = []byte("chain-metadata")
checkpointBucket = []byte("check-point")
powchainBucket = []byte("powchain")
stateValidatorsBucket = []byte("state-validators")
feeRecipientBucket = []byte("fee-recipient")
registrationBucket = []byte("registration")
blocksBucket = []byte("blocks")
stateBucket = []byte("state")
stateSummaryBucket = []byte("state-summary")
chainMetadataBucket = []byte("chain-metadata")
checkpointBucket = []byte("check-point")
powchainBucket = []byte("powchain")
stateValidatorsBucket = []byte("state-validators")
feeRecipientBucket = []byte("fee-recipient")
registrationBucket = []byte("registration")
executionPayloadEnvelopeBucket = []byte("execution-payload-envelope")

// Deprecated: This bucket was migrated in PR 6461. Do not use, except for migrations.
slotsHasObjectBucket = []byte("slots-has-objects")
Expand Down Expand Up @@ -50,6 +51,7 @@ var (
denebBlindKey = []byte("blind-deneb")
electraKey = []byte("electra")
electraBlindKey = []byte("blind-electra")
epbsKey = []byte("epbs")

// block root included in the beacon state used by weak subjectivity initial sync
originCheckpointBlockRootKey = []byte("origin-checkpoint-block-root")
Expand Down
48 changes: 48 additions & 0 deletions beacon-chain/db/kv/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ func (s *Store) saveStatesEfficientInternal(ctx context.Context, tx *bolt.Tx, bl
if err := s.processElectra(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
case *ethpb.BeaconStateEPBS:
if err := s.processEPBS(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
default:
return errors.New("invalid state type")
}
Expand Down Expand Up @@ -367,6 +371,24 @@ func (s *Store) processElectra(ctx context.Context, pbState *ethpb.BeaconStateEl
return nil
}

func (s *Store) processEPBS(ctx context.Context, pbState *ethpb.BeaconStateEPBS, rootHash []byte, bucket, valIdxBkt *bolt.Bucket, validatorKey []byte) error {
valEntries := pbState.Validators
pbState.Validators = make([]*ethpb.Validator, 0)
rawObj, err := pbState.MarshalSSZ()
if err != nil {
return err
}
encodedState := snappy.Encode(nil, append(epbsKey, rawObj...))
if err := bucket.Put(rootHash, encodedState); err != nil {
return err
}
pbState.Validators = valEntries
if err := valIdxBkt.Put(rootHash, validatorKey); err != nil {
return err
}
return nil
}

func (s *Store) storeValidatorEntriesSeparately(ctx context.Context, tx *bolt.Tx, validatorsEntries map[string]*ethpb.Validator) error {
valBkt := tx.Bucket(stateValidatorsBucket)
for hashStr, validatorEntry := range validatorsEntries {
Expand Down Expand Up @@ -516,6 +538,19 @@ func (s *Store) unmarshalState(_ context.Context, enc []byte, validatorEntries [
}

switch {
case hasEpbsKey(enc):
protoState := &ethpb.BeaconStateEPBS{}
if err := protoState.UnmarshalSSZ(enc[len(epbsKey):]); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding for EPBS")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if ok {
protoState.Validators = validatorEntries
}
return statenative.InitializeFromProtoEpbs(protoState)
case hasElectraKey(enc):
protoState := &ethpb.BeaconStateElectra{}
if err := protoState.UnmarshalSSZ(enc[len(electraKey):]); err != nil {
Expand Down Expand Up @@ -675,6 +710,19 @@ func marshalState(ctx context.Context, st state.ReadOnlyBeaconState) ([]byte, er
return nil, err
}
return snappy.Encode(nil, append(electraKey, rawObj...)), nil
case *ethpb.BeaconStateEPBS:
rState, ok := st.ToProtoUnsafe().(*ethpb.BeaconStateEPBS)
if !ok {
return nil, errors.New("non valid inner state")
}
if rState == nil {
return nil, errors.New("nil state")
}
rawObj, err := rState.MarshalSSZ()
if err != nil {
return nil, err
}
return snappy.Encode(nil, append(epbsKey, rawObj...)), nil
default:
return nil, errors.New("invalid inner state")
}
Expand Down
Loading

0 comments on commit db58cfd

Please sign in to comment.