diff --git a/go/oasis-node/cmd/debug/byzantine/p2p.go b/go/oasis-node/cmd/debug/byzantine/p2p.go index 5acf9052e3a..c5fb26b961e 100644 --- a/go/oasis-node/cmd/debug/byzantine/p2p.go +++ b/go/oasis-node/cmd/debug/byzantine/p2p.go @@ -37,7 +37,7 @@ type p2pRecvHandler struct { } // AuthenticatePeer implements p2p Handler. -func (h *p2pRecvHandler) AuthenticatePeer(peerID signature.PublicKey) error { +func (h *p2pRecvHandler) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) error { // The Byzantine node itself isn't especially robust. We assume that // the other nodes are honest. return nil diff --git a/go/worker/common/committee/group.go b/go/worker/common/committee/group.go index 4a4d021089e..54ccd2e753d 100644 --- a/go/worker/common/committee/group.go +++ b/go/worker/common/committee/group.go @@ -31,7 +31,7 @@ type MessageHandler interface { // HandlePeerMessage handles a message. // // The message has already been authenticated to come from a registered node. - HandlePeerMessage(ctx context.Context, message *p2p.Message) error + HandlePeerMessage(ctx context.Context, msg *p2p.Message) error } // CommitteeInfo contains information about a committee of nodes. @@ -485,7 +485,7 @@ func (g *Group) GetEpochSnapshot() *EpochSnapshot { } // AuthenticatePeer handles authenticating a peer that send an incoming message. -func (g *Group) AuthenticatePeer(peerID signature.PublicKey) error { +func (g *Group) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) error { g.RLock() defer g.RUnlock() @@ -514,14 +514,21 @@ func (g *Group) AuthenticatePeer(peerID signature.PublicKey) error { } if !authorized { - return fmt.Errorf("group: peer is not authorized") + err := fmt.Errorf("group: peer is not authorized") + + // If the group version is in the past, make the error permanent to avoid retrying on stale + // messages. + if msg.GroupVersion < g.activeEpoch.groupVersion { + err = p2pError.Permanent(err) + } + return err } return nil } // HandlePeerMessage handles an incoming message from a peer. -func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p.Message) error { +func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Message) error { // Perform some checks on the incoming message. We make sure to release the // lock before running the handler. ctx, err := func() (context.Context, error) { @@ -532,10 +539,10 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p // is not the case, this means that one of the nodes processed an epoch // transition and the other one didn't. switch { - case message.GroupVersion < g.activeEpoch.groupVersion: + case msg.GroupVersion < g.activeEpoch.groupVersion: // Stale messages will never become valid. return nil, p2pError.Permanent(fmt.Errorf("group version in the past")) - case message.GroupVersion > g.activeEpoch.groupVersion: + case msg.GroupVersion > g.activeEpoch.groupVersion: // Messages from the future may eventually become valid. return nil, fmt.Errorf("group version from the future") } @@ -547,8 +554,8 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p } // Import SpanContext from the message and store it in the current Context. - if message.SpanContext != nil { - sc, err := tracing.SpanContextFromBinary(message.SpanContext) + if msg.SpanContext != nil { + sc, err := tracing.SpanContextFromBinary(msg.SpanContext) if err == nil { parentSpan := opentracing.StartSpan("parent", opentracingExt.RPCServerOption(sc)) span := opentracing.StartSpan("HandleBatch", opentracing.FollowsFrom(parentSpan.Context())) @@ -557,7 +564,7 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, message *p2p } } - return g.handler.HandlePeerMessage(ctx, message) + return g.handler.HandlePeerMessage(ctx, msg) } func (g *Group) publishLocked( diff --git a/go/worker/common/p2p/dispatch.go b/go/worker/common/p2p/dispatch.go index c8ad2fd4899..05a8f9597f0 100644 --- a/go/worker/common/p2p/dispatch.go +++ b/go/worker/common/p2p/dispatch.go @@ -28,7 +28,7 @@ type Handler interface { // // The message handler will be re-invoked on error with a periodic // backoff unless errors are wrapped via `p2pError.Permanent`. - AuthenticatePeer(peerID signature.PublicKey) error + AuthenticatePeer(peerID signature.PublicKey, msg *Message) error // HandlePeerMessage handles an incoming message from a peer. // @@ -153,7 +153,7 @@ func (h *topicHandler) dispatchMessage(peerID core.PeerID, m *queuedMsg, isIniti // Perhaps this should reject the message, but it is possible that // the local node is just behind. This does result in stale messages // getting retried though. - if err = h.handler.AuthenticatePeer(m.from); err != nil { + if err = h.handler.AuthenticatePeer(m.from, m.msg); err != nil { return err }