From 2d1ac71dba1eb0e2a5357759adb276f6d65d3dbb Mon Sep 17 00:00:00 2001 From: Stephen Date: Tue, 11 Apr 2023 12:18:25 -0400 Subject: [PATCH] Expose consensus-app-concurrency --- chains/manager.go | 3 +++ config/config.go | 6 ++++++ config/flags.go | 1 + config/keys.go | 1 + node/config.go | 3 +++ node/node.go | 1 + snow/networking/handler/handler.go | 2 +- snow/networking/handler/handler_test.go | 8 ++++++++ snow/networking/router/chain_router_test.go | 13 ++++++++++++- snow/networking/sender/sender_test.go | 5 +++++ utils/constants/networking.go | 1 + vms/platformvm/vm_test.go | 1 + 12 files changed, 43 insertions(+), 2 deletions(-) diff --git a/chains/manager.go b/chains/manager.go index c821cbc6fb26..b3614139cb18 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -205,6 +205,7 @@ type ManagerConfig struct { Metrics metrics.MultiGatherer ConsensusGossipFrequency time.Duration + ConsensusAppConcurrency int // Max Time to spend fetching a container and its // ancestors when responding to a GetAncestors @@ -818,6 +819,7 @@ func (m *manager) createAvalancheChain( vdrs, msgChan, m.ConsensusGossipFrequency, + m.ConsensusAppConcurrency, m.ResourceTracker, validators.UnhandledSubnetConnector, // avalanche chains don't use subnet connector sb, @@ -1173,6 +1175,7 @@ func (m *manager) createSnowmanChain( vdrs, msgChan, m.ConsensusGossipFrequency, + m.ConsensusAppConcurrency, m.ResourceTracker, subnetConnector, sb, diff --git a/config/config.go b/config/config.go index 70a1c782fcec..e955c749b01b 100644 --- a/config/config.go +++ b/config/config.go @@ -1310,6 +1310,12 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) { return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusGossipFrequencyKey) } + // App handling + nodeConfig.ConsensusAppConcurrency = int(v.GetUint(ConsensusAppConcurrencyKey)) + if nodeConfig.ConsensusAppConcurrency <= 0 { + return node.Config{}, fmt.Errorf("%s must be > 0", ConsensusAppConcurrencyKey) + } + nodeConfig.UseCurrentHeight = v.GetBool(ProposerVMUseCurrentHeightKey) // Logging diff --git a/config/flags.go b/config/flags.go index 98c782d0f161..c30f6d27e94e 100644 --- a/config/flags.go +++ b/config/flags.go @@ -178,6 +178,7 @@ func addNodeFlags(fs *pflag.FlagSet) { // Router fs.Duration(ConsensusGossipFrequencyKey, constants.DefaultConsensusGossipFrequency, "Frequency of gossiping accepted frontiers") + fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain") fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain") fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier") fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier") diff --git a/config/keys.go b/config/keys.go index 1f9e0a00b21d..01c3364393c3 100644 --- a/config/keys.go +++ b/config/keys.go @@ -143,6 +143,7 @@ const ( IpcsPathKey = "ipcs-path" MeterVMsEnabledKey = "meter-vms-enabled" ConsensusGossipFrequencyKey = "consensus-gossip-frequency" + ConsensusAppConcurrencyKey = "consensus-app-concurrency" ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size" ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size" ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size" diff --git a/node/config.go b/node/config.go index 4fce225eb172..d32a0b071726 100644 --- a/node/config.go +++ b/node/config.go @@ -179,6 +179,9 @@ type Config struct { ConsensusShutdownTimeout time.Duration `json:"consensusShutdownTimeout"` // Gossip a container in the accepted frontier every [ConsensusGossipFrequency] ConsensusGossipFrequency time.Duration `json:"consensusGossipFreq"` + // ConsensusAppConcurrency defines the maximum number of goroutines to + // handle App messages per chain. + ConsensusAppConcurrency int `json:"consensusAppConcurrency"` TrackedSubnets set.Set[ids.ID] `json:"trackedSubnets"` diff --git a/node/node.go b/node/node.go index 7e46ff066f6e..0e258f4841a2 100644 --- a/node/node.go +++ b/node/node.go @@ -740,6 +740,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { SubnetConfigs: n.Config.SubnetConfigs, ChainConfigs: n.Config.ChainConfigs, ConsensusGossipFrequency: n.Config.ConsensusGossipFrequency, + ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency, BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors, BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent, BootstrapAncestorsMaxContainersReceived: n.Config.BootstrapAncestorsMaxContainersReceived, diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 2934af800cbe..81b8a9c5e282 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -32,7 +32,6 @@ import ( ) const ( - threadPoolSize = 2 numDispatchersToClose = 3 // If a consensus message takes longer than this to process, the handler // will log a warning. @@ -121,6 +120,7 @@ func New( validators validators.Set, msgFromVMChan <-chan common.Message, gossipFrequency time.Duration, + threadPoolSize int, resourceTracker tracker.ResourceTracker, subnetConnector validators.SubnetConnector, subnet subnets.Subnet, diff --git a/snow/networking/handler/handler_test.go b/snow/networking/handler/handler_test.go index 9b51748b92c4..b9e757b7d99d 100644 --- a/snow/networking/handler/handler_test.go +++ b/snow/networking/handler/handler_test.go @@ -28,6 +28,8 @@ import ( "github.com/ava-labs/avalanchego/utils/resource" ) +const testThreadPoolSize = 2 + var errFatal = errors.New("error should cause handler to close") func TestHandlerDropsTimedOutMessages(t *testing.T) { @@ -52,6 +54,7 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -146,6 +149,7 @@ func TestHandlerClosesOnError(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -236,6 +240,7 @@ func TestHandlerDropsGossipDuringBootstrapping(t *testing.T) { vdrs, nil, 1, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -315,6 +320,7 @@ func TestHandlerDispatchInternal(t *testing.T) { vdrs, msgFromVMChan, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -392,6 +398,7 @@ func TestHandlerSubnetConnector(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, connector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -561,6 +568,7 @@ func TestDynamicEngineTypeDispatch(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ids.EmptyNodeID, subnets.Config{}), diff --git a/snow/networking/router/chain_router_test.go b/snow/networking/router/chain_router_test.go index 27a83b6f3036..f235064b28f4 100644 --- a/snow/networking/router/chain_router_test.go +++ b/snow/networking/router/chain_router_test.go @@ -36,7 +36,10 @@ import ( "github.com/ava-labs/avalanchego/version" ) -const engineType = p2p.EngineType_ENGINE_TYPE_AVALANCHE +const ( + engineType = p2p.EngineType_ENGINE_TYPE_AVALANCHE + testThreadPoolSize = 2 +) func TestShutdown(t *testing.T) { vdrs := validators.NewSet() @@ -89,6 +92,7 @@ func TestShutdown(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -224,6 +228,7 @@ func TestShutdownTimesOut(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -380,6 +385,7 @@ func TestRouterTimeout(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -848,6 +854,7 @@ func TestRouterClearTimeouts(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -1138,6 +1145,7 @@ func TestValidatorOnlyMessageDrops(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, sb, @@ -1287,6 +1295,7 @@ func TestRouterCrossChainMessages(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(requester.NodeID, subnets.Config{}), @@ -1304,6 +1313,7 @@ func TestRouterCrossChainMessages(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(responder.NodeID, subnets.Config{}), @@ -1552,6 +1562,7 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, sb, diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index db20b12b3c44..04bfeea6d561 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -37,6 +37,8 @@ import ( "github.com/ava-labs/avalanchego/version" ) +const testThreadPoolSize = 2 + var defaultSubnetConfig = subnets.Config{ GossipConfig: subnets.GossipConfig{ AcceptedFrontierPeerSize: 2, @@ -122,6 +124,7 @@ func TestTimeout(t *testing.T) { vdrs, nil, time.Hour, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -395,6 +398,7 @@ func TestReliableMessages(t *testing.T) { vdrs, nil, 1, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), @@ -543,6 +547,7 @@ func TestReliableMessagesToMyself(t *testing.T) { vdrs, nil, time.Second, + testThreadPoolSize, resourceTracker, validators.UnhandledSubnetConnector, subnets.New(ctx.NodeID, subnets.Config{}), diff --git a/utils/constants/networking.go b/utils/constants/networking.go index b958e2c40889..7290718c267a 100644 --- a/utils/constants/networking.go +++ b/utils/constants/networking.go @@ -79,6 +79,7 @@ const ( // Router DefaultConsensusGossipFrequency = 10 * time.Second + DefaultConsensusAppConcurrency = 2 DefaultConsensusShutdownTimeout = 30 * time.Second DefaultConsensusGossipAcceptedFrontierValidatorSize = 0 DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0 diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index f7b679d9a63e..539fd3862f84 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -1827,6 +1827,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { beacons, msgChan, time.Hour, + 2, cpuTracker, vm, subnets.New(ctx.NodeID, subnets.Config{}),