Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SDK Router message handling #316

Merged
merged 21 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 47 additions & 30 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
Expand Down Expand Up @@ -87,23 +88,33 @@ type network struct {
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
router *p2p.Router
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
//
// Invariant: Even though `closed` is an atomic, `lock` is required to be
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
// held when sending requests and receiving responses to guarantee that the
// network isn't closed during these calls. This is because closing the
// network cancels all outstanding requests, which means we must guarantee
// never to fulfill, or cancel again, after having cancelled the request.
// Similarly, we must ensure we don't register a request that will never be
// fulfilled or cancelled.
closed utils.Atomic[bool]
}

func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
return &network{
router: router,
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
Expand Down Expand Up @@ -172,10 +183,7 @@ func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHand
log.Debug("sending request to peer", "nodeID", nodeID, "requestLen", len(request))
n.peers.TrackPeer(nodeID)

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = responseHandler

nodeIDs := set.NewSet[ids.NodeID](1)
Expand Down Expand Up @@ -209,10 +217,7 @@ func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler
return nil
}

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = handler

// Send cross chain request to [chainID].
Expand Down Expand Up @@ -335,8 +340,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil
log.Debug("forwarding AppRequest to SDK router", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
Expand Down Expand Up @@ -366,21 +371,21 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
// Error returned by this function is expected to be treated as fatal by the engine
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
n.lock.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since closed is atomic, could we move this lock to directly before
handler, exists := n.markRequestFulfilled(requestID)?

Should we move it to inside markRequestFulfilled instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes much more sense than what I was currently doing.

defer n.lock.Unlock()

if n.closed.Get() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very subtle change. So we should be terrified to make it.

I believe we must hold the lock when checking for n.closed on all of the inbound response + requestFailed flows to avoid a potential panic due to a write to a closed channel (or closing a channel twice).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline I'll see if I can make a separate regression test for this invariant as a follow-up to this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I documented the invariant on n.closed as well 👍

n.lock.Unlock()
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we okay with dropping responses (and timeouts) that should have been sent to the SDK router after we close the network?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not okay with this - then I think we'll need to be fairly careful around what gets passed through to the router.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted offline but agreed it was cleanest to leave the Router code as-is, and just drop the closed check on the server-end because we're guaranteed that the outstanding request is empty on shutdown because we empty it on Shutdown and stop sending requests after the flag is set.

}

log.Debug("received AppResponse from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
n.lock.Unlock()
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return nil
log.Debug("forwarding AppResponse to SDK router", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.router.AppResponse(ctx, nodeID, requestID, response)
}

// We must release the slot
Expand All @@ -395,21 +400,21 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
// - request times out before a response is provided
// error returned by this function is expected to be treated as fatal by the engine
// returns error only when the response handler returns an error
func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error {
func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
n.lock.Unlock()
return nil
}

log.Debug("received AppRequestFailed from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
n.lock.Unlock()
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID)
return nil
log.Debug("forwarding AppRequestFailed to SDK router", "nodeID", nodeID, "requestID", requestID)
return n.router.AppRequestFailed(ctx, nodeID, requestID)
}

// We must release the slot
Expand Down Expand Up @@ -564,3 +569,15 @@ func (n *network) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {

n.peers.TrackBandwidth(nodeID, bandwidth)
}

// invariant: peer/network must use explicitly even request ids.
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
// for this reason, [n.requestID] is initialized as zero and incremented by 2.
// This is for backwards-compatibility while the SDK router exists with the
// legacy coreth handlers to avoid a (very) narrow edge case where request ids
// can overlap, resulting in a dropped timeout.
func (n *network) nextRequestID() uint32 {
next := n.requestIDGen
n.requestIDGen += 2

return next
}
100 changes: 82 additions & 18 deletions peer/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"testing"
"time"

"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
ethcommon "github.com/ethereum/go-ethereum/common"

Expand Down Expand Up @@ -49,11 +51,13 @@ var (

_ message.CrossChainRequest = &ExampleCrossChainRequest{}
_ message.CrossChainRequestHandler = &testCrossChainHandler{}

_ p2p.Handler = &testSDKHandler{}
)

func TestNetworkDoesNotConnectToItself(t *testing.T) {
selfNodeID := ids.GenerateTestNodeID()
n := NewNetwork(nil, nil, nil, selfNodeID, 1, 1)
n := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), nil, nil, nil, selfNodeID, 1, 1)
assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion))
assert.EqualValues(t, 0, n.Size())
}
Expand Down Expand Up @@ -89,7 +93,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
Expand Down Expand Up @@ -164,7 +168,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -244,7 +248,7 @@ func TestAppRequestOnShutdown(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion))
Expand Down Expand Up @@ -293,7 +297,7 @@ func TestRequestMinVersion(t *testing.T) {
}

// passing nil as codec works because the net.AppRequest is never called
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
client := NewNetworkClient(net)
requestMessage := TestMessage{Message: "this is a request"}
requestBytes, err := message.RequestToBytes(codecManager, requestMessage)
Expand Down Expand Up @@ -356,7 +360,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) {
processingDuration: 500 * time.Millisecond,
}

