diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 3b8b8dff57bf..8bb4b3f53fc7 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3904,7 +3904,71 @@ func TestOptimisticEvalNoContention(t *testing.T) { require.NoError(t, txn1.Commit(ctx)) } -func BenchmarkOptimisticEval(b *testing.B) { +// TestOptimisticEvalWithConcurrentWriters tests concurrently running writes +// and optimistic reads where the latter always conflict. This is just a +// sanity check to confirm that nothing fails. +func TestOptimisticEvalWithConcurrentWriters(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db := setupDBAndWriteAAndB(t) + defer s.Stopper().Stop(ctx) + + finish := make(chan struct{}) + var workers sync.WaitGroup + for i := 0; i < 4; i++ { + workers.Add(1) + go func() { + for { + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + if err := txn.Put(ctx, "a", "a"); err != nil { + return err + } + return txn.Commit(ctx) + })) + select { + case _, recv := <-finish: + if !recv { + workers.Done() + return + } + default: + } + } + }() + } + for i := 0; i < 4; i++ { + workers.Add(1) + go func() { + for { + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + _, err = txn.Scan(ctx, "a", "c", 1) + if err != nil { + return err + } + err = txn.Commit(ctx) + return err + })) + select { + case _, recv := <-finish: + if !recv { + workers.Done() + return + } + default: + } + } + }() + } + time.Sleep(10 * time.Second) + close(finish) + workers.Wait() +} + +// BenchmarkOptimisticEvalForLocks benchmarks optimistic evaluation when the +// potentially conflicting lock is explicitly held for a duration of time. +func BenchmarkOptimisticEvalForLocks(b *testing.B) { defer log.Scope(b).Close(b) ctx := context.Background() args := base.TestServerArgs{} @@ -3980,3 +4044,102 @@ func BenchmarkOptimisticEval(b *testing.B) { }) } } + +// BenchmarkOptimisticEval benchmarks optimistic evaluation with +// - potentially conflicting latches held by 1PC transactions doing writes. +// - potentially conflicting latches or locks held by transactions doing +// writes. +func BenchmarkOptimisticEval(b *testing.B) { + defer log.Scope(b).Close(b) + ctx := context.Background() + args := base.TestServerArgs{} + + for _, latches := range []bool{false, true} { + conflictWith := "latches-and-locks" + if latches { + conflictWith = "latches" + } + b.Run(conflictWith, func(b *testing.B) { + for _, realContention := range []bool{false, true} { + b.Run(fmt.Sprintf("real-contention=%t", realContention), func(b *testing.B) { + for _, numWriters := range []int{1, 4} { + b.Run(fmt.Sprintf("num-writers=%d", numWriters), func(b *testing.B) { + // Since we are doing writes in the benchmark, start with a + // fresh server each time so that we start with a fresh engine + // without many versions for a key. + s, _, db := serverutils.StartServer(b, args) + defer s.Stopper().Stop(ctx) + + require.NoError(b, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + if err := txn.Put(ctx, "a", "a"); err != nil { + return err + } + if err := txn.Put(ctx, "b", "b"); err != nil { + return err + } + return txn.Commit(ctx) + })) + tup, err := db.Get(ctx, "a") + require.NoError(b, err) + require.NotNil(b, tup.Value) + tup, err = db.Get(ctx, "b") + require.NoError(b, err) + require.NotNil(b, tup.Value) + + writeKey := "b" + if realContention { + writeKey = "a" + } + finishWrites := make(chan struct{}) + var writers sync.WaitGroup + for i := 0; i < numWriters; i++ { + writers.Add(1) + go func() { + for { + if latches { + require.NoError(b, db.Put(ctx, writeKey, "foo")) + + } else { + require.NoError(b, db.Txn(ctx, + func(ctx context.Context, txn *kv.Txn) (err error) { + if err := txn.Put(ctx, writeKey, "foo"); err != nil { + return err + } + return txn.Commit(ctx) + })) + } + select { + case _, recv := <-finishWrites: + if !recv { + writers.Done() + return + } + default: + } + } + }() + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + _, err = txn.Scan(ctx, "a", "c", 1) + if err != nil { + panic(err) + } + err = txn.Commit(ctx) + if err != nil { + panic(err) + } + return err + }) + } + b.StopTimer() + close(finishWrites) + writers.Wait() + }) + } + }) + } + }) + } +} diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index ca1723691bd7..c9eeaa2ad735 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -394,6 +394,7 @@ type Request struct { type Guard struct { Req Request lg latchGuard + lm latchManager ltg lockTableGuard // The latest RequestEvalKind passed to SequenceReq. EvalKind RequestEvalKind @@ -411,13 +412,30 @@ type Error = roachpb.Error // Internal Structure Interfaces // /////////////////////////////////// -// latchManager serializes access to keys and key ranges. +// latchManager serializes access to keys and key ranges. The +// {AcquireOptimistic,CheckOptimisticNoConflicts,WaitUntilAcquired} methods +// are only for use in optimistic latching. // // See additional documentation in pkg/storage/spanlatch. type latchManager interface { // Acquires latches, providing mutual exclusion for conflicting requests. Acquire(context.Context, Request) (latchGuard, *Error) + // AcquireOptimistic is like Acquire in that it inserts latches, but it does + // not wait for conflicting latches on overlapping spans to be released + // before returning. This should be followed by CheckOptimisticNoConflicts + // to validate that not waiting did not violate correctness. + AcquireOptimistic(req Request) latchGuard + + // CheckOptimisticNoConflicts returns true iff the spans in the provided + // spanset do not conflict with existing latches. + CheckOptimisticNoConflicts(lg latchGuard, spans *spanset.SpanSet) bool + + // WaitUntilAcquired is meant to be called when CheckOptimisticNoConflicts + // returned false, or some other occurrence (like conflicting locks) is + // causing this request to switch to pessimistic latching. + WaitUntilAcquired(ctx context.Context, lg latchGuard) (latchGuard, *Error) + // Releases latches, relinquish its protection from conflicting requests. Release(latchGuard) diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index fc854f7243ef..25f0d908348f 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -189,25 +189,43 @@ func (m *managerImpl) sequenceReqWithGuard( // Only the first iteration can sometimes already be holding latches -- we // use this to assert below. - first := true + firstIteration := true for { - if !first { - g.AssertNoLatches() - } - first = false if !g.HoldingLatches() { - // TODO(sumeer): optimistic requests could register their need for - // latches, but not actually wait until acquisition. - // https://github.com/cockroachdb/cockroach/issues/9521 - - // Acquire latches for the request. This synchronizes the request - // with all conflicting in-flight requests. - log.Event(ctx, "acquiring latches") - g.lg, err = m.lm.Acquire(ctx, req) + if g.EvalKind == OptimisticEval { + if !firstIteration { + // The only way we loop more than once is when conflicting locks are + // found -- see below where that happens and the comment there on + // why it will never happen with OptimisticEval. + panic("optimistic eval should not loop in sequenceReqWithGuard") + } + log.Event(ctx, "optimistically acquiring latches") + g.lg = m.lm.AcquireOptimistic(req) + g.lm = m.lm + } else { + // Acquire latches for the request. This synchronizes the request + // with all conflicting in-flight requests. + log.Event(ctx, "acquiring latches") + g.lg, err = m.lm.Acquire(ctx, req) + if err != nil { + return nil, err + } + g.lm = m.lm + } + } else { + if !firstIteration { + panic(errors.AssertionFailedf("second or later iteration cannot be holding latches")) + } + if g.EvalKind != PessimisticAfterFailedOptimisticEval { + panic("must not be holding latches") + } + log.Event(ctx, "optimistic failed, so waiting for latches") + g.lg, err = m.lm.WaitUntilAcquired(ctx, g.lg) if err != nil { return nil, err } } + firstIteration = false // Some requests don't want the wait on locks. if req.LockSpans.Empty() { @@ -226,7 +244,9 @@ func (m *managerImpl) sequenceReqWithGuard( g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg) } - // Wait on conflicting locks, if necessary. + // Wait on conflicting locks, if necessary. Note that this will never be + // true if ScanOptimistic was called above. Therefore it will also never + // be true if latchManager.AcquireOptimistic was called. if g.ltg.ShouldWait() { m.lm.Release(g.moveLatchGuard()) @@ -547,21 +567,44 @@ func (g *Guard) AssertNoLatches() { } } -// CheckOptimisticNoConflicts checks that the lockSpansRead do not have a -// conflicting lock. -func (g *Guard) CheckOptimisticNoConflicts(lockSpansRead *spanset.SpanSet) (ok bool) { +// CheckOptimisticNoConflicts checks that the {latch,lock}SpansRead do not +// have a conflicting latch, lock. +func (g *Guard) CheckOptimisticNoConflicts( + latchSpansRead *spanset.SpanSet, lockSpansRead *spanset.SpanSet, +) (ok bool) { if g.EvalKind != OptimisticEval { panic(errors.AssertionFailedf("unexpected EvalKind: %d", g.EvalKind)) } - if g.ltg == nil { + if g.lg == nil && g.ltg == nil { + return true + } + if g.lg == nil { + panic("expected non-nil latchGuard") + } + // First check the latches, since a conflict there could mean that racing + // requests in the lock table caused a conflicting lock to not be noticed. + if g.lm.CheckOptimisticNoConflicts(g.lg, latchSpansRead) { + return g.ltg.CheckOptimisticNoConflicts(lockSpansRead) + } + return false +} + +// CheckOptimisticNoLatchConflicts checks that the declared latch spans for +// the request do not have a conflicting latch. +func (g *Guard) CheckOptimisticNoLatchConflicts() (ok bool) { + if g.EvalKind != OptimisticEval { + panic(errors.AssertionFailedf("unexpected EvalKind: %d", g.EvalKind)) + } + if g.lg == nil { return true } - return g.ltg.CheckOptimisticNoConflicts(lockSpansRead) + return g.lm.CheckOptimisticNoConflicts(g.lg, g.Req.LatchSpans) } func (g *Guard) moveLatchGuard() latchGuard { lg := g.lg g.lg = nil + g.lm = nil return lg } diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 4d2a6ef5bf8c..101d8f2b80bf 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -300,8 +300,8 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.Fatalf(t, "unknown request: %s", reqName) } reqs, _ := scanRequests(t, d, c) - _, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs) - return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(lockSpans)) + latchSpans, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs) + return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(latchSpans, lockSpans)) case "on-lock-acquired": var reqName string diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index 8befa34c6689..73c37bf85c7f 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" ) @@ -31,6 +32,25 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard return lg, nil } +func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard { + lg := m.m.AcquireOptimistic(req.LatchSpans) + return lg +} + +func (m *latchManagerImpl) CheckOptimisticNoConflicts(lg latchGuard, spans *spanset.SpanSet) bool { + return m.m.CheckOptimisticNoConflicts(lg.(*spanlatch.Guard), spans) +} + +func (m *latchManagerImpl) WaitUntilAcquired( + ctx context.Context, lg latchGuard, +) (latchGuard, *Error) { + lg, err := m.m.WaitUntilAcquired(ctx, lg.(*spanlatch.Guard)) + if err != nil { + return nil, roachpb.NewError(err) + } + return lg, nil +} + func (m *latchManagerImpl) Release(lg latchGuard) { m.m.Release(lg.(*spanlatch.Guard)) } diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic index 073e596a03df..dbd46e8bdd42 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic @@ -38,7 +38,7 @@ new-request name=req2 txn=txn2 ts=12,1 sequence req=req2 eval-kind=opt ---- [2] sequence req2: optimistically sequencing request -[2] sequence req2: acquiring latches +[2] sequence req2: optimistically acquiring latches [2] sequence req2: optimistically scanning lock table for conflicting locks [2] sequence req2: sequencing complete, returned guard @@ -56,33 +56,135 @@ check-opt-no-conflicts req=req2 ---- no-conflicts: true -# Wider span has a conflict. -check-opt-no-conflicts req=req2 +finish req=req2 +---- +[-] finish req2: finishing request + +new-request name=req3 txn=txn2 ts=12,1 + scan key=a endkey=e +---- + +# Optimistic locking for req3 +sequence req=req3 eval-kind=opt +---- +[3] sequence req3: optimistically sequencing request +[3] sequence req3: optimistically acquiring latches +[3] sequence req3: optimistically scanning lock table for conflicting locks +[3] sequence req3: sequencing complete, returned guard + +debug-lock-table +---- +global: num=1 + lock: "d" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 10.000000000,1, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +# Wider span for req3 has a conflict. +check-opt-no-conflicts req=req3 scan key=a endkey=e ---- no-conflicts: false # Sequence again -- latches are already held. -sequence req=req2 eval-kind=pess-after-opt +sequence req=req3 eval-kind=pess-after-opt ---- -[3] sequence req2: re-sequencing request after optimistic sequencing failed -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: waiting in lock wait-queues -[3] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "d" (queuedWriters: 0, queuedReaders: 1) -[3] sequence req2: pushing timestamp of txn 00000001 above 12.000000000,1 -[3] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction +[4] sequence req3: re-sequencing request after optimistic sequencing failed +[4] sequence req3: optimistic failed, so waiting for latches +[4] sequence req3: scanning lock table for conflicting locks +[4] sequence req3: waiting in lock wait-queues +[4] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "d" (queuedWriters: 0, queuedReaders: 1) +[4] sequence req3: pushing timestamp of txn 00000001 above 12.000000000,1 +[4] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction # Conflicting transaction commits. on-txn-updated txn=txn1 status=committed ---- [-] update txn: committing txn1 -[3] sequence req2: resolving intent "d" for txn 00000001 with COMMITTED status -[3] sequence req2: lock wait-queue event: done waiting -[3] sequence req2: conflicted with 00000001-0000-0000-0000-000000000000 on "d" for 1.234s -[3] sequence req2: acquiring latches -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: sequencing complete, returned guard +[4] sequence req3: resolving intent "d" for txn 00000001 with COMMITTED status +[4] sequence req3: lock wait-queue event: done waiting +[4] sequence req3: conflicted with 00000001-0000-0000-0000-000000000000 on "d" for 1.234s +[4] sequence req3: acquiring latches +[4] sequence req3: scanning lock table for conflicting locks +[4] sequence req3: sequencing complete, returned guard -finish req=req2 + +finish req=req3 ---- -[-] finish req2: finishing request +[-] finish req3: finishing request + +# Another transaction that writes, which will hold latches but not locks. +new-txn name=txn3 ts=10,1 epoch=0 +---- + +new-request name=req4 txn=txn3 ts=10,1 + put key=d value=d +---- + +sequence req=req4 +---- +[5] sequence req4: sequencing request +[5] sequence req4: acquiring latches +[5] sequence req4: scanning lock table for conflicting locks +[5] sequence req4: sequencing complete, returned guard + +debug-lock-table +---- +global: num=0 +local: num=0 + +new-request name=req5 txn=txn2 ts=12,1 + scan key=a endkey=e +---- + +sequence req=req5 eval-kind=opt +---- +[6] sequence req5: optimistically sequencing request +[6] sequence req5: optimistically acquiring latches +[6] sequence req5: optimistically scanning lock table for conflicting locks +[6] sequence req5: sequencing complete, returned guard + +# When checking with a span that does not include the existing latch, there is +# no conflict. +check-opt-no-conflicts req=req5 + scan key=a endkey=c +---- +no-conflicts: true + +finish req=req5 +---- +[-] finish req5: finishing request + +new-request name=req6 txn=txn2 ts=12,1 + scan key=a endkey=e +---- + +sequence req=req6 eval-kind=opt +---- +[7] sequence req6: optimistically sequencing request +[7] sequence req6: optimistically acquiring latches +[7] sequence req6: optimistically scanning lock table for conflicting locks +[7] sequence req6: sequencing complete, returned guard + +# Wider span for req6 has a conflict with the latch held by req4. +check-opt-no-conflicts req=req6 + scan key=a endkey=e +---- +no-conflicts: false + +sequence req=req6 eval-kind=pess-after-opt +---- +[8] sequence req6: re-sequencing request after optimistic sequencing failed +[8] sequence req6: optimistic failed, so waiting for latches +[8] sequence req6: waiting to acquire read latch {a-e}@12.000000000,1, held by write latch d@10.000000000,1 +[8] sequence req6: blocked on select in spanlatch.(*Manager).waitForSignal + +# req4 finishing releases the latch and allows req6 to proceed. +finish req=req4 +---- +[-] finish req4: finishing request +[8] sequence req6: scanning lock table for conflicting locks +[8] sequence req6: sequencing complete, returned guard + +finish req=req6 +---- +[-] finish req6: finishing request diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index e3a51cbdfdb5..ccf3c7e82597 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -73,11 +73,11 @@ func (r *Replica) executeReadOnlyBatch( // release latches immediately after we acquire an engine iterator as long // as we're performing a non-locking read. Note that this also requires that // the request is not being optimistically evaluated (optimistic evaluation - // does not check locks). It would also be nice, but not required for - // correctness, that the read-only engine eagerly create an iterator (that - // is later cloned) while the latches are held, so that this request does - // not "see" the effect of any later requests that happen after the latches - // are released. + // does not wait for latches or check locks). It would also be nice, but not + // required for correctness, that the read-only engine eagerly create an + // iterator (that is later cloned) while the latches are held, so that this + // request does not "see" the effect of any later requests that happen after + // the latches are released. var result result.Result br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes( @@ -87,26 +87,53 @@ func (r *Replica) executeReadOnlyBatch( // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { + if g.EvalKind == concurrency.OptimisticEval { + // Since this request was not holding latches, it could have raced with + // intent resolution. So we can't trust it to add discovered locks, if + // there is a latch conflict. This means that a discovered lock plus a + // latch conflict will likely cause the request to evaluate at least 3 + // times: optimistically; pessimistically and add the discovered lock; + // wait until resolution and evaluate pessimistically again. + // + // TODO(sumeer): scans and gets are correctly setting the resume span + // when returning a WriteIntentError. I am not sure about other + // concurrency errors. We could narrow the spans we check the latch + // conflicts for by using collectSpansRead as done below in the + // non-error path. + if !g.CheckOptimisticNoLatchConflicts() { + return nil, g, roachpb.NewError(roachpb.NewOptimisticEvalConflictsError()) + } + } pErr = maybeAttachLease(pErr, &st.Lease) return nil, g, pErr } - if pErr == nil && g.EvalKind == concurrency.OptimisticEval { - // Gather the spans that were read -- we distinguish the spans in the - // request from the spans that were actually read, using resume spans in - // the response. For now we ignore the latch spans, but when we stop - // waiting for latches in optimistic evaluation we will use these to check - // latches first. - _, lockSpansRead, err := r.collectSpansRead(ba, br) - if err != nil { - return nil, g, roachpb.NewError(err) - } - if ok := g.CheckOptimisticNoConflicts(lockSpansRead); !ok { + if g.EvalKind == concurrency.OptimisticEval { + if pErr == nil { + // Gather the spans that were read -- we distinguish the spans in the + // request from the spans that were actually read, using resume spans in + // the response. + latchSpansRead, lockSpansRead, err := r.collectSpansRead(ba, br) + if err != nil { + return nil, g, roachpb.NewError(err) + } + if ok := g.CheckOptimisticNoConflicts(latchSpansRead, lockSpansRead); !ok { + return nil, g, roachpb.NewError(roachpb.NewOptimisticEvalConflictsError()) + } + } else { + // There was an error, that was not classified as a concurrency retry + // error, and this request was not holding latches. This should be rare, + // and in the interest of not having subtle correctness bugs, we retry + // pessimistically. return nil, g, roachpb.NewError(roachpb.NewOptimisticEvalConflictsError()) } } // Handle any local (leaseholder-only) side-effects of the request. + // + // Processing these is fine for optimistic evaluation since these were non + // conflicting intents so intent resolution could have been racing with this + // request even if latches were held. intents := result.Local.DetachEncounteredIntents() if pErr == nil { pErr = r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 00dbfdd47831..8d324b3dbd8a 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -29,8 +29,8 @@ import ( var optimisticEvalLimitedScans = settings.RegisterBoolSetting( "kv.concurrency.optimistic_eval_limited_scans.enabled", "when true, limited scans are optimistically evaluated in the sense of not checking for "+ - "conflicting locks up front for the full key range of the scan, and instead subsequently "+ - "checking for conflicts only over the key range that was read", + "conflicting latches or locks up front for the full key range of the scan, and instead "+ + "subsequently checking for conflicts only over the key range that was read", true, ) @@ -434,9 +434,12 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, pErr } case *roachpb.OptimisticEvalConflictsError: - // We are deliberately not dropping latches. The next iteration will - // pessimistically check for locks while holding these latches, and will - // find them again and queue up, and then release latches. + // We are deliberately not dropping latches. Note that the latches are + // also optimistically acquired, in the sense of being inserted but not + // waited on. The next iteration will wait on these latches to ensure + // acquisition, and then pessimistically check for locks while holding + // these latches. If conflicting locks are found, the request will queue + // for those locks and release latches. requestEvalKind = concurrency.PessimisticAfterFailedOptimisticEval default: log.Fatalf(ctx, "unexpected concurrency retry error %T", t) @@ -779,7 +782,6 @@ func (r *Replica) collectSpans( ba *roachpb.BatchRequest, ) (latchSpans, lockSpans *spanset.SpanSet, requestEvalKind concurrency.RequestEvalKind, _ error) { latchSpans, lockSpans = new(spanset.SpanSet), new(spanset.SpanSet) - isReadOnly := ba.IsReadOnly() r.mu.RLock() desc := r.descRLocked() liveCount := r.mu.state.Stats.LiveCount @@ -808,7 +810,9 @@ func (r *Replica) collectSpans( lockSpans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) } - considerOptEval := isReadOnly && ba.Header.MaxSpanRequestKeys > 0 && + // Note that we are letting locking readers be considered for optimistic + // evaluation. This is correct, though not necessarily beneficial. + considerOptEval := ba.IsReadOnly() && ba.IsAllTransactional() && ba.Header.MaxSpanRequestKeys > 0 && optimisticEvalLimitedScans.Get(&r.ClusterSettings().SV) // When considerOptEval, these are computed below and used to decide whether // to actually do optimistic evaluation. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 4859f3aee3e4..393486f6aa81 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -2879,6 +2879,123 @@ func TestReplicaLatchingSplitDeclaresWrites(t *testing.T) { } } +// TestReplicaLatchingOptimisticEvaluation verifies that limited scans +// evaluate optimistically without waiting for latches to be acquired. In some +// cases, this allows them to avoid waiting on writes that their +// over-estimated declared spans overlapped with. +func TestReplicaLatchingOptimisticEvaluation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + // Split into two back-to-back scans for better test coverage. + sArgs1 := scanArgs([]byte("a"), []byte("c")) + sArgs2 := scanArgs([]byte("c"), []byte("e")) + baScan := roachpb.BatchRequest{} + baScan.Add(sArgs1, sArgs2) + // The state that will block a write while holding latches. + var blockKey, blockWriter atomic.Value + blockKey.Store(roachpb.Key("a")) + blockWriter.Store(false) + blockCh := make(chan struct{}, 1) + blockedCh := make(chan struct{}, 1) + // Setup filter to block the write. + tc := testContext{} + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { + // Make sure the direct GC path doesn't interfere with this test. + if !filterArgs.Req.Header().Key.Equal(blockKey.Load().(roachpb.Key)) { + return nil + } + if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { + blockedCh <- struct{}{} + <-blockCh + } + return nil + } + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + tc.StartWithStoreConfig(t, stopper, tsc) + // Write initial keys. + for _, k := range []string{"a", "b", "c", "d"} { + pArgs := putArgs([]byte(k), []byte("value")) + if _, pErr := tc.SendWrapped(&pArgs); pErr != nil { + t.Fatal(pErr) + } + } + testCases := []struct { + writeKey string + limit int64 + interferes bool + }{ + // No limit, so pessimistic latching. + {"a", 0, true}, + {"b", 0, true}, + {"c", 0, true}, + {"d", 0, true}, + {"e", 0, false}, // Only scanning from [a,e) + // Limited, with optimistic latching. + {"a", 1, true}, + {"b", 1, false}, + {"b", 2, true}, + {"c", 2, false}, + {"c", 3, true}, + {"d", 3, false}, + // Limited, with pessimistic latching since limit count is equal to number + // of keys in range. + {"d", 4, true}, + {"e", 4, false}, // Only scanning from [a,e) + {"e", 5, false}, // Only scanning from [a,e) + } + for _, test := range testCases { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + errCh := make(chan *roachpb.Error, 2) + pArgs := putArgs([]byte(test.writeKey), []byte("value")) + blockKey.Store(roachpb.Key(test.writeKey)) + blockWriter.Store(true) + go func() { + _, pErr := tc.SendWrapped(&pArgs) + errCh <- pErr + }() + <-blockedCh + // Write is now blocked while holding latches. + blockWriter.Store(false) + baScanCopy := baScan + baScanCopy.MaxSpanRequestKeys = test.limit + go func() { + _, pErr := tc.Sender().Send(context.Background(), baScanCopy) + errCh <- pErr + }() + if test.interferes { + // Neither request should complete until the write is unblocked. + select { + case <-time.After(10 * time.Millisecond): + // Expected. + case pErr := <-errCh: + t.Fatalf("expected interference: got error %s", pErr) + } + // Unblock the write. + blockCh <- struct{}{} + // Both read and write should complete with no errors. + for j := 0; j < 2; j++ { + if pErr := <-errCh; pErr != nil { + t.Errorf("error %d: unexpected error: %s", j, pErr) + } + } + } else { + // The read should complete first. + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + // The write should complete next, after it is unblocked. + blockCh <- struct{}{} + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + } + }) + } +} + // TestReplicaUseTSCache verifies that write timestamps are upgraded based on // the timestamp cache. The test performs the operations with and without the // use of synthetic timestamps. diff --git a/pkg/kv/kvserver/spanlatch/BUILD.bazel b/pkg/kv/kvserver/spanlatch/BUILD.bazel index 5fe03015bddc..131181525bcf 100644 --- a/pkg/kv/kvserver/spanlatch/BUILD.bazel +++ b/pkg/kv/kvserver/spanlatch/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index 7a20fe8171b7..66bcd4f326fa 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) // A Manager maintains an interval tree of key and key range latches. Latch @@ -113,6 +114,9 @@ type Guard struct { // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 + // Non-nil only when AcquireOptimistic has retained the snapshot for later + // checking of conflicts, and waiting. + snap *snapshot } func (lg *Guard) latches(s spanset.SpanScope, a spanset.SpanAccess) []latch { @@ -212,6 +216,106 @@ func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, return lg, nil } +// AcquireOptimistic is like Acquire, except it does not wait for latches over +// overlapping spans to be released before returning. Instead, it +// optimistically assumes that there are no currently held latches that need +// to be waited on. This can be verified after the fact by passing the Guard +// and the spans actually read to CheckOptimisticNoConflicts. +// +// Despite existing latches being ignored by this method, future calls to +// Acquire will observe the latches inserted here and will wait for them to be +// Released, as usual. +// +// The method returns a Guard which must be provided to the +// CheckOptimisticNoConflicts, Release methods. +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet) *Guard { + lg, snap := m.sequence(spans) + lg.snap = &snap + return lg +} + +// CheckOptimisticNoConflicts returns true iff the spans in the provided +// spanset do not conflict with any existing latches (in the snapshot created +// in AcquireOptimistic). It must only be called after AcquireOptimistic, and +// if it returns true, the caller can skip calling WaitUntilAcquired and it is +// sufficient to only call Release. If it returns false, the caller will +// typically call WaitUntilAcquired to wait for latch acquisition. It is also +// acceptable for the caller to skip WaitUntilAcquired and directly call +// Release, in which case it never held the latches. +func (m *Manager) CheckOptimisticNoConflicts(lg *Guard, spans *spanset.SpanSet) bool { + if lg.snap == nil { + panic(errors.AssertionFailedf("snap must not be nil")) + } + snap := lg.snap + var search latch + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + tr := &snap.trees[s] + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + ss := spans.GetSpans(a, s) + for _, sp := range ss { + search.span = sp.Span + search.ts = sp.Timestamp + switch a { + case spanset.SpanReadOnly: + // Search for writes at equal or lower timestamps. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreLater) { + return false + } + case spanset.SpanReadWrite: + // Search for all other writes. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreNothing) { + return false + } + // Search for reads at equal or higher timestamps. + it = tr[spanset.SpanReadOnly].MakeIter() + if overlaps(&it, &search, ignoreEarlier) { + return false + } + default: + panic("unknown access") + } + } + } + } + // Note that we don't call lg.snap.close() since even when this returns + // true, it is acceptable for the caller to call WaitUntilAcquired. + return true +} + +func overlaps(it *iterator, search *latch, ignore ignoreFn) bool { + for it.FirstOverlap(search); it.Valid(); it.NextOverlap(search) { + // The held latch may have already been signaled, but that doesn't allow + // us to ignore it, since it could have been held while we were + // concurrently evaluating, and we may not have observed the result of + // evaluation of that conflicting latch holder. + held := it.Cur() + if !ignore(search.ts, held.ts) { + return true + } + } + return false +} + +// WaitUntilAcquired is meant to be called when CheckOptimisticNoConflicts has +// returned false, and so the caller needs to do pessimistic latching. +func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, error) { + if lg.snap == nil { + panic(errors.AssertionFailedf("snap must not be nil")) + } + defer func() { + lg.snap.close() + lg.snap = nil + }() + err := m.wait(ctx, lg, *lg.snap) + if err != nil { + m.Release(lg) + return nil, err + } + return lg, nil +} + // sequence locks the manager, captures an immutable snapshot, inserts latches // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition @@ -427,6 +531,9 @@ func (m *Manager) waitForSignal( // owned latches. func (m *Manager) Release(lg *Guard) { lg.done.signal() + if lg.snap != nil { + lg.snap.close() + } m.mu.Lock() m.removeLocked(lg) diff --git a/pkg/kv/kvserver/spanlatch/manager_test.go b/pkg/kv/kvserver/spanlatch/manager_test.go index 129f8b8e0f44..c8f461c82d2b 100644 --- a/pkg/kv/kvserver/spanlatch/manager_test.go +++ b/pkg/kv/kvserver/spanlatch/manager_test.go @@ -536,6 +536,56 @@ func TestLatchManagerContextCancellation(t *testing.T) { testLatchSucceeds(t, lg3C) } +func TestLatchManagerOptimistic(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + // Acquire latches, no conflict. + lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS)) + require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS))) + lg1, err := m.WaitUntilAcquired(context.Background(), lg1) + require.NoError(t, err) + + // Optimistic acquire encounters conflict in some cases. + lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS)) + require.False(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "e", read, zeroTS))) + require.True(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "d", read, zeroTS))) + waitUntilAcquiredCh := func(g *Guard) <-chan *Guard { + ch := make(chan *Guard) + go func() { + g, err := m.WaitUntilAcquired(context.Background(), g) + require.NoError(t, err) + ch <- g + }() + return ch + } + ch2 := waitUntilAcquiredCh(lg2) + testLatchBlocks(t, ch2) + m.Release(lg1) + testLatchSucceeds(t, ch2) + + // Optimistic acquire encounters conflict. + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS)) + require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) + m.Release(lg2) + // There is still a conflict even though lg2 has been released. + require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) + lg3, err = m.WaitUntilAcquired(context.Background(), lg3) + require.NoError(t, err) + m.Release(lg3) + + // Optimistic acquire for read below write encounters no conflict. + oneTS, twoTS := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2} + lg4 := m.MustAcquire(spans("c", "e", write, twoTS)) + lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS)) + require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "e", read, oneTS))) + require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "c", read, oneTS))) + lg5, err = m.WaitUntilAcquired(context.Background(), lg5) + require.NoError(t, err) + m.Release(lg5) + m.Release(lg4) +} + func BenchmarkLatchManagerReadOnlyMix(b *testing.B) { for _, size := range []int{1, 4, 16, 64, 128, 256} { b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {