Skip to content

Commit

Permalink
add sdk router to network
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim committed Sep 5, 2023
1 parent 3f5dc8a commit b8dcf99
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 33 deletions.
46 changes: 27 additions & 19 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"golang.org/x/sync/semaphore"

"github.com/ava-labs/avalanchego/network/p2p"

"github.com/ethereum/go-ethereum/log"

"github.com/ava-labs/avalanchego/codec"
Expand Down Expand Up @@ -87,23 +89,25 @@ type network struct {
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
router *p2p.Router
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
closed utils.Atomic[bool]
}

func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
return &network{
router: router,
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
Expand Down Expand Up @@ -336,7 +340,9 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil

// this might be an sdk request
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
Expand Down Expand Up @@ -366,7 +372,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
// Error returned by this function is expected to be treated as fatal by the engine
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
n.lock.Lock()
defer n.lock.Unlock()

Expand All @@ -378,9 +384,10 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return nil
log.Debug("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))

// this might be an sdk response
return n.router.AppResponse(ctx, nodeID, requestID, response)
}

// We must release the slot
Expand All @@ -395,7 +402,7 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
// - request times out before a response is provided
// error returned by this function is expected to be treated as fatal by the engine
// returns error only when the response handler returns an error
func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error {
func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()

Expand All @@ -407,9 +414,10 @@ func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, request

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID)
return nil
log.Debug("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID)

// this might be an sdk request
return n.router.AppRequestFailed(ctx, nodeID, requestID)
}

// We must release the slot
Expand Down
27 changes: 14 additions & 13 deletions peer/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"testing"
"time"

"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
ethcommon "github.com/ethereum/go-ethereum/common"

Expand Down Expand Up @@ -53,7 +55,7 @@ var (

func TestNetworkDoesNotConnectToItself(t *testing.T) {
selfNodeID := ids.GenerateTestNodeID()
n := NewNetwork(nil, nil, nil, selfNodeID, 1, 1)
n := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), nil, nil, nil, selfNodeID, 1, 1)
assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion))
assert.EqualValues(t, 0, n.Size())
}
Expand Down Expand Up @@ -89,7 +91,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
Expand Down Expand Up @@ -164,7 +166,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -244,7 +246,7 @@ func TestAppRequestOnShutdown(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion))
Expand Down Expand Up @@ -293,7 +295,7 @@ func TestRequestMinVersion(t *testing.T) {
}

// passing nil as codec works because the net.AppRequest is never called
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
client := NewNetworkClient(net)
requestMessage := TestMessage{Message: "this is a request"}
requestBytes, err := message.RequestToBytes(codecManager, requestMessage)
Expand Down Expand Up @@ -356,7 +358,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) {
processingDuration: 500 * time.Millisecond,
}

net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetRequestHandler(requestHandler)
nodeID := ids.GenerateTestNodeID()

Expand Down Expand Up @@ -396,7 +398,7 @@ func TestGossip(t *testing.T) {
}

gossipHandler := &testGossipHandler{}
clientNetwork = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(gossipHandler)

assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion))
Expand All @@ -423,7 +425,7 @@ func TestHandleInvalidMessages(t *testing.T) {
requestID := uint32(1)
sender := testAppSender{}

clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
clientNetwork.SetRequestHandler(&testRequestHandler{})

Expand Down Expand Up @@ -462,7 +464,6 @@ func TestHandleInvalidMessages(t *testing.T) {
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
assert.NoError(t, clientNetwork.AppRequestFailed(context.Background(), nodeID, requestID))
}

func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
Expand All @@ -473,7 +474,7 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
requestID := uint32(1)
sender := testAppSender{}

clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler

Expand Down Expand Up @@ -513,7 +514,7 @@ func TestCrossChainAppRequest(t *testing.T) {
},
}

net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -568,7 +569,7 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -628,7 +629,7 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
}
codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)

exampleCrossChainRequest := ExampleCrossChainRequest{
Expand Down
35 changes: 35 additions & 0 deletions plugin/evm/mempool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"testing"

"github.com/ava-labs/avalanchego/ids"
"github.com/stretchr/testify/require"
)

func TestMempoolAddTx(t *testing.T) {
require := require.New(t)
m, err := NewMempool(ids.Empty, 5_000)

Check failure on line 15 in plugin/evm/mempool_test.go

View workflow job for this annotation

GitHub Actions / Lint

assignment mismatch: 2 variables but NewMempool returns 1 value (typecheck)
require.NoError(err)

txs := make([]*GossipAtomicTx, 0)

Check failure on line 18 in plugin/evm/mempool_test.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: GossipAtomicTx (typecheck)
for i := 0; i < 3_000; i++ {
tx := &GossipAtomicTx{

Check failure on line 20 in plugin/evm/mempool_test.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: GossipAtomicTx (typecheck)
Tx: &Tx{
UnsignedAtomicTx: &TestUnsignedTx{
IDV: ids.GenerateTestID(),
},
},
}

txs = append(txs, tx)
require.NoError(m.Add(tx))
}

for _, tx := range txs {
require.True(m.bloom.Has(tx))
}
}
Loading

0 comments on commit b8dcf99

Please sign in to comment.