From fc55eb79c5f4951baaf60b9ef03724a1b68c9e5b Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Tue, 11 Aug 2020 18:36:59 +0200 Subject: [PATCH] go/worker/common/committee/group: Don't use roundCtx for P2P messages Since P2P message delivery is async, the roundCtx could be for the previous round and so could get cancelled prematurely. Introduce a timeout instead. --- go/worker/common/committee/group.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/go/worker/common/committee/group.go b/go/worker/common/committee/group.go index fd90e3b521a..c3889130a39 100644 --- a/go/worker/common/committee/group.go +++ b/go/worker/common/committee/group.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/opentracing/opentracing-go" opentracingExt "github.com/opentracing/opentracing-go/ext" @@ -23,11 +24,16 @@ import ( p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error" ) +// peerMessageProcessTimeout is the maximum time that peer message processing can take. +const peerMessageProcessTimeout = 10 * time.Second + // MessageHandler handles messages from other nodes. type MessageHandler interface { - // HandlePeerMessage handles a message. + // HandlePeerMessage handles a message that has already been authenticated to come from a + // registered node. // - // The message has already been authenticated to come from a registered node. + // The provided context is short-lived so if the handler needs to perform additional work, that + // should be dispatched to a separate goroutine and not block delivery. HandlePeerMessage(ctx context.Context, msg *p2p.Message) error } @@ -427,7 +433,7 @@ func (g *Group) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) e func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Message) error { // Perform some checks on the incoming message. We make sure to release the // lock before running the handler. - ctx, err := func() (context.Context, error) { + err := func() error { g.RLock() defer g.RUnlock() @@ -437,18 +443,21 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Mes switch { case msg.GroupVersion < g.activeEpoch.groupVersion: // Stale messages will never become valid. - return nil, p2pError.Permanent(fmt.Errorf("group version in the past")) + return p2pError.Permanent(fmt.Errorf("group version in the past")) case msg.GroupVersion > g.activeEpoch.groupVersion: // Messages from the future may eventually become valid. - return nil, fmt.Errorf("group version from the future") + return fmt.Errorf("group version from the future") } - return g.activeEpoch.roundCtx, nil + return nil }() if err != nil { return err } + ctx, cancel := context.WithTimeout(context.Background(), peerMessageProcessTimeout) + defer cancel() + // Import SpanContext from the message and store it in the current Context. if msg.SpanContext != nil { sc, err := tracing.SpanContextFromBinary(msg.SpanContext)