Skip to content

Commit

Permalink
concurrency,kvserver: limited scans do optimistic latching
Browse files Browse the repository at this point in the history
Latches for the full spans get inserted up front in the
spanlatch.Manager, and the conflict checking happens after
evaluation, only over the spans that were actually read.
If there is a conflict, the existing inserted latches are waited
on, and execution switches to pessimistic latching and locking.
The existing cluster setting for optimistic locking is used to
gate this behavior.

Microbenchmark numbers:
name                                                             old time/op    new time/op    delta
OptimisticEvalForLocks/real-contention=false-16                    28.8µs ± 6%    29.4µs ± 5%     ~     (p=0.089 n=10+10)
OptimisticEvalForLocks/real-contention=true-16                     5.52ms ± 1%    5.54ms ± 1%     ~     (p=0.123 n=10+10)
OptimisticEvalForLatches/real-contention=false/num-writers=1-16    63.1µs ± 9%    32.7µs ± 4%  -48.21%  (p=0.000 n=10+10)
OptimisticEvalForLatches/real-contention=false/num-writers=4-16    66.8µs ± 3%    34.4µs ± 5%  -48.51%  (p=0.000 n=10+10)
OptimisticEvalForLatches/real-contention=false/num-writers=8-16    69.4µs ± 1%    35.7µs ± 2%  -48.47%  (p=0.000 n=10+10)
OptimisticEvalForLatches/real-contention=true/num-writers=1-16      127µs ± 5%     112µs ± 2%  -11.68%  (p=0.000 n=10+10)
OptimisticEvalForLatches/real-contention=true/num-writers=4-16      128µs ± 5%     112µs ± 2%  -12.38%  (p=0.000 n=9+10)
OptimisticEvalForLatches/real-contention=true/num-writers=8-16      140µs ± 2%     115µs ± 2%  -17.56%  (p=0.000 n=9+7)

name                                                             old alloc/op   new alloc/op   delta
OptimisticEvalForLocks/real-contention=false-16                    8.29kB ± 3%    8.38kB ± 5%     ~     (p=0.367 n=9+10)
OptimisticEvalForLocks/real-contention=true-16                     32.2kB ±10%   42.2kB ±108%     ~     (p=0.321 n=8+9)
OptimisticEvalForLatches/real-contention=false/num-writers=1-16    16.6kB ± 4%    13.1kB ± 2%  -21.07%  (p=0.000 n=9+9)
OptimisticEvalForLatches/real-contention=false/num-writers=4-16    16.6kB ± 7%    12.9kB ± 1%  -22.23%  (p=0.000 n=9+9)
OptimisticEvalForLatches/real-contention=false/num-writers=8-16    16.3kB ± 0%    12.7kB ± 1%  -22.20%  (p=0.000 n=9+9)
OptimisticEvalForLatches/real-contention=true/num-writers=1-16     26.8kB ± 0%    26.8kB ± 0%   +0.14%  (p=0.049 n=9+9)
OptimisticEvalForLatches/real-contention=true/num-writers=4-16     26.8kB ± 0%    26.8kB ± 0%   +0.12%  (p=0.027 n=8+9)
OptimisticEvalForLatches/real-contention=true/num-writers=8-16     26.8kB ± 0%    26.8kB ± 0%     ~     (p=1.000 n=10+8)

name                                                             old allocs/op  new allocs/op  delta
OptimisticEvalForLocks/real-contention=false-16                      68.6 ± 1%      70.0 ± 0%   +2.04%  (p=0.000 n=10+7)
OptimisticEvalForLocks/real-contention=true-16                        272 ± 3%       272 ± 2%     ~     (p=0.777 n=8+8)
OptimisticEvalForLatches/real-contention=false/num-writers=1-16       149 ± 0%       117 ± 1%  -21.41%  (p=0.000 n=8+10)
OptimisticEvalForLatches/real-contention=false/num-writers=4-16       150 ± 0%       116 ± 2%  -22.80%  (p=0.000 n=7+10)
OptimisticEvalForLatches/real-contention=false/num-writers=8-16       150 ± 0%       114 ± 1%  -24.20%  (p=0.000 n=9+10)
OptimisticEvalForLatches/real-contention=true/num-writers=1-16        261 ± 0%       261 ± 0%     ~     (all equal)
OptimisticEvalForLatches/real-contention=true/num-writers=4-16        261 ± 0%       261 ± 0%     ~     (p=0.471 n=9+9)
OptimisticEvalForLatches/real-contention=true/num-writers=8-16        262 ± 0%       261 ± 0%   -0.38%  (p=0.000 n=8+8)

Fixes #9521

