Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrency,kvserver: limited scans do optimistic latching #66059

Merged
merged 1 commit into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 164 additions & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3904,7 +3904,71 @@ func TestOptimisticEvalNoContention(t *testing.T) {
require.NoError(t, txn1.Commit(ctx))
}

func BenchmarkOptimisticEval(b *testing.B) {
// TestOptimisticEvalWithConcurrentWriters tests concurrently running writes
// and optimistic reads where the latter always conflict. This is just a
// sanity check to confirm that nothing fails.
func TestOptimisticEvalWithConcurrentWriters(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db := setupDBAndWriteAAndB(t)
defer s.Stopper().Stop(ctx)

finish := make(chan struct{})
var workers sync.WaitGroup
for i := 0; i < 4; i++ {
workers.Add(1)
go func() {
for {
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
return txn.Commit(ctx)
}))
select {
case _, recv := <-finish:
if !recv {
workers.Done()
return
}
default:
}
}
}()
}
for i := 0; i < 4; i++ {
workers.Add(1)
go func() {
for {
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
return err
}
err = txn.Commit(ctx)
return err
}))
select {
case _, recv := <-finish:
if !recv {
workers.Done()
return
}
default:
}
}
}()
}
time.Sleep(10 * time.Second)
close(finish)
workers.Wait()
}

// BenchmarkOptimisticEvalForLocks benchmarks optimistic evaluation when the
// potentially conflicting lock is explicitly held for a duration of time.
func BenchmarkOptimisticEvalForLocks(b *testing.B) {
defer log.Scope(b).Close(b)
ctx := context.Background()
args := base.TestServerArgs{}
Expand Down Expand Up @@ -3980,3 +4044,102 @@ func BenchmarkOptimisticEval(b *testing.B) {
})
}
}

