diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index bb85dbd3fb65..fd899bd1ea85 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -177,7 +177,7 @@ func (r *Service) subscribeWithBase(base proto.Message, topic string, validator msg, err := sub.Next(r.ctx) if err != nil { // This should only happen when the context is cancelled or subscription is cancelled. - log.WithError(err).Error("Subscription next failed") + log.WithError(err).Warn("Subscription next failed") return } @@ -241,7 +241,7 @@ func (r *Service) subscribeDynamicWithSubnets( // Update desired topic indices for aggregator wantedSubs := r.aggregatorCommitteeIndices(currentSlot) // Resize as appropriate. - r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat) + r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) for _, idx := range wantedSubs { if _, exists := subscriptions[idx]; !exists { @@ -311,7 +311,7 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i // revalidate that our currently connected subnets are valid. func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription, - wantedSubs []uint64, topicFormat string) { + wantedSubs []uint64, topicFormat string, digest [4]byte) { for k, v := range subscriptions { var wanted bool for _, idx := range wantedSubs { @@ -322,7 +322,8 @@ func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc } if !wanted && v != nil { v.Cancel() - if err := r.p2p.PubSub().UnregisterTopicValidator(fmt.Sprintf(topicFormat, k)); err != nil { + fullTopic := fmt.Sprintf(topicFormat, digest, k) + r.p2p.Encoding().ProtocolSuffix() + if err := r.p2p.PubSub().UnregisterTopicValidator(fullTopic); err != nil { log.WithError(err).Error("Failed to unregister topic validator") } delete(subscriptions, k)