-
Notifications
You must be signed in to change notification settings - Fork 233
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add SDK Router message handling (#316) Co-authored-by: Stephen Buttolph <[email protected]> * Fix hanging requests after Shutdown (#326) * fix requests hanging after shutdown * fix build --------- Signed-off-by: Stephen Buttolph <[email protected]> Co-authored-by: Stephen Buttolph <[email protected]> * Update to 1.10.10-rc.2 (#328) * update to avalanchego 1.10.10-rc.2 * nits * nit * Add P2P SDK Pull Gossip (#318) * add batchsize * sync changes * Drop outbound gossip for non vdrs (#862) * Drop outbound gossip requests for non-validators (#334) * drop outbound gossip requests for non validators * nit * nit * sync changes --------- Co-authored-by: Joshua Kim <[email protected]> --------- Co-authored-by: Joshua Kim <[email protected]>
- Loading branch information
1 parent
3317f7b
commit d91407f
Showing
8 changed files
with
364 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package evm | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ethereum/go-ethereum/log" | ||
|
||
"github.com/ava-labs/avalanchego/network/p2p/gossip" | ||
|
||
"github.com/ava-labs/subnet-evm/core" | ||
"github.com/ava-labs/subnet-evm/core/txpool" | ||
"github.com/ava-labs/subnet-evm/core/types" | ||
) | ||
|
||
var ( | ||
_ gossip.Gossipable = (*GossipTx)(nil) | ||
_ gossip.Set[*GossipTx] = (*GossipTxPool)(nil) | ||
) | ||
|
||
func NewGossipTxPool(mempool *txpool.TxPool) (*GossipTxPool, error) { | ||
bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) | ||
} | ||
|
||
return &GossipTxPool{ | ||
mempool: mempool, | ||
pendingTxs: make(chan core.NewTxsEvent), | ||
bloom: bloom, | ||
}, nil | ||
} | ||
|
||
type GossipTxPool struct { | ||
mempool *txpool.TxPool | ||
pendingTxs chan core.NewTxsEvent | ||
|
||
bloom *gossip.BloomFilter | ||
lock sync.RWMutex | ||
} | ||
|
||
func (g *GossipTxPool) Subscribe(ctx context.Context) { | ||
g.mempool.SubscribeNewTxsEvent(g.pendingTxs) | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
log.Debug("shutting down subscription") | ||
return | ||
case pendingTxs := <-g.pendingTxs: | ||
g.lock.Lock() | ||
for _, pendingTx := range pendingTxs.Txs { | ||
tx := &GossipTx{Tx: pendingTx} | ||
g.bloom.Add(tx) | ||
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipMaxFalsePositiveRate) | ||
if err != nil { | ||
log.Error("failed to reset bloom filter", "err", err) | ||
continue | ||
} | ||
|
||
if reset { | ||
log.Debug("resetting bloom filter", "reason", "reached max filled ratio") | ||
|
||
g.mempool.IteratePending(func(tx *types.Transaction) bool { | ||
g.bloom.Add(&GossipTx{Tx: pendingTx}) | ||
return true | ||
}) | ||
} | ||
} | ||
g.lock.Unlock() | ||
} | ||
} | ||
} | ||
|
||
// Add enqueues the transaction to the mempool. Subscribe should be called | ||
// to receive an event if tx is actually added to the mempool or not. | ||
func (g *GossipTxPool) Add(tx *GossipTx) error { | ||
return g.mempool.AddRemotes([]*types.Transaction{tx.Tx})[0] | ||
} | ||
|
||
func (g *GossipTxPool) Iterate(f func(tx *GossipTx) bool) { | ||
g.mempool.IteratePending(func(tx *types.Transaction) bool { | ||
return f(&GossipTx{Tx: tx}) | ||
}) | ||
} | ||
|
||
func (g *GossipTxPool) GetFilter() ([]byte, []byte, error) { | ||
g.lock.RLock() | ||
defer g.lock.RUnlock() | ||
|
||
bloom, err := g.bloom.Bloom.MarshalBinary() | ||
salt := g.bloom.Salt | ||
|
||
return bloom, salt[:], err | ||
} | ||
|
||
type GossipTx struct { | ||
Tx *types.Transaction | ||
} | ||
|
||
func (tx *GossipTx) GetID() ids.ID { | ||
return ids.ID(tx.Tx.Hash()) | ||
} | ||
|
||
func (tx *GossipTx) Marshal() ([]byte, error) { | ||
return tx.Tx.MarshalBinary() | ||
} | ||
|
||
func (tx *GossipTx) Unmarshal(bytes []byte) error { | ||
tx.Tx = &types.Transaction{} | ||
return tx.Tx.UnmarshalBinary(bytes) | ||
} |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package evm | ||
|
||
import ( | ||
"context" | ||
"math/big" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ava-labs/avalanchego/network/p2p" | ||
"github.com/ava-labs/avalanchego/network/p2p/gossip" | ||
"github.com/ava-labs/avalanchego/proto/pb/sdk" | ||
"github.com/ava-labs/avalanchego/snow/engine/common" | ||
"github.com/ava-labs/avalanchego/snow/validators" | ||
"github.com/ava-labs/avalanchego/utils" | ||
"github.com/ava-labs/avalanchego/utils/logging" | ||
"github.com/ava-labs/avalanchego/utils/set" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/stretchr/testify/require" | ||
|
||
"go.uber.org/mock/gomock" | ||
|
||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/ava-labs/subnet-evm/core/types" | ||
) | ||
|
||
func TestTxGossip(t *testing.T) { | ||
require := require.New(t) | ||
|
||
// set up prefunded address | ||
_, vm, _, sender := GenesisVM(t, true, genesisJSONLatest, "", "") | ||
defer func() { | ||
require.NoError(vm.Shutdown(context.Background())) | ||
}() | ||
|
||
// sender for the peer requesting gossip from [vm] | ||
ctrl := gomock.NewController(t) | ||
peerSender := common.NewMockSender(ctrl) | ||
router := p2p.NewRouter(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") | ||
|
||
// we're only making client requests, so we don't need a server handler | ||
client, err := router.RegisterAppProtocol(txGossipProtocol, nil, nil) | ||
require.NoError(err) | ||
|
||
emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) | ||
require.NoError(err) | ||
emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary() | ||
require.NoError(err) | ||
request := &sdk.PullGossipRequest{ | ||
Filter: emptyBloomFilterBytes, | ||
Salt: utils.RandomBytes(32), | ||
} | ||
|
||
requestBytes, err := proto.Marshal(request) | ||
require.NoError(err) | ||
|
||
wg := &sync.WaitGroup{} | ||
|
||
requestingNodeID := ids.GenerateTestNodeID() | ||
peerSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) { | ||
go func() { | ||
require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) | ||
}() | ||
}).AnyTimes() | ||
|
||
sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { | ||
go func() { | ||
require.NoError(router.AppResponse(ctx, nodeID, requestID, appResponseBytes)) | ||
}() | ||
return nil | ||
} | ||
|
||
// we only accept gossip requests from validators | ||
mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) | ||
require.True(ok) | ||
mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { | ||
return 0, nil | ||
} | ||
mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { | ||
return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil | ||
} | ||
|
||
// Ask the VM for any new transactions. We should get nothing at first. | ||
wg.Add(1) | ||
onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) { | ||
require.NoError(err) | ||
|
||
response := &sdk.PullGossipResponse{} | ||
require.NoError(proto.Unmarshal(responseBytes, response)) | ||
require.Empty(response.Gossip) | ||
wg.Done() | ||
} | ||
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) | ||
wg.Wait() | ||
|
||
// Issue a tx to the VM | ||
address := testEthAddrs[0] | ||
key := testKeys[0] | ||
tx := types.NewTransaction(0, address, big.NewInt(10), 21000, big.NewInt(testMinGasPrice), nil) | ||
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainConfig.ChainID), key) | ||
require.NoError(err) | ||
|
||
errs := vm.txPool.AddLocals([]*types.Transaction{signedTx}) | ||
require.Len(errs, 1) | ||
require.Nil(errs[0]) | ||
|
||
// wait so we aren't throttled by the vm | ||
time.Sleep(5 * time.Second) | ||
|
||
// Ask the VM for new transactions. We should get the newly issued tx. | ||
wg.Add(1) | ||
onResponse = func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) { | ||
require.NoError(err) | ||
|
||
response := &sdk.PullGossipResponse{} | ||
require.NoError(proto.Unmarshal(responseBytes, response)) | ||
require.Len(response.Gossip, 1) | ||
|
||
gotTx := &GossipTx{} | ||
require.NoError(gotTx.Unmarshal(response.Gossip[0])) | ||
require.Equal(signedTx.Hash(), gotTx.Tx.Hash()) | ||
|
||
wg.Done() | ||
} | ||
require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) | ||
wg.Wait() | ||
} |
Oops, something went wrong.