Skip to content

Commit

Permalink
Add execution_payload and payload_attestation_message topics (#14304
Browse files Browse the repository at this point in the history
)

* Add `execution_payload` and `payload_attestation_message` topics

* Set `SourcePubkey` to 48 bytes long

* Add randomly populated `PayloadAttestationMessage` object

* Add tests for `execution_payload` and `payload_attestation_message` topics
  • Loading branch information
jihoonsong authored and terencechain committed Dec 3, 2024
1 parent 8b63196 commit bb809d8
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 25 deletions.
60 changes: 38 additions & 22 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
Expand All @@ -19,14 +20,14 @@ import (
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
testpb "github.com/prysmaticlabs/prysm/v5/proto/testing"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/prysmaticlabs/prysm/v5/testing/util/random"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

func TestService_Broadcast(t *testing.T) {
Expand Down Expand Up @@ -523,6 +524,22 @@ func TestService_BroadcastBlob(t *testing.T) {
}

func TestService_BroadcastExecutionPayloadHeader(t *testing.T) {
msg := random.SignedExecutionPayloadHeader(t)
testBroadcast(t, SignedExecutionPayloadHeaderTopicFormat, msg)
}

func TestService_BroadcastExecutionPayloadEnvelope(t *testing.T) {
msg := random.SignedExecutionPayloadEnvelope(t)
testBroadcast(t, SignedExecutionPayloadEnvelopeTopicFormat, msg)
}

func TestService_BroadcastPayloadAttestationMessage(t *testing.T) {
msg := random.PayloadAttestationMessage(t)
testBroadcast(t, PayloadAttestationMessageTopicFormat, msg)
}

func testBroadcast(t *testing.T, topicFormat string, msg interface{}) {
// Create two peers and let them connect.
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
Expand All @@ -531,53 +548,52 @@ func TestService_BroadcastExecutionPayloadHeader(t *testing.T) {
t.Fatal("No peers")
}

p := &Service{
// Create a `Service` for the first peer.
s1 := &Service{
host: p1.BHost,
pubsub: p1.PubSub(),
joinedTopics: map[string]*pubsub.Topic{},
cfg: &Config{},
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
}

msg := random.SignedExecutionPayloadHeader(t)

// External peer subscribes to the topic.
topic := SignedExecutionPayloadHeaderTopicFormat
GossipTypeMapping[reflect.TypeOf(msg)] = topic

digest, err := p.currentForkDigest()
// The second peer subscribes to the topic.
digest, err := s1.currentForkDigest()
require.NoError(t, err)

topic = fmt.Sprintf("%s%s", fmt.Sprintf(topic, digest), p.Encoding().ProtocolSuffix())
sub, err := p2.SubscribeToTopic(topic)
topic := fmt.Sprintf(topicFormat, digest) + s1.Encoding().ProtocolSuffix()
subscription, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)

time.Sleep(50 * time.Millisecond) // Necessary delay for libp2p.
time.Sleep(50 * time.Millisecond) // Wait for libp2p to be set up.

// Async listen for the pubsub, must be before the broadcast.
// Start a goroutine listening for a pubsub message.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

incomingMessage, err := sub.Next(ctx)
incomingMessage, err := subscription.Next(ctx)
require.NoError(t, err)

// Same message received from other peer.
result := &enginev1.SignedExecutionPayloadHeader{}
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
result := msg.(ssz.Unmarshaler)
require.NoError(t, s1.Encoding().DecodeGossip(incomingMessage.Data, result))
require.DeepEqual(t, result, msg)
}()

// Unknown message to broadcast.
// An attempt to broadcast a message unmapped to a topic should fail.
ctx := context.Background()
require.ErrorContains(t, "message type is not mapped to a PubSub topic", p.Broadcast(ctx, nil))
require.ErrorContains(t, "message type is not mapped to a PubSub topic", s1.Broadcast(ctx, nil))

// Broadcast to second peer and wait.
require.NoError(t, p.Broadcast(context.Background(), msg))
// The first peer broadcasts the message to the second peer.
require.NoError(t, s1.Broadcast(ctx, msg.(protoreflect.ProtoMessage)))

// Wait for one second for the message to be delivered and processed.
if util.WaitTimeout(&wg, 1*time.Second) {
t.Error("Failed to receive pubsub within 1s")
}
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var gossipTopicMappings = map[string]func() proto.Message{
BlsToExecutionChangeSubnetTopicFormat: func() proto.Message { return &ethpb.SignedBLSToExecutionChange{} },
BlobSubnetTopicFormat: func() proto.Message { return &ethpb.BlobSidecar{} },
SignedExecutionPayloadHeaderTopicFormat: func() proto.Message { return &enginev1.SignedExecutionPayloadHeader{} },
SignedExecutionPayloadEnvelopeTopicFormat: func() proto.Message { return &enginev1.SignedExecutionPayloadEnvelope{} },
PayloadAttestationMessageTopicFormat: func() proto.Message { return &ethpb.PayloadAttestationMessage{} },
}

// GossipTopicMappings is a function to return the assigned data type
Expand Down Expand Up @@ -108,4 +110,6 @@ func init() {
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedAggregateAttestationAndProofElectra{})] = AggregateAndProofSubnetTopicFormat
// Handle ePBS objects.
GossipTypeMapping[reflect.TypeOf(&enginev1.SignedExecutionPayloadHeader{})] = SignedExecutionPayloadHeaderTopicFormat
GossipTypeMapping[reflect.TypeOf(&enginev1.SignedExecutionPayloadEnvelope{})] = SignedExecutionPayloadEnvelopeTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.PayloadAttestationMessage{})] = PayloadAttestationMessageTopicFormat
}
6 changes: 6 additions & 0 deletions beacon-chain/p2p/gossip_topic_mappings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,10 @@ func TestGossipTopicMappings_CorrectType(t *testing.T) {
pMessage = GossipTopicMappings(SignedExecutionPayloadHeaderTopicFormat, epbsForkEpoch)
_, ok = pMessage.(*enginev1.SignedExecutionPayloadHeader)
require.Equal(t, true, ok)
pMessage = GossipTopicMappings(SignedExecutionPayloadEnvelopeTopicFormat, epbsForkEpoch)
_, ok = pMessage.(*enginev1.SignedExecutionPayloadEnvelope)
require.Equal(t, true, ok)
pMessage = GossipTopicMappings(PayloadAttestationMessageTopicFormat, epbsForkEpoch)
_, ok = pMessage.(*ethpb.PayloadAttestationMessage)
require.Equal(t, true, ok)
}
9 changes: 7 additions & 2 deletions beacon-chain/p2p/topics_epbs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package p2p

