diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index fd899bd1ea85..3bd2f2a1d045 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -243,10 +243,9 @@ func (r *Service) subscribeDynamicWithSubnets( // Resize as appropriate. r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) + // subscribe desired aggregator subnets. for _, idx := range wantedSubs { - if _, exists := subscriptions[idx]; !exists { - r.subscribeMissingSubnet(subscriptions, idx, base, digest, validate, handle) - } + r.subscribeAggregatorSubnet(subscriptions, idx, base, digest, validate, handle) } // find desired subs for attesters attesterSubs := r.attesterCommitteeIndices(currentSlot) @@ -332,30 +331,28 @@ func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc } // subscribe missing subnets for our aggregators. -func (r *Service) subscribeMissingSubnet(subscriptions map[uint64]*pubsub.Subscription, idx uint64, +func (r *Service) subscribeAggregatorSubnet(subscriptions map[uint64]*pubsub.Subscription, idx uint64, base proto.Message, digest [4]byte, validate pubsub.Validator, handle subHandler) { // do not subscribe if we have no peers in the same // subnet topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] subnetTopic := fmt.Sprintf(topic, digest, idx) + // check if subscription exists and if not subscribe the relevant subnet. + if _, exists := subscriptions[idx]; !exists { + subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle) + } if !r.validPeersExist(subnetTopic, idx) { log.Debugf("No peers found subscribed to attestation gossip subnet with "+ "committee index %d. Searching network for peers subscribed to the subnet.", idx) go func(idx uint64) { - peerExists, err := r.p2p.FindPeersWithSubnet(idx) + _, err := r.p2p.FindPeersWithSubnet(idx) if err != nil { log.Errorf("Could not search for peers: %v", err) return } - // do not subscribe if we couldn't find a connected peer. - if !peerExists { - return - } - subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle) }(idx) return } - subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle) } // lookup peers for attester specific subnets. diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 243ae7df602a..495949d2dae2 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -2,6 +2,7 @@ package sync import ( "context" + "fmt" "reflect" "sync" "testing" @@ -9,6 +10,7 @@ import ( "github.com/gogo/protobuf/proto" lru "github.com/hashicorp/golang-lru" + pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" @@ -23,6 +25,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" + logTest "github.com/sirupsen/logrus/hooks/test" ) func TestSubscribe_ReceivesValidMessage(t *testing.T) { @@ -270,3 +273,47 @@ func TestSubscribe_HandlesPanic(t *testing.T) { t.Fatal("Did not receive PubSub in 1 second") } } + +func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) { + p := p2ptest.NewTestP2P(t) + hook := logTest.NewGlobal() + r := Service{ + ctx: context.Background(), + chain: &mockChain.ChainService{ + Genesis: time.Now(), + ValidatorsRoot: [32]byte{'A'}, + }, + p2p: p, + } + digest, err := r.forkDigest() + if err != nil { + t.Fatal(err) + } + subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot) + + defaultTopic := "/eth2/testing/%#x/committee%d" + // committee index 1 + fullTopic := fmt.Sprintf(defaultTopic, digest, 1) + r.p2p.Encoding().ProtocolSuffix() + err = r.p2p.PubSub().RegisterTopicValidator(fullTopic, r.noopValidator) + if err != nil { + t.Fatal(err) + } + subscriptions[1], err = r.p2p.PubSub().Subscribe(fullTopic) + if err != nil { + t.Fatal(err) + } + + // committee index 2 + fullTopic = fmt.Sprintf(defaultTopic, digest, 2) + r.p2p.Encoding().ProtocolSuffix() + err = r.p2p.PubSub().RegisterTopicValidator(fullTopic, r.noopValidator) + if err != nil { + t.Fatal(err) + } + subscriptions[2], err = r.p2p.PubSub().Subscribe(fullTopic) + if err != nil { + t.Fatal(err) + } + + r.reValidateSubscriptions(subscriptions, []uint64{2}, defaultTopic, digest) + testutil.AssertLogsDoNotContain(t, hook, "Failed to unregister topic validator") +}