Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/worker/common: Treat stale unauthorized peer error as permanent #3133

Merged
merged 1 commit into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/3133.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/common: Treat stale unauthorized peer error as permanent

If the message's group version indicates that the message is stale and an
authorization check fails, treat the error as permanent as a stale message
will never become valid.
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/byzantine/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type p2pRecvHandler struct {
}

// AuthenticatePeer implements p2p Handler.
func (h *p2pRecvHandler) AuthenticatePeer(peerID signature.PublicKey) error {
func (h *p2pRecvHandler) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) error {
// The Byzantine node itself isn't especially robust. We assume that
// the other nodes are honest.
return nil
Expand Down
25 changes: 16 additions & 9 deletions go/worker/common/committee/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type MessageHandler interface {
// HandlePeerMessage handles a message.
//
// The message has already been authenticated to come from a registered node.
HandlePeerMessage(ctx context.Context, message *p2p.Message) error
HandlePeerMessage(ctx context.Context, msg *p2p.Message) error
}

// CommitteeInfo contains information about a committee of nodes.
Expand Down Expand Up @@ -485,7 +485,7 @@ func (g *Group) GetEpochSnapshot() *EpochSnapshot {
}

// AuthenticatePeer handles authenticating a peer that send an incoming message.
func (g *Group) AuthenticatePeer(peerID signature.PublicKey) error {
func (g *Group) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) error {
g.RLock()
defer g.RUnlock()

Expand Down Expand Up @@ -514,14 +514,21 @@ func (g *Group) AuthenticatePeer(peerID signature.PublicKey) error {
}

if !authorized {
return fmt.Errorf("group: peer is not authorized")
err := fmt.Errorf("group: peer is not authorized")

// If the group version is in the past, make the error permanent to avoid retrying on stale
// messages.
if msg.GroupVersion < g.activeEpoch.groupVersion {
err = p2pError.Permanent(err)
}
return err
}

return nil
}

// HandlePeerMessage handles an incoming message from a peer.
func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p.Message) error {
func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Message) error {
// Perform some checks on the incoming message. We make sure to release the
// lock before running the handler.
ctx, err := func() (context.Context, error) {
Expand All @@ -532,10 +539,10 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p
// is not the case, this means that one of the nodes processed an epoch
// transition and the other one didn't.
switch {
case message.GroupVersion < g.activeEpoch.groupVersion:
case msg.GroupVersion < g.activeEpoch.groupVersion:
// Stale messages will never become valid.
return nil, p2pError.Permanent(fmt.Errorf("group version in the past"))
case message.GroupVersion > g.activeEpoch.groupVersion:
case msg.GroupVersion > g.activeEpoch.groupVersion:
// Messages from the future may eventually become valid.
return nil, fmt.Errorf("group version from the future")
}
Expand All @@ -547,8 +554,8 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p
}

// Import SpanContext from the message and store it in the current Context.
if message.SpanContext != nil {
sc, err := tracing.SpanContextFromBinary(message.SpanContext)
if msg.SpanContext != nil {
sc, err := tracing.SpanContextFromBinary(msg.SpanContext)
if err == nil {
parentSpan := opentracing.StartSpan("parent", opentracingExt.RPCServerOption(sc))
span := opentracing.StartSpan("HandleBatch", opentracing.FollowsFrom(parentSpan.Context()))
Expand All @@ -557,7 +564,7 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p
}
}

return g.handler.HandlePeerMessage(ctx, message)
return g.handler.HandlePeerMessage(ctx, msg)
}

func (g *Group) publishLocked(
Expand Down
4 changes: 2 additions & 2 deletions go/worker/common/p2p/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Handler interface {
//
// The message handler will be re-invoked on error with a periodic
// backoff unless errors are wrapped via `p2pError.Permanent`.
AuthenticatePeer(peerID signature.PublicKey) error
AuthenticatePeer(peerID signature.PublicKey, msg *Message) error

// HandlePeerMessage handles an incoming message from a peer.
//
Expand Down Expand Up @@ -153,7 +153,7 @@ func (h *topicHandler) dispatchMessage(peerID core.PeerID, m *queuedMsg, isIniti
// Perhaps this should reject the message, but it is possible that
// the local node is just behind. This does result in stale messages
// getting retried though.
if err = h.handler.AuthenticatePeer(m.from); err != nil {
if err = h.handler.AuthenticatePeer(m.from, m.msg); err != nil {
return err
}

Expand Down