Skip to content

Commit

Permalink
go/worker/common/committee/group: Don't use roundCtx for P2P messages
Browse files Browse the repository at this point in the history
Since P2P message delivery is async, the roundCtx could be for the previous
round and so could get cancelled prematurely. Introduce a timeout instead.
  • Loading branch information
kostko committed Aug 12, 2020
1 parent 625d4cb commit fc55eb7
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions go/worker/common/committee/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/opentracing/opentracing-go"
opentracingExt "github.com/opentracing/opentracing-go/ext"
Expand All @@ -23,11 +24,16 @@ import (
p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error"
)

// peerMessageProcessTimeout is the maximum time that peer message processing can take.
const peerMessageProcessTimeout = 10 * time.Second

// MessageHandler handles messages from other nodes.
type MessageHandler interface {
// HandlePeerMessage handles a message.
// HandlePeerMessage handles a message that has already been authenticated to come from a
// registered node.
//
// The message has already been authenticated to come from a registered node.
// The provided context is short-lived so if the handler needs to perform additional work, that
// should be dispatched to a separate goroutine and not block delivery.
HandlePeerMessage(ctx context.Context, msg *p2p.Message) error
}

Expand Down Expand Up @@ -427,7 +433,7 @@ func (g *Group) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) e
func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Message) error {
// Perform some checks on the incoming message. We make sure to release the
// lock before running the handler.
ctx, err := func() (context.Context, error) {
err := func() error {
g.RLock()
defer g.RUnlock()

Expand All @@ -437,18 +443,21 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Mes
switch {
case msg.GroupVersion < g.activeEpoch.groupVersion:
// Stale messages will never become valid.
return nil, p2pError.Permanent(fmt.Errorf("group version in the past"))
return p2pError.Permanent(fmt.Errorf("group version in the past"))
case msg.GroupVersion > g.activeEpoch.groupVersion:
// Messages from the future may eventually become valid.
return nil, fmt.Errorf("group version from the future")
return fmt.Errorf("group version from the future")
}

return g.activeEpoch.roundCtx, nil
return nil
}()
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), peerMessageProcessTimeout)
defer cancel()

// Import SpanContext from the message and store it in the current Context.
if msg.SpanContext != nil {
sc, err := tracing.SpanContextFromBinary(msg.SpanContext)
Expand Down

0 comments on commit fc55eb7

Please sign in to comment.