Skip to content

Commit

Permalink
Expose consensus-app-concurrency (#1322)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Apr 11, 2023
1 parent d3af2b5 commit 725108f
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 2 deletions.
3 changes: 3 additions & 0 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ type ManagerConfig struct {
Metrics metrics.MultiGatherer

ConsensusGossipFrequency time.Duration
ConsensusAppConcurrency int

// Max Time to spend fetching a container and its
// ancestors when responding to a GetAncestors
Expand Down Expand Up @@ -818,6 +819,7 @@ func (m *manager) createAvalancheChain(
vdrs,
msgChan,
m.ConsensusGossipFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
validators.UnhandledSubnetConnector, // avalanche chains don't use subnet connector
sb,
Expand Down Expand Up @@ -1173,6 +1175,7 @@ func (m *manager) createSnowmanChain(
vdrs,
msgChan,
m.ConsensusGossipFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
subnetConnector,
sb,
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,12 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusGossipFrequencyKey)
}

// App handling
nodeConfig.ConsensusAppConcurrency = int(v.GetUint(ConsensusAppConcurrencyKey))
if nodeConfig.ConsensusAppConcurrency <= 0 {
return node.Config{}, fmt.Errorf("%s must be > 0", ConsensusAppConcurrencyKey)
}

nodeConfig.UseCurrentHeight = v.GetBool(ProposerVMUseCurrentHeightKey)

// Logging
Expand Down
1 change: 1 addition & 0 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func addNodeFlags(fs *pflag.FlagSet) {

// Router
fs.Duration(ConsensusGossipFrequencyKey, constants.DefaultConsensusGossipFrequency, "Frequency of gossiping accepted frontiers")
fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain")
fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain")
fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier")
Expand Down
1 change: 1 addition & 0 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ const (
IpcsPathKey = "ipcs-path"
MeterVMsEnabledKey = "meter-vms-enabled"
ConsensusGossipFrequencyKey = "consensus-gossip-frequency"
ConsensusAppConcurrencyKey = "consensus-app-concurrency"
ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size"
ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size"
ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size"
Expand Down
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type Config struct {
ConsensusShutdownTimeout time.Duration `json:"consensusShutdownTimeout"`
// Gossip a container in the accepted frontier every [ConsensusGossipFrequency]
ConsensusGossipFrequency time.Duration `json:"consensusGossipFreq"`
// ConsensusAppConcurrency defines the maximum number of goroutines to
// handle App messages per chain.
ConsensusAppConcurrency int `json:"consensusAppConcurrency"`

TrackedSubnets set.Set[ids.ID] `json:"trackedSubnets"`

Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error {
SubnetConfigs: n.Config.SubnetConfigs,
ChainConfigs: n.Config.ChainConfigs,
ConsensusGossipFrequency: n.Config.ConsensusGossipFrequency,
ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency,
BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors,
BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent,
BootstrapAncestorsMaxContainersReceived: n.Config.BootstrapAncestorsMaxContainersReceived,
Expand Down
2 changes: 1 addition & 1 deletion snow/networking/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
)

const (
threadPoolSize = 2
numDispatchersToClose = 3
// If a consensus message takes longer than this to process, the handler
// will log a warning.
Expand Down Expand Up @@ -121,6 +120,7 @@ func New(
validators validators.Set,
msgFromVMChan <-chan common.Message,
gossipFrequency time.Duration,
threadPoolSize int,
resourceTracker tracker.ResourceTracker,
subnetConnector validators.SubnetConnector,
subnet subnets.Subnet,
Expand Down
8 changes: 8 additions & 0 deletions snow/networking/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/ava-labs/avalanchego/utils/resource"
)

const testThreadPoolSize = 2

var errFatal = errors.New("error should cause handler to close")

func TestHandlerDropsTimedOutMessages(t *testing.T) {
Expand All @@ -52,6 +54,7 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -146,6 +149,7 @@ func TestHandlerClosesOnError(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -236,6 +240,7 @@ func TestHandlerDropsGossipDuringBootstrapping(t *testing.T) {
vdrs,
nil,
1,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -315,6 +320,7 @@ func TestHandlerDispatchInternal(t *testing.T) {
vdrs,
msgFromVMChan,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -392,6 +398,7 @@ func TestHandlerSubnetConnector(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
connector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -561,6 +568,7 @@ func TestDynamicEngineTypeDispatch(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ids.EmptyNodeID, subnets.Config{}),
Expand Down
13 changes: 12 additions & 1 deletion snow/networking/router/chain_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import (
"github.com/ava-labs/avalanchego/version"
)

const engineType = p2p.EngineType_ENGINE_TYPE_AVALANCHE
const (
engineType = p2p.EngineType_ENGINE_TYPE_AVALANCHE
testThreadPoolSize = 2
)

func TestShutdown(t *testing.T) {
vdrs := validators.NewSet()
Expand Down Expand Up @@ -89,6 +92,7 @@ func TestShutdown(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -224,6 +228,7 @@ func TestShutdownTimesOut(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -380,6 +385,7 @@ func TestRouterTimeout(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -848,6 +854,7 @@ func TestRouterClearTimeouts(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -1138,6 +1145,7 @@ func TestValidatorOnlyMessageDrops(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
sb,
Expand Down Expand Up @@ -1287,6 +1295,7 @@ func TestRouterCrossChainMessages(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(requester.NodeID, subnets.Config{}),
Expand All @@ -1304,6 +1313,7 @@ func TestRouterCrossChainMessages(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(responder.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -1552,6 +1562,7 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
sb,
Expand Down
5 changes: 5 additions & 0 deletions snow/networking/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/ava-labs/avalanchego/version"
)

const testThreadPoolSize = 2

var defaultSubnetConfig = subnets.Config{
GossipConfig: subnets.GossipConfig{
AcceptedFrontierPeerSize: 2,
Expand Down Expand Up @@ -122,6 +124,7 @@ func TestTimeout(t *testing.T) {
vdrs,
nil,
time.Hour,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -395,6 +398,7 @@ func TestReliableMessages(t *testing.T) {
vdrs,
nil,
1,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -543,6 +547,7 @@ func TestReliableMessagesToMyself(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down
1 change: 1 addition & 0 deletions utils/constants/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (

// Router
DefaultConsensusGossipFrequency = 10 * time.Second
DefaultConsensusAppConcurrency = 2
DefaultConsensusShutdownTimeout = 30 * time.Second
DefaultConsensusGossipAcceptedFrontierValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0
Expand Down
1 change: 1 addition & 0 deletions vms/platformvm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
beacons,
msgChan,
time.Hour,
2,
cpuTracker,
vm,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down

0 comments on commit 725108f

Please sign in to comment.