Skip to content

Commit

Permalink
Merge #65124
Browse files Browse the repository at this point in the history
65124: kvserver: add lock table spans to spanset assertions r=sumeerbhola,tbg a=erikgrinaker

During race builds, we assert that engine accesses satisfy the declared
key spans. However, this did not take into account lock table accesses,
causing test failures when separated intents were enabled (which they
randomly are in tests).

This patch threads the lock spans through the replica write path and
translates them into lock table spans before passing the spans to
`spanset.NewBatch`, which does the assertions.

Resolves #64088.

Release note: None

/cc @cockroachdb/kv 

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed May 18, 2021
2 parents 3ea71eb + e4bb550 commit 08322ff
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 22 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,11 @@ func (g *Guard) LatchSpans() *spanset.SpanSet {
return g.Req.LatchSpans
}

// LockSpans returns the maximal set of lock spans that the request will access.
func (g *Guard) LockSpans() *spanset.SpanSet {
return g.Req.LockSpans
}

// HoldingLatches returned whether the guard is holding latches or not.
func (g *Guard) HoldingLatches() bool {
return g != nil && g.lg != nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func (r *Replica) evaluateProposal(
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
lul hlc.Timestamp,
latchSpans *spanset.SpanSet,
latchSpans, lockSpans *spanset.SpanSet,
) (*result.Result, bool, *roachpb.Error) {
if ba.Timestamp.IsEmpty() {
return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp")
Expand All @@ -781,7 +781,7 @@ func (r *Replica) evaluateProposal(
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, lul, latchSpans)
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, lul, latchSpans, lockSpans)

// Note: reusing the proposer's batch when applying the command on the
// proposer was explored as an optimization but resulted in no performance
Expand Down Expand Up @@ -912,9 +912,9 @@ func (r *Replica) requestToProposal(
ba *roachpb.BatchRequest,
st kvserverpb.LeaseStatus,
lul hlc.Timestamp,
latchSpans *spanset.SpanSet,
latchSpans, lockSpans *spanset.SpanSet,
) (*ProposalData, *roachpb.Error) {
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, lul, latchSpans)
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, lul, latchSpans, lockSpans)

// Fill out the results even if pErr != nil; we'll return the error below.
proposal := &ProposalData{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (r *Replica) evalAndPropose(
) (chan proposalResult, func(), int64, *roachpb.Error) {
defer tok.DoneIfNotMoved(ctx)
idKey := makeIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, lul, g.LatchSpans())
proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, lul, g.LatchSpans(), g.LockSpans())
log.Event(proposal.ctx, "evaluated request")

// If the request hit a server-side concurrency retry error, immediately
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8115,7 +8115,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
ba.Timestamp = tc.Clock().Now()
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}})
st := r.CurrentLeaseStatus(ctx)
cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, hlc.Timestamp{}, &allSpans)
cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, hlc.Timestamp{}, &allSpans, &allSpans)
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -8237,7 +8237,7 @@ func TestReplicaRefreshMultiple(t *testing.T) {

incCmdID = makeIDKey()
atomic.StoreInt32(&filterActive, 1)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), hlc.Timestamp{}, &allSpans)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), hlc.Timestamp{}, &allSpans, &allSpans)
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -8687,7 +8687,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) {
assignSeqNumsForReqs(txn, &txnPut, &txnPut2)
origTxn := txn.Clone()

batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, hlc.Timestamp{}, &allSpans)
batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, hlc.Timestamp{}, &allSpans, &allSpans)
defer batch.Close()
if pErr != nil {
t.Fatal(pErr)
Expand Down Expand Up @@ -12906,7 +12906,7 @@ func TestContainsEstimatesClampProposal(t *testing.T) {
ba.Timestamp = tc.Clock().Now()
req := putArgs(roachpb.Key("some-key"), []byte("some-value"))
ba.Add(&req)
proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), hlc.Timestamp{}, &allSpans)
proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), hlc.Timestamp{}, &allSpans, &allSpans)
if err != nil {
t.Error(err)
}
Expand Down
37 changes: 26 additions & 11 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
Expand Down Expand Up @@ -450,7 +451,7 @@ func (r *Replica) evaluateWriteBatch(
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
lul hlc.Timestamp,
latchSpans *spanset.SpanSet,
latchSpans, lockSpans *spanset.SpanSet,
) (storage.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) {
log.Event(ctx, "executing read-write batch")

Expand All @@ -466,7 +467,7 @@ func (r *Replica) evaluateWriteBatch(
return nil, enginepb.MVCCStats{}, nil, result.Result{}, pErr
}
if ok {
res := r.evaluate1PC(ctx, idKey, ba, latchSpans)
res := r.evaluate1PC(ctx, idKey, ba, latchSpans, lockSpans)
switch res.success {
case onePCSucceeded:
return res.batch, res.stats, res.br, res.res, nil
Expand All @@ -482,7 +483,7 @@ func (r *Replica) evaluateWriteBatch(
ms := new(enginepb.MVCCStats)
rec := NewReplicaEvalContext(r, latchSpans)
batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes(
ctx, idKey, rec, ms, ba, lul, latchSpans, nil /* deadline */)
ctx, idKey, rec, ms, ba, lul, latchSpans, lockSpans, nil /* deadline */)
return batch, *ms, br, res, pErr
}

Expand Down Expand Up @@ -523,7 +524,7 @@ func (r *Replica) evaluate1PC(
ctx context.Context,
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
latchSpans *spanset.SpanSet,
latchSpans, lockSpans *spanset.SpanSet,
) (onePCRes onePCResult) {
log.VEventf(ctx, 2, "attempting 1PC execution")

Expand Down Expand Up @@ -556,10 +557,10 @@ func (r *Replica) evaluate1PC(
ms := new(enginepb.MVCCStats)
if ba.CanForwardReadTimestamp {
batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes(
ctx, idKey, rec, ms, &strippedBa, localUncertaintyLimit, latchSpans, etArg.Deadline)
ctx, idKey, rec, ms, &strippedBa, localUncertaintyLimit, latchSpans, lockSpans, etArg.Deadline)
} else {
batch, br, res, pErr = r.evaluateWriteBatchWrapper(
ctx, idKey, rec, ms, &strippedBa, localUncertaintyLimit, latchSpans)
ctx, idKey, rec, ms, &strippedBa, localUncertaintyLimit, latchSpans, lockSpans)
}

if pErr != nil || (!ba.CanForwardReadTimestamp && ba.Timestamp != br.Timestamp) {
Expand Down Expand Up @@ -652,7 +653,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes(
ms *enginepb.MVCCStats,
ba *roachpb.BatchRequest,
lul hlc.Timestamp,
latchSpans *spanset.SpanSet,
latchSpans, lockSpans *spanset.SpanSet,
deadline *hlc.Timestamp,
) (batch storage.Batch, br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
goldenMS := *ms
Expand All @@ -666,7 +667,8 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes(
batch.Close()
}

batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, lul, latchSpans)
batch, br, res, pErr = r.evaluateWriteBatchWrapper(
ctx, idKey, rec, ms, ba, lul, latchSpans, lockSpans)

var success bool
if pErr == nil {
Expand Down Expand Up @@ -695,9 +697,9 @@ func (r *Replica) evaluateWriteBatchWrapper(
ms *enginepb.MVCCStats,
ba *roachpb.BatchRequest,
lul hlc.Timestamp,
latchSpans *spanset.SpanSet,
latchSpans, lockSpans *spanset.SpanSet,
) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) {
batch, opLogger := r.newBatchedEngine(latchSpans)
batch, opLogger := r.newBatchedEngine(latchSpans, lockSpans)
br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, lul, false /* readOnly */)
if pErr == nil {
if opLogger != nil {
Expand All @@ -713,7 +715,9 @@ func (r *Replica) evaluateWriteBatchWrapper(
// are enabled, it also returns an engine.OpLoggerBatch. If non-nil, then this
// OpLogger is attached to the returned engine.Batch, recording all operations.
// Its recording should be attached to the Result of request evaluation.
func (r *Replica) newBatchedEngine(spans *spanset.SpanSet) (storage.Batch, *storage.OpLoggerBatch) {
func (r *Replica) newBatchedEngine(
latchSpans, lockSpans *spanset.SpanSet,
) (storage.Batch, *storage.OpLoggerBatch) {
batch := r.store.Engine().NewBatch()
if !batch.ConsistentIterators() {
// This is not currently needed for correctness, but future optimizations
Expand Down Expand Up @@ -758,6 +762,17 @@ func (r *Replica) newBatchedEngine(spans *spanset.SpanSet) (storage.Batch, *stor
batch = opLogger
}
if util.RaceEnabled {
// To account for separated intent accesses, we translate the lock spans
// to lock table spans.
spans := latchSpans.Copy()
lockSpans.Iterate(func(sa spanset.SpanAccess, _ spanset.SpanScope, span spanset.Span) {
ltKey, _ := keys.LockTableSingleKey(span.Key, nil)
var ltEndKey roachpb.Key
if span.EndKey != nil {
ltEndKey, _ = keys.LockTableSingleKey(span.EndKey, nil)
}
spans.AddNonMVCC(sa, roachpb.Span{Key: ltKey, EndKey: ltEndKey})
})
// During writes we may encounter a versioned value newer than the request
// timestamp, and may have to retry at a higher timestamp. This is still
// safe as we're only ever writing at timestamps higher than the timestamp
Expand Down
25 changes: 25 additions & 0 deletions pkg/kv/kvserver/spanset/spanset.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ func (s *SpanSet) Empty() bool {
return s.Len() == 0
}

// Copy copies the SpanSet.
func (s *SpanSet) Copy() *SpanSet {
n := &SpanSet{}
for sa := SpanAccess(0); sa < NumSpanAccess; sa++ {
for ss := SpanScope(0); ss < NumSpanScope; ss++ {
n.spans[sa][ss] = append(n.spans[sa][ss], s.spans[sa][ss]...)
}
}
return n
}

// Iterate iterates over a SpanSet, calling the given function.
func (s *SpanSet) Iterate(f func(SpanAccess, SpanScope, Span)) {
if s == nil {
return
}
for sa := SpanAccess(0); sa < NumSpanAccess; sa++ {
for ss := SpanScope(0); ss < NumSpanScope; ss++ {
for _, span := range s.spans[sa][ss] {
f(sa, ss, span)
}
}
}
}

// Reserve space for N additional spans.
func (s *SpanSet) Reserve(access SpanAccess, scope SpanScope, n int) {
existing := s.spans[access][scope]
Expand Down
48 changes: 48 additions & 0 deletions pkg/kv/kvserver/spanset/spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,54 @@ func TestSpanSetGetSpansScope(t *testing.T) {
}
}

func TestSpanSetCopy(t *testing.T) {
defer leaktest.AfterTest(t)()

ss := new(SpanSet)
ss.AddMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("abc")}, hlc.Timestamp{WallTime: 123, Logical: 7})
ss.AddNonMVCC(SpanReadWrite, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")})

c := ss.Copy()
require.Equal(t, ss, c)

// modifying element in ss should not modify copy
ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")})
require.NotEqual(t, ss, c)
}

func TestSpanSetIterate(t *testing.T) {
defer leaktest.AfterTest(t)()

spA := roachpb.Span{Key: roachpb.Key("a")}
spRO := roachpb.Span{Key: roachpb.Key("r"), EndKey: roachpb.Key("o")}
spRW := roachpb.Span{Key: roachpb.Key("r"), EndKey: roachpb.Key("w")}
spLocal := roachpb.Span{Key: keys.RangeLastGCKey(1)}

ss := new(SpanSet)
ss.AddNonMVCC(SpanReadOnly, spLocal)
ss.AddNonMVCC(SpanReadOnly, spRO)
ss.AddNonMVCC(SpanReadOnly, spA)
ss.AddNonMVCC(SpanReadWrite, spRW)

type item struct {
sa SpanAccess
ss SpanScope
span Span
}
expect := []item{
{sa: SpanReadOnly, ss: SpanGlobal, span: Span{Span: spRO}},
{sa: SpanReadOnly, ss: SpanGlobal, span: Span{Span: spA}},
{sa: SpanReadOnly, ss: SpanLocal, span: Span{Span: spLocal}},
{sa: SpanReadWrite, ss: SpanGlobal, span: Span{Span: spRW}},
}
items := []item{}
ss.Iterate(func(sa SpanAccess, ss SpanScope, span Span) {
items = append(items, item{sa: sa, ss: ss, span: span})
})

require.Equal(t, expect, items)
}

func TestSpanSetMerge(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/txn_recovery_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -340,7 +339,6 @@ func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) {
// are disabled. See also: https://github.com/cockroachdb/cockroach/issues/46764
func TestTxnClearRangeIntents(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderRaceWithIssue(t, 64088, "flaky test")
defer log.Scope(t).Close(t)

ctx := context.Background()
Expand Down

0 comments on commit 08322ff

Please sign in to comment.