// BenchmarkOptimisticEval benchmarks optimistic evaluation with
// - potentially conflicting latches held by 1PC transactions doing writes.
// - potentially conflicting latches or locks held by transactions doing
// writes.
func BenchmarkOptimisticEval(b *testing.B) {
defer log.Scope(b).Close(b)
ctx := context.Background()
args := base.TestServerArgs{}

for _, latches := range []bool{false, true} {
conflictWith := "latches-and-locks"
if latches {
conflictWith = "latches"
}
b.Run(conflictWith, func(b *testing.B) {
for _, realContention := range []bool{false, true} {
b.Run(fmt.Sprintf("real-contention=%t", realContention), func(b *testing.B) {
for _, numWriters := range []int{1, 4} {
b.Run(fmt.Sprintf("num-writers=%d", numWriters), func(b *testing.B) {
// Since we are doing writes in the benchmark, start with a
// fresh server each time so that we start with a fresh engine
// without many versions for a key.
s, _, db := serverutils.StartServer(b, args)
defer s.Stopper().Stop(ctx)

require.NoError(b, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
tup, err := db.Get(ctx, "a")
require.NoError(b, err)
require.NotNil(b, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(b, err)
require.NotNil(b, tup.Value)

writeKey := "b"
if realContention {
writeKey = "a"
}
finishWrites := make(chan struct{})
var writers sync.WaitGroup
for i := 0; i < numWriters; i++ {
writers.Add(1)
go func() {
for {
if latches {
require.NoError(b, db.Put(ctx, writeKey, "foo"))

} else {
require.NoError(b, db.Txn(ctx,
func(ctx context.Context, txn *kv.Txn) (err error) {
if err := txn.Put(ctx, writeKey, "foo"); err != nil {
return err
}
return txn.Commit(ctx)
}))
}
select {
case _, recv := <-finishWrites:
if !recv {
writers.Done()
return
}
default:
}
}
}()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
panic(err)
}
err = txn.Commit(ctx)
if err != nil {
panic(err)
}
return err
})
}
b.StopTimer()
close(finishWrites)
writers.Wait()
})
}
})
}
})
}
}
20 changes: 19 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ type Request struct {
type Guard struct {
Req Request
lg latchGuard
lm latchManager
ltg lockTableGuard
// The latest RequestEvalKind passed to SequenceReq.
EvalKind RequestEvalKind
Expand All @@ -411,13 +412,30 @@ type Error = roachpb.Error
// Internal Structure Interfaces //
///////////////////////////////////

// latchManager serializes access to keys and key ranges.
// latchManager serializes access to keys and key ranges. The
// {AcquireOptimistic,CheckOptimisticNoConflicts,WaitUntilAcquired} methods
// are only for use in optimistic latching.
//
// See additional documentation in pkg/storage/spanlatch.
type latchManager interface {
// Acquires latches, providing mutual exclusion for conflicting requests.
Acquire(context.Context, Request) (latchGuard, *Error)

// AcquireOptimistic is like Acquire in that it inserts latches, but it does
// not wait for conflicting latches on overlapping spans to be released
// before returning. This should be followed by CheckOptimisticNoConflicts
// to validate that not waiting did not violate correctness.
AcquireOptimistic(req Request) latchGuard

// CheckOptimisticNoConflicts returns true iff the spans in the provided
// spanset do not conflict with existing latches.
CheckOptimisticNoConflicts(lg latchGuard, spans *spanset.SpanSet) bool

// WaitUntilAcquired is meant to be called when CheckOptimisticNoConflicts
// returned false, or some other occurrence (like conflicting locks) is
// causing this request to switch to pessimistic latching.
WaitUntilAcquired(ctx context.Context, lg latchGuard) (latchGuard, *Error)

// Releases latches, relinquish its protection from conflicting requests.
Release(latchGuard)

Expand Down
81 changes: 62 additions & 19 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,43 @@ func (m *managerImpl) sequenceReqWithGuard(

// Only the first iteration can sometimes already be holding latches -- we
// use this to assert below.
first := true
firstIteration := true
for {
if !first {
g.AssertNoLatches()
}
first = false
if !g.HoldingLatches() {
// TODO(sumeer): optimistic requests could register their need for
// latches, but not actually wait until acquisition.
// https://github.com/cockroachdb/cockroach/issues/9521

// Acquire latches for the request. This synchronizes the request
// with all conflicting in-flight requests.
log.Event(ctx, "acquiring latches")
g.lg, err = m.lm.Acquire(ctx, req)
if g.EvalKind == OptimisticEval {
if !firstIteration {
// The only way we loop more than once is when conflicting locks are
// found -- see below where that happens and the comment there on
// why it will never happen with OptimisticEval.
panic("optimistic eval should not loop in sequenceReqWithGuard")
}
log.Event(ctx, "optimistically acquiring latches")
g.lg = m.lm.AcquireOptimistic(req)
g.lm = m.lm
} else {
// Acquire latches for the request. This synchronizes the request
// with all conflicting in-flight requests.
log.Event(ctx, "acquiring latches")
g.lg, err = m.lm.Acquire(ctx, req)
if err != nil {
return nil, err
}
g.lm = m.lm
}
} else {
if !firstIteration {
panic(errors.AssertionFailedf("second or later iteration cannot be holding latches"))
}
if g.EvalKind != PessimisticAfterFailedOptimisticEval {
panic("must not be holding latches")
}
log.Event(ctx, "optimistic failed, so waiting for latches")
g.lg, err = m.lm.WaitUntilAcquired(ctx, g.lg)
if err != nil {
return nil, err
}
}
firstIteration = false

// Some requests don't want the wait on locks.
if req.LockSpans.Empty() {
Expand All @@ -228,7 +246,9 @@ func (m *managerImpl) sequenceReqWithGuard(
g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg)
}

// Wait on conflicting locks, if necessary.
// Wait on conflicting locks, if necessary. Note that this will never be
// true if ScanOptimistic was called above. Therefore it will also never
// be true if latchManager.AcquireOptimistic was called.
if g.ltg.ShouldWait() {
m.lm.Release(g.moveLatchGuard())

Expand Down Expand Up @@ -549,21 +569,44 @@ func (g *Guard) AssertNoLatches() {
}
}

// CheckOptimisticNoConflicts checks that the lockSpansRead do not have a
// conflicting lock.
func (g *Guard) CheckOptimisticNoConflicts(lockSpansRead *spanset.SpanSet) (ok bool) {
// CheckOptimisticNoConflicts checks that the {latch,lock}SpansRead do not
// have a conflicting latch, lock.
func (g *Guard) CheckOptimisticNoConflicts(
latchSpansRead *spanset.SpanSet, lockSpansRead *spanset.SpanSet,
) (ok bool) {
if g.EvalKind != OptimisticEval {
panic(errors.AssertionFailedf("unexpected EvalKind: %d", g.EvalKind))
}
if g.ltg == nil {
if g.lg == nil && g.ltg == nil {
return true
}
if g.lg == nil {
panic("expected non-nil latchGuard")
}
// First check the latches, since a conflict there could mean that racing
// requests in the lock table caused a conflicting lock to not be noticed.
if g.lm.CheckOptimisticNoConflicts(g.lg, latchSpansRead) {
return g.ltg.CheckOptimisticNoConflicts(lockSpansRead)
}
return false
}

// CheckOptimisticNoLatchConflicts checks that the declared latch spans for
// the request do not have a conflicting latch.
func (g *Guard) CheckOptimisticNoLatchConflicts() (ok bool) {
if g.EvalKind != OptimisticEval {
panic(errors.AssertionFailedf("unexpected EvalKind: %d", g.EvalKind))
}
if g.lg == nil {
return true
}
return g.ltg.CheckOptimisticNoConflicts(lockSpansRead)
return g.lm.CheckOptimisticNoConflicts(g.lg, g.Req.LatchSpans)
}

func (g *Guard) moveLatchGuard() latchGuard {
lg := g.lg
g.lg = nil
g.lm = nil
return lg
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.Fatalf(t, "unknown request: %s", reqName)
}
reqs, _ := scanRequests(t, d, c)
_, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs)
return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(lockSpans))
latchSpans, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs)
return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(latchSpans, lockSpans))

case "on-lock-acquired":
var reqName string
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/concurrency/latch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

Expand All @@ -31,6 +32,25 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard
return lg, nil
}

func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard {
lg := m.m.AcquireOptimistic(req.LatchSpans)
return lg
}

func (m *latchManagerImpl) CheckOptimisticNoConflicts(lg latchGuard, spans *spanset.SpanSet) bool {
return m.m.CheckOptimisticNoConflicts(lg.(*spanlatch.Guard), spans)
}

func (m *latchManagerImpl) WaitUntilAcquired(
ctx context.Context, lg latchGuard,
) (latchGuard, *Error) {
lg, err := m.m.WaitUntilAcquired(ctx, lg.(*spanlatch.Guard))
if err != nil {
return nil, roachpb.NewError(err)
}
return lg, nil
}

func (m *latchManagerImpl) Release(lg latchGuard) {
m.m.Release(lg.(*spanlatch.Guard))
}
Expand Down
Loading