Skip to content

Commit

Permalink
storage: fix panic tracing raft requests
Browse files Browse the repository at this point in the history
Before this PR, when using vmodule=raft >= 2, it was possible for concurrent
proposal cancelation to lead to span being closed before logging an event.
This PR adds a contract to ProposalData.ctx that after a proposal has been
submitted to raft it can only be modified while holding the Replica.raftMu.
This is acceptable because cancelation is not performance sensitive.

The test reliably reproduced the panic when run under stress within a minute
and has been run for 10 minutes after the change without a failure.

Release note: None
  • Loading branch information
ajwerner committed Apr 25, 2019
1 parent 3e65c95 commit 4da0e1a
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 4 deletions.
58 changes: 58 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"math"
"math/rand"
"reflect"
"runtime"
"sync"
Expand Down Expand Up @@ -50,6 +51,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)
Expand Down Expand Up @@ -4519,3 +4521,59 @@ func TestStoreWaitForReplicaInit(t *testing.T) {
}
}
}

// TestTracingDoesNotRaceWithCancelation ensures that the tracing underneath
// raft does not race with tracing operations which might occur concurrently
// due to a request cancelation. When this bug existed this test only
// uncovered it when run under stress.
func TestTracingDoesNotRaceWithCancelation(t *testing.T) {
defer leaktest.AfterTest(t)()

sc := storage.TestStoreConfig(nil)
sc.TestingKnobs.TraceAllRaftEvents = true
sc.TestingKnobs.DisableSplitQueue = true
sc.TestingKnobs.DisableMergeQueue = true
mtc := &multiTestContext{
storeConfig: &sc,
clock: hlc.NewClock(hlc.UnixNano, time.Millisecond),
}
mtc.Start(t, 3)
defer mtc.Stop()

db := mtc.Store(0).DB()
ctx := context.Background()
// Make the transport flaky for the range in question to encourage proposals
// to be sent more times and ultimately traced more.
ri, err := getRangeInfo(ctx, db, roachpb.Key("foo"))
require.Nil(t, err)

for i := 0; i < 3; i++ {
mtc.transport.Listen(mtc.stores[i].Ident.StoreID, &unreliableRaftHandler{
rangeID: ri.Desc.RangeID,
RaftMessageHandler: mtc.stores[i],
dropReq: func(req *storage.RaftMessageRequest) bool {
return rand.Intn(2) == 0
},
})
}
val := []byte("asdf")
var wg sync.WaitGroup
put := func(i int) func() {
wg.Add(1)
return func() {
defer wg.Done()
totalDelay := 1 * time.Millisecond
delay := time.Duration(rand.Intn(int(totalDelay)))
startDelay := totalDelay - delay
time.Sleep(startDelay)
ctx, cancel := context.WithTimeout(context.Background(), delay)
defer cancel()
_ = db.Put(ctx, roachpb.Key(fmt.Sprintf("foo%d", i)), val)
}
}
const N = 256
for i := 0; i < N; i++ {
go put(i)()
}
wg.Wait()
}
4 changes: 2 additions & 2 deletions pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (h *SnapshotRequest_Header) IsPreemptive() bool {
// to the entries contained in ents. The vmodule level for raft must be at
// least 1.
func (r *Replica) traceEntries(ents []raftpb.Entry, event string) {
if log.V(1) {
if log.V(1) || r.store.TestingKnobs().TraceAllRaftEvents {
ids := extractIDs(nil, ents)
traceProposals(r, ids, event)
}
Expand All @@ -234,7 +234,7 @@ func (r *Replica) traceEntries(ents []raftpb.Entry, event string) {
// in entries contained in msgs. The vmodule level for raft must be at
// least 1.
func (r *Replica) traceMessageSends(msgs []raftpb.Message, event string) {
if log.V(1) {
if log.V(1) || r.store.TestingKnobs().TraceAllRaftEvents {
var ids []storagebase.CmdIDKey
for _, m := range msgs {
ids = extractIDs(ids, m.Entries)
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ import (
// evaluated, proposed to raft, and for the result of the command to
// be returned to the caller.
type ProposalData struct {
// The caller's context, used for logging proposals and reproposals.
// The caller's context, used for logging proposals, reproposals, message
// sends, and command application. In order to enable safely tracing events
// beneath, modifying this ctx field in *ProposalData requires holding the
// raftMu.
ctx context.Context

// An optional tracing span bound to the proposal. Will be cleaned
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,17 @@ func (r *Replica) evalAndPropose(
// processed): the process crashes or the local replica is removed from the
// range.
tryAbandon := func() bool {
// The raftMu must be locked to modify the context of a proposal.
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
p, ok := r.mu.proposals[idKey]
if ok {
// TODO(radu): Should this context be created via tracer.ForkCtxSpan?
// We'd need to make sure the span is finished eventually.
p.ctx = r.AnnotateCtx(context.TODO())
}
r.mu.Unlock()
return ok
}
return proposalCh, tryAbandon, maxLeaseIndex, nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ type StoreTestingKnobs struct {
// errors from failed txn pushes immediately instead of utilizing the txn
// recovery manager to recovery from the indeterminate state.
DontRecoverIndeterminateCommits bool
// TraceAllRaftEvents enables raft event tracing even when the current
// vmodule would not have enabled it.
TraceAllRaftEvents bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 4da0e1a

Please sign in to comment.