Skip to content

Commit

Permalink
Process Execution Payload Envelope in Chain Service (#14295)
Browse files Browse the repository at this point in the history
Adds the processing of execution payload envelope
Corrects the protos for attestations and slashings in Electra versions
Adds generators of full blocks for Electra
  • Loading branch information
potuz committed Nov 4, 2024
1 parent 79eeef1 commit ea0b439
Show file tree
Hide file tree
Showing 43 changed files with 1,849 additions and 1,089 deletions.
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"chain_info.go",
"chain_info_forkchoice.go",
"currently_syncing_block.go",
"currently_syncing_execution_payload_envelope.go",
"defragment.go",
"error.go",
"execution_engine.go",
Expand All @@ -25,6 +26,7 @@ go_library(
"receive_attestation.go",
"receive_blob.go",
"receive_block.go",
"receive_execution_payload_envelope.go",
"service.go",
"tracked_proposer.go",
"weak_subjectivity_checks.go",
Expand All @@ -43,6 +45,7 @@ go_library(
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/epbs:go_default_library",
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
Expand Down Expand Up @@ -97,6 +100,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)
Expand Down Expand Up @@ -124,6 +128,7 @@ go_test(
"process_block_test.go",
"receive_attestation_test.go",
"receive_block_test.go",
"receive_execution_payload_envelope_test.go",
"service_norace_test.go",
"service_test.go",
"setup_test.go",
Expand Down Expand Up @@ -165,6 +170,7 @@ go_test(
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/epbs:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/trie:go_default_library",
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ func (s *Service) recoverStateSummary(ctx context.Context, blockRoot [32]byte) (
return nil, errBlockDoesNotExist
}

// PayloadBeingSynced returns whether the payload for the block with the given root is currently being synced
func (s *Service) PayloadBeingSynced(root [32]byte) bool {
return s.payloadBeingSynced.isSyncing(root)
}

// BlockBeingSynced returns whether the block with the given root is currently being synced
func (s *Service) BlockBeingSynced(root [32]byte) bool {
return s.blockBeingSynced.isSyncing(root)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package blockchain

import "sync"

type currentlySyncingPayload struct {
sync.Mutex
roots map[[32]byte]struct{}
}

func (b *currentlySyncingPayload) set(root [32]byte) {
b.Lock()
defer b.Unlock()
b.roots[root] = struct{}{}
}

func (b *currentlySyncingPayload) unset(root [32]byte) {
b.Lock()
defer b.Unlock()
delete(b.roots, root)
}

func (b *currentlySyncingPayload) isSyncing(root [32]byte) bool {
b.Lock()
defer b.Unlock()
_, ok := b.roots[root]
return ok
}
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/execution_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,8 +1056,8 @@ func TestService_removeInvalidBlockAndState(t *testing.T) {

require.NoError(t, service.removeInvalidBlockAndState(ctx, [][32]byte{r1, r2}))

require.Equal(t, false, service.hasBlock(ctx, r1))
require.Equal(t, false, service.hasBlock(ctx, r2))
require.Equal(t, false, service.chainHasBlock(ctx, r1))
require.Equal(t, false, service.chainHasBlock(ctx, r2))
require.Equal(t, false, service.cfg.BeaconDB.HasStateSummary(ctx, r1))
require.Equal(t, false, service.cfg.BeaconDB.HasStateSummary(ctx, r2))
has, err := service.cfg.StateGen.HasState(ctx, r1)
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/blockchain/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ var (
Name: "chain_service_processing_milliseconds",
Help: "Total time to call a chain service in ReceiveBlock()",
})
executionEngineProcessingTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "execution_engine_processing_milliseconds",
Help: "Total time to process an execution payload envelope in ReceiveExecutionPayloadEnvelope()",
})
dataAvailWaitedTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "da_waited_time_milliseconds",
Help: "Total time spent waiting for a data availability check in ReceiveBlock()",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/receive_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *Service) processAttestations(ctx context.Context, disparity time.Durati
}

hasState := s.cfg.BeaconDB.HasStateSummary(ctx, bytesutil.ToBytes32(a.GetData().BeaconBlockRoot))
hasBlock := s.hasBlock(ctx, bytesutil.ToBytes32(a.GetData().BeaconBlockRoot))
hasBlock := s.chainHasBlock(ctx, bytesutil.ToBytes32(a.GetData().BeaconBlockRoot))
if !(hasState && hasBlock) {
continue
}
Expand Down
166 changes: 166 additions & 0 deletions beacon-chain/blockchain/receive_execution_payload_envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package blockchain

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/epbs"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
)

// ReceiveExecutionPayloadEnvelope is a function that defines the operations (minus pubsub)
// that are performed on a received execution payload envelope. The operations consist of:
// 1. Validate the payload, apply state transition.
// 2. Apply fork choice to the processed payload
// 3. Save latest head info
func (s *Service) ReceiveExecutionPayloadEnvelope(ctx context.Context, envelope interfaces.ROExecutionPayloadEnvelope, _ das.AvailabilityStore) error {
receivedTime := time.Now()
root, err := envelope.BeaconBlockRoot()
if err != nil {
return errors.Wrap(err, "could not get beacon block root")
}
s.payloadBeingSynced.set(root)
defer s.payloadBeingSynced.unset(root)

preState, err := s.getPayloadEnvelopePrestate(ctx, envelope)
if err != nil {
return errors.Wrap(err, "could not get prestate")
}

eg, _ := errgroup.WithContext(ctx)
var postState state.BeaconState
eg.Go(func() error {
if err := epbs.ValidatePayloadStateTransition(ctx, preState, envelope); err != nil {
return errors.Wrap(err, "failed to validate consensus state transition function")
}
return nil
})
var isValidPayload bool
eg.Go(func() error {
var err error
isValidPayload, err = s.validateExecutionOnEnvelope(ctx, envelope)
if err != nil {
return errors.Wrap(err, "could not notify the engine of the new payload")
}
return nil
})

if err := eg.Wait(); err != nil {
return err
}
_ = isValidPayload
_ = postState
daStartTime := time.Now()
// TODO: Add DA check
daWaitedTime := time.Since(daStartTime)
dataAvailWaitedTime.Observe(float64(daWaitedTime.Milliseconds()))
// TODO: Add Head update, cache handling, postProcessing
timeWithoutDaWait := time.Since(receivedTime) - daWaitedTime
executionEngineProcessingTime.Observe(float64(timeWithoutDaWait.Milliseconds()))
return nil
}

// notifyNewPayload signals execution engine on a new payload.
// It returns true if the EL has returned VALID for the block
func (s *Service) notifyNewEnvelope(ctx context.Context, envelope interfaces.ROExecutionPayloadEnvelope) (bool, error) {
ctx, span := trace.StartSpan(ctx, "blockChain.notifyNewPayload")
defer span.End()

payload, err := envelope.Execution()
if err != nil {
return false, errors.Wrap(err, "could not get execution payload")
}

var lastValidHash []byte
var versionedHashes []common.Hash
versionedHashes, err = envelope.VersionedHashes()
if err != nil {
return false, errors.Wrap(err, "could not get versioned hashes to feed the engine")
}
root, err := envelope.BeaconBlockRoot()
if err != nil {
return false, errors.Wrap(err, "could not get beacon block root")
}
parentRoot, err := s.ParentRoot(root)
if err != nil {
return false, errors.Wrap(err, "could not get parent block root")
}
pr := common.Hash(parentRoot)
lastValidHash, err = s.cfg.ExecutionEngineCaller.NewPayload(ctx, payload, versionedHashes, &pr)
switch {
case err == nil:
newPayloadValidNodeCount.Inc()
return true, nil
case errors.Is(err, execution.ErrAcceptedSyncingPayloadStatus):
newPayloadOptimisticNodeCount.Inc()
log.WithFields(logrus.Fields{
"payloadBlockHash": fmt.Sprintf("%#x", bytesutil.Trunc(payload.BlockHash())),
}).Info("Called new payload with optimistic block")
return false, nil
case errors.Is(err, execution.ErrInvalidPayloadStatus):
lvh := bytesutil.ToBytes32(lastValidHash)
return false, invalidBlock{
error: ErrInvalidPayload,
lastValidHash: lvh,
}
default:
return false, errors.WithMessage(ErrUndefinedExecutionEngineError, err.Error())
}
}

// validateExecutionOnEnvelope notifies the engine of the incoming execution payload and returns true if the payload is valid
func (s *Service) validateExecutionOnEnvelope(ctx context.Context, e interfaces.ROExecutionPayloadEnvelope) (bool, error) {
isValidPayload, err := s.notifyNewEnvelope(ctx, e)
if err == nil {
return isValidPayload, nil
}
blockRoot, rootErr := e.BeaconBlockRoot()
if rootErr != nil {
return false, errors.Wrap(rootErr, "could not get beacon block root")
}
parentRoot, rootErr := s.ParentRoot(blockRoot)
if rootErr != nil {
return false, errors.Wrap(rootErr, "could not get parent block root")
}
s.cfg.ForkChoiceStore.Lock()
err = s.handleInvalidExecutionError(ctx, err, blockRoot, parentRoot)
s.cfg.ForkChoiceStore.Unlock()
return false, err
}

func (s *Service) getPayloadEnvelopePrestate(ctx context.Context, e interfaces.ROExecutionPayloadEnvelope) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "blockChain.getPayloadEnvelopePreState")
defer span.End()