const (
GossipSignedExecutionPayloadHeader = "signed_execution_payload_header"
SignedExecutionPayloadHeaderTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadHeader
GossipSignedExecutionPayloadHeader = "signed_execution_payload_header"
GossipSignedExecutionPayloadEnvelope = "signed_execution_payload_envelope"
GossipPayloadAttestationMessage = "payload_attestation_message"

SignedExecutionPayloadHeaderTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadHeader
SignedExecutionPayloadEnvelopeTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadEnvelope
PayloadAttestationMessageTopicFormat = GossipProtocolAndDigest + GossipPayloadAttestationMessage
)
11 changes: 10 additions & 1 deletion testing/util/random/epbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,15 @@ func PayloadAttestationData(t *testing.T) *ethpb.PayloadAttestationData {
}
}

// PayloadAttestationMessage creates a random PayloadAttestationMessage for testing purposes.
func PayloadAttestationMessage(t *testing.T) *ethpb.PayloadAttestationMessage {
return &ethpb.PayloadAttestationMessage{
ValidatorIndex: primitives.ValidatorIndex(randomUint64(t)),
Data: PayloadAttestationData(t),
Signature: randomBytes(96, t),
}
}

// SignedExecutionPayloadEnvelope creates a random SignedExecutionPayloadEnvelope for testing purposes.
func SignedExecutionPayloadEnvelope(t *testing.T) *enginev1.SignedExecutionPayloadEnvelope {
return &enginev1.SignedExecutionPayloadEnvelope{
Expand Down Expand Up @@ -432,7 +441,7 @@ func WithdrawalRequest(t *testing.T) *enginev1.WithdrawalRequest {
func ConsolidationRequest(t *testing.T) *enginev1.ConsolidationRequest {
return &enginev1.ConsolidationRequest{
SourceAddress: randomBytes(20, t),
SourcePubkey: randomBytes(20, t),
SourcePubkey: randomBytes(48, t),
TargetPubkey: randomBytes(48, t),
}
}
Expand Down

0 comments on commit bb809d8

Please sign in to comment.