From 2d6b24f3b7daa715ca2976c454e9c14597ea9b7e Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Mon, 16 Jan 2023 10:25:14 +0100 Subject: [PATCH 01/12] assemble cat subcomponents: cache's, store and request manager --- mempool/cat/cache.go | 291 +++++++++++++++++++++++++++++++++++ mempool/cat/cache_test.go | 110 +++++++++++++ mempool/cat/peers.go | 115 ++++++++++++++ mempool/cat/peers_test.go | 37 +++++ mempool/cat/requests.go | 153 ++++++++++++++++++ mempool/cat/requests_test.go | 136 ++++++++++++++++ mempool/cat/store.go | 158 +++++++++++++++++++ mempool/cat/store_test.go | 178 +++++++++++++++++++++ mempool/cat/tx.go | 36 +++++ 9 files changed, 1214 insertions(+) create mode 100644 mempool/cat/cache.go create mode 100644 mempool/cat/cache_test.go create mode 100644 mempool/cat/peers.go create mode 100644 mempool/cat/peers_test.go create mode 100644 mempool/cat/requests.go create mode 100644 mempool/cat/requests_test.go create mode 100644 mempool/cat/store.go create mode 100644 mempool/cat/store_test.go create mode 100644 mempool/cat/tx.go diff --git a/mempool/cat/cache.go b/mempool/cat/cache.go new file mode 100644 index 0000000000..2584452d8d --- /dev/null +++ b/mempool/cat/cache.go @@ -0,0 +1,291 @@ +package cat + +import ( + "container/list" + "time" + + tmsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/types" +) + +// LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache +// only stores the hash of the raw transaction. +// NOTE: This has been copied from mempool/cache with the main diffence of using +// tx keys instead of raw transactions. +type LRUTxCache struct { + staticSize int + + mtx tmsync.Mutex + cacheMap map[types.TxKey]*list.Element + list *list.List +} + +func NewLRUTxCache(cacheSize int) *LRUTxCache { + return &LRUTxCache{ + staticSize: cacheSize, + cacheMap: make(map[types.TxKey]*list.Element, cacheSize), + list: list.New(), + } +} + +func (c *LRUTxCache) Reset() { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.cacheMap = make(map[types.TxKey]*list.Element, c.staticSize) + c.list.Init() +} + +func (c *LRUTxCache) Push(txKey types.TxKey) bool { + if c.staticSize == 0 { + return true + } + + c.mtx.Lock() + defer c.mtx.Unlock() + + moved, ok := c.cacheMap[txKey] + if ok { + c.list.MoveToBack(moved) + return false + } + + if c.list.Len() >= c.staticSize { + front := c.list.Front() + if front != nil { + frontKey := front.Value.(types.TxKey) + delete(c.cacheMap, frontKey) + c.list.Remove(front) + } + } + + e := c.list.PushBack(txKey) + c.cacheMap[txKey] = e + + return true +} + +func (c *LRUTxCache) Remove(txKey types.TxKey) { + c.mtx.Lock() + defer c.mtx.Unlock() + + e := c.cacheMap[txKey] + delete(c.cacheMap, txKey) + + if e != nil { + c.list.Remove(e) + } +} + +func (c *LRUTxCache) Has(txKey types.TxKey) bool { + if c.staticSize == 0 { + return false + } + + c.mtx.Lock() + defer c.mtx.Unlock() + + _, ok := c.cacheMap[txKey] + return ok +} + +type EvictedTxInfo struct { + timeEvicted time.Time + priority int64 + gasWanted int64 + sender string + size int64 +} + +type EvictedTxCache struct { + staticSize int + + mtx tmsync.Mutex + cache map[types.TxKey]*EvictedTxInfo +} + +func NewEvictedTxCache(size int) *EvictedTxCache { + return &EvictedTxCache{ + staticSize: size, + cache: make(map[types.TxKey]*EvictedTxInfo), + } +} + +func (c *EvictedTxCache) Has(txKey types.TxKey) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + _, exists := c.cache[txKey] + return exists +} + +func (c *EvictedTxCache) Get(txKey types.TxKey) *EvictedTxInfo { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.cache[txKey] +} + +func (c *EvictedTxCache) Push(wtx *wrappedTx) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.cache[wtx.key] = &EvictedTxInfo{ + timeEvicted: time.Now().UTC(), + priority: wtx.priority, + gasWanted: wtx.gasWanted, + sender: wtx.sender, + size: wtx.size(), + } + // if cache too large, remove the oldest entry + if len(c.cache) > c.staticSize { + oldestTxKey := wtx.key + oldestTxTime := time.Now().UTC() + for key, info := range c.cache { + if info.timeEvicted.Before(oldestTxTime) { + oldestTxTime = info.timeEvicted + oldestTxKey = key + } + } + delete(c.cache, oldestTxKey) + } +} + +func (c *EvictedTxCache) Pop(txKey types.TxKey) *EvictedTxInfo { + c.mtx.Lock() + defer c.mtx.Unlock() + info, exists := c.cache[txKey] + if !exists { + return nil + } + delete(c.cache, txKey) + return info +} + +func (c *EvictedTxCache) Prune(limit time.Time) { + c.mtx.Lock() + defer c.mtx.Unlock() + for key, info := range c.cache { + if info.timeEvicted.Before(limit) { + delete(c.cache, key) + } + } +} + +func (c *EvictedTxCache) Reset() { + c.mtx.Lock() + defer c.mtx.Unlock() + c.cache = make(map[types.TxKey]*EvictedTxInfo) +} + +// SeenTxSet records transactions that have been +// seen by other peers but not yet by us +type SeenTxSet struct { + mtx tmsync.Mutex + set map[types.TxKey]timestampedPeerSet +} + +type timestampedPeerSet struct { + peers map[uint16]bool + time time.Time +} + +func NewSeenTxSet() *SeenTxSet { + return &SeenTxSet{ + set: make(map[types.TxKey]timestampedPeerSet), + } +} + +func (s *SeenTxSet) Add(txKey types.TxKey, peer uint16) { + if peer == 0 { + return + } + s.mtx.Lock() + defer s.mtx.Unlock() + seenSet, exists := s.set[txKey] + if !exists { + s.set[txKey] = timestampedPeerSet{ + peers: map[uint16]bool{peer: true}, + time: time.Now().UTC(), + } + } else { + seenSet.peers[peer] = true + } +} + +func (s *SeenTxSet) Pop(txKey types.TxKey) uint16 { + s.mtx.Lock() + defer s.mtx.Unlock() + seenSet, exists := s.set[txKey] + if exists { + for peer := range seenSet.peers { + delete(seenSet.peers, peer) + return peer + } + } + return 0 +} + +func (s *SeenTxSet) RemoveKey(txKey types.TxKey) { + s.mtx.Lock() + defer s.mtx.Unlock() + delete(s.set, txKey) +} + +func (s *SeenTxSet) Remove(txKey types.TxKey, peer uint16) { + s.mtx.Lock() + defer s.mtx.Unlock() + set, exists := s.set[txKey] + if exists { + if len(set.peers) == 1 { + delete(s.set, txKey) + } else { + delete(set.peers, peer) + } + } +} + +func (s *SeenTxSet) Prune(limit time.Time) { + s.mtx.Lock() + defer s.mtx.Unlock() + for key, seenSet := range s.set { + if seenSet.time.Before(limit) { + delete(s.set, key) + } + } +} + +func (s *SeenTxSet) Has(txKey types.TxKey, peer uint16) bool { + s.mtx.Lock() + defer s.mtx.Unlock() + seenSet, exists := s.set[txKey] + if !exists { + return false + } + return seenSet.peers[peer] +} + +func (s *SeenTxSet) Get(txKey types.TxKey) map[uint16]struct{} { + s.mtx.Lock() + defer s.mtx.Unlock() + seenSet, exists := s.set[txKey] + if !exists { + return nil + } + // make a copy of the struct to avoid concurrency issues + peers := make(map[uint16]struct{}, len(seenSet.peers)) + for peer := range seenSet.peers { + peers[peer] = struct{}{} + } + return peers +} + +// Len returns the amount of cached items. Mostly used for testing. +func (s *SeenTxSet) Len() int { + s.mtx.Lock() + defer s.mtx.Unlock() + return len(s.set) +} + +func (s *SeenTxSet) Reset() { + s.mtx.Lock() + defer s.mtx.Unlock() + s.set = make(map[types.TxKey]timestampedPeerSet) +} diff --git a/mempool/cat/cache_test.go b/mempool/cat/cache_test.go new file mode 100644 index 0000000000..d4bc6eb6f9 --- /dev/null +++ b/mempool/cat/cache_test.go @@ -0,0 +1,110 @@ +package cat + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/types" +) + +func TestSeenTxSet(t *testing.T) { + var ( + tx1Key = types.Tx("tx1").Key() + tx2Key = types.Tx("tx2").Key() + tx3Key = types.Tx("tx3").Key() + peer1 uint16 = 1 + peer2 uint16 = 2 + ) + + seenSet := NewSeenTxSet() + require.Zero(t, seenSet.Pop(tx1Key)) + + seenSet.Add(tx1Key, peer1) + seenSet.Add(tx1Key, peer1) + require.Equal(t, 1, seenSet.Len()) + seenSet.Add(tx1Key, peer2) + peers := seenSet.Get(tx1Key) + require.NotNil(t, peers) + require.Equal(t, map[uint16]struct{}{peer1: {}, peer2: {}}, peers) + seenSet.Add(tx2Key, peer1) + seenSet.Add(tx3Key, peer1) + require.Equal(t, 3, seenSet.Len()) + seenSet.RemoveKey(tx2Key) + require.Equal(t, 2, seenSet.Len()) + require.Zero(t, seenSet.Pop(tx2Key)) + require.Equal(t, peer1, seenSet.Pop(tx3Key)) +} + +func TestLRUTxCacheRemove(t *testing.T) { + cache := NewLRUTxCache(100) + numTxs := 10 + + txs := make([][32]byte, numTxs) + for i := 0; i < numTxs; i++ { + // probability of collision is 2**-256 + txBytes := make([]byte, 32) + _, err := rand.Read(txBytes) + require.NoError(t, err) + + copy(txs[i][:], txBytes) + cache.Push(txs[i]) + + // make sure its added to both the linked list and the map + require.Equal(t, i+1, cache.list.Len()) + } + + for i := 0; i < numTxs; i++ { + cache.Remove(txs[i]) + // make sure its removed from both the map and the linked list + require.Equal(t, numTxs-(i+1), cache.list.Len()) + } +} + +func TestLRUTxCacheSize(t *testing.T) { + const size = 10 + cache := NewLRUTxCache(size) + + for i := 0; i < size*2; i++ { + tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) + cache.Push(tx.Key()) + require.Less(t, cache.list.Len(), size+1) + } +} + +func TestEvictedTxCache(t *testing.T) { + var ( + tx1 = types.Tx("tx1") + tx2 = types.Tx("tx2") + tx3 = types.Tx("tx3") + wtx1 = newWrappedTx( + tx1, tx1.Key(), 10, 1, 5, "", + ) + wtx2 = newWrappedTx( + tx2, tx2.Key(), 10, 1, 5, "", + ) + wtx3 = newWrappedTx( + tx3, tx3.Key(), 10, 1, 5, "", + ) + ) + + cache := NewEvictedTxCache(2) + require.False(t, cache.Has(tx1.Key())) + require.Nil(t, cache.Pop(tx1.Key())) + cache.Push(wtx1) + require.True(t, cache.Has(tx1.Key())) + require.NotNil(t, cache.Pop(tx1.Key())) + cache.Push(wtx1) + time.Sleep(1 * time.Millisecond) + cache.Push(wtx2) + time.Sleep(1 * time.Millisecond) + cache.Push(wtx3) + // cache should have reached limit and thus evicted the oldest tx + require.False(t, cache.Has(tx1.Key())) + cache.Prune(time.Now().UTC().Add(1 * time.Second)) + require.False(t, cache.Has(tx2.Key())) + require.False(t, cache.Has(tx3.Key())) +} diff --git a/mempool/cat/peers.go b/mempool/cat/peers.go new file mode 100644 index 0000000000..300a6a2f26 --- /dev/null +++ b/mempool/cat/peers.go @@ -0,0 +1,115 @@ +package cat + +import ( + "fmt" + + tmsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" +) + +const firstPeerID = 1 + +// mempoolIDs is a thread-safe map of peer IDs to short IDs used for tracking what peers have sent what +// NOTE: taken from mempool/v1/reactor.go +type mempoolIDs struct { + mtx tmsync.RWMutex + peerMap map[p2p.ID]uint16 + nextID uint16 // assumes that a node will never have over 65536 active peers + activeIDs map[uint16]p2p.Peer // used to check if a given peerID key is used, the value doesn't matter +} + +func newMempoolIDs() *mempoolIDs { + return &mempoolIDs{ + peerMap: make(map[p2p.ID]uint16), + activeIDs: make(map[uint16]p2p.Peer), + nextID: firstPeerID, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx + } +} + +// Reserve searches for the next unused ID and assigns it to the +// peer. +func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + if _, ok := ids.peerMap[peer.ID()]; ok { + panic("duplicate peer added to mempool") + } + + curID := ids.nextPeerID() + ids.peerMap[peer.ID()] = curID + ids.activeIDs[curID] = peer +} + +// nextPeerID returns the next unused peer ID to use. +// This assumes that ids's mutex is already locked. +func (ids *mempoolIDs) nextPeerID() uint16 { + if len(ids.activeIDs) == mempool.MaxActiveIDs { + panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", mempool.MaxActiveIDs)) + } + + _, idExists := ids.activeIDs[ids.nextID] + for idExists { + ids.nextID++ + _, idExists = ids.activeIDs[ids.nextID] + } + curID := ids.nextID + ids.nextID++ + return curID +} + +// Reclaim returns the ID reserved for the peer back to unused pool. +func (ids *mempoolIDs) Reclaim(peerID p2p.ID) uint16 { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + removedID, ok := ids.peerMap[peerID] + if ok { + delete(ids.activeIDs, removedID) + delete(ids.peerMap, peerID) + return removedID + } + return 0 +} + +// GetIDForPeer returns the shorthand ID reserved for the peer. +func (ids *mempoolIDs) GetIDForPeer(peerID p2p.ID) uint16 { + ids.mtx.RLock() + defer ids.mtx.RUnlock() + + id, exists := ids.peerMap[peerID] + if !exists { + return 0 + } + return id +} + +// GetPeer returns the peer for the given shorthand ID. +func (ids *mempoolIDs) GetPeer(id uint16) p2p.Peer { + ids.mtx.RLock() + defer ids.mtx.RUnlock() + + return ids.activeIDs[id] +} + +// GetAll returns all active peers. +func (ids *mempoolIDs) GetAll() map[uint16]p2p.Peer { + ids.mtx.RLock() + defer ids.mtx.RUnlock() + + // make a copy of the map. + peers := make(map[uint16]p2p.Peer, len(ids.activeIDs)) + for id, peer := range ids.activeIDs { + peers[id] = peer + } + return peers +} + +// Len returns the number of active peers. +func (ids *mempoolIDs) Len() int { + ids.mtx.RLock() + defer ids.mtx.RUnlock() + + return len(ids.activeIDs) +} diff --git a/mempool/cat/peers_test.go b/mempool/cat/peers_test.go new file mode 100644 index 0000000000..5edbb06667 --- /dev/null +++ b/mempool/cat/peers_test.go @@ -0,0 +1,37 @@ +package cat + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/mocks" +) + +func TestPeerLifecycle(t *testing.T) { + ids := newMempoolIDs() + peer1 := &mocks.Peer{} + peerID := p2p.ID("peer1") + peer1.On("ID").Return(peerID) + + require.Nil(t, ids.GetPeer(1)) + require.Zero(t, ids.GetIDForPeer(peerID)) + require.Len(t, ids.GetAll(), 0) + ids.ReserveForPeer(peer1) + + id := ids.GetIDForPeer(peerID) + require.Equal(t, uint16(1), id) + require.Equal(t, peer1, ids.GetPeer(id)) + require.Len(t, ids.GetAll(), 1) + + // duplicate peer should panic + require.Panics(t, func() { + ids.ReserveForPeer(peer1) + }) + + require.Equal(t, ids.Reclaim(peerID), id) + require.Nil(t, ids.GetPeer(id)) + require.Zero(t, ids.GetIDForPeer(peerID)) + require.Len(t, ids.GetAll(), 0) +} diff --git a/mempool/cat/requests.go b/mempool/cat/requests.go new file mode 100644 index 0000000000..878737e2b3 --- /dev/null +++ b/mempool/cat/requests.go @@ -0,0 +1,153 @@ +package cat + +import ( + "sync" + "time" + + "github.com/tendermint/tendermint/types" +) + +const defaultGlobalRequestTimeout = 1 * time.Hour + +// requestScheduler tracks the lifecycle of outbound transaction requests. +type requestScheduler struct { + mtx sync.Mutex + + // responseTime is the time the scheduler + // waits for a response from a peer before + // invoking the callback + responseTime time.Duration + + // globalTimeout represents the longest duration + // to wait for any late response (after the reponseTime). + // After this period the request is garbage collected. + globalTimeout time.Duration + + // requestsByPeer is a lookup table of requests by peer. + // Multiple tranasctions can be requested by a single peer at one + requestsByPeer map[uint16]requestSet + + // requestsByTx is a lookup table for requested txs. + // There can only be one request per tx. + requestsByTx map[types.TxKey]uint16 +} + +type requestSet map[types.TxKey]*time.Timer + +func newRequestScheduler(responseTime, globalTimeout time.Duration) *requestScheduler { + return &requestScheduler{ + responseTime: responseTime, + globalTimeout: globalTimeout, + requestsByPeer: make(map[uint16]requestSet), + requestsByTx: make(map[types.TxKey]uint16), + } +} + +func (r *requestScheduler) Add(key types.TxKey, peer uint16, onTimeout func(key types.TxKey)) bool { + if peer == 0 { + return false + } + r.mtx.Lock() + defer r.mtx.Unlock() + + // not allowed to have more than one outgoing transaction at once + if _, ok := r.requestsByTx[key]; ok { + return false + } + + timer := time.AfterFunc(r.responseTime, func() { + r.mtx.Lock() + delete(r.requestsByTx, key) + r.mtx.Unlock() + + // trigger callback. Callback can `Add` the tx back to the scheduler + if onTimeout != nil { + onTimeout(key) + } + + // We set another timeout because the peer could still send + // a late response after the first timeout and it's important + // to recognise that it is a transaction in response to a + // request and not a new transaction being broadcasted to the entire + // network. This timer cannot be stopped and is used to ensure + // garbage collection. + time.AfterFunc(r.globalTimeout, func() { + r.mtx.Lock() + defer r.mtx.Unlock() + delete(r.requestsByPeer[peer], key) + }) + }) + if _, ok := r.requestsByPeer[peer]; !ok { + r.requestsByPeer[peer] = requestSet{key: timer} + } else { + r.requestsByPeer[peer][key] = timer + } + r.requestsByTx[key] = peer + return true +} + +func (r *requestScheduler) ForTx(key types.TxKey) uint16 { + r.mtx.Lock() + defer r.mtx.Unlock() + + return r.requestsByTx[key] +} + +func (r *requestScheduler) Has(peer uint16, key types.TxKey) bool { + r.mtx.Lock() + defer r.mtx.Unlock() + + requestSet, ok := r.requestsByPeer[peer] + if !ok { + return false + } + _, ok = requestSet[key] + return ok +} + +func (r *requestScheduler) ClearAllRequestsFrom(peer uint16) requestSet { + r.mtx.Lock() + defer r.mtx.Unlock() + + requests, ok := r.requestsByPeer[peer] + if !ok { + return requestSet{} + } + for _, timer := range requests { + timer.Stop() + } + delete(r.requestsByPeer, peer) + return requests +} + +func (r *requestScheduler) MarkReceived(peer uint16, key types.TxKey) bool { + r.mtx.Lock() + defer r.mtx.Unlock() + + if _, ok := r.requestsByPeer[peer]; !ok { + return false + } + + if timer, ok := r.requestsByPeer[peer][key]; ok { + timer.Stop() + } else { + return false + } + + delete(r.requestsByPeer[peer], key) + delete(r.requestsByTx, key) + return true +} + +// Close stops all timers and clears all requests. +// Add should never be called after `Close`. +func (r *requestScheduler) Close() { + r.mtx.Lock() + defer r.mtx.Unlock() + + for _, requestSet := range r.requestsByPeer { + for _, timer := range requestSet { + timer.Stop() + } + } +} diff --git a/mempool/cat/requests_test.go b/mempool/cat/requests_test.go new file mode 100644 index 0000000000..16065e0a34 --- /dev/null +++ b/mempool/cat/requests_test.go @@ -0,0 +1,136 @@ +package cat + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/types" +) + +func TestRequestSchedulerRerequest(t *testing.T) { + var ( + requests = newRequestScheduler(10*time.Millisecond, 1*time.Minute) + tx = types.Tx("tx") + key = tx.Key() + peerA uint16 = 1 // should be non-zero + peerB uint16 = 2 + ) + t.Cleanup(requests.Close) + + // check zero state + require.Zero(t, requests.ForTx(key)) + require.False(t, requests.Has(peerA, key)) + // marking a tx that was never requested should return false + require.False(t, requests.MarkReceived(peerA, key)) + + // create a request + closeCh := make(chan struct{}) + require.True(t, requests.Add(key, peerA, func(key types.TxKey) { + require.Equal(t, key, key) + // the first peer times out to respond so we ask the second peer + require.True(t, requests.Add(key, peerB, func(key types.TxKey) { + t.Fatal("did not expect to timeout") + })) + close(closeCh) + })) + + // check that the request was added + require.Equal(t, peerA, requests.ForTx(key)) + require.True(t, requests.Has(peerA, key)) + + // should not be able to add the same request again + require.False(t, requests.Add(key, peerA, nil)) + + // wait for the scheduler to invoke the timeout + <-closeCh + + // check that the request stil exists + require.True(t, requests.Has(peerA, key)) + // check that peerB was requested + require.True(t, requests.Has(peerB, key)) + + // There should still be a request for the Tx + require.Equal(t, peerB, requests.ForTx(key)) + + // record a response from peerB + require.True(t, requests.MarkReceived(peerB, key)) + + // peerA comes in later with a response but it's still + // considered a response from an earlier request + require.True(t, requests.MarkReceived(peerA, key)) +} + +func TestRequestSchedulerNonResponsivePeer(t *testing.T) { + var ( + requests = newRequestScheduler(10*time.Millisecond, time.Millisecond) + tx = types.Tx("tx") + key = tx.Key() + peerA uint16 = 1 // should be non-zero + ) + + require.True(t, requests.Add(key, peerA, nil)) + require.Eventually(t, func() bool { + return requests.ForTx(key) == 0 + }, 100*time.Millisecond, 5*time.Millisecond) +} + +func TestRequestSchedulerConcurrencyAddsAndReads(t *testing.T) { + leaktest.CheckTimeout(t, time.Second)() + requests := newRequestScheduler(10*time.Millisecond, time.Millisecond) + defer requests.Close() + + N := 5 + keys := make([]types.TxKey, N) + for i := 0; i < N; i++ { + tx := types.Tx(fmt.Sprintf("tx%d", i)) + keys[i] = tx.Key() + } + + addWg := sync.WaitGroup{} + receiveWg := sync.WaitGroup{} + doneCh := make(chan struct{}) + for i := 1; i < N*N; i++ { + addWg.Add(1) + go func(peer uint16) { + defer addWg.Done() + requests.Add(keys[int(peer)%N], peer, nil) + }(uint16(i)) + } + for i := 1; i < N*N; i++ { + receiveWg.Add(1) + go func(peer uint16) { + defer receiveWg.Done() + markReceived := func() { + for _, key := range keys { + if requests.Has(peer, key) { + requests.MarkReceived(peer, key) + } + } + } + for { + select { + case <-doneCh: + // need to ensure this is run + // at least once after all adds + // are done + markReceived() + return + default: + markReceived() + } + } + }(uint16(i)) + } + addWg.Wait() + close(doneCh) + + receiveWg.Wait() + + for _, key := range keys { + require.Zero(t, requests.ForTx(key)) + } +} diff --git a/mempool/cat/store.go b/mempool/cat/store.go new file mode 100644 index 0000000000..8187b24cc5 --- /dev/null +++ b/mempool/cat/store.go @@ -0,0 +1,158 @@ +package cat + +import ( + "sync" + "time" + + "github.com/tendermint/tendermint/types" +) + +// simple, thread-safe in memory store for transactions +type store struct { + mtx sync.RWMutex + bytes int64 + txs map[types.TxKey]*wrappedTx +} + +func newStore() *store { + return &store{ + bytes: 0, + txs: make(map[types.TxKey]*wrappedTx), + } +} + +func (s *store) set(wtx *wrappedTx) bool { + if wtx == nil { + return false + } + s.mtx.Lock() + defer s.mtx.Unlock() + if tx, exists := s.txs[wtx.key]; !exists || tx.height == -1 { + s.txs[wtx.key] = wtx + s.bytes += wtx.size() + return true + } + return false +} + +func (s *store) get(txKey types.TxKey) *wrappedTx { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.txs[txKey] +} + +func (s *store) has(txKey types.TxKey) bool { + s.mtx.RLock() + defer s.mtx.RUnlock() + _, has := s.txs[txKey] + return has +} + +func (s *store) remove(txKey types.TxKey) bool { + s.mtx.Lock() + defer s.mtx.Unlock() + tx, exists := s.txs[txKey] + if !exists { + return false + } + s.bytes -= tx.size() + delete(s.txs, txKey) + return true +} + +// reserve adds an empty placeholder for the specified key to prevent +// a transaction with the same key from being added +func (s *store) reserve(txKey types.TxKey) bool { + s.mtx.Lock() + defer s.mtx.Unlock() + if _, ok := s.txs[txKey]; ok { + return false // already reserved + } else { + s.txs[txKey] = &wrappedTx{height: -1} + } + return true +} + +// release is called when a pending transaction failed +// to enter the mempool. The empty element and key is removed. +func (s *store) release(txKey types.TxKey) { + s.mtx.Lock() + defer s.mtx.Unlock() + value, ok := s.txs[txKey] + if ok && value.height == -1 { + delete(s.txs, txKey) + } +} + +func (s *store) size() int { + s.mtx.RLock() + defer s.mtx.RUnlock() + return len(s.txs) +} + +func (s *store) totalBytes() int64 { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.bytes +} + +func (s *store) getAllKeys() []types.TxKey { + s.mtx.RLock() + defer s.mtx.RUnlock() + keys := make([]types.TxKey, len(s.txs)) + idx := 0 + for key := range s.txs { + keys[idx] = key + idx++ + } + return keys +} + +func (s *store) getAllTxs() []*wrappedTx { + s.mtx.RLock() + defer s.mtx.RUnlock() + txs := make([]*wrappedTx, len(s.txs)) + idx := 0 + for _, tx := range s.txs { + txs[idx] = tx + idx++ + } + return txs +} + +func (s *store) getTxsBelowPriority(priority int64) ([]*wrappedTx, int64) { + s.mtx.RLock() + defer s.mtx.RUnlock() + txs := make([]*wrappedTx, 0, len(s.txs)) + bytes := int64(0) + for _, tx := range s.txs { + if tx.priority < priority { + txs = append(txs, tx) + bytes += tx.size() + } + } + return txs, bytes +} + +// purgeExpiredTxs removes all transactions that are older than the given height +// and time. Returns the amount of transactions that were removed +func (s *store) purgeExpiredTxs(expirationHeight int64, expirationAge time.Time) int { + s.mtx.Lock() + defer s.mtx.Unlock() + counter := 0 + for key, tx := range s.txs { + if tx.height < expirationHeight || tx.timestamp.Before(expirationAge) { + s.bytes -= tx.size() + delete(s.txs, key) + counter++ + } + } + return counter +} + +func (s *store) reset() { + s.mtx.Lock() + defer s.mtx.Unlock() + s.bytes = 0 + s.txs = make(map[types.TxKey]*wrappedTx) +} diff --git a/mempool/cat/store_test.go b/mempool/cat/store_test.go new file mode 100644 index 0000000000..3cfc7225a3 --- /dev/null +++ b/mempool/cat/store_test.go @@ -0,0 +1,178 @@ +package cat + +import ( + "bytes" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/types" +) + +func TestStoreSimple(t *testing.T) { + store := newStore() + + tx := types.Tx("tx1") + key := tx.Key() + wtx := newWrappedTx(tx, key, 1, 1, 1, "") + + // asset zero state + require.Nil(t, store.get(key)) + require.False(t, store.has(key)) + require.False(t, store.remove(key)) + require.Zero(t, store.size()) + require.Zero(t, store.totalBytes()) + require.Empty(t, store.getAllKeys()) + require.Empty(t, store.getAllTxs()) + + // add a tx + store.set(wtx) + require.True(t, store.has(key)) + require.Equal(t, wtx, store.get(key)) + require.Equal(t, int(1), store.size()) + require.Equal(t, wtx.size(), store.totalBytes()) + + // remove a tx + store.remove(key) + require.False(t, store.has(key)) + require.Nil(t, store.get(key)) + require.Zero(t, store.size()) + require.Zero(t, store.totalBytes()) +} + +func TestStoreReservingTxs(t *testing.T) { + store := newStore() + + tx := types.Tx("tx1") + key := tx.Key() + wtx := newWrappedTx(tx, key, 1, 1, 1, "") + + // asset zero state + store.release(key) + + // reserve a tx + store.reserve(key) + require.True(t, store.has(key)) + // should not update the total bytes + require.Zero(t, store.totalBytes()) + + // should be able to add a tx + store.set(wtx) + require.Equal(t, tx, store.get(key).tx) + require.Equal(t, wtx.size(), store.totalBytes()) + + // releasing should do nothing on a set tx + store.release(key) + require.True(t, store.has(key)) + require.Equal(t, tx, store.get(key).tx) + + store.remove(key) + require.False(t, store.has(key)) + + // reserve the tx again + store.reserve(key) + require.True(t, store.has(key)) + + // release should remove the tx + store.release(key) + require.False(t, store.has(key)) +} + +func TestStoreConcurrentAccess(t *testing.T) { + store := newStore() + + numTxs := 100 + + wg := &sync.WaitGroup{} + for i := 0; i < numTxs; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + ticker := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-ticker.C: + tx := types.Tx(fmt.Sprintf("tx%d", i%(numTxs/10))) + key := tx.Key() + wtx := newWrappedTx(tx, key, 1, 1, 1, "") + existingTx := store.get(key) + if existingTx != nil && bytes.Equal(existingTx.tx, tx) { + // tx has already been added + return + } + if store.reserve(key) { + // some fail + if i%3 == 0 { + store.release(key) + return + } + store.set(wtx) + // this should be a noop + store.release(key) + return + } + // already reserved so we retry in 10 milliseconds + } + } + }(i) + } + wg.Wait() + + require.Equal(t, numTxs/10, store.size()) +} + +func TestStoreGetTxs(t *testing.T) { + store := newStore() + + numTxs := 100 + for i := 0; i < numTxs; i++ { + tx := types.Tx(fmt.Sprintf("tx%d", i)) + key := tx.Key() + wtx := newWrappedTx(tx, key, 1, 1, int64(i), "") + store.set(wtx) + } + + require.Equal(t, numTxs, store.size()) + + // get all txs + txs := store.getAllTxs() + require.Equal(t, numTxs, len(txs)) + + // get txs by keys + keys := store.getAllKeys() + require.Equal(t, numTxs, len(keys)) + + // get txs below a certain priority + txs, bz := store.getTxsBelowPriority(int64(numTxs / 2)) + require.Equal(t, numTxs/2, len(txs)) + var actualBz int64 = 0 + for _, tx := range txs { + actualBz += tx.size() + } + require.Equal(t, actualBz, bz) +} + +func TestStoreExpiredTxs(t *testing.T) { + store := newStore() + numTxs := 100 + for i := 0; i < numTxs; i++ { + tx := types.Tx(fmt.Sprintf("tx%d", i)) + key := tx.Key() + wtx := newWrappedTx(tx, key, int64(i), 1, 1, "") + store.set(wtx) + } + + // half of them should get purged + store.purgeExpiredTxs(int64(numTxs/2), time.Time{}) + + remainingTxs := store.getAllTxs() + require.Equal(t, numTxs/2, len(remainingTxs)) + for _, tx := range remainingTxs { + require.GreaterOrEqual(t, tx.height, int64(numTxs/2)) + } + + store.purgeExpiredTxs(int64(0), time.Now().Add(time.Second)) + require.Empty(t, store.getAllTxs()) +} diff --git a/mempool/cat/tx.go b/mempool/cat/tx.go new file mode 100644 index 0000000000..8de425cee4 --- /dev/null +++ b/mempool/cat/tx.go @@ -0,0 +1,36 @@ +package cat + +import ( + "time" + + "github.com/tendermint/tendermint/types" +) + +// wrappedTx defines a wrapper around a raw transaction with additional metadata +// that is used for indexing. With the exception of the map of peers who have +// seen this transaction, this struct should never be modified +type wrappedTx struct { + // these fields are immutable + tx types.Tx // the original transaction data + key types.TxKey // the transaction hash + height int64 // height when this transaction was initially checked (for expiry) + timestamp time.Time // time when transaction was entered (for TTL) + gasWanted int64 // app: gas required to execute this transaction + priority int64 // app: priority value for this transaction + sender string // app: assigned sender label +} + +func newWrappedTx(tx types.Tx, key types.TxKey, height, gasWanted, priority int64, sender string) *wrappedTx { + return &wrappedTx{ + tx: tx, + key: key, + height: height, + timestamp: time.Now().UTC(), + gasWanted: gasWanted, + priority: priority, + sender: sender, + } +} + +// Size reports the size of the raw transaction in bytes. +func (w *wrappedTx) size() int64 { return int64(len(w.tx)) } From ce037ccec5a6a82147245f3c4fb6e71afe359200 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Mon, 16 Jan 2023 11:28:30 +0100 Subject: [PATCH 02/12] add concurrency tests --- mempool/cat/cache_test.go | 73 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/mempool/cat/cache_test.go b/mempool/cat/cache_test.go index d4bc6eb6f9..e46a8c62e9 100644 --- a/mempool/cat/cache_test.go +++ b/mempool/cat/cache_test.go @@ -3,6 +3,7 @@ package cat import ( "fmt" "math/rand" + "sync" "testing" "time" @@ -108,3 +109,75 @@ func TestEvictedTxCache(t *testing.T) { require.False(t, cache.Has(tx2.Key())) require.False(t, cache.Has(tx3.Key())) } + +func TestSeenTxSetConcurrency(t *testing.T) { + seenSet := NewSeenTxSet() + + const ( + concurrency = 10 + numTx = 100 + ) + + wg := sync.WaitGroup{} + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func(peer uint16) { + defer wg.Done() + for i := 0; i < numTx; i++ { + tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) + seenSet.Add(tx.Key(), peer) + } + }(uint16(i%2)) + } + time.Sleep(time.Millisecond) + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func(peer uint16) { + for i := 0; i < numTx; i++ { + tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) + seenSet.Has(tx.Key(), peer) + } + }(uint16(i%2)) + } + time.Sleep(time.Millisecond) + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func(peer uint16) { + for i := numTx-1; i >= 0; i-- { + tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) + seenSet.RemoveKey(tx.Key()) + } + }(uint16(i%2)) + } + wg.Wait() +} + +func TestLRUTxCacheConcurrency(t *testing.T) { + cache := NewLRUTxCache(100) + + const ( + concurrency = 10 + numTx = 100 + ) + + wg := sync.WaitGroup{} + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < numTx; i++ { + tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) + cache.Push(tx.Key()) + } + for i := 0; i < numTx; i++ { + tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) + cache.Has(tx.Key()) + } + for i := numTx-1; i >= 0; i-- { + tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) + cache.Remove(tx.Key()) + } + }() + } + wg.Wait() +} \ No newline at end of file From 85295f39a0deb3681c04e0c3b2c817d7b64786ba Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Mon, 16 Jan 2023 11:39:45 +0100 Subject: [PATCH 03/12] CAT Part 2: write pool for handling CRUD-like operations --- mempool/cat/pool.go | 765 +++++++++++++++++++++++++++++++++++++++ mempool/cat/pool_test.go | 669 ++++++++++++++++++++++++++++++++++ types/tx.go | 4 + 3 files changed, 1438 insertions(+) create mode 100644 mempool/cat/pool.go create mode 100644 mempool/cat/pool_test.go diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go new file mode 100644 index 0000000000..40e533093d --- /dev/null +++ b/mempool/cat/pool.go @@ -0,0 +1,765 @@ +package cat + +import ( + "errors" + "fmt" + "runtime" + "sort" + "sync" + "time" + + "github.com/creachadair/taskgroup" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +var _ mempool.Mempool = (*TxPool)(nil) + +const evictedTxCacheSize = 200 + +var ( + ErrTxInMempool = errors.New("tx already exists in mempool") + ErrTxAlreadyRejected = errors.New("tx was previously rejected") +) + +// TxPoolOption sets an optional parameter on the TxPool. +type TxPoolOption func(*TxPool) + +// TxPool implemements the Mempool interface and allows the application to +// set priority values on transactions in the CheckTx response. When selecting +// transactions to include in a block, higher-priority transactions are chosen +// first. When evicting transactions from the mempool for size constraints, +// lower-priority transactions are evicted sooner. +// +// Within the txpool, transactions are ordered by time of arrival, and are +// gossiped to the rest of the network based on that order (gossip order does +// not take priority into account). +type TxPool struct { + // Immutable fields + logger log.Logger + config *config.MempoolConfig + proxyAppConn proxy.AppConnMempool + metrics *mempool.Metrics + + updateMtx sync.Mutex + notifiedTxsAvailable bool + txsAvailable chan struct{} // one value sent per height when mempool is not empty + preCheckFn mempool.PreCheckFunc + postCheckFn mempool.PostCheckFunc + height int64 // the latest height passed to Update + + // Thread-safe cache of rejected transactions for quick look-up + rejectedTxCache *LRUTxCache + // Thread-safe cache of valid txs that were evicted + evictedTxs *EvictedTxCache + // Thread-safe list of transactions peers have seen that we have not yet seen + seenByPeersSet *SeenTxSet + + // Store of wrapped transactions + store *store + + // broadcastCh is an unbuffered channel of new transactions that need to + // be broadcasted to peers. Only populated if `broadcast` is enabled + broadcastCh chan types.TxKey + broadcastMtx sync.Mutex + txsToBeBroadcast map[types.TxKey]struct{} +} + +// NewTxPool constructs a new, empty content addressable txpool at the specified +// initial height and using the given config and options. +func NewTxPool( + logger log.Logger, + cfg *config.MempoolConfig, + proxyAppConn proxy.AppConnMempool, + height int64, + options ...TxPoolOption, +) *TxPool { + txmp := &TxPool{ + logger: logger, + config: cfg, + proxyAppConn: proxyAppConn, + metrics: mempool.NopMetrics(), + rejectedTxCache: NewLRUTxCache(cfg.CacheSize), + evictedTxs: NewEvictedTxCache(evictedTxCacheSize), + seenByPeersSet: NewSeenTxSet(), + height: height, + preCheckFn: func(_ types.Tx) error { return nil }, + postCheckFn: func(_ types.Tx, _ *abci.ResponseCheckTx) error { return nil }, + store: newStore(), + broadcastCh: make(chan types.TxKey, 1), + txsToBeBroadcast: make(map[types.TxKey]struct{}), + } + + for _, opt := range options { + opt(txmp) + } + + return txmp +} + +// WithPreCheck sets a filter for the mempool to reject a transaction if f(tx) +// returns an error. This is executed before CheckTx. It only applies to the +// first created block. After that, Update() overwrites the existing value. +func WithPreCheck(f mempool.PreCheckFunc) TxPoolOption { + return func(txmp *TxPool) { txmp.preCheckFn = f } +} + +// WithPostCheck sets a filter for the mempool to reject a transaction if +// f(tx, resp) returns an error. This is executed after CheckTx. It only applies +// to the first created block. After that, Update overwrites the existing value. +func WithPostCheck(f mempool.PostCheckFunc) TxPoolOption { + return func(txmp *TxPool) { txmp.postCheckFn = f } +} + +// WithMetrics sets the mempool's metrics collector. +func WithMetrics(metrics *mempool.Metrics) TxPoolOption { + return func(txmp *TxPool) { txmp.metrics = metrics } +} + +// Lock is a noop as ABCI calls are serialized +func (txmp *TxPool) Lock() { +} + +// Unlock is a noop as ABCI calls are serialized +func (txmp *TxPool) Unlock() { +} + +// Size returns the number of valid transactions in the mempool. It is +// thread-safe. +func (txmp *TxPool) Size() int { return txmp.store.size() } + +// SizeBytes return the total sum in bytes of all the valid transactions in the +// mempool. It is thread-safe. +func (txmp *TxPool) SizeBytes() int64 { return txmp.store.totalBytes() } + +// FlushAppConn executes FlushSync on the mempool's proxyAppConn. +// +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling FlushAppConn. +func (txmp *TxPool) FlushAppConn() error { + return txmp.proxyAppConn.FlushSync() +} + +// EnableTxsAvailable enables the mempool to trigger events when transactions +// are available on a block by block basis. +func (txmp *TxPool) EnableTxsAvailable() { + txmp.txsAvailable = make(chan struct{}, 1) +} + +// TxsAvailable returns a channel which fires once for every height, and only +// when transactions are available in the mempool. It is thread-safe. +func (txmp *TxPool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } + +func (txmp *TxPool) Height() int64 { + txmp.updateMtx.Lock() + defer txmp.updateMtx.Unlock() + return txmp.height +} + +func (txmp *TxPool) Has(txKey types.TxKey) bool { + return txmp.store.has(txKey) +} + +func (txmp *TxPool) Get(txKey types.TxKey) (types.Tx, bool) { + wtx := txmp.store.get(txKey) + if wtx != nil { + return wtx.tx, true + } + return types.Tx{}, false +} + +func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool { + return txmp.rejectedTxCache.Has(txKey) +} + +func (txmp *TxPool) WasRecentlyEvicted(txKey types.TxKey) bool { + return txmp.evictedTxs.Has(txKey) +} + +func (txmp *TxPool) CanFitEvictedTx(txKey types.TxKey) bool { + info := txmp.evictedTxs.Get(txKey) + if info == nil { + return false + } + return txmp.canAddTx(info.size) +} + +// TryReinsertEvictedTx attempts to reinsert an evicted tx into the mempool. +func (txmp *TxPool) TryReinsertEvictedTx(txKey types.TxKey, tx types.Tx, peer uint16) (*abci.ResponseCheckTx, error) { + info := txmp.evictedTxs.Pop(txKey) + if info == nil { + return nil, fmt.Errorf("evicted tx %v no longer in cache. Please try again", txKey) + } + txmp.logger.Debug("attempting to reinsert evicted tx", "txKey", fmt.Sprintf("%X", txKey)) + wtx := newWrappedTx( + tx, txKey, txmp.Height(), info.gasWanted, info.priority, info.sender, + ) + checkTxResp := &abci.ResponseCheckTx{ + Code: abci.CodeTypeOK, + Priority: info.priority, + Sender: info.sender, + GasWanted: info.gasWanted, + } + if err := txmp.addNewTransaction(wtx, checkTxResp); err != nil { + return nil, err + } + return checkTxResp, nil +} + +// CheckTx adds the given transaction to the mempool if it fits and passes the +// application's ABCI CheckTx method. This should be viewed as the entry method for new transactions +// into the network. In practice this happens via an RPC endpoint +func (txmp *TxPool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo mempool.TxInfo) error { + // Reject transactions in excess of the configured maximum transaction size. + if len(tx) > txmp.config.MaxTxBytes { + return mempool.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)} + } + + // This is a new transaction that we haven't seen before. Verify it against the app and attempt + // to add it to the transaction pool. + key := tx.Key() + rsp, err := txmp.TryAddNewTx(tx, key, txInfo) + if err != nil { + return err + } + defer func() { + // call the callback if it is set + if cb != nil { + cb(&abci.Response{Value: &abci.Response_CheckTx{CheckTx: rsp}}) + } + }() + + // push to the broadcast queue that a new transaction is ready + txmp.markToBeBroadcast(key) + return nil +} + +// next is used by the reactor to get the next transaction to broadcast +// to all other peers. +func (txmp *TxPool) next() <-chan types.TxKey { + txmp.broadcastMtx.Lock() + defer txmp.broadcastMtx.Unlock() + if len(txmp.txsToBeBroadcast) != 0 { + ch := make(chan types.TxKey, 1) + for key := range txmp.txsToBeBroadcast { + delete(txmp.txsToBeBroadcast, key) + ch <- key + return ch + } + } + return txmp.broadcastCh +} + +// markToBeBroadcast marks a transaction to be broadcasted to peers. +// This should never block so we use a map to create an unbounded queue +// of transactions that need to be gossiped. +func (txmp *TxPool) markToBeBroadcast(key types.TxKey) { + if !txmp.config.Broadcast { + return + } + + select { + case txmp.broadcastCh <- key: + default: + txmp.broadcastMtx.Lock() + defer txmp.broadcastMtx.Unlock() + txmp.txsToBeBroadcast[key] = struct{}{} + } +} + +// TryAddNewTx attempts to add a tx that has not already been seen before. It first marks it as seen +// to avoid races with the same tx. It then call `CheckTx` so that the application can validate it. +// If it passes `CheckTx`, the new transaction is added to the mempool as long as it has +// sufficient priority and space else if evicted it will return an error +func (txmp *TxPool) TryAddNewTx(tx types.Tx, key types.TxKey, txInfo mempool.TxInfo) (*abci.ResponseCheckTx, error) { + // First check any of the caches to see if we can conclude early. We may have already seen and processed + // the transaction if: + // - We are connected to nodes running v0 or v1 which simply flood the network + // - If a client submits a transaction to multiple nodes (via RPC) + // - We send multiple requests and the first peer eventually responds after the second peer has already provided the tx + if txmp.IsRejectedTx(key) { + // The peer has sent us a transaction that we have previously marked as invalid. Since `CheckTx` can + // be non-deterministic, we don't punish the peer but instead just ignore the tx + return nil, ErrTxAlreadyRejected + } + + if txmp.WasRecentlyEvicted(key) { + // the transaction was recently evicted. If true, we attempt to re-add it to the mempool + // skipping check tx. + return txmp.TryReinsertEvictedTx(key, tx, txInfo.SenderID) + } + + if txmp.Has(key) { + txmp.metrics.AlreadySeenTxs.Add(1) + // The peer has sent us a transaction that we have already seen + return nil, ErrTxInMempool + } + + // reserve the key + if !txmp.store.reserve(key) { + txmp.logger.Debug("mempool already attempting to verify and add transaction", "txKey", fmt.Sprintf("%X", key)) + txmp.PeerHasTx(txInfo.SenderID, key) + return nil, ErrTxInMempool + } + defer txmp.store.release(key) + + // If a precheck hook is defined, call it before invoking the application. + if err := txmp.preCheck(tx); err != nil { + txmp.metrics.FailedTxs.Add(1) + return nil, mempool.ErrPreCheck{Reason: err} + } + + // Early exit if the proxy connection has an error. + if err := txmp.proxyAppConn.Error(); err != nil { + return nil, err + } + + // Invoke an ABCI CheckTx for this transaction. + rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx}) + if err != nil { + return rsp, err + } + if rsp.Code != abci.CodeTypeOK { + txmp.rejectedTxCache.Push(key) + txmp.metrics.FailedTxs.Add(1) + return rsp, fmt.Errorf("application rejected transaction with code %d and log %s", rsp.Code, rsp.Log) + } + + // Create wrapped tx + wtx := newWrappedTx( + tx, key, txmp.Height(), rsp.GasWanted, rsp.Priority, rsp.Sender, + ) + + // Perform the post check + err = txmp.postCheck(wtx.tx, rsp) + if err != nil { + txmp.rejectedTxCache.Push(key) + txmp.metrics.FailedTxs.Add(1) + return rsp, fmt.Errorf("rejected bad transaction after post check: %w", err) + } + + // Now we consider the transaction to be valid. Once a transaction is valid, it + // can only become invalid if recheckTx is enabled and RecheckTx returns a non zero code + if err := txmp.addNewTransaction(wtx, rsp); err != nil { + return nil, err + } + return rsp, nil +} + +// RemoveTxByKey removes the transaction with the specified key from the +// mempool. It adds it to the rejectedTxCache so it will not be added again +func (txmp *TxPool) RemoveTxByKey(txKey types.TxKey) error { + txmp.removeTxByKey(txKey) + txmp.metrics.EvictedTxs.Add(1) + return nil +} + +func (txmp *TxPool) removeTxByKey(txKey types.TxKey) { + txmp.rejectedTxCache.Push(txKey) + _ = txmp.evictedTxs.Pop(txKey) + _ = txmp.store.remove(txKey) + txmp.seenByPeersSet.RemoveKey(txKey) + txmp.metrics.EvictedTxs.Add(1) + txmp.broadcastMtx.Lock() + defer txmp.broadcastMtx.Unlock() + txmp.txsToBeBroadcast = make(map[types.TxKey]struct{}) +} + +// Flush purges the contents of the mempool and the cache, leaving both empty. +// The current height is not modified by this operation. +func (txmp *TxPool) Flush() { + // Remove all the transactions in the list explicitly, so that the sizes + // and indexes get updated properly. + size := txmp.Size() + txmp.store.reset() + txmp.seenByPeersSet.Reset() + txmp.evictedTxs.Reset() + txmp.rejectedTxCache.Reset() + txmp.metrics.EvictedTxs.Add(float64(size)) + txmp.broadcastMtx.Lock() + defer txmp.broadcastMtx.Unlock() + txmp.txsToBeBroadcast = make(map[types.TxKey]struct{}) +} + +// PeerHasTx marks that the transaction has been seen by a peer. +func (txmp *TxPool) PeerHasTx(peer uint16, txKey types.TxKey) { + txmp.logger.Debug("peer has tx", "peer", peer, "txKey", fmt.Sprintf("%X", txKey)) + txmp.seenByPeersSet.Add(txKey, peer) +} + +// allEntriesSorted returns a slice of all the transactions currently in the +// mempool, sorted in nonincreasing order by priority with ties broken by +// increasing order of arrival time. +func (txmp *TxPool) allEntriesSorted() []*wrappedTx { + txs := txmp.store.getAllTxs() + sort.Slice(txs, func(i, j int) bool { + if txs[i].priority == txs[j].priority { + return txs[i].timestamp.Before(txs[j].timestamp) + } + return txs[i].priority > txs[j].priority // N.B. higher priorities first + }) + return txs +} + +// ReapMaxBytesMaxGas returns a slice of valid transactions that fit within the +// size and gas constraints. The results are ordered by nonincreasing priority, +// with ties broken by increasing order of arrival. Reaping transactions does +// not remove them from the mempool +// +// If maxBytes < 0, no limit is set on the total size in bytes. +// If maxGas < 0, no limit is set on the total gas cost. +// +// If the mempool is empty or has no transactions fitting within the given +// constraints, the result will also be empty. +func (txmp *TxPool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { + var totalGas, totalBytes int64 + + var keep []types.Tx //nolint:prealloc + for _, w := range txmp.allEntriesSorted() { + // N.B. When computing byte size, we need to include the overhead for + // encoding as protobuf to send to the application. + totalGas += w.gasWanted + totalBytes += types.ComputeProtoSizeForTxs([]types.Tx{w.tx}) + if (maxGas >= 0 && totalGas > maxGas) || (maxBytes >= 0 && totalBytes > maxBytes) { + break + } + keep = append(keep, w.tx) + } + return keep +} + +// ReapMaxTxs returns up to max transactions from the mempool. The results are +// ordered by nonincreasing priority with ties broken by increasing order of +// arrival. Reaping transactions does not remove them from the mempool. +// +// If max < 0, all transactions in the mempool are reaped. +// +// The result may have fewer than max elements (possibly zero) if the mempool +// does not have that many transactions available. +func (txmp *TxPool) ReapMaxTxs(max int) types.Txs { + var keep []types.Tx //nolint:prealloc + + for _, w := range txmp.allEntriesSorted() { + if max >= 0 && len(keep) >= max { + break + } + keep = append(keep, w.tx) + } + return keep +} + +// Update removes all the given transactions from the mempool and the cache, +// and updates the current block height. The blockTxs and deliverTxResponses +// must have the same length with each response corresponding to the tx at the +// same offset. +// +// If the configuration enables recheck, Update sends each remaining +// transaction after removing blockTxs to the ABCI CheckTx method. Any +// transactions marked as invalid during recheck are also removed. +// +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling Update. +func (txmp *TxPool) Update( + blockHeight int64, + blockTxs types.Txs, + deliverTxResponses []*abci.ResponseDeliverTx, + newPreFn mempool.PreCheckFunc, + newPostFn mempool.PostCheckFunc, +) error { + // Safety check: Transactions and responses must match in number. + if len(blockTxs) != len(deliverTxResponses) { + panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses", + len(blockTxs), len(deliverTxResponses))) + } + txmp.logger.Debug("updating mempool", "height", blockHeight, "txs", len(blockTxs)) + + txmp.updateMtx.Lock() + txmp.height = blockHeight + txmp.notifiedTxsAvailable = false + + if newPreFn != nil { + txmp.preCheckFn = newPreFn + } + if newPostFn != nil { + txmp.postCheckFn = newPostFn + } + txmp.updateMtx.Unlock() + + txmp.metrics.SuccessfulTxs.Add(float64(len(blockTxs))) + for _, tx := range blockTxs { + // Regardless of success, remove the transaction from the mempool. + txmp.removeTxByKey(tx.Key()) + } + + txmp.purgeExpiredTxs(blockHeight) + + // If there any uncommitted transactions left in the mempool, we either + // initiate re-CheckTx per remaining transaction or notify that remaining + // transactions are left. + size := txmp.Size() + txmp.metrics.Size.Set(float64(size)) + if size > 0 { + if txmp.config.Recheck { + txmp.recheckTransactions() + } else { + txmp.notifyTxsAvailable() + } + } + return nil +} + +// addNewTransaction handles the ABCI CheckTx response for the first time a +// transaction is added to the mempool. A recheck after a block is committed +// goes to handleRecheckResult. +// +// If either the application rejected the transaction or a post-check hook is +// defined and rejects the transaction, it is discarded. +// +// Otherwise, if the mempool is full, check for lower-priority transactions +// that can be evicted to make room for the new one. If no such transactions +// exist, this transaction is logged and dropped; otherwise the selected +// transactions are evicted. +// +// Finally, the new transaction is added and size stats updated. +func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseCheckTx) error { + // At this point the application has ruled the transaction valid, but the + // mempool might be full. If so, find the lowest-priority items with lower + // priority than the application assigned to this new one, and evict as many + // of them as necessary to make room for tx. If no such items exist, we + // discard tx. + if !txmp.canAddTx(wtx.size()) { + victims, victimBytes := txmp.store.getTxsBelowPriority(wtx.priority) + + // If there are no suitable eviction candidates, or the total size of + // those candidates is not enough to make room for the new transaction, + // drop the new one. + if len(victims) == 0 || victimBytes < wtx.size() { + txmp.metrics.EvictedTxs.Add(1) + txmp.evictedTxs.Push(wtx) + checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)", + wtx.key) + return fmt.Errorf("rejected valid incoming transaction; mempool is full (%X). Size: (%d:%d)", + wtx.key.String(), txmp.Size(), txmp.SizeBytes()) + } + + txmp.logger.Debug("evicting lower-priority transactions", + "new_tx", wtx.key.String(), + "new_priority", wtx.priority, + ) + + // Sort lowest priority items first so they will be evicted first. Break + // ties in favor of newer items (to maintain FIFO semantics in a group). + sort.Slice(victims, func(i, j int) bool { + iw := victims[i] + jw := victims[j] + if iw.priority == jw.priority { + return iw.timestamp.After(jw.timestamp) + } + return iw.priority < jw.priority + }) + + // Evict as many of the victims as necessary to make room. + var evictedBytes int64 + for _, tx := range victims { + txmp.evictTx(tx) + + // We may not need to evict all the eligible transactions. Bail out + // early if we have made enough room. + evictedBytes += tx.size() + if evictedBytes >= wtx.size() { + break + } + } + } + + txmp.store.set(wtx) + + txmp.metrics.TxSizeBytes.Observe(float64(wtx.size())) + txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.logger.Debug( + "inserted new valid transaction", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.key), + "height", wtx.height, + "num_txs", txmp.Size(), + ) + txmp.notifyTxsAvailable() + return nil +} + +func (txmp *TxPool) evictTx(wtx *wrappedTx) { + txmp.store.remove(wtx.key) + txmp.metrics.EvictedTxs.Add(1) + txmp.evictedTxs.Push(wtx) + txmp.logger.Debug( + "evicted valid existing transaction; mempool full", + "old_tx", fmt.Sprintf("%X", wtx.key), + "old_priority", wtx.priority, + ) +} + +// handleRecheckResult handles the responses from ABCI CheckTx calls issued +// during the recheck phase of a block Update. It removes any transactions +// invalidated by the application. +// +// This method is NOT executed for the initial CheckTx on a new transaction; +// that case is handled by addNewTransaction instead. +func (txmp *TxPool) handleRecheckResult(wtx *wrappedTx, checkTxRes *abci.ResponseCheckTx) { + txmp.metrics.RecheckTimes.Add(1) + + // If a postcheck hook is defined, call it before checking the result. + err := txmp.postCheck(wtx.tx, checkTxRes) + + if checkTxRes.Code == abci.CodeTypeOK && err == nil { + // Note that we do not update the transaction with any of the values returned in + // recheck tx + return // N.B. Size of mempool did not change + } + + txmp.logger.Debug( + "existing transaction no longer valid; failed re-CheckTx callback", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.key), + "err", err, + "code", checkTxRes.Code, + ) + txmp.store.remove(wtx.key) + txmp.rejectedTxCache.Push(wtx.key) + txmp.metrics.FailedTxs.Add(1) + txmp.metrics.Size.Set(float64(txmp.Size())) +} + +// recheckTransactions initiates re-CheckTx ABCI calls for all the transactions +// currently in the mempool. It reports the number of recheck calls that were +// successfully initiated. +// +// Precondition: The mempool is not empty. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxPool) recheckTransactions() { + if txmp.Size() == 0 { + panic("mempool: cannot run recheck on an empty mempool") + } + txmp.logger.Debug( + "executing re-CheckTx for all remaining transactions", + "num_txs", txmp.Size(), + "height", txmp.Height(), + ) + + // Collect transactions currently in the mempool requiring recheck. + wtxs := txmp.store.getAllTxs() + + // Issue CheckTx calls for each remaining transaction, and when all the + // rechecks are complete signal watchers that transactions may be available. + go func() { + g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) + + for _, wtx := range wtxs { + wtx := wtx + start(func() error { + // The response for this CheckTx is handled by the default recheckTxCallback. + rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) + if err != nil { + txmp.logger.Error("failed to execute CheckTx during recheck", + "err", err, "key", fmt.Sprintf("%x", wtx.key)) + } else { + txmp.handleRecheckResult(wtx, rsp) + } + return nil + }) + } + _ = txmp.proxyAppConn.FlushAsync() + + // When recheck is complete, trigger a notification for more transactions. + _ = g.Wait() + txmp.notifyTxsAvailable() + }() +} + +// canAddTx returns an error if we cannot insert the provided *wrappedTx into +// the mempool due to mempool configured constraints. Otherwise, nil is +// returned and the transaction can be inserted into the mempool. +func (txmp *TxPool) canAddTx(size int64) bool { + numTxs := txmp.Size() + txBytes := txmp.SizeBytes() + + if numTxs > txmp.config.Size || size+txBytes > txmp.config.MaxTxsBytes { + return false + } + + return true +} + +// purgeExpiredTxs removes all transactions from the mempool that have exceeded +// their respective height or time-based limits as of the given blockHeight. +// Transactions removed by this operation are not removed from the rejectedTxCache. +// +// The caller must hold txmp.mtx exclusively. +func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) { + if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 { + return // nothing to do + } + + expirationHeight := blockHeight - txmp.config.TTLNumBlocks + if txmp.config.TTLNumBlocks == 0 { + expirationHeight = 0 + } + + now := time.Now() + expirationAge := now.Add(-txmp.config.TTLDuration) + if txmp.config.TTLDuration == 0 { + expirationAge = time.Time{} + } + + numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge) + txmp.metrics.EvictedTxs.Add(float64(numExpired)) + + // purge old evicted and seen transactions + if txmp.config.TTLDuration == 0 { + // ensure that evictedTxs and seenByPeersSet are eventually pruned + expirationAge = now.Add(-time.Hour) + } + txmp.evictedTxs.Prune(expirationAge) + txmp.seenByPeersSet.Prune(expirationAge) +} + +func (txmp *TxPool) notifyTxsAvailable() { + if txmp.Size() == 0 { + return // nothing to do + } + + if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable { + // channel cap is 1, so this will send once + txmp.notifiedTxsAvailable = true + + select { + case txmp.txsAvailable <- struct{}{}: + default: + } + } +} + +func (txmp *TxPool) preCheck(tx types.Tx) error { + txmp.updateMtx.Lock() + defer txmp.updateMtx.Unlock() + if txmp.preCheckFn != nil { + return txmp.preCheckFn(tx) + } + return nil +} + +func (txmp *TxPool) postCheck(tx types.Tx, res *abci.ResponseCheckTx) error { + txmp.updateMtx.Lock() + defer txmp.updateMtx.Unlock() + if txmp.postCheckFn != nil { + return txmp.postCheckFn(tx, res) + } + return nil +} diff --git a/mempool/cat/pool_test.go b/mempool/cat/pool_test.go new file mode 100644 index 0000000000..d108f76780 --- /dev/null +++ b/mempool/cat/pool_test.go @@ -0,0 +1,669 @@ +package cat + +import ( + "bytes" + "errors" + "fmt" + "math/rand" + "os" + "sort" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/abci/example/code" + "github.com/tendermint/tendermint/abci/example/kvstore" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +// application extends the KV store application by overriding CheckTx to provide +// transaction priority based on the value in the key/value pair. +type application struct { + *kvstore.Application +} + +type testTx struct { + tx types.Tx + priority int64 +} + +func newTx(i int, peerID uint16, msg []byte, priority int64) []byte { + return []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, msg, priority)) +} + +func newDefaultTx(msg string) types.Tx { + return types.Tx(newTx(0, 0, []byte(msg), 1)) +} + +func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { + var ( + priority int64 + sender string + ) + + // infer the priority from the raw transaction value (sender=key=value) + parts := bytes.Split(req.Tx, []byte("=")) + if len(parts) == 3 { + v, err := strconv.ParseInt(string(parts[2]), 10, 64) + if err != nil { + return abci.ResponseCheckTx{ + Priority: priority, + Code: 100, + GasWanted: 1, + } + } + + priority = v + sender = string(parts[0]) + } else { + return abci.ResponseCheckTx{ + Priority: priority, + Code: 101, + GasWanted: 1, + } + } + + return abci.ResponseCheckTx{ + Priority: priority, + Sender: sender, + Code: code.CodeTypeOK, + GasWanted: 1, + } +} + +func setup(t testing.TB, cacheSize int, options ...TxPoolOption) *TxPool { + t.Helper() + + app := &application{kvstore.NewApplication()} + cc := proxy.NewLocalClientCreator(app) + + cfg := config.TestMempoolConfig() + cfg.CacheSize = cacheSize + + appConnMem, err := cc.NewABCIClient() + require.NoError(t, err) + require.NoError(t, appConnMem.Start()) + + t.Cleanup(func() { + os.RemoveAll(cfg.RootDir) + require.NoError(t, appConnMem.Stop()) + }) + + return NewTxPool(log.TestingLogger().With("test", t.Name()), cfg, appConnMem, 1, options...) +} + +// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until +// its callback has finished executing. It fails t if CheckTx fails. +func mustCheckTx(t *testing.T, txmp *TxPool, spec string) { + require.NoError(t, txmp.CheckTx([]byte(spec), nil, mempool.TxInfo{})) +} + +func checkTxs(t *testing.T, txmp *TxPool, numTxs int, peerID uint16) []testTx { + txs := make([]testTx, numTxs) + txInfo := mempool.TxInfo{SenderID: peerID} + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + current := txmp.Size() + for i := 0; i < numTxs; i++ { + prefix := make([]byte, 20) + _, err := rng.Read(prefix) + require.NoError(t, err) + + priority := int64(rng.Intn(9999-1000) + 1000) + + txs[i] = testTx{ + tx: newTx(i, peerID, prefix, priority), + priority: priority, + } + require.NoError(t, txmp.CheckTx(txs[i].tx, nil, txInfo)) + // assert that none of them get silently evicted + require.Equal(t, current+i+1, txmp.Size()) + } + + return txs +} + +func TestTxPool_TxsAvailable(t *testing.T) { + txmp := setup(t, 0) + txmp.EnableTxsAvailable() + + ensureNoTxFire := func() { + timer := time.NewTimer(500 * time.Millisecond) + select { + case <-txmp.TxsAvailable(): + require.Fail(t, "unexpected transactions event") + case <-timer.C: + } + } + + ensureTxFire := func() { + timer := time.NewTimer(500 * time.Millisecond) + select { + case <-txmp.TxsAvailable(): + case <-timer.C: + require.Fail(t, "expected transactions event") + } + } + + // ensure no event as we have not executed any transactions yet + ensureNoTxFire() + + // Execute CheckTx for some transactions and ensure TxsAvailable only fires + // once. + txs := checkTxs(t, txmp, 100, 0) + ensureTxFire() + ensureNoTxFire() + + rawTxs := make([]types.Tx, len(txs)) + for i, tx := range txs { + rawTxs[i] = tx.tx + } + + responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + require.Equal(t, 100, txmp.Size()) + + // commit half the transactions and ensure we fire an event + txmp.Lock() + require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + txmp.Unlock() + ensureTxFire() + ensureNoTxFire() + + // Execute CheckTx for more transactions and ensure we do not fire another + // event as we're still on the same height (1). + _ = checkTxs(t, txmp, 100, 0) + ensureNoTxFire() +} + +func TestTxPool_Size(t *testing.T) { + txmp := setup(t, 0) + txs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + + rawTxs := make([]types.Tx, len(txs)) + for i, tx := range txs { + rawTxs[i] = tx.tx + } + + responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + txmp.Unlock() + + require.Equal(t, len(rawTxs)/2, txmp.Size()) + require.Equal(t, int64(2850), txmp.SizeBytes()) +} + +func TestTxPool_Eviction(t *testing.T) { + txmp := setup(t, 1000) + txmp.config.Size = 5 + txmp.config.MaxTxsBytes = 60 + txExists := func(spec string) bool { + return txmp.store.has(types.Tx(spec).Key()) + } + + txEvicted := func(spec string) bool { + return txmp.evictedTxs.Has(types.Tx(spec).Key()) + } + + // A transaction bigger than the mempool should be rejected even when there + // are slots available. + err := txmp.CheckTx(types.Tx("big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1"), nil, mempool.TxInfo{}) + require.Error(t, err) + require.Contains(t, err.Error(), "mempool is full") + require.Equal(t, 0, txmp.Size()) + + // Nearly-fill the mempool with a low-priority transaction, to show that it + // is evicted even when slots are available for a higher-priority tx. + const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2" + mustCheckTx(t, txmp, bigTx) + require.Equal(t, 1, txmp.Size()) // bigTx is the only element + require.True(t, txExists(bigTx)) + require.Equal(t, int64(len(bigTx)), txmp.SizeBytes()) + + // The next transaction should evict bigTx, because it is higher priority + // but does not fit on size. + mustCheckTx(t, txmp, "key1=0000=25") + require.True(t, txExists("key1=0000=25")) + require.False(t, txExists(bigTx)) + require.True(t, txEvicted(bigTx)) + require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) + + // Now fill up the rest of the slots with other transactions. + mustCheckTx(t, txmp, "key2=0001=5") + mustCheckTx(t, txmp, "key3=0002=10") + mustCheckTx(t, txmp, "key4=0003=3") + mustCheckTx(t, txmp, "key5=0004=3") + + // A new transaction with low priority should be discarded. + err = txmp.CheckTx(types.Tx("key6=0005=1"), nil, mempool.TxInfo{}) + require.Error(t, err) + require.Contains(t, err.Error(), "mempool is full") + require.False(t, txExists("key6=0005=1")) + // transactions instantly evicted should still be cached + require.True(t, txEvicted("key6=0005=1")) + + // A new transaction with higher priority should evict key5, which is the + // newest of the two transactions with lowest priority. + mustCheckTx(t, txmp, "key7=0006=7") + require.True(t, txExists("key7=0006=7")) // new transaction added + require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted + require.True(t, txExists("key4=0003=3")) // older low-priority tx retained + + // Another new transaction evicts the other low-priority element. + mustCheckTx(t, txmp, "key8=0007=20") + require.True(t, txExists("key8=0007=20")) + require.False(t, txExists("key4=0003=3")) + + // Now the lowest-priority tx is 5, so that should be the next to go. + mustCheckTx(t, txmp, "key9=0008=9") + require.True(t, txExists("key9=0008=9")) + require.False(t, txExists("k3y2=0001=5")) + + // Add a transaction that requires eviction of multiple lower-priority + // entries, in order to fit the size of the element. + mustCheckTx(t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11 + require.True(t, txExists("key1=0000=25")) + require.True(t, txExists("key8=0007=20")) + require.True(t, txExists("key10=0123456789abcdef=11")) + require.False(t, txExists("key3=0002=10")) + require.False(t, txExists("key9=0008=9")) + require.False(t, txExists("key7=0006=7")) + + // Free up some space so we can add back previously evicted txs + err = txmp.Update(1, types.Txs{types.Tx("key10=0123456789abcdef=11")}, []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}}, nil, nil) + require.NoError(t, err) + require.False(t, txExists("key10=0123456789abcdef=11")) + mustCheckTx(t, txmp, "key3=0002=10") + require.True(t, txExists("key3=0002=10")) + + // remove a high priority tx and check if there is + // space for the previously evicted tx + require.NoError(t, txmp.RemoveTxByKey(types.Tx("key8=0007=20").Key())) + require.False(t, txExists("key8=0007=20")) + require.True(t, txmp.CanFitEvictedTx(types.Tx("key9=0008=9").Key())) +} + +func TestTxPool_Flush(t *testing.T) { + txmp := setup(t, 0) + txs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + + rawTxs := make([]types.Tx, len(txs)) + for i, tx := range txs { + rawTxs[i] = tx.tx + } + + responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + txmp.Unlock() + + txmp.Flush() + require.Zero(t, txmp.Size()) + require.Equal(t, int64(0), txmp.SizeBytes()) +} + +func TestTxPool_ReapMaxBytesMaxGas(t *testing.T) { + txmp := setup(t, 0) + tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + + txMap := make(map[types.TxKey]testTx) + priorities := make([]int64, len(tTxs)) + for i, tTx := range tTxs { + txMap[tTx.tx.Key()] = tTx + priorities[i] = tTx.priority + } + + sort.Slice(priorities, func(i, j int) bool { + // sort by priority, i.e. decreasing order + return priorities[i] > priorities[j] + }) + + ensurePrioritized := func(reapedTxs types.Txs) { + reapedPriorities := make([]int64, len(reapedTxs)) + for i, rTx := range reapedTxs { + reapedPriorities[i] = txMap[rTx.Key()].priority + } + + require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) + } + + // reap by gas capacity only + reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 50) + + // reap by transaction bytes only + reapedTxs = txmp.ReapMaxBytesMaxGas(1200, -1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.GreaterOrEqual(t, len(reapedTxs), 16) + + // Reap by both transaction bytes and gas, where the size yields 31 reaped + // transactions and the gas limit reaps 25 transactions. + reapedTxs = txmp.ReapMaxBytesMaxGas(2000, 25) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 25) +} + +func TestTxPool_ReapMaxTxs(t *testing.T) { + txmp := setup(t, 0) + txs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + + txMap := make(map[types.TxKey]int64) + for _, tx := range txs { + txMap[tx.tx.Key()] = tx.priority + } + + ensurePrioritized := func(reapedTxs types.Txs) { + for i := 0; i < len(reapedTxs)-1; i++ { + currPriority := txMap[reapedTxs[i].Key()] + nextPriority := txMap[reapedTxs[i+1].Key()] + require.GreaterOrEqual(t, currPriority, nextPriority) + } + } + + // reap all transactions + reapedTxs := txmp.ReapMaxTxs(-1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, len(txs)) + + // reap a single transaction + reapedTxs = txmp.ReapMaxTxs(1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 1) + + // reap half of the transactions + reapedTxs = txmp.ReapMaxTxs(len(txs) / 2) + ensurePrioritized(reapedTxs) + require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, len(txs)/2) +} + +func TestTxPool_CheckTxExceedsMaxSize(t *testing.T) { + txmp := setup(t, 0) + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + tx := make([]byte, txmp.config.MaxTxBytes+1) + _, err := rng.Read(tx) + require.NoError(t, err) + + err = txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0}) + require.Equal(t, mempool.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}, err) + + tx = make([]byte, txmp.config.MaxTxBytes-1) + _, err = rng.Read(tx) + require.NoError(t, err) + + err = txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0}) + require.NotEqual(t, mempool.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}, err) +} + +func TestTxPool_CheckTxSamePeer(t *testing.T) { + txmp := setup(t, 100) + peerID := uint16(1) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + prefix := make([]byte, 20) + _, err := rng.Read(prefix) + require.NoError(t, err) + + tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50)) + + require.NoError(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: peerID})) + require.Error(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: peerID})) +} + +func TestTxPool_ConcurrentTxs(t *testing.T) { + txmp := setup(t, 100) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + checkTxDone := make(chan struct{}) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + for i := 0; i < 20; i++ { + _ = checkTxs(t, txmp, 100, 0) + dur := rng.Intn(1000-500) + 500 + time.Sleep(time.Duration(dur) * time.Millisecond) + } + + wg.Done() + close(checkTxDone) + }() + + wg.Add(1) + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + defer wg.Done() + + var height int64 = 1 + + for range ticker.C { + reapedTxs := txmp.ReapMaxTxs(200) + if len(reapedTxs) > 0 { + responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) + for i := 0; i < len(responses); i++ { + var code uint32 + + if i%10 == 0 { + code = 100 + } else { + code = abci.CodeTypeOK + } + + responses[i] = &abci.ResponseDeliverTx{Code: code} + } + + txmp.Lock() + require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil)) + txmp.Unlock() + + height++ + } else { + // only return once we know we finished the CheckTx loop + select { + case <-checkTxDone: + return + default: + } + } + } + }() + + wg.Wait() + require.Zero(t, txmp.Size()) + require.Zero(t, txmp.SizeBytes()) +} + +func TestTxPool_ExpiredTxs_Timestamp(t *testing.T) { + txmp := setup(t, 5000) + txmp.config.TTLDuration = 5 * time.Millisecond + + added1 := checkTxs(t, txmp, 10, 0) + require.Equal(t, len(added1), txmp.Size()) + + // Wait a while, then add some more transactions that should not be expired + // when the first batch TTLs out. + // + // ms: 0 1 2 3 4 5 6 + // ^ ^ ^ ^ + // | | | +-- Update (triggers pruning) + // | | +------ first batch expires + // | +-------------- second batch added + // +-------------------------- first batch added + // + // The exact intervals are not important except that the delta should be + // large relative to the cost of CheckTx (ms vs. ns is fine here). + time.Sleep(3 * time.Millisecond) + added2 := checkTxs(t, txmp, 10, 1) + + // Wait a while longer, so that the first batch will expire. + time.Sleep(3 * time.Millisecond) + + // Trigger an update so that pruning will occur. + txmp.Lock() + defer txmp.Unlock() + require.NoError(t, txmp.Update(txmp.height+1, nil, nil, nil, nil)) + + // All the transactions in the original set should have been purged. + for _, tx := range added1 { + if txmp.store.has(tx.tx.Key()) { + t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key()) + } + if txmp.rejectedTxCache.Has(tx.tx.Key()) { + t.Errorf("Transaction %X should have been removed from the cache", tx.tx.Key()) + } + } + + // All the transactions added later should still be around. + for _, tx := range added2 { + if !txmp.store.has(tx.tx.Key()) { + t.Errorf("Transaction %X should still be in the mempool, but is not", tx.tx.Key()) + } + } +} + +func TestTxPool_ExpiredTxs_NumBlocks(t *testing.T) { + txmp := setup(t, 500) + txmp.height = 100 + txmp.config.TTLNumBlocks = 10 + + tTxs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(tTxs), txmp.Size()) + + // reap 5 txs at the next height -- no txs should expire + reapedTxs := txmp.ReapMaxTxs(5) + responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil)) + txmp.Unlock() + + require.Equal(t, 95, txmp.Size()) + + // check more txs at height 101 + _ = checkTxs(t, txmp, 50, 1) + require.Equal(t, 145, txmp.Size()) + + // Reap 5 txs at a height that would expire all the transactions from before + // the previous Update (height 100). + // + // NOTE: When we reap txs below, we do not know if we're picking txs from the + // initial CheckTx calls or from the second round of CheckTx calls. Thus, we + // cannot guarantee that all 95 txs are remaining that should be expired and + // removed. However, we do know that that at most 95 txs can be expired and + // removed. + reapedTxs = txmp.ReapMaxTxs(5) + responses = make([]*abci.ResponseDeliverTx, len(reapedTxs)) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil)) + txmp.Unlock() + + require.GreaterOrEqual(t, txmp.Size(), 45) +} + +func TestTxPool_CheckTxPostCheckError(t *testing.T) { + cases := []struct { + name string + err error + }{ + { + name: "error", + err: errors.New("test error"), + }, + { + name: "no error", + err: nil, + }, + } + for _, tc := range cases { + testCase := tc + t.Run(testCase.name, func(t *testing.T) { + postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error { + return testCase.err + } + txmp := setup(t, 0, WithPostCheck(postCheckFn)) + tx := []byte("sender=0000=1") + err := txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0}) + require.True(t, errors.Is(err, testCase.err)) + }) + } +} + +func TestConcurrentlyAddingTx(t *testing.T) { + txmp := setup(t, 500) + tx := types.Tx("sender=0000=1") + + numTxs := 10 + errCh := make(chan error, numTxs) + wg := &sync.WaitGroup{} + for i := 0; i < numTxs; i++ { + wg.Add(1) + go func(sender uint16) { + defer wg.Done() + _, err := txmp.TryAddNewTx(tx, tx.Key(), mempool.TxInfo{SenderID: sender}) + errCh <- err + }(uint16(i + 1)) + } + go func() { + wg.Wait() + close(errCh) + }() + + errCount := 0 + for err := range errCh { + if err != nil { + require.Equal(t, ErrTxInMempool, err) + errCount++ + } + } + require.Equal(t, numTxs-1, errCount) +} diff --git a/types/tx.go b/types/tx.go index 7767df7051..1555ec2b6e 100644 --- a/types/tx.go +++ b/types/tx.go @@ -58,6 +58,10 @@ func (tx Tx) String() string { return fmt.Sprintf("Tx{%X}", []byte(tx)) } +func (key TxKey) String() string { + return fmt.Sprintf("TxKey{%X}", key[:]) +} + // Txs is a slice of Tx. type Txs []Tx From 56ce24c25575176d7789737d0df68d3ecba26ad9 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Mon, 16 Jan 2023 11:41:16 +0100 Subject: [PATCH 04/12] add missing defers --- mempool/cat/cache_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mempool/cat/cache_test.go b/mempool/cat/cache_test.go index e46a8c62e9..dc650ef611 100644 --- a/mempool/cat/cache_test.go +++ b/mempool/cat/cache_test.go @@ -133,6 +133,7 @@ func TestSeenTxSetConcurrency(t *testing.T) { for i := 0; i < concurrency; i++ { wg.Add(1) go func(peer uint16) { + defer wg.Done() for i := 0; i < numTx; i++ { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) seenSet.Has(tx.Key(), peer) @@ -143,6 +144,7 @@ func TestSeenTxSetConcurrency(t *testing.T) { for i := 0; i < concurrency; i++ { wg.Add(1) go func(peer uint16) { + defer wg.Done() for i := numTx-1; i >= 0; i-- { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) seenSet.RemoveKey(tx.Key()) From 36fdb73123467675bb04e0a713c7e7d4d677420c Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Mon, 16 Jan 2023 12:31:38 +0100 Subject: [PATCH 05/12] add bench test --- mempool/cat/pool_bench_test.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 mempool/cat/pool_bench_test.go diff --git a/mempool/cat/pool_bench_test.go b/mempool/cat/pool_bench_test.go new file mode 100644 index 0000000000..9fd687d349 --- /dev/null +++ b/mempool/cat/pool_bench_test.go @@ -0,0 +1,32 @@ +package cat + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/mempool" +) + +func BenchmarkTxPool_CheckTx(b *testing.B) { + txmp := setup(b, 10000) + txmp.config.Size = b.N + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + b.StopTimer() + prefix := make([]byte, 20) + _, err := rng.Read(prefix) + require.NoError(b, err) + + priority := int64(rng.Intn(9999-1000) + 1000) + tx := []byte(fmt.Sprintf("sender%d=%X=%d", n, prefix, priority)) + b.StartTimer() + + require.NoError(b, txmp.CheckTx(tx, nil, mempool.TxInfo{})) + } +} From 6bc8c4c8f978cf6f68457493888becf0cb137748 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 19 Jan 2023 15:32:26 +0100 Subject: [PATCH 06/12] Apply suggestions from code review Co-authored-by: CHAMI Rachid --- mempool/cat/cache_test.go | 2 +- mempool/cat/peers.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mempool/cat/cache_test.go b/mempool/cat/cache_test.go index dc650ef611..01ddb96c2d 100644 --- a/mempool/cat/cache_test.go +++ b/mempool/cat/cache_test.go @@ -182,4 +182,4 @@ func TestLRUTxCacheConcurrency(t *testing.T) { }() } wg.Wait() -} \ No newline at end of file +} diff --git a/mempool/cat/peers.go b/mempool/cat/peers.go index 300a6a2f26..e579c9e899 100644 --- a/mempool/cat/peers.go +++ b/mempool/cat/peers.go @@ -27,7 +27,7 @@ func newMempoolIDs() *mempoolIDs { } } -// Reserve searches for the next unused ID and assigns it to the +// ReserveForPeer searches for the next unused ID and assigns it to the // peer. func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { ids.mtx.Lock() From 08f8785a2b0768366461229dab01794c7ffe66ee Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 26 Jan 2023 10:05:06 +0100 Subject: [PATCH 07/12] incorporate suggestions --- mempool/cat/cache.go | 21 ++++++++++++++------- mempool/cat/cache_test.go | 10 +++++----- mempool/cat/peers.go | 8 ++++---- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/mempool/cat/cache.go b/mempool/cat/cache.go index 2584452d8d..26af547eab 100644 --- a/mempool/cat/cache.go +++ b/mempool/cat/cache.go @@ -15,9 +15,11 @@ import ( type LRUTxCache struct { staticSize int - mtx tmsync.Mutex + mtx tmsync.Mutex + // cacheMap is used as a quick look up table cacheMap map[types.TxKey]*list.Element - list *list.List + // list is a doubly linked list used to capture the FIFO nature of the cache + list *list.List } func NewLRUTxCache(cacheSize int) *LRUTxCache { @@ -66,6 +68,10 @@ func (c *LRUTxCache) Push(txKey types.TxKey) bool { } func (c *LRUTxCache) Remove(txKey types.TxKey) { + if c.staticSize == 0 { + return + } + c.mtx.Lock() defer c.mtx.Unlock() @@ -107,7 +113,7 @@ type EvictedTxCache struct { func NewEvictedTxCache(size int) *EvictedTxCache { return &EvictedTxCache{ staticSize: size, - cache: make(map[types.TxKey]*EvictedTxInfo), + cache: make(map[types.TxKey]*EvictedTxInfo, size+1), } } @@ -183,7 +189,7 @@ type SeenTxSet struct { } type timestampedPeerSet struct { - peers map[uint16]bool + peers map[uint16]struct{} time time.Time } @@ -202,11 +208,11 @@ func (s *SeenTxSet) Add(txKey types.TxKey, peer uint16) { seenSet, exists := s.set[txKey] if !exists { s.set[txKey] = timestampedPeerSet{ - peers: map[uint16]bool{peer: true}, + peers: map[uint16]struct{}{peer: struct{}{}}, time: time.Now().UTC(), } } else { - seenSet.peers[peer] = true + seenSet.peers[peer] = struct{}{} } } @@ -259,7 +265,8 @@ func (s *SeenTxSet) Has(txKey types.TxKey, peer uint16) bool { if !exists { return false } - return seenSet.peers[peer] + _, has := seenSet.peers[peer] + return has } func (s *SeenTxSet) Get(txKey types.TxKey) map[uint16]struct{} { diff --git a/mempool/cat/cache_test.go b/mempool/cat/cache_test.go index 01ddb96c2d..e23ef71114 100644 --- a/mempool/cat/cache_test.go +++ b/mempool/cat/cache_test.go @@ -127,7 +127,7 @@ func TestSeenTxSetConcurrency(t *testing.T) { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) seenSet.Add(tx.Key(), peer) } - }(uint16(i%2)) + }(uint16(i % 2)) } time.Sleep(time.Millisecond) for i := 0; i < concurrency; i++ { @@ -138,18 +138,18 @@ func TestSeenTxSetConcurrency(t *testing.T) { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) seenSet.Has(tx.Key(), peer) } - }(uint16(i%2)) + }(uint16(i % 2)) } time.Sleep(time.Millisecond) for i := 0; i < concurrency; i++ { wg.Add(1) go func(peer uint16) { defer wg.Done() - for i := numTx-1; i >= 0; i-- { + for i := numTx - 1; i >= 0; i-- { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) seenSet.RemoveKey(tx.Key()) } - }(uint16(i%2)) + }(uint16(i % 2)) } wg.Wait() } @@ -175,7 +175,7 @@ func TestLRUTxCacheConcurrency(t *testing.T) { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) cache.Has(tx.Key()) } - for i := numTx-1; i >= 0; i-- { + for i := numTx - 1; i >= 0; i-- { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) cache.Remove(tx.Key()) } diff --git a/mempool/cat/peers.go b/mempool/cat/peers.go index e579c9e899..86a6f4c010 100644 --- a/mempool/cat/peers.go +++ b/mempool/cat/peers.go @@ -8,13 +8,13 @@ import ( "github.com/tendermint/tendermint/p2p" ) -const firstPeerID = 1 +const firstPeerID = mempool.UnknownPeerID + 1 -// mempoolIDs is a thread-safe map of peer IDs to short IDs used for tracking what peers have sent what -// NOTE: taken from mempool/v1/reactor.go +// mempoolIDs is a thread-safe map of peer IDs to shorter uint16 IDs used by the Reactor for tracking peer +// messages and peer state such as what transactions peers have seen type mempoolIDs struct { mtx tmsync.RWMutex - peerMap map[p2p.ID]uint16 + peerMap map[p2p.ID]uint16 // quick lookup table for peer ID to short ID nextID uint16 // assumes that a node will never have over 65536 active peers activeIDs map[uint16]p2p.Peer // used to check if a given peerID key is used, the value doesn't matter } From 30ae48e0928f8625a31047d80d5de10a4c3f7589 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 26 Jan 2023 11:30:07 +0100 Subject: [PATCH 08/12] apply suggestions --- mempool/cat/pool.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index 40e533093d..7a5ab8272a 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -18,8 +18,13 @@ import ( "github.com/tendermint/tendermint/types" ) +// enforce compile-time satisfaction of the Mempool interface var _ mempool.Mempool = (*TxPool)(nil) +// Amount of evicted txs to cache. This is low enough so that in patches +// of high tx throughput, the txpool will be able to quickly recover txs +// that evicted but not large enough that continual excessive tx load won't +// take up needless memory. const evictedTxCacheSize = 200 var ( @@ -367,7 +372,7 @@ func (txmp *TxPool) removeTxByKey(txKey types.TxKey) { txmp.metrics.EvictedTxs.Add(1) txmp.broadcastMtx.Lock() defer txmp.broadcastMtx.Unlock() - txmp.txsToBeBroadcast = make(map[types.TxKey]struct{}) + delete(txmp.txsToBeBroadcast, txKey) } // Flush purges the contents of the mempool and the cache, leaving both empty. @@ -383,6 +388,11 @@ func (txmp *TxPool) Flush() { txmp.metrics.EvictedTxs.Add(float64(size)) txmp.broadcastMtx.Lock() defer txmp.broadcastMtx.Unlock() + // drain the broadcast channel + select { + case _ = <-txmp.broadcastCh: + default: + } txmp.txsToBeBroadcast = make(map[types.TxKey]struct{}) } @@ -564,14 +574,14 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC }) // Evict as many of the victims as necessary to make room. - var evictedBytes int64 + availableBytes := txmp.availableBytes() for _, tx := range victims { txmp.evictTx(tx) // We may not need to evict all the eligible transactions. Bail out // early if we have made enough room. - evictedBytes += tx.size() - if evictedBytes >= wtx.size() { + availableBytes += tx.size() + if availableBytes >= wtx.size() { break } } @@ -683,6 +693,11 @@ func (txmp *TxPool) recheckTransactions() { }() } +// availableBytes returns the number of bytes available in the mempool. +func (txmp *TxPool) availableBytes() int64 { + return txmp.config.MaxTxsBytes - txmp.SizeBytes() +} + // canAddTx returns an error if we cannot insert the provided *wrappedTx into // the mempool due to mempool configured constraints. Otherwise, nil is // returned and the transaction can be inserted into the mempool. From a7e402d5cfc84300bc7e2f056b573cdfb72cfdc2 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 31 Jan 2023 11:53:40 +0100 Subject: [PATCH 09/12] update tests --- mempool/cat/pool_test.go | 5 +++-- mempool/v1/mempool_test.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mempool/cat/pool_test.go b/mempool/cat/pool_test.go index d108f76780..8bdec39edb 100644 --- a/mempool/cat/pool_test.go +++ b/mempool/cat/pool_test.go @@ -276,7 +276,7 @@ func TestTxPool_Eviction(t *testing.T) { // Now the lowest-priority tx is 5, so that should be the next to go. mustCheckTx(t, txmp, "key9=0008=9") require.True(t, txExists("key9=0008=9")) - require.False(t, txExists("k3y2=0001=5")) + require.False(t, txExists("key2=0001=5")) // Add a transaction that requires eviction of multiple lower-priority // entries, in order to fit the size of the element. @@ -366,7 +366,8 @@ func TestTxPool_ReapMaxBytesMaxGas(t *testing.T) { ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) - require.GreaterOrEqual(t, len(reapedTxs), 16) + // each tx is 57 bytes, 20 * 57 = 1140 + overhead for proto encoding + require.Equal(t, len(reapedTxs), 20) // Reap by both transaction bytes and gas, where the size yields 31 reaped // transactions and the gas limit reaps 25 transactions. diff --git a/mempool/v1/mempool_test.go b/mempool/v1/mempool_test.go index c93e091a6e..e88979585c 100644 --- a/mempool/v1/mempool_test.go +++ b/mempool/v1/mempool_test.go @@ -266,7 +266,7 @@ func TestTxMempool_Eviction(t *testing.T) { // Now the lowest-priority tx is 5, so that should be the next to go. mustCheckTx(t, txmp, "key9=0008=9") require.True(t, txExists("key9=0008=9")) - require.False(t, txExists("k3y2=0001=5")) + require.False(t, txExists("key2=0001=5")) // Add a transaction that requires eviction of multiple lower-priority // entries, in order to fit the size of the element. From d07ee429432757286bd4f6230bac11e0f477c3fc Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 31 Jan 2023 12:08:28 +0100 Subject: [PATCH 10/12] add TestRemoveBlobTx --- mempool/cat/pool.go | 4 ++-- mempool/cat/pool_test.go | 49 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index 7a5ab8272a..181fcede50 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -332,7 +332,7 @@ func (txmp *TxPool) TryAddNewTx(tx types.Tx, key types.TxKey, txInfo mempool.TxI if rsp.Code != abci.CodeTypeOK { txmp.rejectedTxCache.Push(key) txmp.metrics.FailedTxs.Add(1) - return rsp, fmt.Errorf("application rejected transaction with code %d and log %s", rsp.Code, rsp.Log) + return rsp, fmt.Errorf("application rejected transaction with code %d (Log: %s)", rsp.Code, rsp.Log) } // Create wrapped tx @@ -695,7 +695,7 @@ func (txmp *TxPool) recheckTransactions() { // availableBytes returns the number of bytes available in the mempool. func (txmp *TxPool) availableBytes() int64 { - return txmp.config.MaxTxsBytes - txmp.SizeBytes() + return txmp.config.MaxTxsBytes - txmp.SizeBytes() } // canAddTx returns an error if we cannot insert the provided *wrappedTx into diff --git a/mempool/cat/pool_test.go b/mempool/cat/pool_test.go index 8bdec39edb..c2964788c6 100644 --- a/mempool/cat/pool_test.go +++ b/mempool/cat/pool_test.go @@ -20,6 +20,7 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/mempool" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -639,6 +640,54 @@ func TestTxPool_CheckTxPostCheckError(t *testing.T) { } } +func TestRemoveBlobTx(t *testing.T) { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + + cfg := config.TestMempoolConfig() + cfg.CacheSize = 100 + + appConnMem, err := cc.NewABCIClient() + require.NoError(t, err) + require.NoError(t, appConnMem.Start()) + + t.Cleanup(func() { + os.RemoveAll(cfg.RootDir) + require.NoError(t, appConnMem.Stop()) + }) + + txmp := NewTxPool(log.TestingLogger(), cfg, appConnMem, 1) + + originalTx := []byte{1, 2, 3, 4} + indexWrapper, err := types.MarshalIndexWrapper(originalTx, 100) + require.NoError(t, err) + + // create the blobTx + b := tmproto.Blob{ + NamespaceId: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + Data: []byte{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + ShareVersion: 0, + } + bTx, err := types.MarshalBlobTx(originalTx, &b) + require.NoError(t, err) + + err = txmp.CheckTx(bTx, nil, mempool.TxInfo{}) + require.NoError(t, err) + + err = txmp.Update(1, []types.Tx{indexWrapper}, abciResponses(1, abci.CodeTypeOK), nil, nil) + require.NoError(t, err) + require.EqualValues(t, 0, txmp.Size()) + require.EqualValues(t, 0, txmp.SizeBytes()) +} + +func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx { + responses := make([]*abci.ResponseDeliverTx, 0, n) + for i := 0; i < n; i++ { + responses = append(responses, &abci.ResponseDeliverTx{Code: code}) + } + return responses +} + func TestConcurrentlyAddingTx(t *testing.T) { txmp := setup(t, 500) tx := types.Tx("sender=0000=1") From f09710ab30b3dfd635dd9e1af1209da1b260043a Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 31 Jan 2023 16:48:20 +0100 Subject: [PATCH 11/12] add a comment about the ordering of transactions. --- mempool/cat/pool.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index 181fcede50..99ecb29d5b 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -39,11 +39,19 @@ type TxPoolOption func(*TxPool) // set priority values on transactions in the CheckTx response. When selecting // transactions to include in a block, higher-priority transactions are chosen // first. When evicting transactions from the mempool for size constraints, -// lower-priority transactions are evicted sooner. +// lower-priority transactions are evicted first. Transactions themselves are +// unordered (A map is used). They can be broadcast in an order different from +// the order to which transactions are entered. There is no guarantee when CheckTx +// passes that a transaction has been successfully broadcast to any of its peers. // -// Within the txpool, transactions are ordered by time of arrival, and are -// gossiped to the rest of the network based on that order (gossip order does -// not take priority into account). +// A TTL can be set to remove transactions after a period of time or a number +// of heights. +// +// A cache of rejectedTxs can be set in the mempool config. Transactions that +// are rejected because of `CheckTx` or other validity checks will be instantly +// rejected if they are seen again. Committed transactions are also added to +// this cache. This serves somewhat as replay protection but applications should +// implement something more comprehensive type TxPool struct { // Immutable fields logger log.Logger @@ -127,12 +135,10 @@ func WithMetrics(metrics *mempool.Metrics) TxPoolOption { } // Lock is a noop as ABCI calls are serialized -func (txmp *TxPool) Lock() { -} +func (txmp *TxPool) Lock() {} // Unlock is a noop as ABCI calls are serialized -func (txmp *TxPool) Unlock() { -} +func (txmp *TxPool) Unlock() {} // Size returns the number of valid transactions in the mempool. It is // thread-safe. From ff576464057a2d4a73baf2283efa01718263da61 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Sat, 4 Feb 2023 06:59:20 +1300 Subject: [PATCH 12/12] remove evicted cache and order broadcast in FIFO --- mempool/cat/cache.go | 86 ---------------------------- mempool/cat/cache_test.go | 46 ++------------- mempool/cat/pool.go | 114 +++++++++++++------------------------- mempool/cat/pool_test.go | 42 ++++++++++---- 4 files changed, 78 insertions(+), 210 deletions(-) diff --git a/mempool/cat/cache.go b/mempool/cat/cache.go index 2584452d8d..9fb7813679 100644 --- a/mempool/cat/cache.go +++ b/mempool/cat/cache.go @@ -89,92 +89,6 @@ func (c *LRUTxCache) Has(txKey types.TxKey) bool { return ok } -type EvictedTxInfo struct { - timeEvicted time.Time - priority int64 - gasWanted int64 - sender string - size int64 -} - -type EvictedTxCache struct { - staticSize int - - mtx tmsync.Mutex - cache map[types.TxKey]*EvictedTxInfo -} - -func NewEvictedTxCache(size int) *EvictedTxCache { - return &EvictedTxCache{ - staticSize: size, - cache: make(map[types.TxKey]*EvictedTxInfo), - } -} - -func (c *EvictedTxCache) Has(txKey types.TxKey) bool { - c.mtx.Lock() - defer c.mtx.Unlock() - _, exists := c.cache[txKey] - return exists -} - -func (c *EvictedTxCache) Get(txKey types.TxKey) *EvictedTxInfo { - c.mtx.Lock() - defer c.mtx.Unlock() - return c.cache[txKey] -} - -func (c *EvictedTxCache) Push(wtx *wrappedTx) { - c.mtx.Lock() - defer c.mtx.Unlock() - c.cache[wtx.key] = &EvictedTxInfo{ - timeEvicted: time.Now().UTC(), - priority: wtx.priority, - gasWanted: wtx.gasWanted, - sender: wtx.sender, - size: wtx.size(), - } - // if cache too large, remove the oldest entry - if len(c.cache) > c.staticSize { - oldestTxKey := wtx.key - oldestTxTime := time.Now().UTC() - for key, info := range c.cache { - if info.timeEvicted.Before(oldestTxTime) { - oldestTxTime = info.timeEvicted - oldestTxKey = key - } - } - delete(c.cache, oldestTxKey) - } -} - -func (c *EvictedTxCache) Pop(txKey types.TxKey) *EvictedTxInfo { - c.mtx.Lock() - defer c.mtx.Unlock() - info, exists := c.cache[txKey] - if !exists { - return nil - } - delete(c.cache, txKey) - return info -} - -func (c *EvictedTxCache) Prune(limit time.Time) { - c.mtx.Lock() - defer c.mtx.Unlock() - for key, info := range c.cache { - if info.timeEvicted.Before(limit) { - delete(c.cache, key) - } - } -} - -func (c *EvictedTxCache) Reset() { - c.mtx.Lock() - defer c.mtx.Unlock() - c.cache = make(map[types.TxKey]*EvictedTxInfo) -} - // SeenTxSet records transactions that have been // seen by other peers but not yet by us type SeenTxSet struct { diff --git a/mempool/cat/cache_test.go b/mempool/cat/cache_test.go index dc650ef611..8800ee8dce 100644 --- a/mempool/cat/cache_test.go +++ b/mempool/cat/cache_test.go @@ -76,40 +76,6 @@ func TestLRUTxCacheSize(t *testing.T) { } } -func TestEvictedTxCache(t *testing.T) { - var ( - tx1 = types.Tx("tx1") - tx2 = types.Tx("tx2") - tx3 = types.Tx("tx3") - wtx1 = newWrappedTx( - tx1, tx1.Key(), 10, 1, 5, "", - ) - wtx2 = newWrappedTx( - tx2, tx2.Key(), 10, 1, 5, "", - ) - wtx3 = newWrappedTx( - tx3, tx3.Key(), 10, 1, 5, "", - ) - ) - - cache := NewEvictedTxCache(2) - require.False(t, cache.Has(tx1.Key())) - require.Nil(t, cache.Pop(tx1.Key())) - cache.Push(wtx1) - require.True(t, cache.Has(tx1.Key())) - require.NotNil(t, cache.Pop(tx1.Key())) - cache.Push(wtx1) - time.Sleep(1 * time.Millisecond) - cache.Push(wtx2) - time.Sleep(1 * time.Millisecond) - cache.Push(wtx3) - // cache should have reached limit and thus evicted the oldest tx - require.False(t, cache.Has(tx1.Key())) - cache.Prune(time.Now().UTC().Add(1 * time.Second)) - require.False(t, cache.Has(tx2.Key())) - require.False(t, cache.Has(tx3.Key())) -} - func TestSeenTxSetConcurrency(t *testing.T) { seenSet := NewSeenTxSet() @@ -127,7 +93,7 @@ func TestSeenTxSetConcurrency(t *testing.T) { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) seenSet.Add(tx.Key(), peer) } - }(uint16(i%2)) + }(uint16(i % 2)) } time.Sleep(time.Millisecond) for i := 0; i < concurrency; i++ { @@ -138,18 +104,18 @@ func TestSeenTxSetConcurrency(t *testing.T) { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) seenSet.Has(tx.Key(), peer) } - }(uint16(i%2)) + }(uint16(i % 2)) } time.Sleep(time.Millisecond) for i := 0; i < concurrency; i++ { wg.Add(1) go func(peer uint16) { defer wg.Done() - for i := numTx-1; i >= 0; i-- { + for i := numTx - 1; i >= 0; i-- { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) seenSet.RemoveKey(tx.Key()) } - }(uint16(i%2)) + }(uint16(i % 2)) } wg.Wait() } @@ -175,11 +141,11 @@ func TestLRUTxCacheConcurrency(t *testing.T) { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) cache.Has(tx.Key()) } - for i := numTx-1; i >= 0; i-- { + for i := numTx - 1; i >= 0; i-- { tx := types.Tx([]byte(fmt.Sprintf("tx%d", i))) cache.Remove(tx.Key()) } }() } wg.Wait() -} \ No newline at end of file +} diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index 40e533093d..c0b8ea34b6 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -46,6 +46,7 @@ type TxPool struct { proxyAppConn proxy.AppConnMempool metrics *mempool.Metrics + // these values are modified once per height updateMtx sync.Mutex notifiedTxsAvailable bool txsAvailable chan struct{} // one value sent per height when mempool is not empty @@ -55,8 +56,6 @@ type TxPool struct { // Thread-safe cache of rejected transactions for quick look-up rejectedTxCache *LRUTxCache - // Thread-safe cache of valid txs that were evicted - evictedTxs *EvictedTxCache // Thread-safe list of transactions peers have seen that we have not yet seen seenByPeersSet *SeenTxSet @@ -64,10 +63,10 @@ type TxPool struct { store *store // broadcastCh is an unbuffered channel of new transactions that need to - // be broadcasted to peers. Only populated if `broadcast` is enabled - broadcastCh chan types.TxKey + // be broadcasted to peers. Only populated if `broadcast` in the config is enabled + broadcastCh chan types.Tx broadcastMtx sync.Mutex - txsToBeBroadcast map[types.TxKey]struct{} + txsToBeBroadcast []types.TxKey } // NewTxPool constructs a new, empty content addressable txpool at the specified @@ -85,14 +84,13 @@ func NewTxPool( proxyAppConn: proxyAppConn, metrics: mempool.NopMetrics(), rejectedTxCache: NewLRUTxCache(cfg.CacheSize), - evictedTxs: NewEvictedTxCache(evictedTxCacheSize), seenByPeersSet: NewSeenTxSet(), height: height, preCheckFn: func(_ types.Tx) error { return nil }, postCheckFn: func(_ types.Tx, _ *abci.ResponseCheckTx) error { return nil }, store: newStore(), - broadcastCh: make(chan types.TxKey, 1), - txsToBeBroadcast: make(map[types.TxKey]struct{}), + broadcastCh: make(chan types.Tx), + txsToBeBroadcast: make([]types.TxKey, 0), } for _, opt := range options { @@ -133,7 +131,7 @@ func (txmp *TxPool) Unlock() { // thread-safe. func (txmp *TxPool) Size() int { return txmp.store.size() } -// SizeBytes return the total sum in bytes of all the valid transactions in the +// SizeBytes returns the total sum in bytes of all the valid transactions in the // mempool. It is thread-safe. func (txmp *TxPool) SizeBytes() int64 { return txmp.store.totalBytes() } @@ -155,16 +153,20 @@ func (txmp *TxPool) EnableTxsAvailable() { // when transactions are available in the mempool. It is thread-safe. func (txmp *TxPool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } +// Height returns the latest height that the mempool is at func (txmp *TxPool) Height() int64 { txmp.updateMtx.Lock() defer txmp.updateMtx.Unlock() return txmp.height } +// Has returns true if the transaction is currently in the mempool func (txmp *TxPool) Has(txKey types.TxKey) bool { return txmp.store.has(txKey) } +// Get retrieves a transaction based on the key. It returns a bool +// if the transaction exists or not func (txmp *TxPool) Get(txKey types.TxKey) (types.Tx, bool) { wtx := txmp.store.get(txKey) if wtx != nil { @@ -173,44 +175,12 @@ func (txmp *TxPool) Get(txKey types.TxKey) (types.Tx, bool) { return types.Tx{}, false } +// IsRejectedTx returns true if the transaction was recently rejected and is +// currently within the cache func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool { return txmp.rejectedTxCache.Has(txKey) } -func (txmp *TxPool) WasRecentlyEvicted(txKey types.TxKey) bool { - return txmp.evictedTxs.Has(txKey) -} - -func (txmp *TxPool) CanFitEvictedTx(txKey types.TxKey) bool { - info := txmp.evictedTxs.Get(txKey) - if info == nil { - return false - } - return txmp.canAddTx(info.size) -} - -// TryReinsertEvictedTx attempts to reinsert an evicted tx into the mempool. -func (txmp *TxPool) TryReinsertEvictedTx(txKey types.TxKey, tx types.Tx, peer uint16) (*abci.ResponseCheckTx, error) { - info := txmp.evictedTxs.Pop(txKey) - if info == nil { - return nil, fmt.Errorf("evicted tx %v no longer in cache. Please try again", txKey) - } - txmp.logger.Debug("attempting to reinsert evicted tx", "txKey", fmt.Sprintf("%X", txKey)) - wtx := newWrappedTx( - tx, txKey, txmp.Height(), info.gasWanted, info.priority, info.sender, - ) - checkTxResp := &abci.ResponseCheckTx{ - Code: abci.CodeTypeOK, - Priority: info.priority, - Sender: info.sender, - GasWanted: info.gasWanted, - } - if err := txmp.addNewTransaction(wtx, checkTxResp); err != nil { - return nil, err - } - return checkTxResp, nil -} - // CheckTx adds the given transaction to the mempool if it fits and passes the // application's ABCI CheckTx method. This should be viewed as the entry method for new transactions // into the network. In practice this happens via an RPC endpoint @@ -235,40 +205,45 @@ func (txmp *TxPool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo mempool }() // push to the broadcast queue that a new transaction is ready - txmp.markToBeBroadcast(key) + txmp.markToBeBroadcast(key, tx) return nil } // next is used by the reactor to get the next transaction to broadcast // to all other peers. -func (txmp *TxPool) next() <-chan types.TxKey { +func (txmp *TxPool) next() <-chan types.Tx { txmp.broadcastMtx.Lock() defer txmp.broadcastMtx.Unlock() - if len(txmp.txsToBeBroadcast) != 0 { - ch := make(chan types.TxKey, 1) - for key := range txmp.txsToBeBroadcast { - delete(txmp.txsToBeBroadcast, key) - ch <- key - return ch + for len(txmp.txsToBeBroadcast) != 0 { + ch := make(chan types.Tx, 1) + key := txmp.txsToBeBroadcast[0] + txmp.txsToBeBroadcast = txmp.txsToBeBroadcast[1:] + tx, exists := txmp.Get(key) + if !exists { + + continue } + ch <- tx + return ch } + return txmp.broadcastCh } // markToBeBroadcast marks a transaction to be broadcasted to peers. // This should never block so we use a map to create an unbounded queue // of transactions that need to be gossiped. -func (txmp *TxPool) markToBeBroadcast(key types.TxKey) { +func (txmp *TxPool) markToBeBroadcast(key types.TxKey, tx types.Tx) { if !txmp.config.Broadcast { return } select { - case txmp.broadcastCh <- key: + case txmp.broadcastCh <- tx: default: txmp.broadcastMtx.Lock() defer txmp.broadcastMtx.Unlock() - txmp.txsToBeBroadcast[key] = struct{}{} + txmp.txsToBeBroadcast = append(txmp.txsToBeBroadcast, key) } } @@ -288,12 +263,6 @@ func (txmp *TxPool) TryAddNewTx(tx types.Tx, key types.TxKey, txInfo mempool.TxI return nil, ErrTxAlreadyRejected } - if txmp.WasRecentlyEvicted(key) { - // the transaction was recently evicted. If true, we attempt to re-add it to the mempool - // skipping check tx. - return txmp.TryReinsertEvictedTx(key, tx, txInfo.SenderID) - } - if txmp.Has(key) { txmp.metrics.AlreadySeenTxs.Add(1) // The peer has sent us a transaction that we have already seen @@ -325,9 +294,11 @@ func (txmp *TxPool) TryAddNewTx(tx types.Tx, key types.TxKey, txInfo mempool.TxI return rsp, err } if rsp.Code != abci.CodeTypeOK { - txmp.rejectedTxCache.Push(key) + if txmp.config.KeepInvalidTxsInCache { + txmp.rejectedTxCache.Push(key) + } txmp.metrics.FailedTxs.Add(1) - return rsp, fmt.Errorf("application rejected transaction with code %d and log %s", rsp.Code, rsp.Log) + return rsp, fmt.Errorf("application rejected transaction with code %d (Log: %s)", rsp.Code, rsp.Log) } // Create wrapped tx @@ -338,7 +309,9 @@ func (txmp *TxPool) TryAddNewTx(tx types.Tx, key types.TxKey, txInfo mempool.TxI // Perform the post check err = txmp.postCheck(wtx.tx, rsp) if err != nil { - txmp.rejectedTxCache.Push(key) + if txmp.config.KeepInvalidTxsInCache { + txmp.rejectedTxCache.Push(key) + } txmp.metrics.FailedTxs.Add(1) return rsp, fmt.Errorf("rejected bad transaction after post check: %w", err) } @@ -361,13 +334,8 @@ func (txmp *TxPool) RemoveTxByKey(txKey types.TxKey) error { func (txmp *TxPool) removeTxByKey(txKey types.TxKey) { txmp.rejectedTxCache.Push(txKey) - _ = txmp.evictedTxs.Pop(txKey) _ = txmp.store.remove(txKey) txmp.seenByPeersSet.RemoveKey(txKey) - txmp.metrics.EvictedTxs.Add(1) - txmp.broadcastMtx.Lock() - defer txmp.broadcastMtx.Unlock() - txmp.txsToBeBroadcast = make(map[types.TxKey]struct{}) } // Flush purges the contents of the mempool and the cache, leaving both empty. @@ -378,12 +346,11 @@ func (txmp *TxPool) Flush() { size := txmp.Size() txmp.store.reset() txmp.seenByPeersSet.Reset() - txmp.evictedTxs.Reset() txmp.rejectedTxCache.Reset() txmp.metrics.EvictedTxs.Add(float64(size)) txmp.broadcastMtx.Lock() defer txmp.broadcastMtx.Unlock() - txmp.txsToBeBroadcast = make(map[types.TxKey]struct{}) + txmp.txsToBeBroadcast = make([]types.TxKey, 0) } // PeerHasTx marks that the transaction has been seen by a peer. @@ -540,7 +507,6 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC // drop the new one. if len(victims) == 0 || victimBytes < wtx.size() { txmp.metrics.EvictedTxs.Add(1) - txmp.evictedTxs.Push(wtx) checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)", wtx.key) return fmt.Errorf("rejected valid incoming transaction; mempool is full (%X). Size: (%d:%d)", @@ -595,7 +561,6 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC func (txmp *TxPool) evictTx(wtx *wrappedTx) { txmp.store.remove(wtx.key) txmp.metrics.EvictedTxs.Add(1) - txmp.evictedTxs.Push(wtx) txmp.logger.Debug( "evicted valid existing transaction; mempool full", "old_tx", fmt.Sprintf("%X", wtx.key), @@ -629,7 +594,9 @@ func (txmp *TxPool) handleRecheckResult(wtx *wrappedTx, checkTxRes *abci.Respons "code", checkTxRes.Code, ) txmp.store.remove(wtx.key) - txmp.rejectedTxCache.Push(wtx.key) + if txmp.config.KeepInvalidTxsInCache { + txmp.rejectedTxCache.Push(wtx.key) + } txmp.metrics.FailedTxs.Add(1) txmp.metrics.Size.Set(float64(txmp.Size())) } @@ -726,7 +693,6 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) { // ensure that evictedTxs and seenByPeersSet are eventually pruned expirationAge = now.Add(-time.Hour) } - txmp.evictedTxs.Prune(expirationAge) txmp.seenByPeersSet.Prune(expirationAge) } diff --git a/mempool/cat/pool_test.go b/mempool/cat/pool_test.go index d108f76780..06c66c25dd 100644 --- a/mempool/cat/pool_test.go +++ b/mempool/cat/pool_test.go @@ -2,6 +2,7 @@ package cat import ( "bytes" + "context" "errors" "fmt" "math/rand" @@ -217,11 +218,7 @@ func TestTxPool_Eviction(t *testing.T) { txmp.config.Size = 5 txmp.config.MaxTxsBytes = 60 txExists := func(spec string) bool { - return txmp.store.has(types.Tx(spec).Key()) - } - - txEvicted := func(spec string) bool { - return txmp.evictedTxs.Has(types.Tx(spec).Key()) + return txmp.Has(types.Tx(spec).Key()) } // A transaction bigger than the mempool should be rejected even when there @@ -244,7 +241,6 @@ func TestTxPool_Eviction(t *testing.T) { mustCheckTx(t, txmp, "key1=0000=25") require.True(t, txExists("key1=0000=25")) require.False(t, txExists(bigTx)) - require.True(t, txEvicted(bigTx)) require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) // Now fill up the rest of the slots with other transactions. @@ -258,8 +254,6 @@ func TestTxPool_Eviction(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "mempool is full") require.False(t, txExists("key6=0005=1")) - // transactions instantly evicted should still be cached - require.True(t, txEvicted("key6=0005=1")) // A new transaction with higher priority should evict key5, which is the // newest of the two transactions with lowest priority. @@ -299,7 +293,6 @@ func TestTxPool_Eviction(t *testing.T) { // space for the previously evicted tx require.NoError(t, txmp.RemoveTxByKey(types.Tx("key8=0007=20").Key())) require.False(t, txExists("key8=0007=20")) - require.True(t, txmp.CanFitEvictedTx(types.Tx("key9=0008=9").Key())) } func TestTxPool_Flush(t *testing.T) { @@ -638,7 +631,7 @@ func TestTxPool_CheckTxPostCheckError(t *testing.T) { } } -func TestConcurrentlyAddingTx(t *testing.T) { +func TestTxPool_ConcurrentlyAddingTx(t *testing.T) { txmp := setup(t, 500) tx := types.Tx("sender=0000=1") @@ -667,3 +660,32 @@ func TestConcurrentlyAddingTx(t *testing.T) { } require.Equal(t, numTxs-1, errCount) } + +func TestTxPool_BroadcastQueue(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + txmp := setup(t, 1) + txs := 10 + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < txs; i++ { + select { + case <-ctx.Done(): + t.Fatalf("failed to receive all txs (got %d/%d)", i+1, txs) + case tx := <-txmp.next(): + require.Equal(t, tx, newDefaultTx(fmt.Sprintf("%d", i))) + } + time.Sleep(10 * time.Millisecond) + } + }() + + for i := 0; i < txs; i++ { + tx := newDefaultTx(fmt.Sprintf("%d", i)) + txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0}) + } + + wg.Wait() +}