Skip to content

Commit

Permalink
Backports #1558 and #1584 to 0.38.x (#1592) (#1611)
Browse files Browse the repository at this point in the history
* Experimental - Reduce # of connections effectively used to gossip transactions out (#1558)

* maxpeers for mempool

* mempool: fix max_peers bcast routine active flag

* Use semaphore to limit concurrency

* Rename MaxPeers to MaxOutboundPeers

* Add max_outbound_peers to config toml template

* Rename in error message

* Renams the parameter to highlight its experimental nature. Extend the AddPeer method to return an error. Moves the semaphone to outside the broadcast routine

* reverting the addition of error to AddPeer. It fails if the context is done and handling this case will be done some other time, when an actual context is passed into acquire.

* reverting the addition of error to AddPeer. It fails if the context is done and handling this case will be done some other time, when an actual context is passed into acquire.

* Fixing lint issue

* renaming semaphore to something more meaningful

* make default value 0, which is the same as the current behavior. 10 is the recommended value.

* adding new flag to manifest.go

* Adding changelog

* Improve the description of the parameter in the generated config file.

* Add metric to track the current number of active connections.

* Change metric to gauge type and rename it.

* e2e: Allow disabling the PEX reactor on all nodes in the testnet

* Apply suggestions from code review



* Update config/config.go comment

* fix lint error

* Improve config description

* Rename metric (remove experimental prefix)

* Add unit test

* Improve unit test

* Update mempool/reactor.go comment

---------








* Updating test file, leaving it broken for now

