From c3271c22721466be130e7afa49893506dc9a32ee Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Wed, 6 Sep 2023 03:10:59 -0400 Subject: [PATCH] nit --- go.mod | 2 +- go.sum | 2 ++ peer/network_test.go | 2 +- plugin/evm/gossip_mempool.go | 44 +++++++++----------------- plugin/evm/gossip_mempool_test.go | 52 ++++++++++++++++++------------- plugin/evm/mempool.go | 33 ++++++++++---------- plugin/evm/tx_gossip_test.go | 31 +++++++++--------- plugin/evm/vm.go | 1 + 8 files changed, 82 insertions(+), 85 deletions(-) diff --git a/go.mod b/go.mod index 10ced6c0fd..97cc1e1440 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/VictoriaMetrics/fastcache v1.10.0 - github.com/ava-labs/avalanchego v1.10.10-rc.0 + github.com/ava-labs/avalanchego v1.10.10-rc.1 github.com/cespare/cp v0.1.0 github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 github.com/davecgh/go-spew v1.1.1 diff --git a/go.sum b/go.sum index e83a87d369..fab801edea 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/ava-labs/avalanchego v1.10.9-rc.4 h1:vtavPfRiF6r1Zc6RV8/arEfVpe9GQsLW github.com/ava-labs/avalanchego v1.10.9-rc.4/go.mod h1:vTBLl1zK36olfLRA7IUfdbvphWqlkuarIoXxvZTHZVw= github.com/ava-labs/avalanchego v1.10.10-rc.0 h1:6VjkpwhAJ0tDNJK+UIUD8WIb5VelgH3w61mgk7JAkDQ= github.com/ava-labs/avalanchego v1.10.10-rc.0/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw= +github.com/ava-labs/avalanchego v1.10.10-rc.1 h1:dPJISEWqL3tdUShe6RuB8CFuXl3rsH8617sXbLBjkIE= +github.com/ava-labs/avalanchego v1.10.10-rc.1/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/peer/network_test.go b/peer/network_test.go index add57616de..c130bd5136 100644 --- a/peer/network_test.go +++ b/peer/network_test.go @@ -665,7 +665,7 @@ func TestSDKRouting(t *testing.T) { protocol := 0 handler := &testSDKHandler{} router := p2p.NewRouter(logging.NoLog{}, sender) - _, err := router.RegisterAppProtocol(uint64(protocol), handler) + _, err := router.RegisterAppProtocol(uint64(protocol), handler, &p2p.Peers{}) require.NoError(err) networkCodec := codec.NewManager(0) diff --git a/plugin/evm/gossip_mempool.go b/plugin/evm/gossip_mempool.go index c14e67f67f..d6a1cba044 100644 --- a/plugin/evm/gossip_mempool.go +++ b/plugin/evm/gossip_mempool.go @@ -7,6 +7,7 @@ import ( "fmt" "sync" + "github.com/ava-labs/avalanchego/ids" "github.com/ethereum/go-ethereum/log" "github.com/ava-labs/avalanchego/network/p2p/gossip" @@ -27,12 +28,8 @@ type GossipAtomicTx struct { Tx *Tx `serialize:"true"` } -func (tx *GossipAtomicTx) GetHash() gossip.Hash { - id := tx.Tx.ID() - hash := gossip.Hash{} - copy(hash[:], id[:]) - - return hash +func (tx *GossipAtomicTx) GetID() ids.ID { + return tx.Tx.ID() } func (tx *GossipAtomicTx) Marshal() ([]byte, error) { @@ -79,7 +76,13 @@ func (g *GossipEthTxPool) Subscribe(shutdownChan chan struct{}, shutdownWg *sync for _, pendingTx := range pendingTxs.Txs { tx := &GossipEthTx{Tx: pendingTx} g.bloom.Add(tx) - if gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipBloomMaxFilledRatio) { + reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipBloomMaxFilledRatio) + if err != nil { + log.Error("failed to reset bloom filter", "err", err) + continue + } + + if reset { log.Debug("resetting bloom filter", "reason", "reached max filled ratio") pending := g.mempool.Pending(false) @@ -105,25 +108,10 @@ func (g *GossipEthTxPool) Add(tx *GossipEthTx) error { return nil } -func (g *GossipEthTxPool) Get(filter func(tx *GossipEthTx) bool) []*GossipEthTx { - limit := 1000 - resultSize := 0 - result := make([]*GossipEthTx, 0) - +func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) { g.mempool.IteratePending(func(tx *types.Transaction) bool { - resultSize += int(tx.Size()) - if resultSize > limit { - return false - } - - gossipTx := &GossipEthTx{ - Tx: tx, - } - result = append(result, gossipTx) - return true + return f(&GossipEthTx{Tx: tx}) }) - - return result } func (g *GossipEthTxPool) GetFilter() ([]byte, []byte, error) { @@ -138,12 +126,8 @@ type GossipEthTx struct { Tx *types.Transaction } -func (tx *GossipEthTx) GetHash() gossip.Hash { - txHash := tx.Tx.Hash() - hash := gossip.Hash{} - copy(hash[:], txHash[:]) - - return hash +func (tx *GossipEthTx) GetID() ids.ID { + return ids.ID(tx.Tx.Hash()) } func (tx *GossipEthTx) Marshal() ([]byte, error) { diff --git a/plugin/evm/gossip_mempool_test.go b/plugin/evm/gossip_mempool_test.go index f5dbb3a697..fcc15c74ff 100644 --- a/plugin/evm/gossip_mempool_test.go +++ b/plugin/evm/gossip_mempool_test.go @@ -11,7 +11,7 @@ import ( "github.com/ava-labs/avalanchego/ids" ) -func TestAtomicMempoolAddTx(t *testing.T) { +func TestAtomicMempoolIterate(t *testing.T) { txs := []*GossipAtomicTx{ { Tx: &Tx{ @@ -30,44 +30,43 @@ func TestAtomicMempoolAddTx(t *testing.T) { } tests := []struct { - name string - add []*GossipAtomicTx - filter func(tx *GossipAtomicTx) bool - expected []*GossipAtomicTx + name string + add []*GossipAtomicTx + f func(tx *GossipAtomicTx) bool + possibleValues []*GossipAtomicTx + expectedLen int }{ { - name: "empty", - }, - { - name: "filter matches nothing", + name: "func matches nothing", add: txs, - filter: func(*GossipAtomicTx) bool { + f: func(*GossipAtomicTx) bool { return false }, - expected: nil, + possibleValues: nil, }, { - name: "filter matches all", + name: "func matches all", add: txs, - filter: func(*GossipAtomicTx) bool { + f: func(*GossipAtomicTx) bool { return true }, - expected: txs, + possibleValues: txs, + expectedLen: 2, }, { - name: "filter matches subset", + name: "func matches subset", add: txs, - filter: func(tx *GossipAtomicTx) bool { + f: func(tx *GossipAtomicTx) bool { return tx.Tx == txs[0].Tx }, - expected: txs[:1], + possibleValues: txs, + expectedLen: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { require := require.New(t) - m, err := NewMempool(ids.Empty, 10) require.NoError(err) @@ -75,12 +74,21 @@ func TestAtomicMempoolAddTx(t *testing.T) { require.NoError(m.Add(add)) } - txs := m.Get(tt.filter) - require.Len(txs, len(tt.expected)) + matches := make([]*GossipAtomicTx, 0) + f := func(tx *GossipAtomicTx) bool { + match := tt.f(tx) - for _, expected := range tt.expected { - require.Contains(txs, expected) + if match { + matches = append(matches, tx) + } + + return match } + + m.Iterate(f) + + require.Len(matches, tt.expectedLen) + require.Subset(tt.possibleValues, matches) }) } } diff --git a/plugin/evm/mempool.go b/plugin/evm/mempool.go index 97e8ac5b2e..9d180c6956 100644 --- a/plugin/evm/mempool.go +++ b/plugin/evm/mempool.go @@ -274,16 +274,14 @@ func (m *Mempool) addTx(tx *Tx, force bool) error { for utxoID := range utxoSet { m.utxoSpenders[utxoID] = tx } - // When adding [tx] to the mempool make sure that there is an item in Pending - // to signal the VM to produce a block. Note: if the VM's buildStatus has already - // been set to something other than [dontBuild], this will be ignored and won't be - // reset until the engine calls BuildBlock. This case is handled in IssueCurrentTx - // and CancelCurrentTx. - m.newTxs = append(m.newTxs, tx) - m.addPending() m.bloom.Add(&GossipAtomicTx{Tx: tx}) - if gossip.ResetBloomFilterIfNeeded(m.bloom, txGossipBloomMaxFilledRatio) { + reset, err := gossip.ResetBloomFilterIfNeeded(m.bloom, txGossipBloomMaxFilledRatio) + if err != nil { + return err + } + + if reset { log.Debug("resetting bloom filter", "reason", "reached max filled ratio") for _, pendingTx := range m.txHeap.minHeap.items { @@ -291,23 +289,26 @@ func (m *Mempool) addTx(tx *Tx, force bool) error { } } + // When adding [tx] to the mempool make sure that there is an item in Pending + // to signal the VM to produce a block. Note: if the VM's buildStatus has already + // been set to something other than [dontBuild], this will be ignored and won't be + // reset until the engine calls BuildBlock. This case is handled in IssueCurrentTx + // and CancelCurrentTx. + m.newTxs = append(m.newTxs, tx) + m.addPending() + return nil } -func (m *Mempool) Get(filter func(tx *GossipAtomicTx) bool) []*GossipAtomicTx { +func (m *Mempool) Iterate(f func(tx *GossipAtomicTx) bool) { m.lock.RLock() defer m.lock.RUnlock() - gossipTxs := make([]*GossipAtomicTx, 0, len(m.txHeap.maxHeap.items)) for _, item := range m.txHeap.maxHeap.items { - gossipTx := &GossipAtomicTx{Tx: item.tx} - if !filter(gossipTx) { - continue + if !f(&GossipAtomicTx{Tx: item.tx}) { + return } - gossipTxs = append(gossipTxs, gossipTx) } - - return gossipTxs } func (m *Mempool) GetFilter() ([]byte, []byte, error) { diff --git a/plugin/evm/tx_gossip_test.go b/plugin/evm/tx_gossip_test.go index 70575d8e5c..1ad2857911 100644 --- a/plugin/evm/tx_gossip_test.go +++ b/plugin/evm/tx_gossip_test.go @@ -10,20 +10,21 @@ import ( "testing" "time" - "github.com/ava-labs/avalanchego/p2p/gossip" - "github.com/ava-labs/avalanchego/p2p/gossip/proto/pb" - "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - "google.golang.org/protobuf/proto" - "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" - commonEng "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" + "github.com/stretchr/testify/require" + + "go.uber.org/mock/gomock" + + "google.golang.org/protobuf/proto" "github.com/ava-labs/coreth/core" "github.com/ava-labs/coreth/core/types" @@ -60,7 +61,7 @@ func TestEthTxGossip(t *testing.T) { // sender for the peer requesting gossip from [vm] ctrl := gomock.NewController(t) - peerSender := commonEng.NewMockSender(ctrl) + peerSender := common.NewMockSender(ctrl) router := p2p.NewRouter(logging.NoLog{}, peerSender) // we're only making client requests, so we don't need a server handler @@ -71,7 +72,7 @@ func TestEthTxGossip(t *testing.T) { require.NoError(err) emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary() require.NoError(err) - request := &pb.PullGossipRequest{ + request := &sdk.PullGossipRequest{ Filter: emptyBloomFilterBytes, Salt: utils.RandomBytes(32), } @@ -110,7 +111,7 @@ func TestEthTxGossip(t *testing.T) { onResponse := func(nodeID ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &pb.PullGossipResponse{} + response := &sdk.PullGossipResponse{} require.NoError(proto.Unmarshal(responseBytes, response)) require.Empty(response.Gossip) wg.Done() @@ -137,7 +138,7 @@ func TestEthTxGossip(t *testing.T) { onResponse = func(nodeID ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &pb.PullGossipResponse{} + response := &sdk.PullGossipResponse{} require.NoError(proto.Unmarshal(responseBytes, response)) require.Len(response.Gossip, 1) @@ -166,7 +167,7 @@ func TestAtomicTxGossip(t *testing.T) { // sender for the peer requesting gossip from [vm] ctrl := gomock.NewController(t) - peerSender := commonEng.NewMockSender(ctrl) + peerSender := common.NewMockSender(ctrl) router := p2p.NewRouter(logging.NoLog{}, peerSender) // we're only making client requests, so we don't need a server handler @@ -177,7 +178,7 @@ func TestAtomicTxGossip(t *testing.T) { require.NoError(err) bloomBytes, err := emptyBloomFilter.Bloom.MarshalBinary() require.NoError(err) - request := &pb.PullGossipRequest{ + request := &sdk.PullGossipRequest{ Filter: bloomBytes, Salt: emptyBloomFilter.Salt[:], } @@ -214,7 +215,7 @@ func TestAtomicTxGossip(t *testing.T) { onResponse := func(nodeID ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &pb.PullGossipResponse{} + response := &sdk.PullGossipResponse{} require.NoError(proto.Unmarshal(responseBytes, response)) require.Empty(response.Gossip) wg.Done() @@ -237,7 +238,7 @@ func TestAtomicTxGossip(t *testing.T) { onResponse = func(nodeID ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &pb.PullGossipResponse{} + response := &sdk.PullGossipResponse{} require.NoError(proto.Unmarshal(responseBytes, response)) require.Len(response.Gossip, 1) diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index d2146d28b2..c4fdd6b905 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -18,6 +18,7 @@ import ( avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/coreth/consensus/dummy" corethConstants "github.com/ava-labs/coreth/constants"