Skip to content

Commit

Permalink
go/worker/common/committee/group: Don't use roundCtx for P2P messages
Browse files Browse the repository at this point in the history
Since P2P message delivery is async, the roundCtx could be for the previous
round and so could get cancelled prematurely. Introduce a timeout instead.
  • Loading branch information
kostko committed Aug 11, 2020
1 parent 04a57b4 commit c6f88c2
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions go/worker/common/committee/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/opentracing/opentracing-go"
opentracingExt "github.com/opentracing/opentracing-go/ext"
Expand All @@ -23,6 +24,9 @@ import (
p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error"
)

// peerMessageProcessTimeout is the maximum time that peer message processing can take.
const peerMessageProcessTimeout = 10 * time.Second

// MessageHandler handles messages from other nodes.
type MessageHandler interface {
// HandlePeerMessage handles a message.
Expand Down Expand Up @@ -427,7 +431,7 @@ func (g *Group) AuthenticatePeer(peerID signature.PublicKey, msg *p2p.Message) e
func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Message) error {
// Perform some checks on the incoming message. We make sure to release the
// lock before running the handler.
ctx, err := func() (context.Context, error) {
err := func() error {
g.RLock()
defer g.RUnlock()

Expand All @@ -437,18 +441,21 @@ func (g *Group) HandlePeerMessage(unusedPeerID signature.PublicKey, msg *p2p.Mes
switch {
case msg.GroupVersion < g.activeEpoch.groupVersion:
// Stale messages will never become valid.
return nil, p2pError.Permanent(fmt.Errorf("group version in the past"))
return p2pError.Permanent(fmt.Errorf("group version in the past"))
case msg.GroupVersion > g.activeEpoch.groupVersion:
// Messages from the future may eventually become valid.
return nil, fmt.Errorf("group version from the future")
return fmt.Errorf("group version from the future")
}

return g.activeEpoch.roundCtx, nil
return nil
}()
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), peerMessageProcessTimeout)
defer cancel()

// Import SpanContext from the message and store it in the current Context.
if msg.SpanContext != nil {
sc, err := tracing.SpanContextFromBinary(msg.SpanContext)
Expand Down

0 comments on commit c6f88c2

Please sign in to comment.