Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT/20.10.x] go/worker/p2p: configurable libp2p buffer sizes #3277

Merged
merged 3 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .buildkite/code.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ steps:
env:
OASIS_E2E_COVERAGE: enable
TEST_BASE_DIR: /tmp
# libp2p logging.
IPFS_LOGGING: info
retry:
<<: *retry_agent_failure
plugins:
Expand All @@ -242,6 +244,8 @@ steps:
OASIS_E2E_COVERAGE: enable
OASIS_EXCLUDE_E2E: e2e/runtime/txsource-multi
TEST_BASE_DIR: /tmp
# libp2p logging.
IPFS_LOGGING: info
agents:
queue: intel-sgx
retry:
Expand All @@ -266,6 +270,8 @@ steps:
env:
OASIS_E2E_COVERAGE: enable
TEST_BASE_DIR: /tmp
# libp2p logging.
IPFS_LOGGING: info
agents:
queue: intel-sgx
retry:
Expand Down
2 changes: 2 additions & 0 deletions .buildkite/longtests.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ steps:
- .buildkite/scripts/daily_txsource.sh
env:
TEST_BASE_DIR: /var/tmp/longtests
# libp2p logging.
IPFS_LOGGING: info
agents:
queue: default-daily
buildkite_agent_class: stable
Expand Down
1 change: 1 addition & 0 deletions .changelog/3264.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runtime/client: Skip not yet published requests in retry check
4 changes: 4 additions & 0 deletions .changelog/3264.cfg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/p2p: Configurable libp2p buffer sizes

Added `worker.p2p.peer_outbound_queue_size` and
`worker.p2p.validate_queue_size` flags for configuring libp2p buffer sizes.
11 changes: 10 additions & 1 deletion go/runtime/client/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (w *blockWatcher) watch() {

// If we were just started, refresh the committee information from any
// block, otherwise just from epoch transition blocks.
gotInitialCommittee := false
var gotInitialCommittee bool
// latestGroupVersion contains the latest known committee group version.
var latestGroupVersion int64
// latestHeight contains the latest known consensus block height.
Expand Down Expand Up @@ -183,12 +183,21 @@ func (w *blockWatcher) watch() {

// Check if any transactions are due for a retry.
for key, watch := range w.watched {
if watch.height == 0 {
continue
}
if (latestHeight - retryInterval) < watch.height {
continue
}
res := &watchResult{
groupVersion: latestGroupVersion,
}
w.Logger.Debug("resending message",
"key", key,
"latest_height", latestHeight,
"retry_interval", retryInterval,
"watch_height", watch.height,
)
if watch.send(res, latestHeight) != nil {
delete(w.watched, key)
}
Expand Down
9 changes: 8 additions & 1 deletion go/worker/common/p2p/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const (
CfgP2pPort = "worker.p2p.port"

cfgP2pAddresses = "worker.p2p.addresses"

// CfgP2PPeerOutboundQueueSize sets the libp2p gossipsub buffer size for outbound messages.
CfgP2PPeerOutboundQueueSize = "worker.p2p.peer_outbound_queue_size"
// CfgP2PValidateQueueSize sets the libp2p gossipsub buffer size of the validate queue.
CfgP2PValidateQueueSize = "worker.p2p.validate_queue_size"
)

// Enabled reads our enabled flag from viper.
Expand All @@ -24,9 +29,11 @@ func Enabled() bool {
var Flags = flag.NewFlagSet("", flag.ContinueOnError)

func init() {
Flags.Bool(CfgP2PEnabled, false, "Enable P2P worker (automatically enabled if compute worker enabled).")
Flags.Bool(CfgP2PEnabled, false, "Enable P2P worker (automatically enabled if compute worker enabled)")
Flags.Uint16(CfgP2pPort, 9200, "Port to use for incoming P2P connections")
Flags.StringSlice(cfgP2pAddresses, []string{}, "Address/port(s) to use for P2P connections when registering this node (if not set, all non-loopback local interfaces will be used)")
Flags.Int64(CfgP2PPeerOutboundQueueSize, 32, "Set libp2p gossipsub buffer size for outbound messages")
Flags.Int64(CfgP2PValidateQueueSize, 32, "Set libp2p gossipsub buffer size of the validate queue")

_ = viper.BindPFlags(Flags)
}
2 changes: 2 additions & 0 deletions go/worker/common/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ func New(ctx context.Context, identity *identity.Identity, consensus consensus.B
pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerOutboundQueueSize(viper.GetInt(CfgP2PPeerOutboundQueueSize)),
pubsub.WithValidateQueueSize(viper.GetInt(CfgP2PValidateQueueSize)),
)
if err != nil {
return nil, fmt.Errorf("worker/common/p2p: failed to initialize libp2p gossipsub: %w", err)
Expand Down