* mempool: Limit gossip connections to persistent and non-persistent peers (experimental) (#1584)

* Ignore persistent peers from limiting of outbound connections

* Update 1558-experimental-gossip-limiting.md

Update changeling

* Fix typo in mempool/metrics.go

* Use two independent configs and semaphores for persistent and non-persistent peers

* Forgot to rename in test

* Update metric description

* Rename semaphores

* Add comment to unit test

---------



* Reverting to old way of reporting errors

* Reverting change that shouldn't have been included in cherry-pick

* Reverting tests to use older functions

* fix rebase merge

---------

Co-authored-by: Adi Seredinschi <[email protected]>
Co-authored-by: Ethan Buchman <[email protected]>
Co-authored-by: Daniel Cason <[email protected]>
Co-authored-by: hvanz <[email protected]>
Co-authored-by: Andy Nogueira <[email protected]>
Co-authored-by: Sergio Mena <[email protected]>
  • Loading branch information
7 people authored Nov 15, 2023
1 parent 5b1e711 commit 47ffffa
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- `[mempool]` Add experimental feature to limit the number of persistent peers and non-persistent
peers to which the node gossip transactions.
([\#1558](https://github.com/cometbft/cometbft/pull/1558))
([\#1584](https://github.com/cometbft/cometbft/pull/1584))
- `[config]` Add mempool parameters `experimental_max_gossip_connections_to_persistent_peers` and
`experimental_max_gossip_connections_to_non_persistent_peers` for limiting the number of peers to
which the node gossip transactions.
([\#1558](https://github.com/cometbft/cometbft/pull/1558))
([\#1584](https://github.com/cometbft/cometbft/pull/1584))
27 changes: 27 additions & 0 deletions abci/example/kvstore/helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kvstore

import (
"fmt"
"strings"

"github.com/cometbft/cometbft/abci/types"
cmtrand "github.com/cometbft/cometbft/libs/rand"
)
Expand Down Expand Up @@ -34,3 +37,27 @@ func InitKVStore(app *PersistentKVStoreApplication) {
Validators: RandVals(1),
})
}

// Create a new transaction
func NewTx(key, value string) []byte {
return []byte(strings.Join([]string{key, value}, "="))
}

func NewRandomTx(size int) []byte {
if size < 4 {
panic("random tx size must be greater than 3")
}
return NewTx(cmtrand.Str(2), cmtrand.Str(size-3))
}

func NewRandomTxs(n int) [][]byte {
txs := make([][]byte, n)
for i := 0; i < n; i++ {
txs[i] = NewRandomTx(10)
}
return txs
}

func NewTxFromID(i int) []byte {
return []byte(fmt.Sprintf("%d=%d", i, i))
}
29 changes: 25 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,19 @@ type MempoolConfig struct {
// Including space needed by encoding (one varint per transaction).
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
MaxBatchBytes int `mapstructure:"max_batch_bytes"`
// Experimental parameters to limit gossiping txs to up to the specified number of peers.
// We use two independent upper values for persistent peers and for non-persistent peers.
// Unconditional peers are not affected by this feature.
// If we are connected to more than the specified number of persistent peers, only send txs to
// the first ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
// persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
// peers, with an upper limit of ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// If set to 0, the feature is disabled for the corresponding group of peers, that is, the
// number of active connections to that group of peers is not bounded.
// For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
// performance results using the default P2P configuration.
ExperimentalMaxGossipConnectionsToPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_non_persistent_peers"`

// TTLDuration, if non-zero, defines the maximum amount of time a transaction
// can exist for in the mempool.
Expand Down Expand Up @@ -794,10 +807,12 @@ func DefaultMempoolConfig() *MempoolConfig {
WalPath: "",
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
ExperimentalMaxGossipConnectionsToNonPersistentPeers: 0,
ExperimentalMaxGossipConnectionsToPersistentPeers: 0,
TTLDuration: 0 * time.Second,
TTLNumBlocks: 0,
}
Expand Down Expand Up @@ -835,6 +850,12 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.MaxTxBytes < 0 {
return errors.New("max_tx_bytes can't be negative")
}
if cfg.ExperimentalMaxGossipConnectionsToPersistentPeers < 0 {
return errors.New("experimental_max_gossip_connections_to_persistent_peers can't be negative")
}
if cfg.ExperimentalMaxGossipConnectionsToNonPersistentPeers < 0 {
return errors.New("experimental_max_gossip_connections_to_non_persistent_peers can't be negative")
}
return nil
}

Expand Down
14 changes: 14 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,20 @@ ttl-duration = "{{ .Mempool.TTLDuration }}"
# it's insertion time into the mempool is beyond ttl-duration.
ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
# Experimental parameters to limit gossiping txs to up to the specified number of peers.
# We use two independent upper values for persistent peers and for non-persistent peers.
# Unconditional peers are not affected by this feature.
# If we are connected to more than the specified number of persistent peers, only send txs to
# the first experimental_max_gossip_connections_to_persistent_peers of them. If one of those
# persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
# peers, with an upper limit of experimental_max_gossip_connections_to_non_persistent_peers.
# If set to 0, the feature is disabled for the corresponding group of peers, that is, the
# number of active connections to that group of peers is not bounded.
# For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
# performance results using the default P2P configuration.
experimental_max_gossip_connections_to_persistent_peers = {{ .Mempool.ExperimentalMaxGossipConnectionsToPersistentPeers }}
experimental_max_gossip_connections_to_non_persistent_peers = {{ .Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers }}
#######################################################
### State Sync Configuration Options ###
#######################################################
Expand Down
21 changes: 14 additions & 7 deletions mempool/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ type Metrics struct {

// Number of times transactions are rechecked in the mempool.
RecheckTimes metrics.Counter

// Number of connections being actively used for gossiping transactions
// (experimental feature).
ActiveOutboundConnections metrics.Gauge
}
22 changes: 16 additions & 6 deletions mempool/v0/clist_mempool_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package v0

import (
"crypto/rand"
"encoding/binary"
"fmt"
mrand "math/rand"
Expand Down Expand Up @@ -96,16 +95,27 @@ func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
}
}

func callCheckTx(t *testing.T, mp mempool.Mempool, txs types.Txs) {
txInfo := mempool.TxInfo{SenderID: 0}
for i, tx := range txs {
if err := mp.CheckTx(tx, nil, txInfo); err != nil {
// Skip invalid txs.
// TestMempoolFilters will fail otherwise. It asserts a number of txs
// returned.
if mempool.IsPreCheckError(err) {
continue
}
t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i)
}
}
}

func checkTxs(t *testing.T, mp mempool.Mempool, count int, peerID uint16) types.Txs {
txs := make(types.Txs, count)
txInfo := mempool.TxInfo{SenderID: peerID}
for i := 0; i < count; i++ {
txBytes := make([]byte, 20)
txBytes := kvstore.NewRandomTx(20)
txs[i] = txBytes
_, err := rand.Read(txBytes)
if err != nil {
t.Error(err)
}
if err := mp.CheckTx(txBytes, nil, txInfo); err != nil {
// Skip invalid txs.
// TestMempoolFilters will fail otherwise. It asserts a number of txs
Expand Down
44 changes: 43 additions & 1 deletion mempool/v0/reactor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v0

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/cometbft/cometbft/p2p"
protomem "github.com/cometbft/cometbft/proto/tendermint/mempool"
"github.com/cometbft/cometbft/types"
"golang.org/x/sync/semaphore"
)

// Reactor handles mempool tx broadcasting amongst peers.
Expand All @@ -23,6 +25,12 @@ type Reactor struct {
config *cfg.MempoolConfig
mempool *CListMempool
ids *mempoolIDs

// Semaphores to keep track of how many connections to peers are active for broadcasting
// transactions. Each semaphore has a capacity that puts an upper bound on the number of
// connections for different groups of peers.
activePersistentPeersSemaphore *semaphore.Weighted
activeNonPersistentPeersSemaphore *semaphore.Weighted
}

type mempoolIDs struct {
Expand Down Expand Up @@ -96,6 +104,9 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers))
memR.activeNonPersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers))

return memR
}

Expand Down Expand Up @@ -143,7 +154,37 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
go memR.broadcastTxRoutine(peer)
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activePersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activePersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}

if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activeNonPersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activeNonPersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}
}

memR.mempool.metrics.ActiveOutboundConnections.Add(1)
memR.broadcastTxRoutine(peer)
}()
}
}

Expand Down Expand Up @@ -203,6 +244,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
if !memR.IsRunning() || !peer.IsRunning() {
return
}

// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
Expand Down
53 changes: 53 additions & 0 deletions mempool/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,51 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
}
}

// Test the experimental feature that limits the number of outgoing connections for gossiping
// transactions (only non-persistent peers).
// Note: in this test we know which gossip connections are active or not because of how the p2p
// functions are currently implemented, which affects the order in which peers are added to the
// mempool reactor.
func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) {
config := cfg.TestConfig()
config.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = 1
reactors := makeAndConnectReactors(config, 4)
defer func() {
for _, r := range reactors {
if err := r.Stop(); err != nil {
assert.NoError(t, err)
}
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
peer.Set(types.PeerStateKey, peerState{1})
}
}

// Add a bunch transactions to the first reactor.
txs := newUniqueTxs(100)
callCheckTx(t, reactors[0].mempool, txs)

// Wait for all txs to be in the mempool of the second reactor; the other reactors should not
// receive any tx. (The second reactor only sends transactions to the first reactor.)
waitForTxsOnReactor(t, txs, reactors[1], 0)
for _, r := range reactors[2:] {
require.Zero(t, r.mempool.Size())
}

// Disconnect the second reactor from the first reactor.
firstPeer := reactors[0].Switch.Peers().List()[0]
reactors[0].Switch.StopPeerGracefully(firstPeer)

// Now the third reactor should start receiving transactions from the first reactor; the fourth
// reactor's mempool should still be empty.
waitForTxsOnReactor(t, txs, reactors[2], 0)
for _, r := range reactors[3:] {
require.Zero(t, r.mempool.Size())
}
}

// mempoolLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func mempoolLogger() log.Logger {
Expand Down Expand Up @@ -328,6 +373,14 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
return reactors
}

func newUniqueTxs(n int) types.Txs {
txs := make(types.Txs, n)
for i := 0; i < n; i++ {
txs[i] = kvstore.NewTxFromID(i)
}
return txs
}

func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) {
// wait for the txs in all mempools
wg := new(sync.WaitGroup)
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/pkg/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type Manifest struct {
// Enable or disable Prometheus metrics on all nodes.
// Defaults to false (disabled).
Prometheus bool `toml:"prometheus"`

// Maximum number of peers to which the node gossip transactions
ExperimentalMaxGossipConnectionsToPersistentPeers uint `toml:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers uint `toml:"experimental_max_gossip_connections_to_non_persistent_peers"`
}

// ManifestNode represents a node in a testnet manifest.
Expand Down
Loading

0 comments on commit 47ffffa

Please sign in to comment.