net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetRequestHandler(requestHandler)
nodeID := ids.GenerateTestNodeID()

Expand Down Expand Up @@ -396,7 +400,7 @@ func TestGossip(t *testing.T) {
}

gossipHandler := &testGossipHandler{}
clientNetwork = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(gossipHandler)

assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion))
Expand All @@ -423,7 +427,7 @@ func TestHandleInvalidMessages(t *testing.T) {
requestID := uint32(1)
sender := testAppSender{}

clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
clientNetwork.SetRequestHandler(&testRequestHandler{})

Expand Down Expand Up @@ -457,12 +461,11 @@ func TestHandleInvalidMessages(t *testing.T) {
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), garbageResponse))
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), emptyResponse))
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), nilResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
assert.NoError(t, clientNetwork.AppRequestFailed(context.Background(), nodeID, requestID))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
Comment on lines +464 to +468
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was written to ensure that an invalid message NEVER triggers an unintentional fatal error, so it seems a bit weird to change it in this way.

Copy link
Contributor Author

@joshua-kim joshua-kim Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was unsure if it made more sense to just remove these test cases or just test for the sdk error. I guess now the property that we have is that invalid messages are always forwarded into the router, so we can either check for this error, get rid these tests, or write a Router interface that p2p.Router implements but maybe that's overkill.

}

func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
Expand All @@ -473,7 +476,7 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
requestID := uint32(1)
sender := testAppSender{}

clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler

Expand Down Expand Up @@ -513,7 +516,7 @@ func TestCrossChainAppRequest(t *testing.T) {
},
}

net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -568,7 +571,7 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -628,7 +631,7 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
}
codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)

exampleCrossChainRequest := ExampleCrossChainRequest{
Expand All @@ -649,6 +652,48 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
require.True(t, called)
}

func TestSDKRouting(t *testing.T) {
require := require.New(t)
sender := &testAppSender{
sendAppRequestFn: func(s set.Set[ids.NodeID], u uint32, bytes []byte) error {
return nil
},
sendAppResponseFn: func(id ids.NodeID, u uint32, bytes []byte) error {
return nil
},
}
protocol := 0
handler := &testSDKHandler{}
router := p2p.NewRouter(logging.NoLog{}, sender)
_, err := router.RegisterAppProtocol(uint64(protocol), handler)
require.NoError(err)

networkCodec := codec.NewManager(0)
crossChainCodec := codec.NewManager(0)

network := NewNetwork(
router,
nil,
networkCodec,
crossChainCodec,
ids.EmptyNodeID,
1,
1,
)

nodeID := ids.GenerateTestNodeID()
foobar := append([]byte{byte(protocol)}, []byte("foobar")...)
err = network.AppRequest(context.Background(), nodeID, 0, time.Time{}, foobar)
require.NoError(err)
require.True(handler.appRequested)

err = network.AppResponse(context.Background(), ids.GenerateTestNodeID(), 0, foobar)
require.ErrorIs(err, p2p.ErrUnrequestedResponse)

err = network.AppRequestFailed(context.Background(), nodeID, 0)
require.ErrorIs(err, p2p.ErrUnrequestedResponse)
}

func buildCodec(t *testing.T, types ...interface{}) codec.Manager {
codecManager := codec.NewDefaultManager()
c := linearcodec.NewDefault()
Expand Down Expand Up @@ -850,3 +895,22 @@ type testCrossChainHandler struct {
func (t *testCrossChainHandler) HandleCrossChainRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, exampleRequest message.CrossChainRequest) ([]byte, error) {
return t.codec.Marshal(message.Version, ExampleCrossChainResponse{Response: "this is an example response"})
}

type testSDKHandler struct {
appRequested bool
}

func (t *testSDKHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
// TODO implement me
panic("implement me")
}

func (t *testSDKHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
t.appRequested = true
return nil, nil
}

func (t *testSDKHandler) CrossChainAppRequest(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error) {
// TODO implement me
panic("implement me")
}
6 changes: 5 additions & 1 deletion plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/network/p2p"

"github.com/ava-labs/coreth/consensus/dummy"
corethConstants "github.com/ava-labs/coreth/constants"
Expand Down Expand Up @@ -276,6 +277,8 @@ type VM struct {
client peer.NetworkClient
networkCodec codec.Manager

router *p2p.Router

// Metrics
multiGatherer avalanchegoMetrics.MultiGatherer

Expand Down Expand Up @@ -506,8 +509,9 @@ func (vm *VM) Initialize(
}

// initialize peer network
vm.router = p2p.NewRouter(vm.ctx.Log, appSender)
vm.networkCodec = message.Codec
vm.Network = peer.NewNetwork(appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
vm.Network = peer.NewNetwork(vm.router, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
vm.client = peer.NewNetworkClient(vm.Network)

if err := vm.initializeChain(lastAcceptedHash); err != nil {
Expand Down
Loading