// Verify incoming payload has a valid pre state.
root, err := e.BeaconBlockRoot()
if err != nil {
return nil, errors.Wrap(err, "could not get beacon block root")
}
// Verify the referred block is known to forkchoice
if !s.InForkchoice(root) {
return nil, errors.New("Cannot import execution payload envelope for unknown block")
}
if err := s.verifyBlkPreState(ctx, root); err != nil {
return nil, errors.Wrap(err, "could not verify payload prestate")
}

preState, err := s.cfg.StateGen.StateByRoot(ctx, root)
if err != nil {
return nil, errors.Wrap(err, "could not get pre state")
}
if preState == nil || preState.IsNil() {
return nil, errors.Wrap(err, "nil pre state")
}
return preState, nil
}
104 changes: 104 additions & 0 deletions beacon-chain/blockchain/receive_execution_payload_envelope_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package blockchain

import (
"testing"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
mockExecution "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing"
forkchoicetypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v5/consensus-types/epbs"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
)

func Test_getPayloadEnvelopePrestate(t *testing.T) {
service, tr := minimalTestService(t)
ctx, fcs := tr.ctx, tr.fcs

gs, _ := util.DeterministicGenesisStateEpbs(t, 32)
require.NoError(t, service.saveGenesisData(ctx, gs))
require.NoError(t, fcs.UpdateFinalizedCheckpoint(&forkchoicetypes.Checkpoint{Root: service.originBlockRoot}))

p := &enginev1.ExecutionPayloadEnvelope{
Payload: &enginev1.ExecutionPayloadElectra{},
BeaconBlockRoot: service.originBlockRoot[:],
}
e, err := epbs.WrappedROExecutionPayloadEnvelope(p)
require.NoError(t, err)

_, err = service.getPayloadEnvelopePrestate(ctx, e)
require.NoError(t, err)
}

