Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean Up Of Dynamic Subscriptions #5690

Merged
merged 3 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,9 @@ func (r *Service) subscribeDynamicWithSubnets(
// Resize as appropriate.
r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)

// subscribe desired aggregator subnets.
for _, idx := range wantedSubs {
if _, exists := subscriptions[idx]; !exists {
r.subscribeMissingSubnet(subscriptions, idx, base, digest, validate, handle)
}
r.subscribeAggregatorSubnet(subscriptions, idx, base, digest, validate, handle)
}
// find desired subs for attesters
attesterSubs := r.attesterCommitteeIndices(currentSlot)
Expand Down Expand Up @@ -332,30 +331,28 @@ func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc
}

// subscribe missing subnets for our aggregators.
func (r *Service) subscribeMissingSubnet(subscriptions map[uint64]*pubsub.Subscription, idx uint64,
func (r *Service) subscribeAggregatorSubnet(subscriptions map[uint64]*pubsub.Subscription, idx uint64,
base proto.Message, digest [4]byte, validate pubsub.Validator, handle subHandler) {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
// check if subscription exists and if not subscribe the relevant subnet.
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle)
}
if !r.validPeersExist(subnetTopic, idx) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
go func(idx uint64) {
peerExists, err := r.p2p.FindPeersWithSubnet(idx)
_, err := r.p2p.FindPeersWithSubnet(idx)
if err != nil {
log.Errorf("Could not search for peers: %v", err)
return
}
// do not subscribe if we couldn't find a connected peer.
if !peerExists {
return
}
subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle)
}(idx)
return
}
subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle)
}

// lookup peers for attester specific subnets.
Expand Down
47 changes: 47 additions & 0 deletions beacon-chain/sync/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package sync

import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
lru "github.com/hashicorp/golang-lru"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)

func TestSubscribe_ReceivesValidMessage(t *testing.T) {
Expand Down Expand Up @@ -270,3 +273,47 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
t.Fatal("Did not receive PubSub in 1 second")
}
}

func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
p := p2ptest.NewTestP2P(t)
hook := logTest.NewGlobal()
r := Service{
ctx: context.Background(),
chain: &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
},
p2p: p,
}
digest, err := r.forkDigest()
if err != nil {
t.Fatal(err)
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)

defaultTopic := "/eth2/testing/%#x/committee%d"
// committee index 1
fullTopic := fmt.Sprintf(defaultTopic, digest, 1) + r.p2p.Encoding().ProtocolSuffix()
err = r.p2p.PubSub().RegisterTopicValidator(fullTopic, r.noopValidator)
if err != nil {
t.Fatal(err)
}
subscriptions[1], err = r.p2p.PubSub().Subscribe(fullTopic)
if err != nil {
t.Fatal(err)
}

// committee index 2
fullTopic = fmt.Sprintf(defaultTopic, digest, 2) + r.p2p.Encoding().ProtocolSuffix()
err = r.p2p.PubSub().RegisterTopicValidator(fullTopic, r.noopValidator)
if err != nil {
t.Fatal(err)
}
subscriptions[2], err = r.p2p.PubSub().Subscribe(fullTopic)
if err != nil {
t.Fatal(err)
}

r.reValidateSubscriptions(subscriptions, []uint64{2}, defaultTopic, digest)
testutil.AssertLogsDoNotContain(t, hook, "Failed to unregister topic validator")
}