Skip to content

Commit

Permalink
Merge pull request #3264 from oasisprotocol/ptrus/fix/runtime-p2p-client
Browse files Browse the repository at this point in the history
go/worker/p2p: configurable libp2p buffer sizes
  • Loading branch information
ptrus authored Sep 11, 2020
2 parents 01ce036 + 9f45e3d commit b53a134
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 2 deletions.
6 changes: 6 additions & 0 deletions .buildkite/code.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ steps:
env:
OASIS_E2E_COVERAGE: enable
TEST_BASE_DIR: /tmp
# libp2p logging.
IPFS_LOGGING: info
retry:
<<: *retry_agent_failure
plugins:
Expand All @@ -242,6 +244,8 @@ steps:
OASIS_E2E_COVERAGE: enable
OASIS_EXCLUDE_E2E: e2e/runtime/txsource-multi
TEST_BASE_DIR: /tmp
# libp2p logging.
IPFS_LOGGING: info
agents:
queue: intel-sgx
retry:
Expand All @@ -266,6 +270,8 @@ steps:
env:
OASIS_E2E_COVERAGE: enable
TEST_BASE_DIR: /tmp
# libp2p logging.
IPFS_LOGGING: info
agents:
queue: intel-sgx
retry:
Expand Down
2 changes: 2 additions & 0 deletions .buildkite/longtests.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ steps:
- .buildkite/scripts/daily_txsource.sh
env:
TEST_BASE_DIR: /var/tmp/longtests
# libp2p logging.
IPFS_LOGGING: info
agents:
queue: default-daily
buildkite_agent_class: stable
Expand Down
1 change: 1 addition & 0 deletions .changelog/3264.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runtime/client: Skip not yet published requests in retry check
4 changes: 4 additions & 0 deletions .changelog/3264.cfg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/p2p: Configurable libp2p buffer sizes

Added `worker.p2p.peer_outbound_queue_size` and
`worker.p2p.validate_queue_size` flags for configuring libp2p buffer sizes.
11 changes: 10 additions & 1 deletion go/runtime/client/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (w *blockWatcher) watch() {

// If we were just started, refresh the committee information from any
// block, otherwise just from epoch transition blocks.
gotInitialCommittee := false
var gotInitialCommittee bool
// latestGroupVersion contains the latest known committee group version.
var latestGroupVersion int64
// latestHeight contains the latest known consensus block height.
Expand Down Expand Up @@ -183,12 +183,21 @@ func (w *blockWatcher) watch() {

// Check if any transactions are due for a retry.
for key, watch := range w.watched {
if watch.height == 0 {
continue
}
if (latestHeight - retryInterval) < watch.height {
continue
}
res := &watchResult{
groupVersion: latestGroupVersion,
}
w.Logger.Debug("resending message",
"key", key,
"latest_height", latestHeight,
"retry_interval", retryInterval,
"watch_height", watch.height,
)
if watch.send(res, latestHeight) != nil {
delete(w.watched, key)
}
Expand Down
9 changes: 8 additions & 1 deletion go/worker/common/p2p/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const (
CfgP2pPort = "worker.p2p.port"

cfgP2pAddresses = "worker.p2p.addresses"

// CfgP2PPeerOutboundQueueSize sets the libp2p gossipsub buffer size for outbound messages.
CfgP2PPeerOutboundQueueSize = "worker.p2p.peer_outbound_queue_size"
// CfgP2PValidateQueueSize sets the libp2p gossipsub buffer size of the validate queue.
CfgP2PValidateQueueSize = "worker.p2p.validate_queue_size"
)

// Enabled reads our enabled flag from viper.
Expand All @@ -24,9 +29,11 @@ func Enabled() bool {
var Flags = flag.NewFlagSet("", flag.ContinueOnError)

func init() {
Flags.Bool(CfgP2PEnabled, false, "Enable P2P worker (automatically enabled if compute worker enabled).")
Flags.Bool(CfgP2PEnabled, false, "Enable P2P worker (automatically enabled if compute worker enabled)")
Flags.Uint16(CfgP2pPort, 9200, "Port to use for incoming P2P connections")
Flags.StringSlice(cfgP2pAddresses, []string{}, "Address/port(s) to use for P2P connections when registering this node (if not set, all non-loopback local interfaces will be used)")
Flags.Int64(CfgP2PPeerOutboundQueueSize, 32, "Set libp2p gossipsub buffer size for outbound messages")
Flags.Int64(CfgP2PValidateQueueSize, 32, "Set libp2p gossipsub buffer size of the validate queue")

_ = viper.BindPFlags(Flags)
}
2 changes: 2 additions & 0 deletions go/worker/common/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ func New(ctx context.Context, identity *identity.Identity, consensus consensus.B
pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerOutboundQueueSize(viper.GetInt(CfgP2PPeerOutboundQueueSize)),
pubsub.WithValidateQueueSize(viper.GetInt(CfgP2PValidateQueueSize)),
)
if err != nil {
return nil, fmt.Errorf("worker/common/p2p: failed to initialize libp2p gossipsub: %w", err)
Expand Down

0 comments on commit b53a134

Please sign in to comment.