Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fork Digest For Gossip Topics #5191

Merged
merged 15 commits into from
Mar 26, 2020
12 changes: 6 additions & 6 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
// GossipTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup.
var GossipTopicMappings = map[string]proto.Message{
"/eth2/beacon_block": &pb.SignedBeaconBlock{},
"/eth2/committee_index%d_beacon_attestation": &pb.Attestation{},
"/eth2/voluntary_exit": &pb.SignedVoluntaryExit{},
"/eth2/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/attester_slashing": &pb.AttesterSlashing{},
"/eth2/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{},
"/eth2/%x/beacon_block": &pb.SignedBeaconBlock{},
"/eth2/%x/committee_index%d_beacon_attestation": &pb.Attestation{},
"/eth2/%x/voluntary_exit": &pb.SignedVoluntaryExit{},
"/eth2/%x/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/%x/attester_slashing": &pb.AttesterSlashing{},
"/eth2/%x/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{},
}

// GossipTypeMapping is the inverse of GossipTopicMappings so that an arbitrary protobuf message
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ConnectionHandler interface {
// EncodingProvider provides p2p network encoding.
type EncodingProvider interface {
Encoding() encoder.NetworkEncoding
ForkDigest() [4]byte
}

// PubSubProvider provides the p2p pubsub protocol.
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ func (s *Service) Peers() *peers.Status {
return s.peers
}

// ForkDigest returns the current fork digest of
// the node.
func (s *Service) ForkDigest() [4]byte {
//no-op
return [4]byte{}
}

// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee id's for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee id's.
Expand Down
9 changes: 8 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
if _, err := p.Encoding().Encode(buf, msg); err != nil {
p.t.Fatalf("Failed to encode message: %v", err)
}
topic = fmt.Sprintf(topic, p.ForkDigest())
topic = topic + p.Encoding().ProtocolSuffix()

if err := ps.Publish(topic+p.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
if err := ps.Publish(topic, buf.Bytes()); err != nil {
p.t.Fatalf("Failed to publish message; %v", err)
}
}
Expand Down Expand Up @@ -237,3 +239,8 @@ func (p *TestP2P) FindPeersWithSubnet(index uint64) (bool, error) {
func (p *TestP2P) RefreshENR(epoch uint64) {
return
}

// ForkDigest mocks the p2p func.
func (p *TestP2P) ForkDigest() [4]byte {
return [4]byte{}
}
8 changes: 8 additions & 0 deletions beacon-chain/sync/decode_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error
}
topic := msg.TopicIDs[0]
topic = strings.TrimSuffix(topic, r.p2p.Encoding().ProtocolSuffix())
topic = r.replaceForkDigest(topic)
base, ok := p2p.GossipTopicMappings[topic]
if !ok {
return nil, fmt.Errorf("no message mapped for topic %s", topic)
Expand All @@ -26,3 +27,10 @@ func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error
}
return m, nil
}

// replaces our fork digest with the formatter
nisdas marked this conversation as resolved.
Show resolved Hide resolved
func (r *Service) replaceForkDigest(topic string) string {
subStrings := strings.Split(topic, "/")
subStrings[2] = "%x"
return strings.Join(subStrings, "/")
}
2 changes: 1 addition & 1 deletion beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const genericError = "internal service error"
const rateLimitedError = "rate limited"

var errWrongForkVersion = errors.New("wrong fork version")
var errWrongForkVersion = errors.New("wrong fork digest version")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var errWrongForkVersion = errors.New("wrong fork digest version")
var errWrongForkDigestVersion = errors.New("wrong fork digest version")

var errInvalidEpoch = errors.New("invalid epoch")

var responseCodeSuccess = byte(0x00)
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/sync/initial-sync-old/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus
peerStatus.Add(peer.PeerID(), nil, network.DirOutbound, []uint64{})
peerStatus.SetConnectionState(peer.PeerID(), peers.PeerConnected)
peerStatus.SetChainState(peer.PeerID(), &p2ppb.Status{
HeadForkVersion: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
HeadRoot: []byte("head_root"),
HeadSlot: datum.headSlot,
ForkDigest: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
HeadRoot: []byte("head_root"),
HeadSlot: datum.headSlot,
})
}
}
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/sync/initial-sync/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus
peerStatus.Add(peer.PeerID(), nil, network.DirOutbound, []uint64{})
peerStatus.SetConnectionState(peer.PeerID(), peers.PeerConnected)
peerStatus.SetChainState(peer.PeerID(), &p2ppb.Status{
HeadForkVersion: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
HeadRoot: []byte("head_root"),
HeadSlot: datum.headSlot,
ForkDigest: params.BeaconConfig().GenesisForkVersion,
FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)),
FinalizedEpoch: datum.finalizedEpoch,
HeadRoot: []byte("head_root"),
HeadSlot: datum.headSlot,
})
}
}
Expand Down
25 changes: 14 additions & 11 deletions beacon-chain/sync/rpc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
return err
}