func Test_notifyNewEnvelope(t *testing.T) {
service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache()))
ctx, fcs := tr.ctx, tr.fcs
gs, _ := util.DeterministicGenesisStateEpbs(t, 32)
require.NoError(t, service.saveGenesisData(ctx, gs))
require.NoError(t, fcs.UpdateFinalizedCheckpoint(&forkchoicetypes.Checkpoint{Root: service.originBlockRoot}))
p := &enginev1.ExecutionPayloadEnvelope{
Payload: &enginev1.ExecutionPayloadElectra{},
BeaconBlockRoot: service.originBlockRoot[:],
}
e, err := epbs.WrappedROExecutionPayloadEnvelope(p)
require.NoError(t, err)
engine := &mockExecution.EngineClient{}
service.cfg.ExecutionEngineCaller = engine
isValidPayload, err := service.notifyNewEnvelope(ctx, e)
require.NoError(t, err)
require.Equal(t, true, isValidPayload)
}

func Test_validateExecutionOnEnvelope(t *testing.T) {
service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache()))
ctx, fcs := tr.ctx, tr.fcs
gs, _ := util.DeterministicGenesisStateEpbs(t, 32)
require.NoError(t, service.saveGenesisData(ctx, gs))
require.NoError(t, fcs.UpdateFinalizedCheckpoint(&forkchoicetypes.Checkpoint{Root: service.originBlockRoot}))
p := &enginev1.ExecutionPayloadEnvelope{
Payload: &enginev1.ExecutionPayloadElectra{},
BeaconBlockRoot: service.originBlockRoot[:],
}
e, err := epbs.WrappedROExecutionPayloadEnvelope(p)
require.NoError(t, err)
engine := &mockExecution.EngineClient{}
service.cfg.ExecutionEngineCaller = engine
isValidPayload, err := service.validateExecutionOnEnvelope(ctx, e)
require.NoError(t, err)
require.Equal(t, true, isValidPayload)
}

func Test_ReceiveExecutionPayloadEnvelope(t *testing.T) {
service, tr := minimalTestService(t, WithPayloadIDCache(cache.NewPayloadIDCache()))
ctx, fcs := tr.ctx, tr.fcs
gs, _ := util.DeterministicGenesisStateEpbs(t, 32)
require.NoError(t, service.saveGenesisData(ctx, gs))
require.NoError(t, fcs.UpdateFinalizedCheckpoint(&forkchoicetypes.Checkpoint{Root: service.originBlockRoot}))
post := gs.Copy()
p := &enginev1.ExecutionPayloadEnvelope{
Payload: &enginev1.ExecutionPayloadElectra{
ParentHash: make([]byte, 32),
BlockHash: make([]byte, 32),
},
BeaconBlockRoot: service.originBlockRoot[:],
BlobKzgCommitments: make([][]byte, 0),
StateRoot: make([]byte, 32),
}
e, err := epbs.WrappedROExecutionPayloadEnvelope(p)
require.NoError(t, err)
das := &das.MockAvailabilityStore{}

blockHeader := post.LatestBlockHeader()
prevStateRoot, err := post.HashTreeRoot(ctx)
require.NoError(t, err)
blockHeader.StateRoot = prevStateRoot[:]
require.NoError(t, post.SetLatestBlockHeader(blockHeader))
stRoot, err := post.HashTreeRoot(ctx)
require.NoError(t, err)
p.StateRoot = stRoot[:]
engine := &mockExecution.EngineClient{}
service.cfg.ExecutionEngineCaller = engine
require.NoError(t, service.ReceiveExecutionPayloadEnvelope(ctx, e, das))
}
Loading

0 comments on commit ea0b439

Please sign in to comment.