Release note (performance improvement): A limited scan now checks
for conflicting latches in an optimistic manner, which means it will
not conflict with latches that were held in the scan's full spans,
but were not in the spans that were scanned until the limit was
reached. This behavior can be turned off (along with optimistic
locking) by changing the value of the cluster setting
kv.concurrency.optimistic_eval_limited_scans.enabled to false.
  • Loading branch information
sumeerbhola committed Jun 8, 2021
1 parent 4fb18c8 commit 1357016
Show file tree
Hide file tree
Showing 12 changed files with 581 additions and 54 deletions.
80 changes: 79 additions & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3904,7 +3904,7 @@ func TestOptimisticEvalNoContention(t *testing.T) {
require.NoError(t, txn1.Commit(ctx))
}

func BenchmarkOptimisticEval(b *testing.B) {
func BenchmarkOptimisticEvalForLocks(b *testing.B) {
defer log.Scope(b).Close(b)
ctx := context.Background()
args := base.TestServerArgs{}
Expand Down Expand Up @@ -3980,3 +3980,81 @@ func BenchmarkOptimisticEval(b *testing.B) {
})
}
}

func BenchmarkOptimisticEvalForLatches(b *testing.B) {
defer log.Scope(b).Close(b)
ctx := context.Background()
args := base.TestServerArgs{}
s, _, db := serverutils.StartServer(b, args)
defer s.Stopper().Stop(ctx)

require.NoError(b, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
b.Log(err)
}()
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
tup, err := db.Get(ctx, "a")
require.NoError(b, err)
require.NotNil(b, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(b, err)
require.NotNil(b, tup.Value)

for _, realContention := range []bool{false, true} {
for _, numWriters := range []int{1, 4, 8} {
b.Run(fmt.Sprintf("real-contention=%t/num-writers=%d", realContention, numWriters),
func(b *testing.B) {
lockStart := "b"
if realContention {
lockStart = "a"
}
finishWrites := make(chan struct{})
var writers sync.WaitGroup
for i := 0; i < 1; i++ {
writers.Add(1)
go func() {
for {
txn := db.NewTxn(ctx, "locking txn")
_, err = txn.ScanForUpdate(ctx, lockStart, "c", 0)
require.NoError(b, err)
// Normally, it would do a write here, but we don't bother.
require.NoError(b, txn.Commit(ctx))
select {
case _, recv := <-finishWrites:
if !recv {
writers.Done()
return
}
default:
}
}
}()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
panic(err)
}
err = txn.Commit(ctx)
if err != nil {
panic(err)
}
return err
})
}
b.StopTimer()
close(finishWrites)
writers.Wait()
})
}
}
}
20 changes: 19 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ type Request struct {
type Guard struct {
Req Request
lg latchGuard
lm latchManager
ltg lockTableGuard
// The latest RequestEvalKind passed to SequenceReq.
EvalKind RequestEvalKind
Expand All @@ -411,13 +412,30 @@ type Error = roachpb.Error
// Internal Structure Interfaces //
///////////////////////////////////

// latchManager serializes access to keys and key ranges.
// latchManager serializes access to keys and key ranges. The
// {AcquireOptimistic,CheckOptimisticNoConflicts,WaitUntilAcquired} methods
// are only for use in optimistic latching.
//
// See additional documentation in pkg/storage/spanlatch.
type latchManager interface {
// Acquires latches, providing mutual exclusion for conflicting requests.
Acquire(context.Context, Request) (latchGuard, *Error)

// AcquireOptimistic is like Acquire in that it inserts latches, but it does
// not wait for conflicting latches on overlapping spans to be released
// before returning. This should be followed by CheckOptimisticNoConflicts
// to validate that not waiting did not violate correctness.
AcquireOptimistic(req Request) latchGuard

// CheckOptimisticNoConflicts returns true iff the spans in the provided
// spanset do not conflict with existing latches.
CheckOptimisticNoConflicts(lg latchGuard, spans *spanset.SpanSet) bool

// WaitUntilAcquired is meant to be called when CheckOptimisticNoConflicts
// returned false, or some other occurrence (like conflicting locks) is
// causing this request to switch to pessimistic latching.
WaitUntilAcquired(ctx context.Context, lg latchGuard) (latchGuard, *Error)

// Releases latches, relinquish its protection from conflicting requests.
Release(latchGuard)

Expand Down
65 changes: 48 additions & 17 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,25 +189,43 @@ func (m *managerImpl) sequenceReqWithGuard(

// Only the first iteration can sometimes already be holding latches -- we
// use this to assert below.
first := true
firstIteration := true
for {
if !first {
g.AssertNoLatches()
}
first = false
if !g.HoldingLatches() {
// TODO(sumeer): optimistic requests could register their need for
// latches, but not actually wait until acquisition.
// https://github.com/cockroachdb/cockroach/issues/9521

// Acquire latches for the request. This synchronizes the request
// with all conflicting in-flight requests.
log.Event(ctx, "acquiring latches")
g.lg, err = m.lm.Acquire(ctx, req)
if g.EvalKind == OptimisticEval {
if !firstIteration {
// The only way we loop more than once is when conflicting locks are
// found -- see below where that happens and the comment there on
// why it will never happen with OptimisticEval.
panic("optimistic eval should not loop in sequenceReqWithGuard")
}
log.Event(ctx, "optimistically acquiring latches")
g.lg = m.lm.AcquireOptimistic(req)
g.lm = m.lm
} else {
// Acquire latches for the request. This synchronizes the request
// with all conflicting in-flight requests.
log.Event(ctx, "acquiring latches")
g.lg, err = m.lm.Acquire(ctx, req)
if err != nil {
return nil, err
}
g.lm = m.lm
}
} else {
if !firstIteration {
panic(errors.AssertionFailedf("second or later iteration cannot be holding latches"))
}
if g.EvalKind != PessimisticAfterFailedOptimisticEval {
panic("must not be holding latches")
}
log.Event(ctx, "optimistic failed, so waiting for latches")
g.lg, err = m.lm.WaitUntilAcquired(ctx, g.lg)
if err != nil {
return nil, err
}
}
firstIteration = false

// Some requests don't want the wait on locks.
if req.LockSpans.Empty() {
Expand All @@ -226,7 +244,9 @@ func (m *managerImpl) sequenceReqWithGuard(
g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg)
}

// Wait on conflicting locks, if necessary.
// Wait on conflicting locks, if necessary. Note that this will never be
// true if ScanOptimistic was called above. Therefore it will also never
// be true if latchManager.AcquireOptimistic was called.
if g.ltg.ShouldWait() {
m.lm.Release(g.moveLatchGuard())

Expand Down Expand Up @@ -549,19 +569,30 @@ func (g *Guard) AssertNoLatches() {

// CheckOptimisticNoConflicts checks that the lockSpansRead do not have a
// conflicting lock.
func (g *Guard) CheckOptimisticNoConflicts(lockSpansRead *spanset.SpanSet) (ok bool) {
func (g *Guard) CheckOptimisticNoConflicts(
latchSpansRead *spanset.SpanSet, lockSpansRead *spanset.SpanSet,
) (ok bool) {
if g.EvalKind != OptimisticEval {
panic(errors.AssertionFailedf("unexpected EvalKind: %d", g.EvalKind))
}
if g.ltg == nil {
if g.lg == nil && g.ltg == nil {
return true
}
return g.ltg.CheckOptimisticNoConflicts(lockSpansRead)
if g.lg == nil {
panic("expected non-nil latchGuard")
}
// First check the latches, since a conflict there could mean that racing
// requests in the lock table caused a conflicting lock to not be noticed.
if g.lm.CheckOptimisticNoConflicts(g.lg, latchSpansRead) {
return g.ltg.CheckOptimisticNoConflicts(lockSpansRead)
}
return false
}

func (g *Guard) moveLatchGuard() latchGuard {
lg := g.lg
g.lg = nil
g.lm = nil
return lg
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.Fatalf(t, "unknown request: %s", reqName)
}
reqs, _ := scanRequests(t, d, c)
_, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs)
return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(lockSpans))
latchSpans, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs)
return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(latchSpans, lockSpans))

case "on-lock-acquired":
var reqName string
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/concurrency/latch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

Expand All @@ -31,6 +32,25 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard
return lg, nil
}

func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard {
lg := m.m.AcquireOptimistic(req.LatchSpans)
return lg
}

func (m *latchManagerImpl) CheckOptimisticNoConflicts(lg latchGuard, spans *spanset.SpanSet) bool {
return m.m.CheckOptimisticNoConflicts(lg.(*spanlatch.Guard), spans)
}

func (m *latchManagerImpl) WaitUntilAcquired(
ctx context.Context, lg latchGuard,
) (latchGuard, *Error) {
lg, err := m.m.WaitUntilAcquired(ctx, lg.(*spanlatch.Guard))
if err != nil {
return nil, roachpb.NewError(err)
}
return lg, nil
}

func (m *latchManagerImpl) Release(lg latchGuard) {
m.m.Release(lg.(*spanlatch.Guard))
}
Expand Down
Loading

0 comments on commit 1357016

Please sign in to comment.