From 388d4816de4c4e63208cc330aec173fe2cd1dbd7 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 24 Mar 2020 20:27:28 +0800 Subject: [PATCH 01/10] update for the day --- beacon-chain/p2p/gossip_topic_mappings.go | 12 +-- beacon-chain/p2p/interfaces.go | 1 + beacon-chain/p2p/service.go | 7 ++ beacon-chain/p2p/testing/p2p.go | 9 ++- beacon-chain/sync/decode_pubsub.go | 8 ++ .../sync/initial-sync-old/round_robin_test.go | 10 +-- .../sync/initial-sync/round_robin_test.go | 10 +-- beacon-chain/sync/rpc_status.go | 25 +++--- beacon-chain/sync/rpc_status_test.go | 36 ++++----- beacon-chain/sync/subscriber.go | 77 +++++++++++-------- ...committee_index_beacon_attestation_test.go | 2 +- beacon-chain/sync/subscriber_test.go | 8 +- ...date_committee_index_beacon_attestation.go | 2 +- ...committee_index_beacon_attestation_test.go | 12 +-- proto/beacon/p2p/v1/messages.pb.go | 69 ++++++++--------- proto/beacon/p2p/v1/messages.proto | 2 +- 16 files changed, 162 insertions(+), 128 deletions(-) diff --git a/beacon-chain/p2p/gossip_topic_mappings.go b/beacon-chain/p2p/gossip_topic_mappings.go index 8bbd1bc97f60..c39b192e3dda 100644 --- a/beacon-chain/p2p/gossip_topic_mappings.go +++ b/beacon-chain/p2p/gossip_topic_mappings.go @@ -10,12 +10,12 @@ import ( // GossipTopicMappings represent the protocol ID to protobuf message type map for easy // lookup. var GossipTopicMappings = map[string]proto.Message{ - "/eth2/beacon_block": &pb.SignedBeaconBlock{}, - "/eth2/committee_index%d_beacon_attestation": &pb.Attestation{}, - "/eth2/voluntary_exit": &pb.SignedVoluntaryExit{}, - "/eth2/proposer_slashing": &pb.ProposerSlashing{}, - "/eth2/attester_slashing": &pb.AttesterSlashing{}, - "/eth2/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{}, + "/eth2/%x/beacon_block": &pb.SignedBeaconBlock{}, + "/eth2/%x/committee_index%d_beacon_attestation": &pb.Attestation{}, + "/eth2/%x/voluntary_exit": &pb.SignedVoluntaryExit{}, + "/eth2/%x/proposer_slashing": &pb.ProposerSlashing{}, + "/eth2/%x/attester_slashing": &pb.AttesterSlashing{}, + "/eth2/%x/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{}, } // GossipTypeMapping is the inverse of GossipTopicMappings so that an arbitrary protobuf message diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 88fd333b2fd9..9940270a0b46 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -42,6 +42,7 @@ type ConnectionHandler interface { // EncodingProvider provides p2p network encoding. type EncodingProvider interface { Encoding() encoder.NetworkEncoding + ForkDigest() [4]byte } // PubSubProvider provides the p2p pubsub protocol. diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 8749c9b551a9..7a872e7e4351 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -300,6 +300,13 @@ func (s *Service) Peers() *peers.Status { return s.peers } +// ForkDigest returns the current fork digest of +// the node. +func (s *Service) ForkDigest() [4]byte { + //no-op + return [4]byte{} +} + // RefreshENR uses an epoch to refresh the enr entry for our node // with the tracked committee id's for the epoch, allowing our node // to be dynamically discoverable by others given our tracked committee id's. diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 36aef6bf33c5..d9cb5c9ba8cc 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -115,8 +115,10 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) { if _, err := p.Encoding().Encode(buf, msg); err != nil { p.t.Fatalf("Failed to encode message: %v", err) } + topic = fmt.Sprintf(topic, p.ForkDigest()) + topic = topic + p.Encoding().ProtocolSuffix() - if err := ps.Publish(topic+p.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil { + if err := ps.Publish(topic, buf.Bytes()); err != nil { p.t.Fatalf("Failed to publish message; %v", err) } } @@ -237,3 +239,8 @@ func (p *TestP2P) FindPeersWithSubnet(index uint64) (bool, error) { func (p *TestP2P) RefreshENR(epoch uint64) { return } + +// ForkDigest mocks the p2p func. +func (p *TestP2P) ForkDigest() [4]byte { + return [4]byte{} +} diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index bde115cc7c2d..bf07a01395a0 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -16,6 +16,7 @@ func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error } topic := msg.TopicIDs[0] topic = strings.TrimSuffix(topic, r.p2p.Encoding().ProtocolSuffix()) + topic = r.replaceForkDigest(topic) base, ok := p2p.GossipTopicMappings[topic] if !ok { return nil, fmt.Errorf("no message mapped for topic %s", topic) @@ -26,3 +27,10 @@ func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error } return m, nil } + +// replaces our fork digest with the formatter +func (r *Service) replaceForkDigest(topic string) string { + subStrings := strings.Split(topic, "/") + subStrings[2] = "%x" + return strings.Join(subStrings, "/") +} diff --git a/beacon-chain/sync/initial-sync-old/round_robin_test.go b/beacon-chain/sync/initial-sync-old/round_robin_test.go index fa529bb56cda..4a273d1b7e36 100644 --- a/beacon-chain/sync/initial-sync-old/round_robin_test.go +++ b/beacon-chain/sync/initial-sync-old/round_robin_test.go @@ -373,11 +373,11 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus peerStatus.Add(peer.PeerID(), nil, network.DirOutbound, []uint64{}) peerStatus.SetConnectionState(peer.PeerID(), peers.PeerConnected) peerStatus.SetChainState(peer.PeerID(), &p2ppb.Status{ - HeadForkVersion: params.BeaconConfig().GenesisForkVersion, - FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)), - FinalizedEpoch: datum.finalizedEpoch, - HeadRoot: []byte("head_root"), - HeadSlot: datum.headSlot, + ForkDigest: params.BeaconConfig().GenesisForkVersion, + FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)), + FinalizedEpoch: datum.finalizedEpoch, + HeadRoot: []byte("head_root"), + HeadSlot: datum.headSlot, }) } } diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 855830535e2f..15bf29c4fbf5 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -373,11 +373,11 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus peerStatus.Add(peer.PeerID(), nil, network.DirOutbound, []uint64{}) peerStatus.SetConnectionState(peer.PeerID(), peers.PeerConnected) peerStatus.SetChainState(peer.PeerID(), &p2ppb.Status{ - HeadForkVersion: params.BeaconConfig().GenesisForkVersion, - FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)), - FinalizedEpoch: datum.finalizedEpoch, - HeadRoot: []byte("head_root"), - HeadSlot: datum.headSlot, + ForkDigest: params.BeaconConfig().GenesisForkVersion, + FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)), + FinalizedEpoch: datum.finalizedEpoch, + HeadRoot: []byte("head_root"), + HeadSlot: datum.headSlot, }) } } diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 48ae6f197b73..bf51d5334d89 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -77,12 +77,13 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { return err } + forkDigest := r.p2p.ForkDigest() resp := &pb.Status{ - HeadForkVersion: r.chain.CurrentFork().CurrentVersion, - FinalizedRoot: r.chain.FinalizedCheckpt().Root, - FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch, - HeadRoot: headRoot, - HeadSlot: r.chain.HeadSlot(), + ForkDigest: forkDigest[:], + FinalizedRoot: r.chain.FinalizedCheckpt().Root, + FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch, + HeadRoot: headRoot, + HeadSlot: r.chain.HeadSlot(), } stream, err := r.p2p.Send(ctx, resp, id) if err != nil { @@ -155,12 +156,13 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream return err } + forkDigest := r.p2p.ForkDigest() resp := &pb.Status{ - HeadForkVersion: r.chain.CurrentFork().CurrentVersion, - FinalizedRoot: r.chain.FinalizedCheckpt().Root, - FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch, - HeadRoot: headRoot, - HeadSlot: r.chain.HeadSlot(), + ForkDigest: forkDigest[:], + FinalizedRoot: r.chain.FinalizedCheckpt().Root, + FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch, + HeadRoot: headRoot, + HeadSlot: r.chain.HeadSlot(), } if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { @@ -172,7 +174,8 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream } func (r *Service) validateStatusMessage(msg *pb.Status, stream network.Stream) error { - if !bytes.Equal(params.BeaconConfig().GenesisForkVersion, msg.HeadForkVersion) { + forkDigest := r.p2p.ForkDigest() + if !bytes.Equal(forkDigest[:], msg.ForkDigest) { return errWrongForkVersion } genesis := r.chain.GenesisTime() diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index f7ceb5023b89..92f520ad83c1 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -59,7 +59,7 @@ func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { t.Fatal(err) } - err = r.statusRPCHandler(context.Background(), &pb.Status{HeadForkVersion: []byte("fake")}, stream1) + err = r.statusRPCHandler(context.Background(), &pb.Status{ForkDigest: []byte("fake")}, stream1) if err != errWrongForkVersion { t.Errorf("Expected error %v, got %v", errWrongForkVersion, err) } @@ -130,11 +130,11 @@ func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) { t.Fatal(err) } expected := &pb.Status{ - HeadForkVersion: params.BeaconConfig().GenesisForkVersion, - HeadSlot: genesisState.Slot(), - HeadRoot: headRoot[:], - FinalizedEpoch: 5, - FinalizedRoot: finalizedRoot[:], + ForkDigest: params.BeaconConfig().GenesisForkVersion, + HeadSlot: genesisState.Slot(), + HeadRoot: headRoot[:], + FinalizedEpoch: 5, + FinalizedRoot: finalizedRoot[:], } if !proto.Equal(out, expected) { t.Errorf("Did not receive expected message. Got %+v wanted %+v", out, expected) @@ -145,7 +145,7 @@ func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) { t.Fatal(err) } - err = r.statusRPCHandler(context.Background(), &pb.Status{HeadForkVersion: params.BeaconConfig().GenesisForkVersion}, stream1) + err = r.statusRPCHandler(context.Background(), &pb.Status{ForkDigest: params.BeaconConfig().GenesisForkVersion}, stream1) if err != nil { t.Errorf("Unxpected error: %v", err) } @@ -194,7 +194,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) { } log.WithField("status", out).Warn("received status") - resp := &pb.Status{HeadSlot: 100, HeadForkVersion: params.BeaconConfig().GenesisForkVersion} + resp := &pb.Status{HeadSlot: 100, ForkDigest: params.BeaconConfig().GenesisForkVersion} if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { t.Fatal(err) @@ -300,11 +300,11 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) { t.Fatal(err) } expected := &pb.Status{ - HeadForkVersion: params.BeaconConfig().GenesisForkVersion, - HeadSlot: genesisState.Slot(), - HeadRoot: headRoot[:], - FinalizedEpoch: 5, - FinalizedRoot: finalizedRoot[:], + ForkDigest: params.BeaconConfig().GenesisForkVersion, + HeadSlot: genesisState.Slot(), + HeadRoot: headRoot[:], + FinalizedEpoch: 5, + FinalizedRoot: finalizedRoot[:], } if !proto.Equal(out, expected) { t.Errorf("Did not receive expected message. Got %+v wanted %+v", out, expected) @@ -378,11 +378,11 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { t.Fatal(err) } expected := &pb.Status{ - HeadForkVersion: []byte{1, 1, 1, 1}, - HeadSlot: genesisState.Slot(), - HeadRoot: headRoot[:], - FinalizedEpoch: 5, - FinalizedRoot: finalizedRoot[:], + ForkDigest: []byte{1, 1, 1, 1}, + HeadSlot: genesisState.Slot(), + HeadRoot: headRoot[:], + FinalizedEpoch: 5, + FinalizedRoot: finalizedRoot[:], } if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { log.WithError(err).Error("Failed to write to stream") diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 733bcbf50a98..b6912424032c 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "runtime/debug" + "strings" "time" "github.com/gogo/protobuf/proto" @@ -41,67 +42,65 @@ func (r *Service) noopValidator(ctx context.Context, _ peer.ID, msg *pubsub.Mess // Register PubSub subscribers func (r *Service) registerSubscribers() { - go func() { - // Wait until chain start. - stateChannel := make(chan *feed.Event, 1) - stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel) - defer stateSub.Unsubscribe() - for r.chainStarted == false { - select { - case event := <-stateChannel: - if event.Type == statefeed.Initialized { - data := event.Data.(*statefeed.InitializedData) - log.WithField("starttime", data.StartTime).Debug("Received state initialized event") - if data.StartTime.After(roughtime.Now()) { - stateSub.Unsubscribe() - time.Sleep(roughtime.Until(data.StartTime)) - } - r.chainStarted = true + // Wait until chain start. + stateChannel := make(chan *feed.Event, 1) + stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel) + defer stateSub.Unsubscribe() + for r.chainStarted == false { + select { + case event := <-stateChannel: + if event.Type == statefeed.Initialized { + data := event.Data.(*statefeed.InitializedData) + log.WithField("starttime", data.StartTime).Debug("Received state initialized event") + if data.StartTime.After(roughtime.Now()) { + stateSub.Unsubscribe() + time.Sleep(roughtime.Until(data.StartTime)) } - case <-r.ctx.Done(): - log.Debug("Context closed, exiting goroutine") - return - case err := <-stateSub.Err(): - log.WithError(err).Error("Subscription to state notifier failed") - return + r.chainStarted = true } + case <-r.ctx.Done(): + log.Debug("Context closed, exiting goroutine") + return + case err := <-stateSub.Err(): + log.WithError(err).Error("Subscription to state notifier failed") + return } - }() + } r.subscribe( - "/eth2/beacon_block", + "/eth2/%x/beacon_block", r.validateBeaconBlockPubSub, r.beaconBlockSubscriber, ) r.subscribe( - "/eth2/beacon_aggregate_and_proof", + "/eth2/%x/beacon_aggregate_and_proof", r.validateAggregateAndProof, r.beaconAggregateProofSubscriber, ) r.subscribe( - "/eth2/voluntary_exit", + "/eth2/%x/voluntary_exit", r.validateVoluntaryExit, r.voluntaryExitSubscriber, ) r.subscribe( - "/eth2/proposer_slashing", + "/eth2/%x/proposer_slashing", r.validateProposerSlashing, r.proposerSlashingSubscriber, ) r.subscribe( - "/eth2/attester_slashing", + "/eth2/%x/attester_slashing", r.validateAttesterSlashing, r.attesterSlashingSubscriber, ) if featureconfig.Get().EnableDynamicCommitteeSubnets { r.subscribeDynamicWithSubnets( - "/eth2/committee_index%d_beacon_attestation", + "/eth2/%x/committee_index%d_beacon_attestation", r.committeeIndices, /* determineSubsLen */ r.validateCommitteeIndexBeaconAttestation, /* validator */ r.committeeIndexBeaconAttestationSubscriber, /* message handler */ ) } else { r.subscribeDynamic( - "/eth2/committee_index%d_beacon_attestation", + "/eth2/%x/committee_index%d_beacon_attestation", r.committeesCount, /* determineSubsLen */ r.validateCommitteeIndexBeaconAttestation, /* validator */ r.committeeIndexBeaconAttestationSubscriber, /* message handler */ @@ -116,7 +115,7 @@ func (r *Service) subscribe(topic string, validator pubsub.Validator, handle sub if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) } - return r.subscribeWithBase(base, topic, validator, handle) + return r.subscribeWithBase(base, r.addDigestToTopic(topic), validator, handle) } func (r *Service) subscribeWithBase(base proto.Message, topic string, validator pubsub.Validator, handle subHandler) *pubsub.Subscription { @@ -216,6 +215,7 @@ func (r *Service) subscribeDynamicWithSubnets( if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) } + digest := r.p2p.ForkDigest() subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot) @@ -253,7 +253,7 @@ func (r *Service) subscribeDynamicWithSubnets( // do not subscribe if we have no peers in the same // subnet topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] - subnetTopic := fmt.Sprintf(topic, idx) + subnetTopic := fmt.Sprintf(topic, digest, idx) numOfPeers := r.p2p.PubSub().ListPeers(subnetTopic) if len(r.p2p.Peers().SubscribedToSubnet(idx)) == 0 && len(numOfPeers) == 0 { log.Debugf("No peers found subscribed to attestation gossip subnet with "+ @@ -290,7 +290,7 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) } - + digest := r.p2p.ForkDigest() var subscriptions []*pubsub.Subscription stateChannel := make(chan *feed.Event, 1) @@ -317,7 +317,7 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i } } else if len(subscriptions) < wantedSubs { // Increase topics for i := len(subscriptions); i < wantedSubs; i++ { - sub := r.subscribeWithBase(base, fmt.Sprintf(topicFormat, i), validate, handle) + sub := r.subscribeWithBase(base, fmt.Sprintf(topicFormat, digest, i), validate, handle) subscriptions = append(subscriptions, sub) } } @@ -325,3 +325,12 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i } }() } + +// add fork digest to topic +func (r *Service) addDigestToTopic(topic string) string { + if !strings.Contains(topic, "%x") { + panic("topic does not have appropriate formatter for digest") + } + digest := r.p2p.ForkDigest() + return fmt.Sprintf(topic, digest) +} diff --git a/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go b/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go index 1553ac00fbc5..fb9b072807eb 100644 --- a/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go +++ b/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go @@ -79,7 +79,7 @@ func TestService_committeeIndexBeaconAttestationSubscriber_ValidMessage(t *testi Signature: sKeys[0].Sign([]byte("foo")).Marshal(), } - p.ReceivePubSub("/eth2/committee_index0_beacon_attestation", att) + p.ReceivePubSub("/eth2/00000000/committee_index0_beacon_attestation", att) time.Sleep(time.Second) diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 88753f06d76c..cf550b4451de 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -32,7 +32,7 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) { p2p: p2p, initialSync: &mockSync.Sync{IsSyncing: false}, } - topic := "/eth2/voluntary_exit" + topic := "/eth2/%x/voluntary_exit" var wg sync.WaitGroup wg.Add(1) @@ -69,7 +69,7 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) { db: d, seenAttesterSlashingCache: c, } - topic := "/eth2/attester_slashing" + topic := "/eth2/%x/attester_slashing" var wg sync.WaitGroup wg.Add(1) params.OverrideBeaconConfig(params.MinimalSpecConfig()) @@ -117,7 +117,7 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) { db: d, seenProposerSlashingCache: c, } - topic := "/eth2/proposer_slashing" + topic := "/eth2/%x/proposer_slashing" var wg sync.WaitGroup wg.Add(1) params.OverrideBeaconConfig(params.MinimalSpecConfig()) @@ -161,7 +161,7 @@ func TestSubscribe_WaitToSync(t *testing.T) { initialSync: &mockSync.Sync{IsSyncing: false}, } - topic := "/eth2/beacon_block" + topic := "/eth2/%x/beacon_block" r.registerSubscribers() i := r.stateNotifier.StateFeed().Send(&feed.Event{ Type: statefeed.Initialized, diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation.go b/beacon-chain/sync/validate_committee_index_beacon_attestation.go index 0390e777aed4..7d326dff5c35 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation.go @@ -62,7 +62,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p } // The attestation's committee index (attestation.data.index) is for the correct subnet. - if !strings.HasPrefix(originalTopic, fmt.Sprintf(format, att.Data.CommitteeIndex)) { + if !strings.HasPrefix(originalTopic, fmt.Sprintf(format, s.p2p.ForkDigest(), att.Data.CommitteeIndex)) { return false } diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go b/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go index ccca1aa0680d..c9bab57903e1 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go @@ -76,7 +76,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { Slot: 63, }, }, - topic: "/eth2/committee_index1_beacon_attestation", + topic: "/eth2/00000000/committee_index1_beacon_attestation", validAttestationSignature: true, want: true, }, @@ -90,7 +90,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { Slot: 63, }, }, - topic: "/eth2/committee_index1_beacon_attestation", + topic: "/eth2/00000000/committee_index1_beacon_attestation", validAttestationSignature: true, want: false, }, @@ -104,7 +104,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { Slot: 63, }, }, - topic: "/eth2/committee_index3_beacon_attestation", + topic: "/eth2/00000000/committee_index3_beacon_attestation", validAttestationSignature: true, want: false, }, @@ -118,7 +118,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { Slot: 63, }, }, - topic: "/eth2/committee_index1_beacon_attestation", + topic: "/eth2/00000000/committee_index1_beacon_attestation", validAttestationSignature: true, want: false, }, @@ -132,7 +132,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { Slot: 63, }, }, - topic: "/eth2/committee_index1_beacon_attestation", + topic: "/eth2/00000000/committee_index1_beacon_attestation", validAttestationSignature: true, want: false, }, @@ -146,7 +146,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { Slot: 63, }, }, - topic: "/eth2/committee_index1_beacon_attestation", + topic: "/eth2/00000000/committee_index1_beacon_attestation", validAttestationSignature: false, want: false, }, diff --git a/proto/beacon/p2p/v1/messages.pb.go b/proto/beacon/p2p/v1/messages.pb.go index 78b5f3fa7a3b..67b12338acfc 100755 --- a/proto/beacon/p2p/v1/messages.pb.go +++ b/proto/beacon/p2p/v1/messages.pb.go @@ -24,7 +24,7 @@ var _ = math.Inf const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type Status struct { - HeadForkVersion []byte `protobuf:"bytes,1,opt,name=head_fork_version,json=headForkVersion,proto3" json:"head_fork_version,omitempty" ssz-size:"4"` + ForkDigest []byte `protobuf:"bytes,1,opt,name=fork_digest,json=forkDigest,proto3" json:"fork_digest,omitempty" ssz-size:"4"` FinalizedRoot []byte `protobuf:"bytes,2,opt,name=finalized_root,json=finalizedRoot,proto3" json:"finalized_root,omitempty" ssz-size:"32"` FinalizedEpoch uint64 `protobuf:"varint,3,opt,name=finalized_epoch,json=finalizedEpoch,proto3" json:"finalized_epoch,omitempty"` HeadRoot []byte `protobuf:"bytes,4,opt,name=head_root,json=headRoot,proto3" json:"head_root,omitempty" ssz-size:"32"` @@ -67,9 +67,9 @@ func (m *Status) XXX_DiscardUnknown() { var xxx_messageInfo_Status proto.InternalMessageInfo -func (m *Status) GetHeadForkVersion() []byte { +func (m *Status) GetForkDigest() []byte { if m != nil { - return m.HeadForkVersion + return m.ForkDigest } return nil } @@ -175,29 +175,28 @@ func init() { } var fileDescriptor_a1d590cda035b632 = []byte{ - // 341 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xc1, 0x4e, 0xc2, 0x40, - 0x10, 0x86, 0xb3, 0x5a, 0x88, 0x6c, 0x40, 0x64, 0x63, 0x4c, 0x83, 0x11, 0x48, 0x2f, 0x72, 0xa1, - 0x0d, 0xe0, 0xc1, 0x18, 0x4f, 0x4d, 0xf4, 0x01, 0x4a, 0xe2, 0x95, 0xb4, 0x65, 0x68, 0x1b, 0x4a, - 0xa7, 0x76, 0xb7, 0x24, 0xf2, 0x84, 0x1e, 0x7d, 0x02, 0x62, 0x78, 0x04, 0x0e, 0x9e, 0x4d, 0x67, - 0x89, 0x9c, 0xbc, 0xed, 0xcc, 0x7c, 0xff, 0x97, 0xd9, 0xe1, 0x56, 0x5e, 0xa0, 0x42, 0x27, 0x00, - 0x3f, 0xc4, 0xcc, 0xc9, 0x27, 0xb9, 0xb3, 0x19, 0x3b, 0x6b, 0x90, 0xd2, 0x8f, 0x40, 0xda, 0x34, - 0x14, 0x37, 0xa0, 0x62, 0x28, 0xa0, 0x5c, 0xdb, 0x1a, 0xb3, 0xf3, 0x49, 0x6e, 0x6f, 0xc6, 0xdd, - 0x51, 0x94, 0xa8, 0xb8, 0x0c, 0xec, 0x10, 0xd7, 0x4e, 0x84, 0x11, 0x3a, 0x84, 0x07, 0xe5, 0x92, - 0x2a, 0x2d, 0xae, 0x5e, 0x5a, 0x63, 0xfd, 0x30, 0x5e, 0x9f, 0x29, 0x5f, 0x95, 0x52, 0x3c, 0xf3, - 0x4e, 0x0c, 0xfe, 0x62, 0xbe, 0xc4, 0x62, 0x35, 0xdf, 0x40, 0x21, 0x13, 0xcc, 0x4c, 0x36, 0x60, - 0xc3, 0xa6, 0x7b, 0x75, 0xd8, 0xf5, 0x9b, 0x52, 0x6e, 0x47, 0x32, 0xd9, 0xc2, 0x93, 0xf5, 0x60, - 0x79, 0xed, 0x0a, 0x7d, 0xc5, 0x62, 0xf5, 0xa6, 0x41, 0xf1, 0xc8, 0x2f, 0x97, 0x49, 0xe6, 0xa7, - 0xc9, 0x16, 0x16, 0xf3, 0x02, 0x51, 0x99, 0x67, 0x14, 0xed, 0x1c, 0x76, 0xfd, 0xd6, 0x29, 0x3a, - 0x9d, 0x58, 0x5e, 0xeb, 0x0f, 0xf4, 0x10, 0x95, 0xb8, 0xe7, 0xed, 0x53, 0x12, 0x72, 0x0c, 0x63, - 0xf3, 0x7c, 0xc0, 0x86, 0x86, 0x77, 0x12, 0xbe, 0x54, 0x5d, 0x61, 0xf3, 0x06, 0x2d, 0x48, 0x76, - 0xe3, 0x3f, 0xfb, 0x45, 0xc5, 0x90, 0xf8, 0xf6, 0xc8, 0xcb, 0x14, 0x95, 0x59, 0x23, 0x25, 0x0d, - 0x67, 0x29, 0x2a, 0x0b, 0x78, 0xd7, 0xa5, 0xc3, 0xb9, 0x29, 0x86, 0x2b, 0xe9, 0x7e, 0x78, 0x7e, - 0x16, 0x81, 0x07, 0xef, 0x25, 0x48, 0x25, 0xee, 0x38, 0x97, 0xca, 0x2f, 0x94, 0xce, 0x32, 0xca, - 0x36, 0xa8, 0x53, 0x85, 0xc5, 0x35, 0xaf, 0x85, 0x58, 0x66, 0xfa, 0x8f, 0x86, 0xa7, 0x0b, 0x21, - 0xb8, 0x21, 0x15, 0xe4, 0xc7, 0xed, 0xe9, 0xed, 0x36, 0x3f, 0xf7, 0x3d, 0xf6, 0xb5, 0xef, 0xb1, - 0xef, 0x7d, 0x8f, 0x05, 0x75, 0x3a, 0xfa, 0xf4, 0x37, 0x00, 0x00, 0xff, 0xff, 0x60, 0xbe, 0x44, - 0x15, 0xe1, 0x01, 0x00, 0x00, + // 334 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xcf, 0x4e, 0xfa, 0x40, + 0x10, 0xc7, 0xb3, 0xbf, 0x5f, 0x21, 0xb2, 0x82, 0x7f, 0x36, 0xc6, 0x34, 0x18, 0x81, 0xf4, 0x22, + 0x17, 0xda, 0x00, 0x1e, 0x8c, 0xc7, 0x46, 0x5f, 0xa0, 0x3c, 0x00, 0xd9, 0x96, 0xa1, 0x6d, 0x28, + 0x9d, 0xb5, 0xbb, 0x25, 0x91, 0x27, 0xf4, 0xe8, 0x13, 0x10, 0xc3, 0xd5, 0x1b, 0x4f, 0x60, 0x3a, + 0x25, 0x72, 0xf2, 0xb6, 0x33, 0xf3, 0xf9, 0x7e, 0xb2, 0x33, 0xdc, 0x51, 0x05, 0x1a, 0xf4, 0x42, + 0x90, 0x11, 0xe6, 0x9e, 0x9a, 0x28, 0x6f, 0x33, 0xf6, 0xd6, 0xa0, 0xb5, 0x8c, 0x41, 0xbb, 0x34, + 0x14, 0xb7, 0x60, 0x12, 0x28, 0xa0, 0x5c, 0xbb, 0x35, 0xe6, 0xaa, 0x89, 0x72, 0x37, 0xe3, 0xee, + 0x28, 0x4e, 0x4d, 0x52, 0x86, 0x6e, 0x84, 0x6b, 0x2f, 0xc6, 0x18, 0x3d, 0xc2, 0xc3, 0x72, 0x49, + 0x55, 0x2d, 0xae, 0x5e, 0xb5, 0xc6, 0xf9, 0x66, 0xbc, 0x39, 0x33, 0xd2, 0x94, 0x5a, 0x8c, 0xf9, + 0xf9, 0x12, 0x8b, 0xd5, 0x7c, 0x91, 0xc6, 0xa0, 0x8d, 0xcd, 0x06, 0x6c, 0xd8, 0xf6, 0xaf, 0x0e, + 0xbb, 0x7e, 0x5b, 0xeb, 0xed, 0x48, 0xa7, 0x5b, 0x78, 0x76, 0x1e, 0x9d, 0x80, 0x57, 0xd0, 0x0b, + 0x31, 0xe2, 0x89, 0x5f, 0x2c, 0xd3, 0x5c, 0x66, 0xe9, 0x16, 0x16, 0xf3, 0x02, 0xd1, 0xd8, 0xff, + 0x28, 0x75, 0x7d, 0xd8, 0xf5, 0x3b, 0xa7, 0xd4, 0x74, 0xe2, 0x04, 0x9d, 0x5f, 0x30, 0x40, 0x34, + 0xe2, 0x81, 0x5f, 0x9e, 0x92, 0xa0, 0x30, 0x4a, 0xec, 0xff, 0x03, 0x36, 0xb4, 0x82, 0x93, 0xf0, + 0xb5, 0xea, 0x0a, 0x97, 0xb7, 0x12, 0x90, 0x47, 0xbb, 0xf5, 0x97, 0xfd, 0xac, 0x62, 0x48, 0x7c, + 0x77, 0xe4, 0x75, 0x86, 0xc6, 0x6e, 0x90, 0x92, 0x86, 0xb3, 0x0c, 0x8d, 0x03, 0xbc, 0xeb, 0xd3, + 0xb5, 0xfc, 0x0c, 0xa3, 0x95, 0xf6, 0xdf, 0x03, 0x99, 0xc7, 0x10, 0xc0, 0x5b, 0x59, 0x6d, 0x73, + 0xcf, 0xb9, 0x36, 0xb2, 0x30, 0x75, 0x96, 0x51, 0xb6, 0x45, 0x9d, 0x2a, 0x2c, 0x6e, 0x78, 0x23, + 0xc2, 0x32, 0xaf, 0x77, 0xb4, 0x82, 0xba, 0x10, 0x82, 0x5b, 0xda, 0x80, 0x3a, 0xfe, 0x9e, 0xde, + 0x7e, 0xfb, 0x63, 0xdf, 0x63, 0x9f, 0xfb, 0x1e, 0xfb, 0xda, 0xf7, 0x58, 0xd8, 0xa4, 0x4b, 0x4f, + 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x4b, 0xb5, 0x3a, 0xe7, 0xd6, 0x01, 0x00, 0x00, } func (m *Status) Marshal() (dAtA []byte, err error) { @@ -248,10 +247,10 @@ func (m *Status) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x12 } - if len(m.HeadForkVersion) > 0 { - i -= len(m.HeadForkVersion) - copy(dAtA[i:], m.HeadForkVersion) - i = encodeVarintMessages(dAtA, i, uint64(len(m.HeadForkVersion))) + if len(m.ForkDigest) > 0 { + i -= len(m.ForkDigest) + copy(dAtA[i:], m.ForkDigest) + i = encodeVarintMessages(dAtA, i, uint64(len(m.ForkDigest))) i-- dAtA[i] = 0xa } @@ -317,7 +316,7 @@ func (m *Status) Size() (n int) { } var l int _ = l - l = len(m.HeadForkVersion) + l = len(m.ForkDigest) if l > 0 { n += 1 + l + sovMessages(uint64(l)) } @@ -399,7 +398,7 @@ func (m *Status) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field HeadForkVersion", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ForkDigest", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -426,9 +425,9 @@ func (m *Status) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.HeadForkVersion = append(m.HeadForkVersion[:0], dAtA[iNdEx:postIndex]...) - if m.HeadForkVersion == nil { - m.HeadForkVersion = []byte{} + m.ForkDigest = append(m.ForkDigest[:0], dAtA[iNdEx:postIndex]...) + if m.ForkDigest == nil { + m.ForkDigest = []byte{} } iNdEx = postIndex case 2: diff --git a/proto/beacon/p2p/v1/messages.proto b/proto/beacon/p2p/v1/messages.proto index 2dd1762cda82..47534e5e349e 100644 --- a/proto/beacon/p2p/v1/messages.proto +++ b/proto/beacon/p2p/v1/messages.proto @@ -5,7 +5,7 @@ package ethereum.beacon.p2p.v1; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; message Status { - bytes head_fork_version = 1 [(gogoproto.moretags) = "ssz-size:\"4\""]; + bytes fork_digest = 1 [(gogoproto.moretags) = "ssz-size:\"4\""]; bytes finalized_root = 2 [(gogoproto.moretags) = "ssz-size:\"32\""]; uint64 finalized_epoch = 3; bytes head_root = 4 [(gogoproto.moretags) = "ssz-size:\"32\""]; From 03c7d15b7c087060bc237fc4d3cc9be503380d7d Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 24 Mar 2020 20:28:35 +0800 Subject: [PATCH 02/10] fix remaining failing test --- .../sync/subscriber_committee_index_beacon_attestation_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go b/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go index fb9b072807eb..ae821c843278 100644 --- a/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go +++ b/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go @@ -79,7 +79,7 @@ func TestService_committeeIndexBeaconAttestationSubscriber_ValidMessage(t *testi Signature: sKeys[0].Sign([]byte("foo")).Marshal(), } - p.ReceivePubSub("/eth2/00000000/committee_index0_beacon_attestation", att) + p.ReceivePubSub("/eth2/%x/committee_index0_beacon_attestation", att) time.Sleep(time.Second) From db70a02650f80299328f4562aa98d0b05ce52a53 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 24 Mar 2020 20:41:10 +0800 Subject: [PATCH 03/10] fix one more test --- beacon-chain/sync/subscriber_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index cf550b4451de..24055ce5f5a0 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -162,7 +162,8 @@ func TestSubscribe_WaitToSync(t *testing.T) { } topic := "/eth2/%x/beacon_block" - r.registerSubscribers() + go r.registerSubscribers() + time.Sleep(100 * time.Millisecond) i := r.stateNotifier.StateFeed().Send(&feed.Event{ Type: statefeed.Initialized, Data: &statefeed.InitializedData{ From fb75dd3e54a7553c399476a773111e919d0afb30 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 24 Mar 2020 20:55:38 +0800 Subject: [PATCH 04/10] change message --- beacon-chain/sync/error.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 162178866095..72414a2c829c 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -11,7 +11,7 @@ import ( const genericError = "internal service error" const rateLimitedError = "rate limited" -var errWrongForkVersion = errors.New("wrong fork version") +var errWrongForkVersion = errors.New("wrong fork digest version") var errInvalidEpoch = errors.New("invalid epoch") var responseCodeSuccess = byte(0x00) From 210a6592e27580028aeb82e6d196ed3c8369010f Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Tue, 24 Mar 2020 23:24:04 +0800 Subject: [PATCH 05/10] Apply suggestions from code review Co-Authored-By: terence tsao --- beacon-chain/sync/decode_pubsub.go | 2 +- beacon-chain/sync/subscriber.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index bf07a01395a0..922cf3091cdd 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -28,7 +28,7 @@ func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error return m, nil } -// replaces our fork digest with the formatter +// Replaces our fork digest with the formatter. func (r *Service) replaceForkDigest(topic string) string { subStrings := strings.Split(topic, "/") subStrings[2] = "%x" diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index b6912424032c..cc941338245e 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -326,7 +326,7 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i }() } -// add fork digest to topic +// Add fork digest to topic. func (r *Service) addDigestToTopic(topic string) string { if !strings.Contains(topic, "%x") { panic("topic does not have appropriate formatter for digest") From fb0a2b147d83d2582e8997586f171fe7cb7d0638 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 24 Mar 2020 23:42:34 +0800 Subject: [PATCH 06/10] terence's review --- beacon-chain/sync/error.go | 2 +- beacon-chain/sync/pending_blocks_queue_test.go | 6 +++--- beacon-chain/sync/rpc.go | 4 ++-- beacon-chain/sync/rpc_status.go | 2 +- beacon-chain/sync/rpc_status_test.go | 10 +++++----- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 72414a2c829c..8a6f4d8729b6 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -11,7 +11,7 @@ import ( const genericError = "internal service error" const rateLimitedError = "rate limited" -var errWrongForkVersion = errors.New("wrong fork digest version") +var errWrongForkDigestVersion = errors.New("wrong fork digest version") var errInvalidEpoch = errors.New("invalid epoch") var responseCodeSuccess = byte(0x00) diff --git a/beacon-chain/sync/pending_blocks_queue_test.go b/beacon-chain/sync/pending_blocks_queue_test.go index 29fa6d84b527..5531b1c72a3b 100644 --- a/beacon-chain/sync/pending_blocks_queue_test.go +++ b/beacon-chain/sync/pending_blocks_queue_test.go @@ -113,9 +113,9 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) { if code == 0 { t.Error("Expected a non-zero code") } - if errMsg != errWrongForkVersion.Error() { - t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(errWrongForkVersion.Error())) - t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, errWrongForkVersion.Error()) + if errMsg != errWrongForkDigestVersion.Error() { + t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(errWrongForkDigestVersion.Error())) + t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, errWrongForkDigestVersion.Error()) } }) diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 8e1a9819f089..915a5de1d617 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -87,7 +87,7 @@ func (r *Service) registerRPC(topic string, base interface{}, handle rpcHandler) } if err := handle(ctx, msg.Interface(), stream); err != nil { messageFailedProcessingCounter.WithLabelValues(topic).Inc() - if err != errWrongForkVersion { + if err != errWrongForkDigestVersion { log.WithError(err).Warn("Failed to handle p2p RPC") } traceutil.AnnotateError(span, err) @@ -101,7 +101,7 @@ func (r *Service) registerRPC(topic string, base interface{}, handle rpcHandler) } if err := handle(ctx, msg.Elem().Interface(), stream); err != nil { messageFailedProcessingCounter.WithLabelValues(topic).Inc() - if err != errWrongForkVersion { + if err != errWrongForkDigestVersion { log.WithError(err).Warn("Failed to handle p2p RPC") } traceutil.AnnotateError(span, err) diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index bf51d5334d89..332f74828361 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -176,7 +176,7 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream func (r *Service) validateStatusMessage(msg *pb.Status, stream network.Stream) error { forkDigest := r.p2p.ForkDigest() if !bytes.Equal(forkDigest[:], msg.ForkDigest) { - return errWrongForkVersion + return errWrongForkDigestVersion } genesis := r.chain.GenesisTime() maxEpoch := slotutil.EpochsSinceGenesis(genesis) diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index 92f520ad83c1..6422cb229a5e 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -48,9 +48,9 @@ func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { if code == 0 { t.Error("Expected a non-zero code") } - if errMsg != errWrongForkVersion.Error() { - t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(errWrongForkVersion.Error())) - t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, errWrongForkVersion.Error()) + if errMsg != errWrongForkDigestVersion.Error() { + t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(errWrongForkDigestVersion.Error())) + t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, errWrongForkDigestVersion.Error()) } }) @@ -60,8 +60,8 @@ func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { } err = r.statusRPCHandler(context.Background(), &pb.Status{ForkDigest: []byte("fake")}, stream1) - if err != errWrongForkVersion { - t.Errorf("Expected error %v, got %v", errWrongForkVersion, err) + if err != errWrongForkDigestVersion { + t.Errorf("Expected error %v, got %v", errWrongForkDigestVersion, err) } if testutil.WaitTimeout(&wg, 1*time.Second) { From a67be397d17323340306f729e85de1e107ac733b Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 25 Mar 2020 15:20:06 -0500 Subject: [PATCH 07/10] implement fork digest' --- beacon-chain/p2p/fork.go | 36 ++++++++++++++++++++++++++++-------- beacon-chain/p2p/service.go | 7 ------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/beacon-chain/p2p/fork.go b/beacon-chain/p2p/fork.go index 0d968e2edd3a..07c29ad9e71f 100644 --- a/beacon-chain/p2p/fork.go +++ b/beacon-chain/p2p/fork.go @@ -19,6 +19,12 @@ import ( // ENR key used for eth2-related fork data. const eth2ENRKey = "eth2" +// ForkDigest returns the current fork digest of +// the node. +func (s *Service) ForkDigest() ([4]byte, error) { + return createForkDigest(s.genesisTime, s.genesisValidatorsRoot) +} + // Compares fork ENRs between an incoming peer's record and our node's // local record values for current and next fork version/epoch. func (s *Service) compareForkENR(record *enr.Record) error { @@ -66,16 +72,13 @@ func (s *Service) compareForkENR(record *enr.Record) error { return nil } -// Adds a fork entry as an ENR record under the eth2EnrKey for -// the local node. The fork entry is an ssz-encoded enrForkID type -// which takes into account the current fork version from the current -// epoch to create a fork digest, the next fork version, -// and the next fork epoch. -func addForkEntry( - node *enode.LocalNode, +// Creates a fork digest from a genesis time and genesis +// validators root, utilizing the current slot to determine +// the active fork version in the node. +func createForkDigest( genesisTime time.Time, genesisValidatorsRoot []byte, -) (*enode.LocalNode, error) { +) ([4]byte, error) { currentSlot := helpers.SlotsSince(genesisTime) currentEpoch := helpers.SlotToEpoch(currentSlot) @@ -92,6 +95,23 @@ func addForkEntry( } digest, err := helpers.ComputeForkDigest(currentForkVersion, genesisValidatorsRoot) + if err != nil { + return [4]byte{}, err + } + return digest, nil +} + +// Adds a fork entry as an ENR record under the eth2EnrKey for +// the local node. The fork entry is an ssz-encoded enrForkID type +// which takes into account the current fork version from the current +// epoch to create a fork digest, the next fork version, +// and the next fork epoch. +func addForkEntry( + node *enode.LocalNode, + genesisTime time.Time, + genesisValidatorsRoot []byte, +) (*enode.LocalNode, error) { + digest, err := createForkDigest(genesisTime, genesisValidatorsRoot) if err != nil { return nil, err } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 8538e2885498..80d1162c30b9 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -319,13 +319,6 @@ func (s *Service) Peers() *peers.Status { return s.peers } -// ForkDigest returns the current fork digest of -// the node. -func (s *Service) ForkDigest() [4]byte { - //no-op - return [4]byte{} -} - // RefreshENR uses an epoch to refresh the enr entry for our node // with the tracked committee id's for the epoch, allowing our node // to be dynamically discoverable by others given our tracked committee id's. From 72d15833e9ef4b0e2d94f478eef273f8633541e9 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 25 Mar 2020 15:31:53 -0500 Subject: [PATCH 08/10] align digest to interface' --- beacon-chain/p2p/interfaces.go | 2 +- beacon-chain/p2p/testing/p2p.go | 10 ++++++--- beacon-chain/sync/rpc_status.go | 15 ++++++++++--- beacon-chain/sync/subscriber.go | 22 +++++++++++++------ ...date_committee_index_beacon_attestation.go | 8 ++++++- 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 9940270a0b46..f2ab18c441a6 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -42,7 +42,7 @@ type ConnectionHandler interface { // EncodingProvider provides p2p network encoding. type EncodingProvider interface { Encoding() encoder.NetworkEncoding - ForkDigest() [4]byte + ForkDigest() ([4]byte, error) } // PubSubProvider provides the p2p pubsub protocol. diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index d9cb5c9ba8cc..6507da72a52c 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -115,7 +115,11 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) { if _, err := p.Encoding().Encode(buf, msg); err != nil { p.t.Fatalf("Failed to encode message: %v", err) } - topic = fmt.Sprintf(topic, p.ForkDigest()) + digest, err := p.ForkDigest() + if err != nil { + p.t.Fatal(err) + } + topic = fmt.Sprintf(topic, digest) topic = topic + p.Encoding().ProtocolSuffix() if err := ps.Publish(topic, buf.Bytes()); err != nil { @@ -241,6 +245,6 @@ func (p *TestP2P) RefreshENR(epoch uint64) { } // ForkDigest mocks the p2p func. -func (p *TestP2P) ForkDigest() [4]byte { - return [4]byte{} +func (p *TestP2P) ForkDigest() ([4]byte, error) { + return [4]byte{}, nil } diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 332f74828361..63c208fea229 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -77,7 +77,10 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { return err } - forkDigest := r.p2p.ForkDigest() + forkDigest, err := r.p2p.ForkDigest() + if err != nil { + return err + } resp := &pb.Status{ ForkDigest: forkDigest[:], FinalizedRoot: r.chain.FinalizedCheckpt().Root, @@ -156,7 +159,10 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream return err } - forkDigest := r.p2p.ForkDigest() + forkDigest, err := r.p2p.ForkDigest() + if err != nil { + return err + } resp := &pb.Status{ ForkDigest: forkDigest[:], FinalizedRoot: r.chain.FinalizedCheckpt().Root, @@ -174,7 +180,10 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream } func (r *Service) validateStatusMessage(msg *pb.Status, stream network.Stream) error { - forkDigest := r.p2p.ForkDigest() + forkDigest, err := r.p2p.ForkDigest() + if err != nil { + return err + } if !bytes.Equal(forkDigest[:], msg.ForkDigest) { return errWrongForkDigestVersion } diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index cc941338245e..45e694aab098 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -213,10 +213,12 @@ func (r *Service) subscribeDynamicWithSubnets( ) { base := p2p.GossipTopicMappings[topicFormat] if base == nil { - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) + log.Fatalf("%s is not mapped to any message in GossipTopicMappings", topicFormat) + } + digest, err := r.p2p.ForkDigest() + if err != nil { + log.WithError(err).Fatal("Could not compute fork digest") } - digest := r.p2p.ForkDigest() - subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot) stateChannel := make(chan *feed.Event, 1) @@ -288,9 +290,12 @@ func (r *Service) subscribeDynamicWithSubnets( func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() int, validate pubsub.Validator, handle subHandler) { base := p2p.GossipTopicMappings[topicFormat] if base == nil { - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) + log.Fatalf("%s is not mapped to any message in GossipTopicMappings", topicFormat) + } + digest, err := r.p2p.ForkDigest() + if err != nil { + log.WithError(err).Fatal("Could not compute fork digest") } - digest := r.p2p.ForkDigest() var subscriptions []*pubsub.Subscription stateChannel := make(chan *feed.Event, 1) @@ -329,8 +334,11 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i // Add fork digest to topic. func (r *Service) addDigestToTopic(topic string) string { if !strings.Contains(topic, "%x") { - panic("topic does not have appropriate formatter for digest") + log.Fatal("Topic does not have appropriate formatter for digest") + } + digest, err := r.p2p.ForkDigest() + if err != nil { + log.WithError(err).Fatal("Could not compute fork digest") } - digest := r.p2p.ForkDigest() return fmt.Sprintf(topic, digest) } diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation.go b/beacon-chain/sync/validate_committee_index_beacon_attestation.go index bae25b751649..11906871a929 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation.go @@ -62,7 +62,13 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p } // The attestation's committee index (attestation.data.index) is for the correct subnet. - if !strings.HasPrefix(originalTopic, fmt.Sprintf(format, s.p2p.ForkDigest(), att.Data.CommitteeIndex)) { + digest, err := s.p2p.ForkDigest() + if err != nil { + log.WithError(err).Error("Failed to compute fork digest") + traceutil.AnnotateError(span, err) + return false + } + if !strings.HasPrefix(originalTopic, fmt.Sprintf(format, digest, att.Data.CommitteeIndex)) { return false } From 36367c9aaf1e84fb28e912d122d187416382f6fb Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 25 Mar 2020 15:38:13 -0500 Subject: [PATCH 09/10] passed all tests --- beacon-chain/p2p/broadcaster.go | 12 ++++++++---- beacon-chain/p2p/broadcaster_test.go | 8 ++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index 342a8005521d..310f89ca63b4 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -22,11 +22,15 @@ var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub top func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error { ctx, span := trace.StartSpan(ctx, "p2p.Broadcast") defer span.End() + forkDigest, err := s.ForkDigest() + if err != nil { + return err + } var topic string switch msg.(type) { case *eth.Attestation: - topic = attestationToTopic(msg.(*eth.Attestation)) + topic = attestationToTopic(msg.(*eth.Attestation), forkDigest) default: var ok bool topic, ok = GossipTypeMapping[reflect.TypeOf(msg)] @@ -59,11 +63,11 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error { return nil } -const attestationSubnetTopicFormat = "/eth2/committee_index%d_beacon_attestation" +const attestationSubnetTopicFormat = "/eth2/%x/committee_index%d_beacon_attestation" -func attestationToTopic(att *eth.Attestation) string { +func attestationToTopic(att *eth.Attestation, forkDigest [4]byte) string { if att == nil || att.Data == nil { return "" } - return fmt.Sprintf(attestationSubnetTopicFormat, att.Data.CommitteeIndex) + return fmt.Sprintf(attestationSubnetTopicFormat, forkDigest, att.Data.CommitteeIndex) } diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 38298759a093..8e4eef3fee9d 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -99,7 +99,7 @@ func TestService_Attestation_Subnet(t *testing.T) { CommitteeIndex: 0, }, }, - topic: "/eth2/committee_index0_beacon_attestation", + topic: "/eth2/00000000/committee_index0_beacon_attestation", }, { att: ð.Attestation{ @@ -107,7 +107,7 @@ func TestService_Attestation_Subnet(t *testing.T) { CommitteeIndex: 11, }, }, - topic: "/eth2/committee_index11_beacon_attestation", + topic: "/eth2/00000000/committee_index11_beacon_attestation", }, { att: ð.Attestation{ @@ -115,7 +115,7 @@ func TestService_Attestation_Subnet(t *testing.T) { CommitteeIndex: 55, }, }, - topic: "/eth2/committee_index55_beacon_attestation", + topic: "/eth2/00000000/committee_index55_beacon_attestation", }, { att: ð.Attestation{}, @@ -126,7 +126,7 @@ func TestService_Attestation_Subnet(t *testing.T) { }, } for _, tt := range tests { - if res := attestationToTopic(tt.att); res != tt.topic { + if res := attestationToTopic(tt.att, [4]byte{} /* fork digest */); res != tt.topic { t.Errorf("Wrong topic, got %s wanted %s", res, tt.topic) } } From f329aea73d909a0d669a7fa2bb2ebc99fb035bf7 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 25 Mar 2020 17:32:06 -0500 Subject: [PATCH 10/10] spawn in goroutine --- beacon-chain/sync/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 4f1bea073421..318ad8ae1245 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -84,7 +84,7 @@ func NewRegularSync(cfg *Config) *Service { } r.registerRPCHandlers() - r.registerSubscribers() + go r.registerSubscribers() return r }