From 4321fa8d20842c4e40e7bd295a0d9299270126e1 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 2 Jun 2021 16:56:08 -0400 Subject: [PATCH] concurrency,kvserver: limited scans do optimistic latching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Latches for the full spans get inserted up front in the spanlatch.Manager, and the conflict checking happens after evaluation, only over the spans that were actually read. If there is a conflict, the existing inserted latches are waited on, and execution switches to pessimistic latching and locking. The existing cluster setting for optimistic locking is used to gate this behavior. Numbers for the new OptimisticEval benchmark show improvement when real-contention=false, compared to master which only had optimistic locking. There is a modest slowdown in the real-contention=true case since every optimistic read has to try twice. The benchmark has concurrent writes of two different kinds: latches represents 1PC writers, so no intents, while latches-and-locks represent writers that will leave intents. For the latter, when the optimistic read finds an intent during evaluation it cannot necessarily add it as a discovered lock, if there is also a conflicting latch (since it could be racing with intent resolution). This case is rare in these benchmarks (latches-and-locks/real-contention=true): 13% found an intent and had a conflicting latch when num-writers=1 and < 1% for the same case when num-writers=4. The remainder (the common case) was to find a conflict when looking for conflicting latches/locks using CheckOptimisticNoConflicts. name old time/op new time/op delta OptimisticEvalForLocks/real-contention=false-16 28.2µs ± 4% 28.5µs ± 5% ~ (p=0.393 n=10+10) OptimisticEvalForLocks/real-contention=true-16 5.52ms ± 1% 5.52ms ± 1% ~ (p=0.796 n=10+10) OptimisticEval/latches-and-locks/real-contention=false/num-writers=1-16 220µs ±62% 89µs ±38% -59.37% (p=0.000 n=10+8) OptimisticEval/latches-and-locks/real-contention=false/num-writers=4-16 213µs ±65% 155µs ±82% -27.33% (p=0.015 n=10+10) OptimisticEval/latches-and-locks/real-contention=true/num-writers=1-16 1.33ms ± 3% 1.27ms ±16% ~ (p=0.829 n=8+10) OptimisticEval/latches-and-locks/real-contention=true/num-writers=4-16 2.02ms ±10% 2.25ms ± 9% +11.31% (p=0.000 n=10+10) OptimisticEval/latches/real-contention=false/num-writers=1-16 175µs ± 2% 45µs ± 5% -74.05% (p=0.000 n=10+10) OptimisticEval/latches/real-contention=false/num-writers=4-16 613µs ± 2% 44µs ± 3% -92.74% (p=0.000 n=10+9) OptimisticEval/latches/real-contention=true/num-writers=1-16 181µs ± 4% 179µs ± 5% ~ (p=0.315 n=10+10) OptimisticEval/latches/real-contention=true/num-writers=4-16 624µs ± 3% 620µs ± 3% ~ (p=0.247 n=10+10) name old alloc/op new alloc/op delta OptimisticEvalForLocks/real-contention=false-16 8.40kB ± 7% 8.33kB ± 3% ~ (p=1.000 n=10+8) OptimisticEvalForLocks/real-contention=true-16 31.8kB ± 7% 32.6kB ± 9% ~ (p=0.382 n=8+8) OptimisticEval/latches-and-locks/real-contention=false/num-writers=1-16 23.9kB ±21% 17.8kB ±25% -25.55% (p=0.003 n=10+10) OptimisticEval/latches-and-locks/real-contention=false/num-writers=4-16 24.1kB ±20% 19.4kB ±22% -19.56% (p=0.015 n=10+10) OptimisticEval/latches-and-locks/real-contention=true/num-writers=1-16 104kB ± 1% 101kB ± 1% -2.89% (p=0.000 n=8+9) OptimisticEval/latches-and-locks/real-contention=true/num-writers=4-16 197kB ± 6% 217kB ±11% +10.19% (p=0.000 n=9+10) OptimisticEval/latches/real-contention=false/num-writers=1-16 29.9kB ± 0% 13.4kB ± 1% -55.07% (p=0.000 n=9+10) OptimisticEval/latches/real-contention=false/num-writers=4-16 94.9kB ± 0% 14.5kB ± 1% -84.74% (p=0.000 n=9+8) OptimisticEval/latches/real-contention=true/num-writers=1-16 29.9kB ± 0% 31.3kB ± 0% +4.59% (p=0.000 n=10+10) OptimisticEval/latches/real-contention=true/num-writers=4-16 94.8kB ± 0% 96.2kB ± 0% +1.48% (p=0.000 n=10+10) name old allocs/op new allocs/op delta OptimisticEvalForLocks/real-contention=false-16 68.6 ± 1% 69.6 ± 2% +1.52% (p=0.005 n=9+10) OptimisticEvalForLocks/real-contention=true-16 271 ± 2% 272 ± 2% ~ (p=0.336 n=8+8) OptimisticEval/latches-and-locks/real-contention=false/num-writers=1-16 149 ±17% 117 ±18% -21.00% (p=0.002 n=10+10) OptimisticEval/latches-and-locks/real-contention=false/num-writers=4-16 151 ±16% 126 ±18% -16.31% (p=0.013 n=10+10) OptimisticEval/latches-and-locks/real-contention=true/num-writers=1-16 610 ± 0% 559 ± 1% -8.32% (p=0.000 n=8+9) OptimisticEval/latches-and-locks/real-contention=true/num-writers=4-16 1.12k ± 5% 1.19k ±12% ~ (p=0.053 n=9+10) OptimisticEval/latches/real-contention=false/num-writers=1-16 162 ± 0% 91 ± 0% -43.83% (p=0.001 n=8+9) OptimisticEval/latches/real-contention=false/num-writers=4-16 445 ± 0% 96 ± 0% -78.44% (p=0.000 n=9+9) OptimisticEval/latches/real-contention=true/num-writers=1-16 163 ± 0% 184 ± 0% +13.07% (p=0.000 n=10+10) OptimisticEval/latches/real-contention=true/num-writers=4-16 445 ± 0% 467 ± 0% +4.94% (p=0.000 n=10+10) Fixes #9521 Release note (performance improvement): A limited scan now checks for conflicting latches in an optimistic manner, which means it will not conflict with latches that were held in the scan's full spans, but were not in the spans that were scanned until the limit was reached. This behavior can be turned off (along with optimistic locking) by changing the value of the cluster setting kv.concurrency.optimistic_eval_limited_scans.enabled to false. --- pkg/kv/kvserver/client_replica_test.go | 165 +++++++++++++++++- .../concurrency/concurrency_control.go | 20 ++- .../concurrency/concurrency_manager.go | 81 +++++++-- .../concurrency/concurrency_manager_test.go | 4 +- pkg/kv/kvserver/concurrency/latch_manager.go | 20 +++ .../testdata/concurrency_manager/optimistic | 138 +++++++++++++-- pkg/kv/kvserver/replica_read.go | 59 +++++-- pkg/kv/kvserver/replica_send.go | 18 +- pkg/kv/kvserver/replica_test.go | 117 +++++++++++++ pkg/kv/kvserver/spanlatch/BUILD.bazel | 1 + pkg/kv/kvserver/spanlatch/manager.go | 107 ++++++++++++ pkg/kv/kvserver/spanlatch/manager_test.go | 50 ++++++ 12 files changed, 716 insertions(+), 64 deletions(-) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 45d48dfa6ef4..29d102407c1b 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3904,7 +3904,71 @@ func TestOptimisticEvalNoContention(t *testing.T) { require.NoError(t, txn1.Commit(ctx)) } -func BenchmarkOptimisticEval(b *testing.B) { +// TestOptimisticEvalWithConcurrentWriters tests concurrently running writes +// and optimistic reads where the latter always conflict. This is just a +// sanity check to confirm that nothing fails. +func TestOptimisticEvalWithConcurrentWriters(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db := setupDBAndWriteAAndB(t) + defer s.Stopper().Stop(ctx) + + finish := make(chan struct{}) + var workers sync.WaitGroup + for i := 0; i < 4; i++ { + workers.Add(1) + go func() { + for { + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + if err := txn.Put(ctx, "a", "a"); err != nil { + return err + } + return txn.Commit(ctx) + })) + select { + case _, recv := <-finish: + if !recv { + workers.Done() + return + } + default: + } + } + }() + } + for i := 0; i < 4; i++ { + workers.Add(1) + go func() { + for { + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + _, err = txn.Scan(ctx, "a", "c", 1) + if err != nil { + return err + } + err = txn.Commit(ctx) + return err + })) + select { + case _, recv := <-finish: + if !recv { + workers.Done() + return + } + default: + } + } + }() + } + time.Sleep(10 * time.Second) + close(finish) + workers.Wait() +} + +// BenchmarkOptimisticEvalForLocks benchmarks optimistic evaluation when the +// potentially conflicting lock is explicitly held for a duration of time. +func BenchmarkOptimisticEvalForLocks(b *testing.B) { defer log.Scope(b).Close(b) ctx := context.Background() args := base.TestServerArgs{} @@ -3980,3 +4044,102 @@ func BenchmarkOptimisticEval(b *testing.B) { }) } } + +// BenchmarkOptimisticEval benchmarks optimistic evaluation with +// - potentially conflicting latches held by 1PC transactions doing writes. +// - potentially conflicting latches or locks held by transactions doing +// writes. +func BenchmarkOptimisticEval(b *testing.B) { + defer log.Scope(b).Close(b) + ctx := context.Background() + args := base.TestServerArgs{} + + for _, latches := range []bool{false, true} { + conflictWith := "latches-and-locks" + if latches { + conflictWith = "latches" + } + b.Run(conflictWith, func(b *testing.B) { + for _, realContention := range []bool{false, true} { + b.Run(fmt.Sprintf("real-contention=%t", realContention), func(b *testing.B) { + for _, numWriters := range []int{1, 4} { + b.Run(fmt.Sprintf("num-writers=%d", numWriters), func(b *testing.B) { + // Since we are doing writes in the benchmark, start with a + // fresh server each time so that we start with a fresh engine + // without many versions for a key. + s, _, db := serverutils.StartServer(b, args) + defer s.Stopper().Stop(ctx) + + require.NoError(b, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + if err := txn.Put(ctx, "a", "a"); err != nil { + return err + } + if err := txn.Put(ctx, "b", "b"); err != nil { + return err + } + return txn.Commit(ctx) + })) + tup, err := db.Get(ctx, "a") + require.NoError(b, err) + require.NotNil(b, tup.Value) + tup, err = db.Get(ctx, "b") + require.NoError(b, err) + require.NotNil(b, tup.Value) + + writeKey := "b" + if realContention { + writeKey = "a" + } + finishWrites := make(chan struct{}) + var writers sync.WaitGroup + for i := 0; i < numWriters; i++ { + writers.Add(1) + go func() { + for { + if latches { + require.NoError(b, db.Put(ctx, writeKey, "foo")) + + } else { + require.NoError(b, db.Txn(ctx, + func(ctx context.Context, txn *kv.Txn) (err error) { + if err := txn.Put(ctx, writeKey, "foo"); err != nil { + return err + } + return txn.Commit(ctx) + })) + } + select { + case _, recv := <-finishWrites: + if !recv { + writers.Done() + return + } + default: + } + } + }() + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + _, err = txn.Scan(ctx, "a", "c", 1) + if err != nil { + panic(err) + } + err = txn.Commit(ctx) + if err != nil { + panic(err) + } + return err + }) + } + b.StopTimer() + close(finishWrites) + writers.Wait() + }) + } + }) + } + }) + } +} diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index ca1723691bd7..c9eeaa2ad735 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -394,6 +394,7 @@ type Request struct { type Guard struct { Req Request lg latchGuard + lm latchManager ltg lockTableGuard // The latest RequestEvalKind passed to SequenceReq. EvalKind RequestEvalKind @@ -411,13 +412,30 @@ type Error = roachpb.Error // Internal Structure Interfaces // /////////////////////////////////// -// latchManager serializes access to keys and key ranges. +// latchManager serializes access to keys and key ranges. The +// {AcquireOptimistic,CheckOptimisticNoConflicts,WaitUntilAcquired} methods +// are only for use in optimistic latching. // // See additional documentation in pkg/storage/spanlatch. type latchManager interface { // Acquires latches, providing mutual exclusion for conflicting requests. Acquire(context.Context, Request) (latchGuard, *Error) + // AcquireOptimistic is like Acquire in that it inserts latches, but it does + // not wait for conflicting latches on overlapping spans to be released + // before returning. This should be followed by CheckOptimisticNoConflicts + // to validate that not waiting did not violate correctness. + AcquireOptimistic(req Request) latchGuard + + // CheckOptimisticNoConflicts returns true iff the spans in the provided + // spanset do not conflict with existing latches. + CheckOptimisticNoConflicts(lg latchGuard, spans *spanset.SpanSet) bool + + // WaitUntilAcquired is meant to be called when CheckOptimisticNoConflicts + // returned false, or some other occurrence (like conflicting locks) is + // causing this request to switch to pessimistic latching. + WaitUntilAcquired(ctx context.Context, lg latchGuard) (latchGuard, *Error) + // Releases latches, relinquish its protection from conflicting requests. Release(latchGuard) diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index eaff4c2d683c..8b5899d5f521 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -191,25 +191,43 @@ func (m *managerImpl) sequenceReqWithGuard( // Only the first iteration can sometimes already be holding latches -- we // use this to assert below. - first := true + firstIteration := true for { - if !first { - g.AssertNoLatches() - } - first = false if !g.HoldingLatches() { - // TODO(sumeer): optimistic requests could register their need for - // latches, but not actually wait until acquisition. - // https://github.com/cockroachdb/cockroach/issues/9521 - - // Acquire latches for the request. This synchronizes the request - // with all conflicting in-flight requests. - log.Event(ctx, "acquiring latches") - g.lg, err = m.lm.Acquire(ctx, req) + if g.EvalKind == OptimisticEval { + if !firstIteration { + // The only way we loop more than once is when conflicting locks are + // found -- see below where that happens and the comment there on + // why it will never happen with OptimisticEval. + panic("optimistic eval should not loop in sequenceReqWithGuard") + } + log.Event(ctx, "optimistically acquiring latches") + g.lg = m.lm.AcquireOptimistic(req) + g.lm = m.lm + } else { + // Acquire latches for the request. This synchronizes the request + // with all conflicting in-flight requests. + log.Event(ctx, "acquiring latches") + g.lg, err = m.lm.Acquire(ctx, req) + if err != nil { + return nil, err + } + g.lm = m.lm + } + } else { + if !firstIteration { + panic(errors.AssertionFailedf("second or later iteration cannot be holding latches")) + } + if g.EvalKind != PessimisticAfterFailedOptimisticEval { + panic("must not be holding latches") + } + log.Event(ctx, "optimistic failed, so waiting for latches") + g.lg, err = m.lm.WaitUntilAcquired(ctx, g.lg) if err != nil { return nil, err } } + firstIteration = false // Some requests don't want the wait on locks. if req.LockSpans.Empty() { @@ -228,7 +246,9 @@ func (m *managerImpl) sequenceReqWithGuard( g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg) } - // Wait on conflicting locks, if necessary. + // Wait on conflicting locks, if necessary. Note that this will never be + // true if ScanOptimistic was called above. Therefore it will also never + // be true if latchManager.AcquireOptimistic was called. if g.ltg.ShouldWait() { m.lm.Release(g.moveLatchGuard()) @@ -549,21 +569,44 @@ func (g *Guard) AssertNoLatches() { } } -// CheckOptimisticNoConflicts checks that the lockSpansRead do not have a -// conflicting lock. -func (g *Guard) CheckOptimisticNoConflicts(lockSpansRead *spanset.SpanSet) (ok bool) { +// CheckOptimisticNoConflicts checks that the {latch,lock}SpansRead do not +// have a conflicting latch, lock. +func (g *Guard) CheckOptimisticNoConflicts( + latchSpansRead *spanset.SpanSet, lockSpansRead *spanset.SpanSet, +) (ok bool) { if g.EvalKind != OptimisticEval { panic(errors.AssertionFailedf("unexpected EvalKind: %d", g.EvalKind)) } - if g.ltg == nil { + if g.lg == nil && g.ltg == nil { + return true + } + if g.lg == nil { + panic("expected non-nil latchGuard") + } + // First check the latches, since a conflict there could mean that racing + // requests in the lock table caused a conflicting lock to not be noticed. + if g.lm.CheckOptimisticNoConflicts(g.lg, latchSpansRead) { + return g.ltg.CheckOptimisticNoConflicts(lockSpansRead) + } + return false +} + +// CheckOptimisticNoLatchConflicts checks that the declared latch spans for +// the request do not have a conflicting latch. +func (g *Guard) CheckOptimisticNoLatchConflicts() (ok bool) { + if g.EvalKind != OptimisticEval { + panic(errors.AssertionFailedf("unexpected EvalKind: %d", g.EvalKind)) + } + if g.lg == nil { return true } - return g.ltg.CheckOptimisticNoConflicts(lockSpansRead) + return g.lm.CheckOptimisticNoConflicts(g.lg, g.Req.LatchSpans) } func (g *Guard) moveLatchGuard() latchGuard { lg := g.lg g.lg = nil + g.lm = nil return lg } diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index f0e487b12f2c..55442276ed0c 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -301,8 +301,8 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.Fatalf(t, "unknown request: %s", reqName) } reqs, _ := scanRequests(t, d, c) - _, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs) - return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(lockSpans)) + latchSpans, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs) + return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(latchSpans, lockSpans)) case "on-lock-acquired": var reqName string diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index 8befa34c6689..73c37bf85c7f 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" ) @@ -31,6 +32,25 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard return lg, nil } +func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard { + lg := m.m.AcquireOptimistic(req.LatchSpans) + return lg +} + +func (m *latchManagerImpl) CheckOptimisticNoConflicts(lg latchGuard, spans *spanset.SpanSet) bool { + return m.m.CheckOptimisticNoConflicts(lg.(*spanlatch.Guard), spans) +} + +func (m *latchManagerImpl) WaitUntilAcquired( + ctx context.Context, lg latchGuard, +) (latchGuard, *Error) { + lg, err := m.m.WaitUntilAcquired(ctx, lg.(*spanlatch.Guard)) + if err != nil { + return nil, roachpb.NewError(err) + } + return lg, nil +} + func (m *latchManagerImpl) Release(lg latchGuard) { m.m.Release(lg.(*spanlatch.Guard)) } diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic index 073e596a03df..dbd46e8bdd42 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic @@ -38,7 +38,7 @@ new-request name=req2 txn=txn2 ts=12,1 sequence req=req2 eval-kind=opt ---- [2] sequence req2: optimistically sequencing request -[2] sequence req2: acquiring latches +[2] sequence req2: optimistically acquiring latches [2] sequence req2: optimistically scanning lock table for conflicting locks [2] sequence req2: sequencing complete, returned guard @@ -56,33 +56,135 @@ check-opt-no-conflicts req=req2 ---- no-conflicts: true -# Wider span has a conflict. -check-opt-no-conflicts req=req2 +finish req=req2 +---- +[-] finish req2: finishing request + +new-request name=req3 txn=txn2 ts=12,1 + scan key=a endkey=e +---- + +# Optimistic locking for req3 +sequence req=req3 eval-kind=opt +---- +[3] sequence req3: optimistically sequencing request +[3] sequence req3: optimistically acquiring latches +[3] sequence req3: optimistically scanning lock table for conflicting locks +[3] sequence req3: sequencing complete, returned guard + +debug-lock-table +---- +global: num=1 + lock: "d" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 10.000000000,1, info: unrepl epoch: 0, seqs: [0] +local: num=0 + +# Wider span for req3 has a conflict. +check-opt-no-conflicts req=req3 scan key=a endkey=e ---- no-conflicts: false # Sequence again -- latches are already held. -sequence req=req2 eval-kind=pess-after-opt +sequence req=req3 eval-kind=pess-after-opt ---- -[3] sequence req2: re-sequencing request after optimistic sequencing failed -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: waiting in lock wait-queues -[3] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "d" (queuedWriters: 0, queuedReaders: 1) -[3] sequence req2: pushing timestamp of txn 00000001 above 12.000000000,1 -[3] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction +[4] sequence req3: re-sequencing request after optimistic sequencing failed +[4] sequence req3: optimistic failed, so waiting for latches +[4] sequence req3: scanning lock table for conflicting locks +[4] sequence req3: waiting in lock wait-queues +[4] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "d" (queuedWriters: 0, queuedReaders: 1) +[4] sequence req3: pushing timestamp of txn 00000001 above 12.000000000,1 +[4] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction # Conflicting transaction commits. on-txn-updated txn=txn1 status=committed ---- [-] update txn: committing txn1 -[3] sequence req2: resolving intent "d" for txn 00000001 with COMMITTED status -[3] sequence req2: lock wait-queue event: done waiting -[3] sequence req2: conflicted with 00000001-0000-0000-0000-000000000000 on "d" for 1.234s -[3] sequence req2: acquiring latches -[3] sequence req2: scanning lock table for conflicting locks -[3] sequence req2: sequencing complete, returned guard +[4] sequence req3: resolving intent "d" for txn 00000001 with COMMITTED status +[4] sequence req3: lock wait-queue event: done waiting +[4] sequence req3: conflicted with 00000001-0000-0000-0000-000000000000 on "d" for 1.234s +[4] sequence req3: acquiring latches +[4] sequence req3: scanning lock table for conflicting locks +[4] sequence req3: sequencing complete, returned guard -finish req=req2 + +finish req=req3 ---- -[-] finish req2: finishing request +[-] finish req3: finishing request + +# Another transaction that writes, which will hold latches but not locks. +new-txn name=txn3 ts=10,1 epoch=0 +---- + +new-request name=req4 txn=txn3 ts=10,1 + put key=d value=d +---- + +sequence req=req4 +---- +[5] sequence req4: sequencing request +[5] sequence req4: acquiring latches +[5] sequence req4: scanning lock table for conflicting locks +[5] sequence req4: sequencing complete, returned guard + +debug-lock-table +---- +global: num=0 +local: num=0 + +new-request name=req5 txn=txn2 ts=12,1 + scan key=a endkey=e +---- + +sequence req=req5 eval-kind=opt +---- +[6] sequence req5: optimistically sequencing request +[6] sequence req5: optimistically acquiring latches +[6] sequence req5: optimistically scanning lock table for conflicting locks +[6] sequence req5: sequencing complete, returned guard + +# When checking with a span that does not include the existing latch, there is +# no conflict. +check-opt-no-conflicts req=req5 + scan key=a endkey=c +---- +no-conflicts: true + +finish req=req5 +---- +[-] finish req5: finishing request + +new-request name=req6 txn=txn2 ts=12,1 + scan key=a endkey=e +---- + +sequence req=req6 eval-kind=opt +---- +[7] sequence req6: optimistically sequencing request +[7] sequence req6: optimistically acquiring latches +[7] sequence req6: optimistically scanning lock table for conflicting locks +[7] sequence req6: sequencing complete, returned guard + +# Wider span for req6 has a conflict with the latch held by req4. +check-opt-no-conflicts req=req6 + scan key=a endkey=e +---- +no-conflicts: false + +sequence req=req6 eval-kind=pess-after-opt +---- +[8] sequence req6: re-sequencing request after optimistic sequencing failed +[8] sequence req6: optimistic failed, so waiting for latches +[8] sequence req6: waiting to acquire read latch {a-e}@12.000000000,1, held by write latch d@10.000000000,1 +[8] sequence req6: blocked on select in spanlatch.(*Manager).waitForSignal + +# req4 finishing releases the latch and allows req6 to proceed. +finish req=req4 +---- +[-] finish req4: finishing request +[8] sequence req6: scanning lock table for conflicting locks +[8] sequence req6: sequencing complete, returned guard + +finish req=req6 +---- +[-] finish req6: finishing request diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 28f12fa22595..ab97286ceeb6 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -73,11 +73,11 @@ func (r *Replica) executeReadOnlyBatch( // release latches immediately after we acquire an engine iterator as long // as we're performing a non-locking read. Note that this also requires that // the request is not being optimistically evaluated (optimistic evaluation - // does not check locks). It would also be nice, but not required for - // correctness, that the read-only engine eagerly create an iterator (that - // is later cloned) while the latches are held, so that this request does - // not "see" the effect of any later requests that happen after the latches - // are released. + // does not wait for latches or check locks). It would also be nice, but not + // required for correctness, that the read-only engine eagerly create an + // iterator (that is later cloned) while the latches are held, so that this + // request does not "see" the effect of any later requests that happen after + // the latches are released. var result result.Result br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes( @@ -87,26 +87,53 @@ func (r *Replica) executeReadOnlyBatch( // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { + if g.EvalKind == concurrency.OptimisticEval { + // Since this request was not holding latches, it could have raced with + // intent resolution. So we can't trust it to add discovered locks, if + // there is a latch conflict. This means that a discovered lock plus a + // latch conflict will likely cause the request to evaluate at least 3 + // times: optimistically; pessimistically and add the discovered lock; + // wait until resolution and evaluate pessimistically again. + // + // TODO(sumeer): scans and gets are correctly setting the resume span + // when returning a WriteIntentError. I am not sure about other + // concurrency errors. We could narrow the spans we check the latch + // conflicts for by using collectSpansRead as done below in the + // non-error path. + if !g.CheckOptimisticNoLatchConflicts() { + return nil, g, roachpb.NewError(roachpb.NewOptimisticEvalConflictsError()) + } + } pErr = maybeAttachLease(pErr, &st.Lease) return nil, g, pErr } - if pErr == nil && g.EvalKind == concurrency.OptimisticEval { - // Gather the spans that were read -- we distinguish the spans in the - // request from the spans that were actually read, using resume spans in - // the response. For now we ignore the latch spans, but when we stop - // waiting for latches in optimistic evaluation we will use these to check - // latches first. - _, lockSpansRead, err := r.collectSpansRead(ba, br) - if err != nil { - return nil, g, roachpb.NewError(err) - } - if ok := g.CheckOptimisticNoConflicts(lockSpansRead); !ok { + if g.EvalKind == concurrency.OptimisticEval { + if pErr == nil { + // Gather the spans that were read -- we distinguish the spans in the + // request from the spans that were actually read, using resume spans in + // the response. + latchSpansRead, lockSpansRead, err := r.collectSpansRead(ba, br) + if err != nil { + return nil, g, roachpb.NewError(err) + } + if ok := g.CheckOptimisticNoConflicts(latchSpansRead, lockSpansRead); !ok { + return nil, g, roachpb.NewError(roachpb.NewOptimisticEvalConflictsError()) + } + } else { + // There was an error, that was not classified as a concurrency retry + // error, and this request was not holding latches. This should be rare, + // and in the interest of not having subtle correctness bugs, we retry + // pessimistically. return nil, g, roachpb.NewError(roachpb.NewOptimisticEvalConflictsError()) } } // Handle any local (leaseholder-only) side-effects of the request. + // + // Processing these is fine for optimistic evaluation since these were non + // conflicting intents so intent resolution could have been racing with this + // request even if latches were held. intents := result.Local.DetachEncounteredIntents() if pErr == nil { pErr = r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 04eb0b1499f7..dfeb47eb0e8c 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -29,8 +29,8 @@ import ( var optimisticEvalLimitedScans = settings.RegisterBoolSetting( "kv.concurrency.optimistic_eval_limited_scans.enabled", "when true, limited scans are optimistically evaluated in the sense of not checking for "+ - "conflicting locks up front for the full key range of the scan, and instead subsequently "+ - "checking for conflicts only over the key range that was read", + "conflicting latches or locks up front for the full key range of the scan, and instead "+ + "subsequently checking for conflicts only over the key range that was read", true, ) @@ -434,9 +434,12 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, pErr } case *roachpb.OptimisticEvalConflictsError: - // We are deliberately not dropping latches. The next iteration will - // pessimistically check for locks while holding these latches, and will - // find them again and queue up, and then release latches. + // We are deliberately not dropping latches. Note that the latches are + // also optimistically acquired, in the sense of being inserted but not + // waited on. The next iteration will wait on these latches to ensure + // acquisition, and then pessimistically check for locks while holding + // these latches. If conflicting locks are found, the request will queue + // for those locks and release latches. requestEvalKind = concurrency.PessimisticAfterFailedOptimisticEval default: log.Fatalf(ctx, "unexpected concurrency retry error %T", t) @@ -779,7 +782,6 @@ func (r *Replica) collectSpans( ba *roachpb.BatchRequest, ) (latchSpans, lockSpans *spanset.SpanSet, requestEvalKind concurrency.RequestEvalKind, _ error) { latchSpans, lockSpans = new(spanset.SpanSet), new(spanset.SpanSet) - isReadOnly := ba.IsReadOnly() r.mu.RLock() desc := r.descRLocked() liveCount := r.mu.state.Stats.LiveCount @@ -808,7 +810,9 @@ func (r *Replica) collectSpans( lockSpans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) } - considerOptEval := isReadOnly && ba.Header.MaxSpanRequestKeys > 0 && + // Note that we are letting locking readers be considered for optimistic + // evaluation. This is correct, though not necessarily beneficial. + considerOptEval := ba.IsReadOnly() && ba.IsAllTransactional() && ba.Header.MaxSpanRequestKeys > 0 && optimisticEvalLimitedScans.Get(&r.ClusterSettings().SV) // When considerOptEval, these are computed below and used to decide whether // to actually do optimistic evaluation. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 4859f3aee3e4..393486f6aa81 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -2879,6 +2879,123 @@ func TestReplicaLatchingSplitDeclaresWrites(t *testing.T) { } } +// TestReplicaLatchingOptimisticEvaluation verifies that limited scans +// evaluate optimistically without waiting for latches to be acquired. In some +// cases, this allows them to avoid waiting on writes that their +// over-estimated declared spans overlapped with. +func TestReplicaLatchingOptimisticEvaluation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + // Split into two back-to-back scans for better test coverage. + sArgs1 := scanArgs([]byte("a"), []byte("c")) + sArgs2 := scanArgs([]byte("c"), []byte("e")) + baScan := roachpb.BatchRequest{} + baScan.Add(sArgs1, sArgs2) + // The state that will block a write while holding latches. + var blockKey, blockWriter atomic.Value + blockKey.Store(roachpb.Key("a")) + blockWriter.Store(false) + blockCh := make(chan struct{}, 1) + blockedCh := make(chan struct{}, 1) + // Setup filter to block the write. + tc := testContext{} + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { + // Make sure the direct GC path doesn't interfere with this test. + if !filterArgs.Req.Header().Key.Equal(blockKey.Load().(roachpb.Key)) { + return nil + } + if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { + blockedCh <- struct{}{} + <-blockCh + } + return nil + } + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + tc.StartWithStoreConfig(t, stopper, tsc) + // Write initial keys. + for _, k := range []string{"a", "b", "c", "d"} { + pArgs := putArgs([]byte(k), []byte("value")) + if _, pErr := tc.SendWrapped(&pArgs); pErr != nil { + t.Fatal(pErr) + } + } + testCases := []struct { + writeKey string + limit int64 + interferes bool + }{ + // No limit, so pessimistic latching. + {"a", 0, true}, + {"b", 0, true}, + {"c", 0, true}, + {"d", 0, true}, + {"e", 0, false}, // Only scanning from [a,e) + // Limited, with optimistic latching. + {"a", 1, true}, + {"b", 1, false}, + {"b", 2, true}, + {"c", 2, false}, + {"c", 3, true}, + {"d", 3, false}, + // Limited, with pessimistic latching since limit count is equal to number + // of keys in range. + {"d", 4, true}, + {"e", 4, false}, // Only scanning from [a,e) + {"e", 5, false}, // Only scanning from [a,e) + } + for _, test := range testCases { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + errCh := make(chan *roachpb.Error, 2) + pArgs := putArgs([]byte(test.writeKey), []byte("value")) + blockKey.Store(roachpb.Key(test.writeKey)) + blockWriter.Store(true) + go func() { + _, pErr := tc.SendWrapped(&pArgs) + errCh <- pErr + }() + <-blockedCh + // Write is now blocked while holding latches. + blockWriter.Store(false) + baScanCopy := baScan + baScanCopy.MaxSpanRequestKeys = test.limit + go func() { + _, pErr := tc.Sender().Send(context.Background(), baScanCopy) + errCh <- pErr + }() + if test.interferes { + // Neither request should complete until the write is unblocked. + select { + case <-time.After(10 * time.Millisecond): + // Expected. + case pErr := <-errCh: + t.Fatalf("expected interference: got error %s", pErr) + } + // Unblock the write. + blockCh <- struct{}{} + // Both read and write should complete with no errors. + for j := 0; j < 2; j++ { + if pErr := <-errCh; pErr != nil { + t.Errorf("error %d: unexpected error: %s", j, pErr) + } + } + } else { + // The read should complete first. + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + // The write should complete next, after it is unblocked. + blockCh <- struct{}{} + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + } + }) + } +} + // TestReplicaUseTSCache verifies that write timestamps are upgraded based on // the timestamp cache. The test performs the operations with and without the // use of synthetic timestamps. diff --git a/pkg/kv/kvserver/spanlatch/BUILD.bazel b/pkg/kv/kvserver/spanlatch/BUILD.bazel index 5fe03015bddc..131181525bcf 100644 --- a/pkg/kv/kvserver/spanlatch/BUILD.bazel +++ b/pkg/kv/kvserver/spanlatch/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index 7a20fe8171b7..66bcd4f326fa 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) // A Manager maintains an interval tree of key and key range latches. Latch @@ -113,6 +114,9 @@ type Guard struct { // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 + // Non-nil only when AcquireOptimistic has retained the snapshot for later + // checking of conflicts, and waiting. + snap *snapshot } func (lg *Guard) latches(s spanset.SpanScope, a spanset.SpanAccess) []latch { @@ -212,6 +216,106 @@ func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, return lg, nil } +// AcquireOptimistic is like Acquire, except it does not wait for latches over +// overlapping spans to be released before returning. Instead, it +// optimistically assumes that there are no currently held latches that need +// to be waited on. This can be verified after the fact by passing the Guard +// and the spans actually read to CheckOptimisticNoConflicts. +// +// Despite existing latches being ignored by this method, future calls to +// Acquire will observe the latches inserted here and will wait for them to be +// Released, as usual. +// +// The method returns a Guard which must be provided to the +// CheckOptimisticNoConflicts, Release methods. +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet) *Guard { + lg, snap := m.sequence(spans) + lg.snap = &snap + return lg +} + +// CheckOptimisticNoConflicts returns true iff the spans in the provided +// spanset do not conflict with any existing latches (in the snapshot created +// in AcquireOptimistic). It must only be called after AcquireOptimistic, and +// if it returns true, the caller can skip calling WaitUntilAcquired and it is +// sufficient to only call Release. If it returns false, the caller will +// typically call WaitUntilAcquired to wait for latch acquisition. It is also +// acceptable for the caller to skip WaitUntilAcquired and directly call +// Release, in which case it never held the latches. +func (m *Manager) CheckOptimisticNoConflicts(lg *Guard, spans *spanset.SpanSet) bool { + if lg.snap == nil { + panic(errors.AssertionFailedf("snap must not be nil")) + } + snap := lg.snap + var search latch + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + tr := &snap.trees[s] + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + ss := spans.GetSpans(a, s) + for _, sp := range ss { + search.span = sp.Span + search.ts = sp.Timestamp + switch a { + case spanset.SpanReadOnly: + // Search for writes at equal or lower timestamps. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreLater) { + return false + } + case spanset.SpanReadWrite: + // Search for all other writes. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreNothing) { + return false + } + // Search for reads at equal or higher timestamps. + it = tr[spanset.SpanReadOnly].MakeIter() + if overlaps(&it, &search, ignoreEarlier) { + return false + } + default: + panic("unknown access") + } + } + } + } + // Note that we don't call lg.snap.close() since even when this returns + // true, it is acceptable for the caller to call WaitUntilAcquired. + return true +} + +func overlaps(it *iterator, search *latch, ignore ignoreFn) bool { + for it.FirstOverlap(search); it.Valid(); it.NextOverlap(search) { + // The held latch may have already been signaled, but that doesn't allow + // us to ignore it, since it could have been held while we were + // concurrently evaluating, and we may not have observed the result of + // evaluation of that conflicting latch holder. + held := it.Cur() + if !ignore(search.ts, held.ts) { + return true + } + } + return false +} + +// WaitUntilAcquired is meant to be called when CheckOptimisticNoConflicts has +// returned false, and so the caller needs to do pessimistic latching. +func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, error) { + if lg.snap == nil { + panic(errors.AssertionFailedf("snap must not be nil")) + } + defer func() { + lg.snap.close() + lg.snap = nil + }() + err := m.wait(ctx, lg, *lg.snap) + if err != nil { + m.Release(lg) + return nil, err + } + return lg, nil +} + // sequence locks the manager, captures an immutable snapshot, inserts latches // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition @@ -427,6 +531,9 @@ func (m *Manager) waitForSignal( // owned latches. func (m *Manager) Release(lg *Guard) { lg.done.signal() + if lg.snap != nil { + lg.snap.close() + } m.mu.Lock() m.removeLocked(lg) diff --git a/pkg/kv/kvserver/spanlatch/manager_test.go b/pkg/kv/kvserver/spanlatch/manager_test.go index 129f8b8e0f44..c8f461c82d2b 100644 --- a/pkg/kv/kvserver/spanlatch/manager_test.go +++ b/pkg/kv/kvserver/spanlatch/manager_test.go @@ -536,6 +536,56 @@ func TestLatchManagerContextCancellation(t *testing.T) { testLatchSucceeds(t, lg3C) } +func TestLatchManagerOptimistic(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + // Acquire latches, no conflict. + lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS)) + require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS))) + lg1, err := m.WaitUntilAcquired(context.Background(), lg1) + require.NoError(t, err) + + // Optimistic acquire encounters conflict in some cases. + lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS)) + require.False(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "e", read, zeroTS))) + require.True(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "d", read, zeroTS))) + waitUntilAcquiredCh := func(g *Guard) <-chan *Guard { + ch := make(chan *Guard) + go func() { + g, err := m.WaitUntilAcquired(context.Background(), g) + require.NoError(t, err) + ch <- g + }() + return ch + } + ch2 := waitUntilAcquiredCh(lg2) + testLatchBlocks(t, ch2) + m.Release(lg1) + testLatchSucceeds(t, ch2) + + // Optimistic acquire encounters conflict. + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS)) + require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) + m.Release(lg2) + // There is still a conflict even though lg2 has been released. + require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) + lg3, err = m.WaitUntilAcquired(context.Background(), lg3) + require.NoError(t, err) + m.Release(lg3) + + // Optimistic acquire for read below write encounters no conflict. + oneTS, twoTS := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2} + lg4 := m.MustAcquire(spans("c", "e", write, twoTS)) + lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS)) + require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "e", read, oneTS))) + require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "c", read, oneTS))) + lg5, err = m.WaitUntilAcquired(context.Background(), lg5) + require.NoError(t, err) + m.Release(lg5) + m.Release(lg4) +} + func BenchmarkLatchManagerReadOnlyMix(b *testing.B) { for _, size := range []int{1, 4, 16, 64, 128, 256} { b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {