Skip to content

Commit

Permalink
experimental mempool: Backport cometbft#1558 and cometbft#1584 to 0.3…
Browse files Browse the repository at this point in the history
…4.x (cometbft#1630)

* Backports cometbft#1558 and cometbft#1584 to 0.37.x (cometbft#1611)
Backports cometbft#1558 and cometbft#1584 to 0.38.x (cometbft#1592)

* mempool: Limit gossip connections to persistent and non-persistent peers (experimental) (cometbft#1584)

* Experimental - Reduce # of connections effectively used to gossip transactions out (cometbft#1558)

* maxpeers for mempool

* mempool: fix max_peers bcast routine active flag

* Use semaphore to limit concurrency

* Rename MaxPeers to MaxOutboundPeers

* Add max_outbound_peers to config toml template

* Rename in error message

* Renams the parameter to highlight its experimental nature. Extend the AddPeer method to return an error. Moves the semaphone to outside the broadcast routine

* reverting the addition of error to AddPeer. It fails if the context is done and handling this case will be done some other time, when an actual context is passed into acquire.

* reverting the addition of error to AddPeer. It fails if the context is done and handling this case will be done some other time, when an actual context is passed into acquire.

* Fixing lint issue

* renaming semaphore to something more meaningful

* make default value 0, which is the same as the current behavior. 10 is the recommended value.

* adding new flag to manifest.go

* Adding changelog

* Improve the description of the parameter in the generated config file.

* Add metric to track the current number of active connections.

* Change metric to gauge type and rename it.

* e2e: Allow disabling the PEX reactor on all nodes in the testnet

* Apply suggestions from code review

* Update config/config.go comment

* fix lint error

* Improve config description

* Rename metric (remove experimental prefix)

* Add unit test

* Improve unit test

* Update mempool/reactor.go comment

---------

* Updating test file, leaving it broken for now

* mempool: Limit gossip connections to persistent and non-persistent peers (experimental) (cometbft#1584)

* Ignore persistent peers from limiting of outbound connections

* Update 1558-experimental-gossip-limiting.md

Update changeling

* Fix typo in mempool/metrics.go

* Use two independent configs and semaphores for persistent and non-persistent peers

* Forgot to rename in test

* Update metric description

* Rename semaphores

* Add comment to unit test

---------

* Reverting to old way of reporting errors

* Reverting change that shouldn't have been included in cherry-pick

* Reverting tests to use older functions

* fix rebase merge

---------

Co-authored-by: Adi Seredinschi <[email protected]>
Co-authored-by: Ethan Buchman <[email protected]>
Co-authored-by: Daniel Cason <[email protected]>
Co-authored-by: hvanz <[email protected]>
Co-authored-by: Andy Nogueira <[email protected]>
Co-authored-by: Sergio Mena <[email protected]>

* Comment that feature only applies to v0 mempool

* Fix new metric

* This commit makes the test be the same as in main, that is, it ignores the order of transactions in the receiving reactor. (cometbft#1629)

* Update .changelog/unreleased/improvements/1558-experimental-gossip-limiting.md

Co-authored-by: Thane Thomson <[email protected]>

---------

Co-authored-by: Adi Seredinschi <[email protected]>
Co-authored-by: Ethan Buchman <[email protected]>
Co-authored-by: Daniel Cason <[email protected]>
Co-authored-by: Andy Nogueira <[email protected]>
Co-authored-by: Sergio Mena <[email protected]>
Co-authored-by: lasaro <[email protected]>
Co-authored-by: Thane Thomson <[email protected]>
  • Loading branch information
8 people authored and dpierret committed Dec 7, 2023
1 parent 65f7a2f commit d4e4098
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- `[mempool]` Add experimental feature to limit the number of persistent peers and non-persistent
peers to which the node gossip transactions (only for "v0" mempool).
([\#1558](https://github.com/cometbft/cometbft/pull/1558),
([\#1584](https://github.com/cometbft/cometbft/pull/1584))
- `[config]` Add mempool parameters `experimental_max_gossip_connections_to_persistent_peers` and
`experimental_max_gossip_connections_to_non_persistent_peers` for limiting the number of peers to
which the node gossip transactions.
([\#1558](https://github.com/cometbft/cometbft/pull/1558))
([\#1584](https://github.com/cometbft/cometbft/pull/1584))
27 changes: 27 additions & 0 deletions abci/example/kvstore/helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kvstore

import (
"fmt"
"strings"

"github.com/cometbft/cometbft/abci/types"
cmtrand "github.com/cometbft/cometbft/libs/rand"
)
Expand Down Expand Up @@ -34,3 +37,27 @@ func InitKVStore(app *PersistentKVStoreApplication) {
Validators: RandVals(1),
})
}

// Create a new transaction
func NewTx(key, value string) []byte {
return []byte(strings.Join([]string{key, value}, "="))
}

func NewRandomTx(size int) []byte {
if size < 4 {
panic("random tx size must be greater than 3")
}
return NewTx(cmtrand.Str(2), cmtrand.Str(size-3))
}

func NewRandomTxs(n int) [][]byte {
txs := make([][]byte, n)
for i := 0; i < n; i++ {
txs[i] = NewRandomTx(10)
}
return txs
}

func NewTxFromID(i int) []byte {
return []byte(fmt.Sprintf("%d=%d", i, i))
}
36 changes: 29 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
// DefaultLogLevel defines a default log level as INFO.
DefaultLogLevel = "info"

// Mempool versions. V1 is prioritized mempool (deprecated), v0 is regular mempool.
// Mempool versions. V1 is prioritized mempool, v0 is regular mempool.
// Default is v0.
MempoolV0 = "v0"
MempoolV1 = "v1"
Expand Down Expand Up @@ -759,6 +759,20 @@ type MempoolConfig struct {
// Including space needed by encoding (one varint per transaction).
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
MaxBatchBytes int `mapstructure:"max_batch_bytes"`
// Experimental parameters to limit gossiping txs to up to the specified number of peers.
// This feature is only available for the default mempool (version config set to "v0").
// We use two independent upper values for persistent peers and for non-persistent peers.
// Unconditional peers are not affected by this feature.
// If we are connected to more than the specified number of persistent peers, only send txs to
// the first ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
// persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
// peers, with an upper limit of ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// If set to 0, the feature is disabled for the corresponding group of peers, that is, the
// number of active connections to that group of peers is not bounded.
// For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
// performance results using the default P2P configuration.
ExperimentalMaxGossipConnectionsToPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_non_persistent_peers"`

// TTLDuration, if non-zero, defines the maximum amount of time a transaction
// can exist for in the mempool.
Expand Down Expand Up @@ -792,12 +806,14 @@ func DefaultMempoolConfig() *MempoolConfig {
WalPath: "",
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
TTLDuration: 0 * time.Second,
TTLNumBlocks: 0,
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
ExperimentalMaxGossipConnectionsToNonPersistentPeers: 0,
ExperimentalMaxGossipConnectionsToPersistentPeers: 0,
TTLDuration: 0 * time.Second,
TTLNumBlocks: 0,
KeepInvalidTxsInCache: false,
}
}
Expand Down Expand Up @@ -834,6 +850,12 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.MaxTxBytes < 0 {
return errors.New("max_tx_bytes can't be negative")
}
if cfg.ExperimentalMaxGossipConnectionsToPersistentPeers < 0 {
return errors.New("experimental_max_gossip_connections_to_persistent_peers can't be negative")
}
if cfg.ExperimentalMaxGossipConnectionsToNonPersistentPeers < 0 {
return errors.New("experimental_max_gossip_connections_to_non_persistent_peers can't be negative")
}
return nil
}

Expand Down
15 changes: 15 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,21 @@ ttl-duration = "{{ .Mempool.TTLDuration }}"
# it's insertion time into the mempool is beyond ttl-duration.
ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
# Experimental parameters to limit gossiping txs to up to the specified number of peers.
# This feature is only available for the default mempool (version config set to "v0").
# We use two independent upper values for persistent peers and for non-persistent peers.
# Unconditional peers are not affected by this feature.
# If we are connected to more than the specified number of persistent peers, only send txs to
# the first experimental_max_gossip_connections_to_persistent_peers of them. If one of those
# persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
# peers, with an upper limit of experimental_max_gossip_connections_to_non_persistent_peers.
# If set to 0, the feature is disabled for the corresponding group of peers, that is, the
# number of active connections to that group of peers is not bounded.
# For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
# performance results using the default P2P configuration.
experimental_max_gossip_connections_to_persistent_peers = {{ .Mempool.ExperimentalMaxGossipConnectionsToPersistentPeers }}
experimental_max_gossip_connections_to_non_persistent_peers = {{ .Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers }}
#######################################################
### State Sync Configuration Options ###
#######################################################
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ require (
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/btcsuite/btcd/btcutil v1.1.2
github.com/cometbft/cometbft-db v0.7.0
github.com/cosmos/cosmos-proto v1.0.0-beta.2
github.com/cosmos/gogoproto v1.4.6
github.com/go-git/go-git/v5 v5.5.2
github.com/gogo/protobuf v1.3.2
github.com/vektra/mockery/v2 v2.14.0
golang.org/x/sync v0.1.0
gonum.org/v1/gonum v0.8.2
google.golang.org/protobuf v1.28.2-0.20230208135220-49eaa78c6c9c
)
Expand Down Expand Up @@ -96,7 +99,6 @@ require (
github.com/containerd/continuity v0.3.0 // indirect
github.com/containerd/typeurl v1.0.2 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect
github.com/cosmos/go-bip39 v1.0.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/curioswitch/go-reassign v0.2.0 // indirect
Expand Down Expand Up @@ -139,7 +141,6 @@ require (
github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down Expand Up @@ -287,7 +288,6 @@ require (
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
golang.org/x/exp/typeparams v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ type Metrics struct {

// Number of times transactions are rechecked in the mempool.
RecheckTimes metrics.Counter

// Number of connections being actively used for gossiping transactions
// (experimental feature).
ActiveOutboundConnections metrics.Gauge
}
22 changes: 16 additions & 6 deletions mempool/v0/clist_mempool_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package v0

import (
"crypto/rand"
"encoding/binary"
"fmt"
mrand "math/rand"
Expand Down Expand Up @@ -96,16 +95,27 @@ func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
}
}

func callCheckTx(t *testing.T, mp mempool.Mempool, txs types.Txs) {
txInfo := mempool.TxInfo{SenderID: 0}
for i, tx := range txs {
if err := mp.CheckTx(tx, nil, txInfo); err != nil {
// Skip invalid txs.
// TestMempoolFilters will fail otherwise. It asserts a number of txs
// returned.
if mempool.IsPreCheckError(err) {
continue
}
t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i)
}
}
}

func checkTxs(t *testing.T, mp mempool.Mempool, count int, peerID uint16) types.Txs {
txs := make(types.Txs, count)
txInfo := mempool.TxInfo{SenderID: peerID}
for i := 0; i < count; i++ {
txBytes := make([]byte, 20)
txBytes := kvstore.NewRandomTx(20)
txs[i] = txBytes
_, err := rand.Read(txBytes)
if err != nil {
t.Error(err)
}
if err := mp.CheckTx(txBytes, nil, txInfo); err != nil {
// Skip invalid txs.
// TestMempoolFilters will fail otherwise. It asserts a number of txs
Expand Down
46 changes: 45 additions & 1 deletion mempool/v0/reactor.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package v0

import (
"context"
"errors"
"fmt"
"time"

"github.com/gogo/protobuf/proto"

cfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/libs/clist"
"github.com/cometbft/cometbft/libs/log"
Expand All @@ -13,6 +16,8 @@ import (
"github.com/cometbft/cometbft/p2p"
protomem "github.com/cometbft/cometbft/proto/tendermint/mempool"
"github.com/cometbft/cometbft/types"

"golang.org/x/sync/semaphore"
)

// Reactor handles mempool tx broadcasting amongst peers.
Expand All @@ -23,6 +28,12 @@ type Reactor struct {
config *cfg.MempoolConfig
mempool *CListMempool
ids *mempoolIDs

// Semaphores to keep track of how many connections to peers are active for broadcasting
// transactions. Each semaphore has a capacity that puts an upper bound on the number of
// connections for different groups of peers.
activePersistentPeersSemaphore *semaphore.Weighted
activeNonPersistentPeersSemaphore *semaphore.Weighted
}

type mempoolIDs struct {
Expand Down Expand Up @@ -96,6 +107,9 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers))
memR.activeNonPersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers))

return memR
}

Expand Down Expand Up @@ -143,7 +157,37 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
go memR.broadcastTxRoutine(peer)
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activePersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activePersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}

if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activeNonPersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activeNonPersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}
}

memR.mempool.metrics.ActiveOutboundConnections.Add(1)
memR.broadcastTxRoutine(peer)
}()
}
}

Expand Down
4 changes: 4 additions & 0 deletions test/e2e/pkg/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type Manifest struct {
// Enable or disable Prometheus metrics on all nodes.
// Defaults to false (disabled).
Prometheus bool `toml:"prometheus"`

// Maximum number of peers to which the node gossip transactions
ExperimentalMaxGossipConnectionsToPersistentPeers uint `toml:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers uint `toml:"experimental_max_gossip_connections_to_non_persistent_peers"`
}

// ManifestNode represents a node in a testnet manifest.
Expand Down
5 changes: 5 additions & 0 deletions test/e2e/pkg/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type Testnet struct {
CheckTxDelay time.Duration
UpgradeVersion string
Prometheus bool
ExperimentalMaxGossipConnectionsToPersistentPeers uint
ExperimentalMaxGossipConnectionsToNonPersistentPeers uint

}

// Node represents a CometBFT node in a testnet.
Expand Down Expand Up @@ -150,6 +153,8 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
CheckTxDelay: manifest.CheckTxDelay,
UpgradeVersion: manifest.UpgradeVersion,
Prometheus: manifest.Prometheus,
ExperimentalMaxGossipConnectionsToPersistentPeers: manifest.ExperimentalMaxGossipConnectionsToPersistentPeers,
ExperimentalMaxGossipConnectionsToNonPersistentPeers: manifest.ExperimentalMaxGossipConnectionsToNonPersistentPeers,
}
if len(manifest.KeyType) != 0 {
testnet.KeyType = manifest.KeyType
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/runner/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg.P2P.AddrBookStrict = false
cfg.DBBackend = node.Database
cfg.StateSync.DiscoveryTime = 5 * time.Second
cfg.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = int(node.Testnet.ExperimentalMaxGossipConnectionsToNonPersistentPeers)
cfg.Mempool.ExperimentalMaxGossipConnectionsToPersistentPeers = int(node.Testnet.ExperimentalMaxGossipConnectionsToPersistentPeers)

switch node.ABCIProtocol {
case e2e.ProtocolUNIX:
Expand Down

0 comments on commit d4e4098

Please sign in to comment.