forkDigest := r.p2p.ForkDigest()
resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
ForkDigest: forkDigest[:],
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
}
stream, err := r.p2p.Send(ctx, resp, id)
if err != nil {
Expand Down Expand Up @@ -155,12 +156,13 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
return err
}

forkDigest := r.p2p.ForkDigest()
resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
ForkDigest: forkDigest[:],
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
}

if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
Expand All @@ -172,7 +174,8 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
}

func (r *Service) validateStatusMessage(msg *pb.Status, stream network.Stream) error {
if !bytes.Equal(params.BeaconConfig().GenesisForkVersion, msg.HeadForkVersion) {
forkDigest := r.p2p.ForkDigest()
if !bytes.Equal(forkDigest[:], msg.ForkDigest) {
return errWrongForkVersion
}
genesis := r.chain.GenesisTime()
Expand Down
36 changes: 18 additions & 18 deletions beacon-chain/sync/rpc_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
t.Fatal(err)
}

err = r.statusRPCHandler(context.Background(), &pb.Status{HeadForkVersion: []byte("fake")}, stream1)
err = r.statusRPCHandler(context.Background(), &pb.Status{ForkDigest: []byte("fake")}, stream1)
if err != errWrongForkVersion {
t.Errorf("Expected error %v, got %v", errWrongForkVersion, err)
}
Expand Down Expand Up @@ -130,11 +130,11 @@ func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) {
t.Fatal(err)
}
expected := &pb.Status{
HeadForkVersion: params.BeaconConfig().GenesisForkVersion,
HeadSlot: genesisState.Slot(),
HeadRoot: headRoot[:],
FinalizedEpoch: 5,
FinalizedRoot: finalizedRoot[:],
ForkDigest: params.BeaconConfig().GenesisForkVersion,
HeadSlot: genesisState.Slot(),
HeadRoot: headRoot[:],
FinalizedEpoch: 5,
FinalizedRoot: finalizedRoot[:],
}
if !proto.Equal(out, expected) {
t.Errorf("Did not receive expected message. Got %+v wanted %+v", out, expected)
Expand All @@ -145,7 +145,7 @@ func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) {
t.Fatal(err)
}

err = r.statusRPCHandler(context.Background(), &pb.Status{HeadForkVersion: params.BeaconConfig().GenesisForkVersion}, stream1)
err = r.statusRPCHandler(context.Background(), &pb.Status{ForkDigest: params.BeaconConfig().GenesisForkVersion}, stream1)
if err != nil {
t.Errorf("Unxpected error: %v", err)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) {
}
log.WithField("status", out).Warn("received status")

resp := &pb.Status{HeadSlot: 100, HeadForkVersion: params.BeaconConfig().GenesisForkVersion}
resp := &pb.Status{HeadSlot: 100, ForkDigest: params.BeaconConfig().GenesisForkVersion}

if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -300,11 +300,11 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
t.Fatal(err)
}
expected := &pb.Status{
HeadForkVersion: params.BeaconConfig().GenesisForkVersion,
HeadSlot: genesisState.Slot(),
HeadRoot: headRoot[:],
FinalizedEpoch: 5,
FinalizedRoot: finalizedRoot[:],
ForkDigest: params.BeaconConfig().GenesisForkVersion,
HeadSlot: genesisState.Slot(),
HeadRoot: headRoot[:],
FinalizedEpoch: 5,
FinalizedRoot: finalizedRoot[:],
}
if !proto.Equal(out, expected) {
t.Errorf("Did not receive expected message. Got %+v wanted %+v", out, expected)
Expand Down Expand Up @@ -378,11 +378,11 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
t.Fatal(err)
}
expected := &pb.Status{
HeadForkVersion: []byte{1, 1, 1, 1},
HeadSlot: genesisState.Slot(),
HeadRoot: headRoot[:],
FinalizedEpoch: 5,
FinalizedRoot: finalizedRoot[:],
ForkDigest: []byte{1, 1, 1, 1},
HeadSlot: genesisState.Slot(),
HeadRoot: headRoot[:],
FinalizedEpoch: 5,
FinalizedRoot: finalizedRoot[:],
}
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
log.WithError(err).Error("Failed to write to stream")
Expand Down
Loading