Skip to content

Commit

Permalink
go/worker/p2p: configurable libp2p buffer sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Sep 11, 2020
1 parent 0662c6c commit c9d3c72
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
9 changes: 8 additions & 1 deletion go/worker/common/p2p/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const (
CfgP2pPort = "worker.p2p.port"

cfgP2pAddresses = "worker.p2p.addresses"

// CfgP2PPeerOutboundQueueSize sets the libp2p gossipsub buffer size for outbound messages.
CfgP2PPeerOutboundQueueSize = "worker.p2p.peer_outbound_queue_size"
// CfgP2PValidateQueueSize sets the libp2p gossipsub buffer size of the validate queue.
CfgP2PValidateQueueSize = "worker.p2p.validate_queue_size"
)

// Enabled reads our enabled flag from viper.
Expand All @@ -24,9 +29,11 @@ func Enabled() bool {
var Flags = flag.NewFlagSet("", flag.ContinueOnError)

func init() {
Flags.Bool(CfgP2PEnabled, false, "Enable P2P worker (automatically enabled if compute worker enabled).")
Flags.Bool(CfgP2PEnabled, false, "Enable P2P worker (automatically enabled if compute worker enabled)")
Flags.Uint16(CfgP2pPort, 9200, "Port to use for incoming P2P connections")
Flags.StringSlice(cfgP2pAddresses, []string{}, "Address/port(s) to use for P2P connections when registering this node (if not set, all non-loopback local interfaces will be used)")
Flags.Int64(CfgP2PPeerOutboundQueueSize, 32, "Set libp2p gossipsub buffer size for outbound messages")
Flags.Int64(CfgP2PValidateQueueSize, 32, "Set libp2p gossipsub buffer size of the validate queue")

_ = viper.BindPFlags(Flags)
}
2 changes: 2 additions & 0 deletions go/worker/common/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ func New(ctx context.Context, identity *identity.Identity, consensus consensus.B
pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerOutboundQueueSize(viper.GetInt(CfgP2PPeerOutboundQueueSize)),
pubsub.WithValidateQueueSize(viper.GetInt(CfgP2PValidateQueueSize)),
)
if err != nil {
return nil, fmt.Errorf("worker/common/p2p: failed to initialize libp2p gossipsub: %w", err)
Expand Down

0 comments on commit c9d3c72

Please sign in to comment.