Skip to content

Commit

Permalink
go/worker/common: Treat stale unauthorized peer error as permanent
Browse files Browse the repository at this point in the history
If the message's group version indicates that the message is stale and an
authorization check fails, treat the error as permanent as a stale message will
never become valid.
  • Loading branch information
kostko committed Jul 27, 2020
1 parent 5464ad3 commit af331fd
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/byzantine/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type p2pRecvHandler struct {
}

// AuthenticatePeer implements p2p Handler.
func (h *p2pRecvHandler) AuthenticatePeer(peerID signature.PublicKey) error {
func (h *p2pRecvHandler) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) error {
// The Byzantine node itself isn't especially robust. We assume that
// the other nodes are honest.
return nil
Expand Down
25 changes: 16 additions & 9 deletions go/worker/common/committee/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type MessageHandler interface {
// HandlePeerMessage handles a message.
//
// The message has already been authenticated to come from a registered node.
HandlePeerMessage(ctx context.Context, message *p2p.Message) error
HandlePeerMessage(ctx context.Context, msg *p2p.Message) error
}

// CommitteeInfo contains information about a committee of nodes.
Expand Down Expand Up @@ -485,7 +485,7 @@ func (g *Group) GetEpochSnapshot() *EpochSnapshot {
}

// AuthenticatePeer handles authenticating a peer that send an incoming message.
func (g *Group) AuthenticatePeer(peerID signature.PublicKey) error {
func (g *Group) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) error {
g.RLock()
defer g.RUnlock()

Expand Down Expand Up @@ -514,14 +514,21 @@ func (g *Group) AuthenticatePeer(peerID signature.PublicKey) error {
}

if !authorized {
return fmt.Errorf("group: peer is not authorized")
err := fmt.Errorf("group: peer is not authorized")

// If the group version is in the past, make the error permanent to avoid retrying on stale
// messages.
if msg.GroupVersion < g.activeEpoch.groupVersion {
err = p2pError.Permanent(err)
}
return err
}

return nil
}

// HandlePeerMessage handles an incoming message from a peer.
func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p.Message) error {
func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Message) error {
// Perform some checks on the incoming message. We make sure to release the
// lock before running the handler.
ctx, err := func() (context.Context, error) {
Expand All @@ -532,10 +539,10 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p
// is not the case, this means that one of the nodes processed an epoch
// transition and the other one didn't.
switch {
case message.GroupVersion < g.activeEpoch.groupVersion:
case msg.GroupVersion < g.activeEpoch.groupVersion:
// Stale messages will never become valid.
return nil, p2pError.Permanent(fmt.Errorf("group version in the past"))
case message.GroupVersion > g.activeEpoch.groupVersion:
case msg.GroupVersion > g.activeEpoch.groupVersion:
// Messages from the future may eventually become valid.
return nil, fmt.Errorf("group version from the future")
}
Expand All @@ -547,8 +554,8 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p
}

// Import SpanContext from the message and store it in the current Context.
if message.SpanContext != nil {
sc, err := tracing.SpanContextFromBinary(message.SpanContext)
if msg.SpanContext != nil {
sc, err := tracing.SpanContextFromBinary(msg.SpanContext)
if err == nil {
parentSpan := opentracing.StartSpan("parent", opentracingExt.RPCServerOption(sc))
span := opentracing.StartSpan("HandleBatch", opentracing.FollowsFrom(parentSpan.Context()))
Expand All @@ -557,7 +564,7 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p
}
}

return g.handler.HandlePeerMessage(ctx, message)
return g.handler.HandlePeerMessage(ctx, msg)
}

func (g *Group) publishLocked(
Expand Down
4 changes: 2 additions & 2 deletions go/worker/common/p2p/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Handler interface {
//
// The message handler will be re-invoked on error with a periodic
// backoff unless errors are wrapped via `p2pError.Permanent`.
AuthenticatePeer(peerID signature.PublicKey) error
AuthenticatePeer(peerID signature.PublicKey, msg *Message) error

// HandlePeerMessage handles an incoming message from a peer.
//
Expand Down Expand Up @@ -153,7 +153,7 @@ func (h *topicHandler) dispatchMessage(peerID core.PeerID, m *queuedMsg, isIniti
// Perhaps this should reject the message, but it is possible that
// the local node is just behind. This does result in stale messages
// getting retried though.
if err = h.handler.AuthenticatePeer(m.from); err != nil {
if err = h.handler.AuthenticatePeer(m.from, m.msg); err != nil {
return err
}

Expand Down

0 comments on commit af331fd

Please sign in to comment.