Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SDK Router message handling #316

Merged
merged 21 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 40 additions & 56 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
Expand Down Expand Up @@ -87,6 +88,7 @@ type network struct {
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
router *p2p.Router // handles messages being sent to the generic networking SDK
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
Expand All @@ -99,11 +101,18 @@ type network struct {

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
//
// Invariant: Even though `closed` is an atomic, `lock` is required to be
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
// held when sending requests to guarantee that the network isn't closed
// during these calls. This is because closing the network cancels all
// outstanding requests, which means we must guarantee never to register a
// request that will never be fulfilled or cancelled.
closed utils.Atomic[bool]
}

func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
return &network{
router: router,
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
Expand Down Expand Up @@ -172,10 +181,7 @@ func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHand
log.Debug("sending request to peer", "nodeID", nodeID, "requestLen", len(request))
n.peers.TrackPeer(nodeID)

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = responseHandler

nodeIDs := set.NewSet[ids.NodeID](1)
Expand Down Expand Up @@ -209,10 +215,7 @@ func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler
return nil
}

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = handler

// Send cross chain request to [chainID].
Expand Down Expand Up @@ -272,19 +275,12 @@ func (n *network) CrossChainAppRequest(ctx context.Context, requestingChainID id
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) CrossChainAppRequestFailed(ctx context.Context, respondingChainID ids.ID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppRequestFailed from chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received CrossChainAppRequestFailed to unknown request", "respondingChainID", respondingChainID, "requestID", requestID)
// Can happen after the network has been closed.
log.Debug("received CrossChainAppRequestFailed to unknown request", "respondingChainID", respondingChainID, "requestID", requestID)
return nil
}

Expand All @@ -299,19 +295,12 @@ func (n *network) CrossChainAppRequestFailed(ctx context.Context, respondingChai
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) CrossChainAppResponse(ctx context.Context, respondingChainID ids.ID, requestID uint32, response []byte) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppResponse from responding chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received CrossChainAppResponse to unknown request", "respondingChainID", respondingChainID, "requestID", requestID, "responseLen", len(response))
// Can happen after the network has been closed.
log.Debug("received CrossChainAppResponse to unknown request", "respondingChainID", respondingChainID, "requestID", requestID, "responseLen", len(response))
return nil
}

Expand All @@ -335,8 +324,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil
log.Debug("forwarding AppRequest to SDK router", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
Expand Down Expand Up @@ -366,21 +355,13 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
// Error returned by this function is expected to be treated as fatal by the engine
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
log.Debug("received AppResponse from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return nil
log.Debug("forwarding AppResponse to SDK router", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.router.AppResponse(ctx, nodeID, requestID, response)
}

// We must release the slot
Expand All @@ -395,21 +376,13 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
// - request times out before a response is provided
// error returned by this function is expected to be treated as fatal by the engine
// returns error only when the response handler returns an error
func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
log.Debug("received AppRequestFailed from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID)
return nil
log.Debug("forwarding AppRequestFailed to SDK router", "nodeID", nodeID, "requestID", requestID)
return n.router.AppRequestFailed(ctx, nodeID, requestID)
}

// We must release the slot
Expand Down Expand Up @@ -442,8 +415,11 @@ func calculateTimeUntilDeadline(deadline time.Time, stats stats.RequestHandlerSt

// markRequestFulfilled fetches the handler for [requestID] and marks the request with [requestID] as having been fulfilled.
// This is called by either [AppResponse] or [AppRequestFailed].
// Assumes that the write lock is held.
// Assumes that the write lock is not held.
func (n *network) markRequestFulfilled(requestID uint32) (message.ResponseHandler, bool) {
n.lock.Lock()
defer n.lock.Unlock()

handler, exists := n.outstandingRequestHandlers[requestID]
if !exists {
return nil, false
Expand All @@ -467,10 +443,6 @@ func (n *network) Gossip(gossip []byte) error {
// error returned by this function is expected to be treated as fatal by the engine
// returns error if request could not be parsed as message.Request or when the requestHandler returns an error
func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
if n.closed.Get() {
return nil
}

var gossipMsg message.GossipMessage
if _, err := n.codec.Unmarshal(gossipBytes, &gossipMsg); err != nil {
log.Debug("could not parse app gossip", "nodeID", nodeID, "gossipLen", len(gossipBytes), "err", err)
Expand Down Expand Up @@ -564,3 +536,15 @@ func (n *network) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {

n.peers.TrackBandwidth(nodeID, bandwidth)
}

// invariant: peer/network must use explicitly even request ids.
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
// for this reason, [n.requestID] is initialized as zero and incremented by 2.
// This is for backwards-compatibility while the SDK router exists with the
// legacy coreth handlers to avoid a (very) narrow edge case where request ids
// can overlap, resulting in a dropped timeout.
func (n *network) nextRequestID() uint32 {
next := n.requestIDGen
n.requestIDGen += 2

return next
}
Loading
Loading