diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 3fddde6ecd2a..c4bad6ce5e67 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -465,6 +465,11 @@ func (g *Guard) LatchSpans() *spanset.SpanSet { return g.Req.LatchSpans } +// LockSpans returns the maximal set of lock spans that the request will access. +func (g *Guard) LockSpans() *spanset.SpanSet { + return g.Req.LockSpans +} + // HoldingLatches returned whether the guard is holding latches or not. func (g *Guard) HoldingLatches() bool { return g != nil && g.lg != nil diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 1672cc79cb8f..5837fa2bd751 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -765,7 +765,7 @@ func (r *Replica) evaluateProposal( idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, lul hlc.Timestamp, - latchSpans *spanset.SpanSet, + latchSpans, lockSpans *spanset.SpanSet, ) (*result.Result, bool, *roachpb.Error) { if ba.Timestamp.IsEmpty() { return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp") @@ -781,7 +781,7 @@ func (r *Replica) evaluateProposal( // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. - batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, lul, latchSpans) + batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, lul, latchSpans, lockSpans) // Note: reusing the proposer's batch when applying the command on the // proposer was explored as an optimization but resulted in no performance @@ -912,9 +912,9 @@ func (r *Replica) requestToProposal( ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus, lul hlc.Timestamp, - latchSpans *spanset.SpanSet, + latchSpans, lockSpans *spanset.SpanSet, ) (*ProposalData, *roachpb.Error) { - res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, lul, latchSpans) + res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, lul, latchSpans, lockSpans) // Fill out the results even if pErr != nil; we'll return the error below. proposal := &ProposalData{ diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 534946493657..aac31229ff18 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -86,7 +86,7 @@ func (r *Replica) evalAndPropose( ) (chan proposalResult, func(), int64, *roachpb.Error) { defer tok.DoneIfNotMoved(ctx) idKey := makeIDKey() - proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, lul, g.LatchSpans()) + proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, lul, g.LatchSpans(), g.LockSpans()) log.Event(proposal.ctx, "evaluated request") // If the request hit a server-side concurrency retry error, immediately diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 92c6e27c1a5e..48d16f6b15c1 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -8115,7 +8115,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ba.Timestamp = tc.Clock().Now() ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(id)}}) st := r.CurrentLeaseStatus(ctx) - cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, hlc.Timestamp{}, &allSpans) + cmd, pErr := r.requestToProposal(ctx, kvserverbase.CmdIDKey(id), &ba, st, hlc.Timestamp{}, &allSpans, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8237,7 +8237,7 @@ func TestReplicaRefreshMultiple(t *testing.T) { incCmdID = makeIDKey() atomic.StoreInt32(&filterActive, 1) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), hlc.Timestamp{}, &allSpans) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, &ba, repl.CurrentLeaseStatus(ctx), hlc.Timestamp{}, &allSpans, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8687,7 +8687,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { assignSeqNumsForReqs(txn, &txnPut, &txnPut2) origTxn := txn.Clone() - batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, hlc.Timestamp{}, &allSpans) + batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, makeIDKey(), &ba, hlc.Timestamp{}, &allSpans, &allSpans) defer batch.Close() if pErr != nil { t.Fatal(pErr) @@ -12906,7 +12906,7 @@ func TestContainsEstimatesClampProposal(t *testing.T) { ba.Timestamp = tc.Clock().Now() req := putArgs(roachpb.Key("some-key"), []byte("some-value")) ba.Add(&req) - proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), hlc.Timestamp{}, &allSpans) + proposal, err := tc.repl.requestToProposal(ctx, cmdIDKey, &ba, tc.repl.CurrentLeaseStatus(ctx), hlc.Timestamp{}, &allSpans, &allSpans) if err != nil { t.Error(err) } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index b12e1e6d578e..20bdf8256e1d 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" @@ -450,7 +451,7 @@ func (r *Replica) evaluateWriteBatch( idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, lul hlc.Timestamp, - latchSpans *spanset.SpanSet, + latchSpans, lockSpans *spanset.SpanSet, ) (storage.Batch, enginepb.MVCCStats, *roachpb.BatchResponse, result.Result, *roachpb.Error) { log.Event(ctx, "executing read-write batch") @@ -466,7 +467,7 @@ func (r *Replica) evaluateWriteBatch( return nil, enginepb.MVCCStats{}, nil, result.Result{}, pErr } if ok { - res := r.evaluate1PC(ctx, idKey, ba, latchSpans) + res := r.evaluate1PC(ctx, idKey, ba, latchSpans, lockSpans) switch res.success { case onePCSucceeded: return res.batch, res.stats, res.br, res.res, nil @@ -482,7 +483,7 @@ func (r *Replica) evaluateWriteBatch( ms := new(enginepb.MVCCStats) rec := NewReplicaEvalContext(r, latchSpans) batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, ba, lul, latchSpans, nil /* deadline */) + ctx, idKey, rec, ms, ba, lul, latchSpans, lockSpans, nil /* deadline */) return batch, *ms, br, res, pErr } @@ -523,7 +524,7 @@ func (r *Replica) evaluate1PC( ctx context.Context, idKey kvserverbase.CmdIDKey, ba *roachpb.BatchRequest, - latchSpans *spanset.SpanSet, + latchSpans, lockSpans *spanset.SpanSet, ) (onePCRes onePCResult) { log.VEventf(ctx, 2, "attempting 1PC execution") @@ -556,10 +557,10 @@ func (r *Replica) evaluate1PC( ms := new(enginepb.MVCCStats) if ba.CanForwardReadTimestamp { batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes( - ctx, idKey, rec, ms, &strippedBa, localUncertaintyLimit, latchSpans, etArg.Deadline) + ctx, idKey, rec, ms, &strippedBa, localUncertaintyLimit, latchSpans, lockSpans, etArg.Deadline) } else { batch, br, res, pErr = r.evaluateWriteBatchWrapper( - ctx, idKey, rec, ms, &strippedBa, localUncertaintyLimit, latchSpans) + ctx, idKey, rec, ms, &strippedBa, localUncertaintyLimit, latchSpans, lockSpans) } if pErr != nil || (!ba.CanForwardReadTimestamp && ba.Timestamp != br.Timestamp) { @@ -652,7 +653,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, lul hlc.Timestamp, - latchSpans *spanset.SpanSet, + latchSpans, lockSpans *spanset.SpanSet, deadline *hlc.Timestamp, ) (batch storage.Batch, br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { goldenMS := *ms @@ -666,7 +667,8 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( batch.Close() } - batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, lul, latchSpans) + batch, br, res, pErr = r.evaluateWriteBatchWrapper( + ctx, idKey, rec, ms, ba, lul, latchSpans, lockSpans) var success bool if pErr == nil { @@ -695,9 +697,9 @@ func (r *Replica) evaluateWriteBatchWrapper( ms *enginepb.MVCCStats, ba *roachpb.BatchRequest, lul hlc.Timestamp, - latchSpans *spanset.SpanSet, + latchSpans, lockSpans *spanset.SpanSet, ) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) { - batch, opLogger := r.newBatchedEngine(latchSpans) + batch, opLogger := r.newBatchedEngine(latchSpans, lockSpans) br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, lul, false /* readOnly */) if pErr == nil { if opLogger != nil { @@ -713,7 +715,9 @@ func (r *Replica) evaluateWriteBatchWrapper( // are enabled, it also returns an engine.OpLoggerBatch. If non-nil, then this // OpLogger is attached to the returned engine.Batch, recording all operations. // Its recording should be attached to the Result of request evaluation. -func (r *Replica) newBatchedEngine(spans *spanset.SpanSet) (storage.Batch, *storage.OpLoggerBatch) { +func (r *Replica) newBatchedEngine( + latchSpans, lockSpans *spanset.SpanSet, +) (storage.Batch, *storage.OpLoggerBatch) { batch := r.store.Engine().NewBatch() if !batch.ConsistentIterators() { // This is not currently needed for correctness, but future optimizations @@ -758,6 +762,17 @@ func (r *Replica) newBatchedEngine(spans *spanset.SpanSet) (storage.Batch, *stor batch = opLogger } if util.RaceEnabled { + // To account for separated intent accesses, we translate the lock spans + // to lock table spans. + spans := latchSpans.Copy() + lockSpans.Iterate(func(sa spanset.SpanAccess, _ spanset.SpanScope, span spanset.Span) { + ltKey, _ := keys.LockTableSingleKey(span.Key, nil) + var ltEndKey roachpb.Key + if span.EndKey != nil { + ltEndKey, _ = keys.LockTableSingleKey(span.EndKey, nil) + } + spans.AddNonMVCC(sa, roachpb.Span{Key: ltKey, EndKey: ltEndKey}) + }) // During writes we may encounter a versioned value newer than the request // timestamp, and may have to retry at a higher timestamp. This is still // safe as we're only ever writing at timestamps higher than the timestamp diff --git a/pkg/kv/kvserver/spanset/spanset.go b/pkg/kv/kvserver/spanset/spanset.go index 236105ee8ff6..49a24f59b39d 100644 --- a/pkg/kv/kvserver/spanset/spanset.go +++ b/pkg/kv/kvserver/spanset/spanset.go @@ -114,6 +114,31 @@ func (s *SpanSet) Empty() bool { return s.Len() == 0 } +// Copy copies the SpanSet. +func (s *SpanSet) Copy() *SpanSet { + n := &SpanSet{} + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + n.spans[sa][ss] = append(n.spans[sa][ss], s.spans[sa][ss]...) + } + } + return n +} + +// Iterate iterates over a SpanSet, calling the given function. +func (s *SpanSet) Iterate(f func(SpanAccess, SpanScope, Span)) { + if s == nil { + return + } + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + for _, span := range s.spans[sa][ss] { + f(sa, ss, span) + } + } + } +} + // Reserve space for N additional spans. func (s *SpanSet) Reserve(access SpanAccess, scope SpanScope, n int) { existing := s.spans[access][scope] diff --git a/pkg/kv/kvserver/spanset/spanset_test.go b/pkg/kv/kvserver/spanset/spanset_test.go index 3721f69a7f64..2827f67db39a 100644 --- a/pkg/kv/kvserver/spanset/spanset_test.go +++ b/pkg/kv/kvserver/spanset/spanset_test.go @@ -49,6 +49,54 @@ func TestSpanSetGetSpansScope(t *testing.T) { } } +func TestSpanSetCopy(t *testing.T) { + defer leaktest.AfterTest(t)() + + ss := new(SpanSet) + ss.AddMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("abc")}, hlc.Timestamp{WallTime: 123, Logical: 7}) + ss.AddNonMVCC(SpanReadWrite, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}) + + c := ss.Copy() + require.Equal(t, ss, c) + + // modifying element in ss should not modify copy + ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}) + require.NotEqual(t, ss, c) +} + +func TestSpanSetIterate(t *testing.T) { + defer leaktest.AfterTest(t)() + + spA := roachpb.Span{Key: roachpb.Key("a")} + spRO := roachpb.Span{Key: roachpb.Key("r"), EndKey: roachpb.Key("o")} + spRW := roachpb.Span{Key: roachpb.Key("r"), EndKey: roachpb.Key("w")} + spLocal := roachpb.Span{Key: keys.RangeLastGCKey(1)} + + ss := new(SpanSet) + ss.AddNonMVCC(SpanReadOnly, spLocal) + ss.AddNonMVCC(SpanReadOnly, spRO) + ss.AddNonMVCC(SpanReadOnly, spA) + ss.AddNonMVCC(SpanReadWrite, spRW) + + type item struct { + sa SpanAccess + ss SpanScope + span Span + } + expect := []item{ + {sa: SpanReadOnly, ss: SpanGlobal, span: Span{Span: spRO}}, + {sa: SpanReadOnly, ss: SpanGlobal, span: Span{Span: spA}}, + {sa: SpanReadOnly, ss: SpanLocal, span: Span{Span: spLocal}}, + {sa: SpanReadWrite, ss: SpanGlobal, span: Span{Span: spRW}}, + } + items := []item{} + ss.Iterate(func(sa SpanAccess, ss SpanScope, span Span) { + items = append(items, item{sa: sa, ss: ss, span: span}) + }) + + require.Equal(t, expect, items) +} + func TestSpanSetMerge(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/kv/kvserver/txn_recovery_integration_test.go b/pkg/kv/kvserver/txn_recovery_integration_test.go index 8c43987d3d23..b4188bf6f074 100644 --- a/pkg/kv/kvserver/txn_recovery_integration_test.go +++ b/pkg/kv/kvserver/txn_recovery_integration_test.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -340,7 +339,6 @@ func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) { // are disabled. See also: https://github.com/cockroachdb/cockroach/issues/46764 func TestTxnClearRangeIntents(t *testing.T) { defer leaktest.AfterTest(t)() - skip.UnderRaceWithIssue(t, 64088, "flaky test") defer log.Scope(t).Close(t) ctx := context.Background()