Skip to content

Commit

Permalink
nit
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim committed Sep 6, 2023
1 parent b001238 commit c3271c2
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 85 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/VictoriaMetrics/fastcache v1.10.0
github.com/ava-labs/avalanchego v1.10.10-rc.0
github.com/ava-labs/avalanchego v1.10.10-rc.1
github.com/cespare/cp v0.1.0
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
github.com/davecgh/go-spew v1.1.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/ava-labs/avalanchego v1.10.9-rc.4 h1:vtavPfRiF6r1Zc6RV8/arEfVpe9GQsLW
github.com/ava-labs/avalanchego v1.10.9-rc.4/go.mod h1:vTBLl1zK36olfLRA7IUfdbvphWqlkuarIoXxvZTHZVw=
github.com/ava-labs/avalanchego v1.10.10-rc.0 h1:6VjkpwhAJ0tDNJK+UIUD8WIb5VelgH3w61mgk7JAkDQ=
github.com/ava-labs/avalanchego v1.10.10-rc.0/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw=
github.com/ava-labs/avalanchego v1.10.10-rc.1 h1:dPJISEWqL3tdUShe6RuB8CFuXl3rsH8617sXbLBjkIE=
github.com/ava-labs/avalanchego v1.10.10-rc.1/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
2 changes: 1 addition & 1 deletion peer/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func TestSDKRouting(t *testing.T) {
protocol := 0
handler := &testSDKHandler{}
router := p2p.NewRouter(logging.NoLog{}, sender)
_, err := router.RegisterAppProtocol(uint64(protocol), handler)
_, err := router.RegisterAppProtocol(uint64(protocol), handler, &p2p.Peers{})
require.NoError(err)

networkCodec := codec.NewManager(0)
Expand Down
44 changes: 14 additions & 30 deletions plugin/evm/gossip_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ethereum/go-ethereum/log"

"github.com/ava-labs/avalanchego/network/p2p/gossip"
Expand All @@ -27,12 +28,8 @@ type GossipAtomicTx struct {
Tx *Tx `serialize:"true"`
}

func (tx *GossipAtomicTx) GetHash() gossip.Hash {
id := tx.Tx.ID()
hash := gossip.Hash{}
copy(hash[:], id[:])

return hash
func (tx *GossipAtomicTx) GetID() ids.ID {
return tx.Tx.ID()
}

func (tx *GossipAtomicTx) Marshal() ([]byte, error) {
Expand Down Expand Up @@ -79,7 +76,13 @@ func (g *GossipEthTxPool) Subscribe(shutdownChan chan struct{}, shutdownWg *sync
for _, pendingTx := range pendingTxs.Txs {
tx := &GossipEthTx{Tx: pendingTx}
g.bloom.Add(tx)
if gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipBloomMaxFilledRatio) {
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipBloomMaxFilledRatio)
if err != nil {
log.Error("failed to reset bloom filter", "err", err)
continue
}

if reset {
log.Debug("resetting bloom filter", "reason", "reached max filled ratio")

pending := g.mempool.Pending(false)
Expand All @@ -105,25 +108,10 @@ func (g *GossipEthTxPool) Add(tx *GossipEthTx) error {
return nil
}

func (g *GossipEthTxPool) Get(filter func(tx *GossipEthTx) bool) []*GossipEthTx {
limit := 1000
resultSize := 0
result := make([]*GossipEthTx, 0)

func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) {
g.mempool.IteratePending(func(tx *types.Transaction) bool {
resultSize += int(tx.Size())
if resultSize > limit {
return false
}

gossipTx := &GossipEthTx{
Tx: tx,
}
result = append(result, gossipTx)
return true
return f(&GossipEthTx{Tx: tx})
})

return result
}

func (g *GossipEthTxPool) GetFilter() ([]byte, []byte, error) {
Expand All @@ -138,12 +126,8 @@ type GossipEthTx struct {
Tx *types.Transaction
}

func (tx *GossipEthTx) GetHash() gossip.Hash {
txHash := tx.Tx.Hash()
hash := gossip.Hash{}
copy(hash[:], txHash[:])

return hash
func (tx *GossipEthTx) GetID() ids.ID {
return ids.ID(tx.Tx.Hash())
}

func (tx *GossipEthTx) Marshal() ([]byte, error) {
Expand Down
52 changes: 30 additions & 22 deletions plugin/evm/gossip_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ava-labs/avalanchego/ids"
)

func TestAtomicMempoolAddTx(t *testing.T) {
func TestAtomicMempoolIterate(t *testing.T) {
txs := []*GossipAtomicTx{
{
Tx: &Tx{
Expand All @@ -30,57 +30,65 @@ func TestAtomicMempoolAddTx(t *testing.T) {
}

tests := []struct {
name string
add []*GossipAtomicTx
filter func(tx *GossipAtomicTx) bool
expected []*GossipAtomicTx
name string
add []*GossipAtomicTx
f func(tx *GossipAtomicTx) bool
possibleValues []*GossipAtomicTx
expectedLen int
}{
{
name: "empty",
},
{
name: "filter matches nothing",
name: "func matches nothing",
add: txs,
filter: func(*GossipAtomicTx) bool {
f: func(*GossipAtomicTx) bool {
return false
},
expected: nil,
possibleValues: nil,
},
{
name: "filter matches all",
name: "func matches all",
add: txs,
filter: func(*GossipAtomicTx) bool {
f: func(*GossipAtomicTx) bool {
return true
},
expected: txs,
possibleValues: txs,
expectedLen: 2,
},
{
name: "filter matches subset",
name: "func matches subset",
add: txs,
filter: func(tx *GossipAtomicTx) bool {
f: func(tx *GossipAtomicTx) bool {
return tx.Tx == txs[0].Tx
},
expected: txs[:1],
possibleValues: txs,
expectedLen: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)

m, err := NewMempool(ids.Empty, 10)
require.NoError(err)

for _, add := range tt.add {
require.NoError(m.Add(add))
}

txs := m.Get(tt.filter)
require.Len(txs, len(tt.expected))
matches := make([]*GossipAtomicTx, 0)
f := func(tx *GossipAtomicTx) bool {
match := tt.f(tx)

for _, expected := range tt.expected {
require.Contains(txs, expected)
if match {
matches = append(matches, tx)
}

return match
}

m.Iterate(f)

require.Len(matches, tt.expectedLen)
require.Subset(tt.possibleValues, matches)
})
}
}
33 changes: 17 additions & 16 deletions plugin/evm/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,40 +274,41 @@ func (m *Mempool) addTx(tx *Tx, force bool) error {
for utxoID := range utxoSet {
m.utxoSpenders[utxoID] = tx
}
// When adding [tx] to the mempool make sure that there is an item in Pending
// to signal the VM to produce a block. Note: if the VM's buildStatus has already
// been set to something other than [dontBuild], this will be ignored and won't be
// reset until the engine calls BuildBlock. This case is handled in IssueCurrentTx
// and CancelCurrentTx.
m.newTxs = append(m.newTxs, tx)
m.addPending()

m.bloom.Add(&GossipAtomicTx{Tx: tx})
if gossip.ResetBloomFilterIfNeeded(m.bloom, txGossipBloomMaxFilledRatio) {
reset, err := gossip.ResetBloomFilterIfNeeded(m.bloom, txGossipBloomMaxFilledRatio)
if err != nil {
return err
}

if reset {
log.Debug("resetting bloom filter", "reason", "reached max filled ratio")

for _, pendingTx := range m.txHeap.minHeap.items {
m.bloom.Add(&GossipAtomicTx{Tx: pendingTx.tx})
}
}

// When adding [tx] to the mempool make sure that there is an item in Pending
// to signal the VM to produce a block. Note: if the VM's buildStatus has already
// been set to something other than [dontBuild], this will be ignored and won't be
// reset until the engine calls BuildBlock. This case is handled in IssueCurrentTx
// and CancelCurrentTx.
m.newTxs = append(m.newTxs, tx)
m.addPending()

return nil
}

func (m *Mempool) Get(filter func(tx *GossipAtomicTx) bool) []*GossipAtomicTx {
func (m *Mempool) Iterate(f func(tx *GossipAtomicTx) bool) {
m.lock.RLock()
defer m.lock.RUnlock()

gossipTxs := make([]*GossipAtomicTx, 0, len(m.txHeap.maxHeap.items))
for _, item := range m.txHeap.maxHeap.items {
gossipTx := &GossipAtomicTx{Tx: item.tx}
if !filter(gossipTx) {
continue
if !f(&GossipAtomicTx{Tx: item.tx}) {
return
}
gossipTxs = append(gossipTxs, gossipTx)
}

return gossipTxs
}

func (m *Mempool) GetFilter() ([]byte, []byte, error) {
Expand Down
31 changes: 16 additions & 15 deletions plugin/evm/tx_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ import (
"testing"
"time"

"github.com/ava-labs/avalanchego/p2p/gossip"
"github.com/ava-labs/avalanchego/p2p/gossip/proto/pb"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
commonEng "github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"

"google.golang.org/protobuf/proto"

"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/types"
Expand Down Expand Up @@ -60,7 +61,7 @@ func TestEthTxGossip(t *testing.T) {

// sender for the peer requesting gossip from [vm]
ctrl := gomock.NewController(t)
peerSender := commonEng.NewMockSender(ctrl)
peerSender := common.NewMockSender(ctrl)
router := p2p.NewRouter(logging.NoLog{}, peerSender)

// we're only making client requests, so we don't need a server handler
Expand All @@ -71,7 +72,7 @@ func TestEthTxGossip(t *testing.T) {
require.NoError(err)
emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
require.NoError(err)
request := &pb.PullGossipRequest{
request := &sdk.PullGossipRequest{
Filter: emptyBloomFilterBytes,
Salt: utils.RandomBytes(32),
}
Expand Down Expand Up @@ -110,7 +111,7 @@ func TestEthTxGossip(t *testing.T) {
onResponse := func(nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &pb.PullGossipResponse{}
response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Empty(response.Gossip)
wg.Done()
Expand All @@ -137,7 +138,7 @@ func TestEthTxGossip(t *testing.T) {
onResponse = func(nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &pb.PullGossipResponse{}
response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Len(response.Gossip, 1)

Expand Down Expand Up @@ -166,7 +167,7 @@ func TestAtomicTxGossip(t *testing.T) {

// sender for the peer requesting gossip from [vm]
ctrl := gomock.NewController(t)
peerSender := commonEng.NewMockSender(ctrl)
peerSender := common.NewMockSender(ctrl)
router := p2p.NewRouter(logging.NoLog{}, peerSender)

// we're only making client requests, so we don't need a server handler
Expand All @@ -177,7 +178,7 @@ func TestAtomicTxGossip(t *testing.T) {
require.NoError(err)
bloomBytes, err := emptyBloomFilter.Bloom.MarshalBinary()
require.NoError(err)
request := &pb.PullGossipRequest{
request := &sdk.PullGossipRequest{
Filter: bloomBytes,
Salt: emptyBloomFilter.Salt[:],
}
Expand Down Expand Up @@ -214,7 +215,7 @@ func TestAtomicTxGossip(t *testing.T) {
onResponse := func(nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &pb.PullGossipResponse{}
response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Empty(response.Gossip)
wg.Done()
Expand All @@ -237,7 +238,7 @@ func TestAtomicTxGossip(t *testing.T) {
onResponse = func(nodeID ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &pb.PullGossipResponse{}
response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Len(response.Gossip, 1)

Expand Down
1 change: 1 addition & 0 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/network/p2p/gossip"

"github.com/ava-labs/coreth/consensus/dummy"
corethConstants "github.com/ava-labs/coreth/constants"
Expand Down

0 comments on commit c3271c2

Please sign in to comment.