diff --git a/chains/manager.go b/chains/manager.go index b4f28b1290be..a1158e67716c 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -205,8 +205,8 @@ type ManagerConfig struct { MeterVMEnabled bool // Should each VM be wrapped with a MeterVM Metrics metrics.MultiGatherer - AcceptedFrontierGossipFrequency time.Duration - ConsensusAppConcurrency int + FrontierPollFrequency time.Duration + ConsensusAppConcurrency int // Max Time to spend fetching a container and its // ancestors when responding to a GetAncestors @@ -824,7 +824,7 @@ func (m *manager) createAvalancheChain( ctx, vdrs, msgChan, - m.AcceptedFrontierGossipFrequency, + m.FrontierPollFrequency, m.ConsensusAppConcurrency, m.ResourceTracker, validators.UnhandledSubnetConnector, // avalanche chains don't use subnet connector @@ -1166,7 +1166,7 @@ func (m *manager) createSnowmanChain( ctx, vdrs, msgChan, - m.AcceptedFrontierGossipFrequency, + m.FrontierPollFrequency, m.ConsensusAppConcurrency, m.ResourceTracker, subnetConnector, diff --git a/config/config.go b/config/config.go index db080ec2514a..d894e832d889 100644 --- a/config/config.go +++ b/config/config.go @@ -60,8 +60,9 @@ const ( subnetConfigFileExt = ".json" ipResolutionTimeout = 30 * time.Second - ipcDeprecationMsg = "IPC API is deprecated" - keystoreDeprecationMsg = "keystore API is deprecated" + ipcDeprecationMsg = "IPC API is deprecated" + keystoreDeprecationMsg = "keystore API is deprecated" + acceptedFrontierGossipDeprecationMsg = "push-based accepted frontier gossip is deprecated" ) var ( @@ -72,6 +73,12 @@ var ( IpcsChainIDsKey: ipcDeprecationMsg, IpcsPathKey: ipcDeprecationMsg, KeystoreAPIEnabledKey: keystoreDeprecationMsg, + ConsensusGossipAcceptedFrontierValidatorSizeKey: acceptedFrontierGossipDeprecationMsg, + ConsensusGossipAcceptedFrontierNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg, + ConsensusGossipAcceptedFrontierPeerSizeKey: acceptedFrontierGossipDeprecationMsg, + ConsensusGossipOnAcceptValidatorSizeKey: acceptedFrontierGossipDeprecationMsg, + ConsensusGossipOnAcceptNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg, + ConsensusGossipOnAcceptPeerSizeKey: acceptedFrontierGossipDeprecationMsg, } errSybilProtectionDisabledStakerWeights = errors.New("sybil protection disabled weights must be positive") @@ -1320,9 +1327,9 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) { } // Gossiping - nodeConfig.AcceptedFrontierGossipFrequency = v.GetDuration(ConsensusAcceptedFrontierGossipFrequencyKey) - if nodeConfig.AcceptedFrontierGossipFrequency < 0 { - return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusAcceptedFrontierGossipFrequencyKey) + nodeConfig.FrontierPollFrequency = v.GetDuration(ConsensusFrontierPollFrequencyKey) + if nodeConfig.FrontierPollFrequency < 0 { + return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusFrontierPollFrequencyKey) } // App handling diff --git a/config/flags.go b/config/flags.go index 20f42381320f..6e381e1c6a85 100644 --- a/config/flags.go +++ b/config/flags.go @@ -177,9 +177,9 @@ func addNodeFlags(fs *pflag.FlagSet) { fs.Duration(BenchlistMinFailingDurationKey, constants.DefaultBenchlistMinFailingDuration, "Minimum amount of time messages to a peer must be failing before the peer is benched") // Router - fs.Duration(ConsensusAcceptedFrontierGossipFrequencyKey, constants.DefaultAcceptedFrontierGossipFrequency, "Frequency of gossiping accepted frontiers") fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain") fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain") + fs.Duration(ConsensusFrontierPollFrequencyKey, constants.DefaultFrontierPollFrequency, "Frequency of polling for new consensus frontiers") fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier") fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier") fs.Uint(ConsensusGossipAcceptedFrontierPeerSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPeerSize, "Number of peers to gossip to when gossiping accepted frontier") diff --git a/config/keys.go b/config/keys.go index 1fe19cd2424a..c627abfc1b57 100644 --- a/config/keys.go +++ b/config/keys.go @@ -143,8 +143,9 @@ const ( IpcsChainIDsKey = "ipcs-chain-ids" IpcsPathKey = "ipcs-path" MeterVMsEnabledKey = "meter-vms-enabled" - ConsensusAcceptedFrontierGossipFrequencyKey = "consensus-accepted-frontier-gossip-frequency" ConsensusAppConcurrencyKey = "consensus-app-concurrency" + ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout" + ConsensusFrontierPollFrequencyKey = "consensus-frontier-poll-frequency" ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size" ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size" ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size" @@ -154,7 +155,6 @@ const ( AppGossipValidatorSizeKey = "consensus-app-gossip-validator-size" AppGossipNonValidatorSizeKey = "consensus-app-gossip-non-validator-size" AppGossipPeerSizeKey = "consensus-app-gossip-peer-size" - ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout" ProposerVMUseCurrentHeightKey = "proposervm-use-current-height" FdLimitKey = "fd-limit" IndexEnabledKey = "index-enabled" diff --git a/node/config.go b/node/config.go index 87783bd9935b..5839da75960a 100644 --- a/node/config.go +++ b/node/config.go @@ -181,8 +181,8 @@ type Config struct { ConsensusRouter router.Router `json:"-"` RouterHealthConfig router.HealthConfig `json:"routerHealthConfig"` ConsensusShutdownTimeout time.Duration `json:"consensusShutdownTimeout"` - // Gossip a container in the accepted frontier every [AcceptedFrontierGossipFrequency] - AcceptedFrontierGossipFrequency time.Duration `json:"consensusGossipFreq"` + // Poll for new frontiers every [FrontierPollFrequency] + FrontierPollFrequency time.Duration `json:"consensusGossipFreq"` // ConsensusAppConcurrency defines the maximum number of goroutines to // handle App messages per chain. ConsensusAppConcurrency int `json:"consensusAppConcurrency"` diff --git a/node/node.go b/node/node.go index 7867625fc982..40eb8dc16bef 100644 --- a/node/node.go +++ b/node/node.go @@ -1014,7 +1014,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { Metrics: n.MetricsGatherer, SubnetConfigs: n.Config.SubnetConfigs, ChainConfigs: n.Config.ChainConfigs, - AcceptedFrontierGossipFrequency: n.Config.AcceptedFrontierGossipFrequency, + FrontierPollFrequency: n.Config.FrontierPollFrequency, ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency, BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors, BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent, diff --git a/node/overridden_manager.go b/node/overridden_manager.go index 91d8c198a4c3..80295f8636ea 100644 --- a/node/overridden_manager.go +++ b/node/overridden_manager.go @@ -68,6 +68,10 @@ func (o *overriddenManager) Sample(_ ids.ID, size int) ([]ids.NodeID, error) { return o.manager.Sample(o.subnetID, size) } +func (o *overriddenManager) UniformSample(_ ids.ID, size int) ([]ids.NodeID, error) { + return o.manager.UniformSample(o.subnetID, size) +} + func (o *overriddenManager) GetMap(ids.ID) map[ids.NodeID]*validators.GetValidatorOutput { return o.manager.GetMap(o.subnetID) } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 7675cff931fe..7f06698cbab0 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -34,7 +34,14 @@ import ( "github.com/ava-labs/avalanchego/utils/wrappers" ) -const nonVerifiedCacheSize = 64 * units.MiB +const ( + nonVerifiedCacheSize = 64 * units.MiB + + // putGossipPeriod specifies the number of times Gossip will be called per + // Put gossip. This is done to avoid splitting Gossip into multiple + // functions and to allow more frequent pull gossip than push gossip. + putGossipPeriod = 10 +) var _ Engine = (*Transitive)(nil) @@ -63,6 +70,8 @@ type Transitive struct { requestID uint32 + gossipCounter int + // track outstanding preference requests polls poll.Set @@ -151,6 +160,69 @@ func newTransitive(config Config) (*Transitive, error) { return t, t.metrics.Initialize("", config.Ctx.Registerer) } +func (t *Transitive) Gossip(ctx context.Context) error { + lastAcceptedID, lastAcceptedHeight := t.Consensus.LastAccepted() + if numProcessing := t.Consensus.NumProcessing(); numProcessing == 0 { + t.Ctx.Log.Verbo("sampling from validators", + zap.Stringer("validators", t.Validators), + ) + + // Uniform sampling is used here to reduce bandwidth requirements of + // nodes with a large amount of stake weight. + vdrIDs, err := t.Validators.UniformSample(t.Ctx.SubnetID, 1) + if err != nil { + t.Ctx.Log.Error("skipping block gossip", + zap.String("reason", "no validators"), + zap.Error(err), + ) + return nil + } + + nextHeightToAccept, err := math.Add64(lastAcceptedHeight, 1) + if err != nil { + t.Ctx.Log.Error("skipping block gossip", + zap.String("reason", "block height overflow"), + zap.Stringer("blkID", lastAcceptedID), + zap.Uint64("lastAcceptedHeight", lastAcceptedHeight), + zap.Error(err), + ) + return nil + } + + t.requestID++ + vdrSet := set.Of(vdrIDs...) + preferredID := t.Consensus.Preference() + t.Sender.SendPullQuery(ctx, vdrSet, t.requestID, preferredID, nextHeightToAccept) + } else { + t.Ctx.Log.Debug("skipping block gossip", + zap.String("reason", "blocks currently processing"), + zap.Int("numProcessing", numProcessing), + ) + } + + // TODO: Remove periodic push gossip after v1.11.x is activated + t.gossipCounter++ + t.gossipCounter %= putGossipPeriod + if t.gossipCounter > 0 { + return nil + } + + lastAccepted, err := t.GetBlock(ctx, lastAcceptedID) + if err != nil { + t.Ctx.Log.Warn("dropping gossip request", + zap.String("reason", "block couldn't be loaded"), + zap.Stringer("blkID", lastAcceptedID), + zap.Error(err), + ) + return nil + } + t.Ctx.Log.Verbo("gossiping accepted block to the network", + zap.Stringer("blkID", lastAcceptedID), + ) + t.Sender.SendGossip(ctx, lastAccepted.Bytes()) + return nil +} + func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkBytes []byte) error { blk, err := t.VM.ParseBlock(ctx, blkBytes) if err != nil { @@ -383,28 +455,6 @@ func (*Transitive) Timeout(context.Context) error { return nil } -func (t *Transitive) Gossip(ctx context.Context) error { - blkID, err := t.VM.LastAccepted(ctx) - if err != nil { - return err - } - - blk, err := t.GetBlock(ctx, blkID) - if err != nil { - t.Ctx.Log.Warn("dropping gossip request", - zap.String("reason", "block couldn't be loaded"), - zap.Stringer("blkID", blkID), - zap.Error(err), - ) - return nil - } - t.Ctx.Log.Verbo("gossiping accepted block to the network", - zap.Stringer("blkID", blkID), - ) - t.Sender.SendGossip(ctx, blk.Bytes()) - return nil -} - func (*Transitive) Halt(context.Context) {} func (t *Transitive) Shutdown(ctx context.Context) error { @@ -873,6 +923,7 @@ func (t *Transitive) sendQuery( t.Ctx.Log.Error("dropped query for block", zap.String("reason", "insufficient number of validators"), zap.Stringer("blkID", blkID), + zap.Int("size", t.Params.K), ) return } diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 69ca9611cc3f..26f6c1127bbf 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -1382,7 +1382,7 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) { func TestEngineGossip(t *testing.T) { require := require.New(t) - _, _, sender, vm, te, gBlk := setupDefaultConfig(t) + nodeID, _, sender, vm, te, gBlk := setupDefaultConfig(t) vm.LastAcceptedF = func(context.Context) (ids.ID, error) { return gBlk.ID(), nil @@ -1392,15 +1392,15 @@ func TestEngineGossip(t *testing.T) { return gBlk, nil } - called := new(bool) - sender.SendGossipF = func(_ context.Context, blkBytes []byte) { - *called = true - require.Equal(gBlk.Bytes(), blkBytes) + var calledSendPullQuery bool + sender.SendPullQueryF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], _ uint32, _ ids.ID, _ uint64) { + calledSendPullQuery = true + require.Equal(set.Of(nodeID), nodeIDs) } require.NoError(te.Gossip(context.Background())) - require.True(*called) + require.True(calledSendPullQuery) } func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) { diff --git a/snow/validators/manager.go b/snow/validators/manager.go index c42ea779d96b..8cf634f29bd7 100644 --- a/snow/validators/manager.go +++ b/snow/validators/manager.go @@ -85,6 +85,10 @@ type Manager interface { // If sampling the requested size isn't possible, an error will be returned. Sample(subnetID ids.ID, size int) ([]ids.NodeID, error) + // UniformSample returns a collection of validatorIDs in the subnet. + // If sampling the requested size isn't possible, an error will be returned. + UniformSample(subnetID ids.ID, size int) ([]ids.NodeID, error) + // Map of the validators in this subnet GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput @@ -253,6 +257,21 @@ func (m *manager) Sample(subnetID ids.ID, size int) ([]ids.NodeID, error) { return set.Sample(size) } +func (m *manager) UniformSample(subnetID ids.ID, size int) ([]ids.NodeID, error) { + if size == 0 { + return nil, nil + } + + m.lock.RLock() + set, exists := m.subnetToVdrs[subnetID] + m.lock.RUnlock() + if !exists { + return nil, ErrMissingValidators + } + + return set.UniformSample(size) +} + func (m *manager) GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput { m.lock.RLock() set, exists := m.subnetToVdrs[subnetID] diff --git a/snow/validators/set.go b/snow/validators/set.go index dfa294a70bbe..564cd107153a 100644 --- a/snow/validators/set.go +++ b/snow/validators/set.go @@ -243,6 +243,13 @@ func (s *vdrSet) Sample(size int) ([]ids.NodeID, error) { return s.sample(size) } +func (s *vdrSet) UniformSample(size int) ([]ids.NodeID, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.uniformSample(size) +} + func (s *vdrSet) sample(size int) ([]ids.NodeID, error) { if !s.samplerInitialized { if err := s.sampler.Initialize(s.weights); err != nil { @@ -263,6 +270,22 @@ func (s *vdrSet) sample(size int) ([]ids.NodeID, error) { return list, nil } +func (s *vdrSet) uniformSample(size int) ([]ids.NodeID, error) { + uniform := sampler.NewUniform() + uniform.Initialize(uint64(len(s.vdrSlice))) + + indices, err := uniform.Sample(size) + if err != nil { + return nil, err + } + + list := make([]ids.NodeID, size) + for i, index := range indices { + list[i] = s.vdrSlice[index].NodeID + } + return list, nil +} + func (s *vdrSet) TotalWeight() (uint64, error) { s.lock.RLock() defer s.lock.RUnlock() diff --git a/utils/constants/networking.go b/utils/constants/networking.go index d26f3db1070d..a9417eac37c9 100644 --- a/utils/constants/networking.go +++ b/utils/constants/networking.go @@ -71,12 +71,12 @@ const ( DefaultBenchlistMinFailingDuration = 2*time.Minute + 30*time.Second // Router - DefaultAcceptedFrontierGossipFrequency = 10 * time.Second DefaultConsensusAppConcurrency = 2 DefaultConsensusShutdownTimeout = time.Minute + DefaultFrontierPollFrequency = 100 * time.Millisecond DefaultConsensusGossipAcceptedFrontierValidatorSize = 0 DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0 - DefaultConsensusGossipAcceptedFrontierPeerSize = 15 + DefaultConsensusGossipAcceptedFrontierPeerSize = 1 DefaultConsensusGossipOnAcceptValidatorSize = 0 DefaultConsensusGossipOnAcceptNonValidatorSize = 0 DefaultConsensusGossipOnAcceptPeerSize = 10