Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose consensus-app-concurrency #1322

Merged
merged 2 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ type ManagerConfig struct {
Metrics metrics.MultiGatherer

ConsensusGossipFrequency time.Duration
ConsensusAppConcurrency int

// Max Time to spend fetching a container and its
// ancestors when responding to a GetAncestors
Expand Down Expand Up @@ -818,6 +819,7 @@ func (m *manager) createAvalancheChain(
vdrs,
msgChan,
m.ConsensusGossipFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
validators.UnhandledSubnetConnector, // avalanche chains don't use subnet connector
sb,
Expand Down Expand Up @@ -1173,6 +1175,7 @@ func (m *manager) createSnowmanChain(
vdrs,
msgChan,
m.ConsensusGossipFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
subnetConnector,
sb,
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,12 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusGossipFrequencyKey)
}

// App handling
nodeConfig.ConsensusAppConcurrency = int(v.GetUint(ConsensusAppConcurrencyKey))
if nodeConfig.ConsensusAppConcurrency <= 0 {
return node.Config{}, fmt.Errorf("%s must be > 0", ConsensusAppConcurrencyKey)
}

nodeConfig.UseCurrentHeight = v.GetBool(ProposerVMUseCurrentHeightKey)

// Logging
Expand Down
1 change: 1 addition & 0 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func addNodeFlags(fs *pflag.FlagSet) {

// Router
fs.Duration(ConsensusGossipFrequencyKey, constants.DefaultConsensusGossipFrequency, "Frequency of gossiping accepted frontiers")
fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain")
fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain")
fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier")
Expand Down
1 change: 1 addition & 0 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ const (
IpcsPathKey = "ipcs-path"
MeterVMsEnabledKey = "meter-vms-enabled"
ConsensusGossipFrequencyKey = "consensus-gossip-frequency"
ConsensusAppConcurrencyKey = "consensus-app-concurrency"
ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size"
ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size"
ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size"
Expand Down
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type Config struct {
ConsensusShutdownTimeout time.Duration `json:"consensusShutdownTimeout"`
// Gossip a container in the accepted frontier every [ConsensusGossipFrequency]
ConsensusGossipFrequency time.Duration `json:"consensusGossipFreq"`
// ConsensusAppConcurrency defines the maximum number of goroutines to
// handle App messages per chain.
ConsensusAppConcurrency int `json:"consensusAppConcurrency"`

TrackedSubnets set.Set[ids.ID] `json:"trackedSubnets"`

Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error {
SubnetConfigs: n.Config.SubnetConfigs,
ChainConfigs: n.Config.ChainConfigs,
ConsensusGossipFrequency: n.Config.ConsensusGossipFrequency,
ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency,
BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors,
BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent,
BootstrapAncestorsMaxContainersReceived: n.Config.BootstrapAncestorsMaxContainersReceived,
Expand Down
2 changes: 1 addition & 1 deletion snow/networking/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
)

const (
threadPoolSize = 2
numDispatchersToClose = 3
// If a consensus message takes longer than this to process, the handler
// will log a warning.
Expand Down Expand Up @@ -121,6 +120,7 @@ func New(
validators validators.Set,
msgFromVMChan <-chan common.Message,
gossipFrequency time.Duration,
threadPoolSize int,
resourceTracker tracker.ResourceTracker,
subnetConnector validators.SubnetConnector,
subnet subnets.Subnet,
Expand Down
8 changes: 8 additions & 0 deletions snow/networking/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/ava-labs/avalanchego/utils/resource"
)

const testThreadPoolSize = 2

var errFatal = errors.New("error should cause handler to close")

func TestHandlerDropsTimedOutMessages(t *testing.T) {
Expand All @@ -52,6 +54,7 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -146,6 +149,7 @@ func TestHandlerClosesOnError(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -236,6 +240,7 @@ func TestHandlerDropsGossipDuringBootstrapping(t *testing.T) {
vdrs,
nil,
1,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -315,6 +320,7 @@ func TestHandlerDispatchInternal(t *testing.T) {
vdrs,
msgFromVMChan,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -392,6 +398,7 @@ func TestHandlerSubnetConnector(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
connector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -561,6 +568,7 @@ func TestDynamicEngineTypeDispatch(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ids.EmptyNodeID, subnets.Config{}),
Expand Down
13 changes: 12 additions & 1 deletion snow/networking/router/chain_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import (
"github.com/ava-labs/avalanchego/version"
)

const engineType = p2p.EngineType_ENGINE_TYPE_AVALANCHE
const (
engineType = p2p.EngineType_ENGINE_TYPE_AVALANCHE
testThreadPoolSize = 2
)

func TestShutdown(t *testing.T) {
vdrs := validators.NewSet()
Expand Down Expand Up @@ -89,6 +92,7 @@ func TestShutdown(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -224,6 +228,7 @@ func TestShutdownTimesOut(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -380,6 +385,7 @@ func TestRouterTimeout(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -848,6 +854,7 @@ func TestRouterClearTimeouts(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -1138,6 +1145,7 @@ func TestValidatorOnlyMessageDrops(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
sb,
Expand Down Expand Up @@ -1287,6 +1295,7 @@ func TestRouterCrossChainMessages(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(requester.NodeID, subnets.Config{}),
Expand All @@ -1304,6 +1313,7 @@ func TestRouterCrossChainMessages(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(responder.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -1552,6 +1562,7 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
sb,
Expand Down
5 changes: 5 additions & 0 deletions snow/networking/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/ava-labs/avalanchego/version"
)

const testThreadPoolSize = 2

var defaultSubnetConfig = subnets.Config{
GossipConfig: subnets.GossipConfig{
AcceptedFrontierPeerSize: 2,
Expand Down Expand Up @@ -122,6 +124,7 @@ func TestTimeout(t *testing.T) {
vdrs,
nil,
time.Hour,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -395,6 +398,7 @@ func TestReliableMessages(t *testing.T) {
vdrs,
nil,
1,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down Expand Up @@ -543,6 +547,7 @@ func TestReliableMessagesToMyself(t *testing.T) {
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down
1 change: 1 addition & 0 deletions utils/constants/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (

// Router
DefaultConsensusGossipFrequency = 10 * time.Second
DefaultConsensusAppConcurrency = 2
DefaultConsensusShutdownTimeout = 30 * time.Second
DefaultConsensusGossipAcceptedFrontierValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0
Expand Down
1 change: 1 addition & 0 deletions vms/platformvm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
beacons,
msgChan,
time.Hour,
2,
cpuTracker,
vm,
subnets.New(ctx.NodeID, subnets.Config{}),
Expand Down