Skip to content

Commit

Permalink
concurrency,kvserver: limited scans optimistically check for locks
Browse files Browse the repository at this point in the history
This optimistic checking happens after the evaluation. The evaluation
will already discover any conflicting intents, so the subsequent
checking of the lock table is primarily to find conflicting
unreplicated locks.

This structure should be easy to extend to optimistic latching.

Benchmark numbers from the new kv roachtest that does SFU to
introduce false contention:
name                                           old ops/sec  new ops/sec  delta
kv95limited-spans/enc=false/nodes=1/splt=0/seq   5.68k ± 0%   9.96k ± 1%  +75.46%  (p=0.000 n=8+9)

name                                           old p50      new p50      delta
kv95limited-spans/enc=false/nodes=1/splt=0/seq    13.1 ± 0%     5.5 ± 0%  -58.02%  (p=0.000 n=8+8)

name                                           old p95      new p95      delta
kv95limited-spans/enc=false/nodes=1/splt=0/seq    18.9 ± 0%    16.8 ± 0%  -11.11%  (p=0.001 n=8+9)

name                                           old p99      new p99      delta
kv95limited-spans/enc=false/nodes=1/splt=0/seq    22.0 ± 0%    25.6 ± 2%  +16.57%  (p=0.000 n=8+9)

Fixes #49973
Informs #9521

Release note: None
  • Loading branch information
sumeerbhola committed Jan 15, 2021
1 parent e37319a commit 2a0ea09
Show file tree
Hide file tree
Showing 13 changed files with 1,221 additions and 303 deletions.
50 changes: 41 additions & 9 deletions pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ func registerKV(r *testRegistry) {
nodes int
cpus int
readPercent int
batchSize int
blockSize int
splits int // 0 implies default, negative implies 0
encryption bool
sequential bool
duration time.Duration
tags []string
// If true, the reads are limited reads over the full span of the table.
// Currently this also enables SFU writes on the workload since this is
// geared towards testing optimistic locking and latching.
spanReads bool
batchSize int
blockSize int
splits int // 0 implies default, negative implies 0
encryption bool
sequential bool
duration time.Duration
tags []string
}
computeNumSplits := func(opts kvOptions) int {
// TODO(ajwerner): set this default to a more sane value or remove it and
Expand All @@ -62,6 +66,16 @@ func registerKV(r *testRegistry) {
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, t, c.Range(1, nodes), startArgs(fmt.Sprintf("--encrypt=%t", opts.encryption)))

if opts.splits < 0 {
// In addition to telling the workload to not split, disable load-based
// splitting.
db := c.Conn(ctx, 1)
defer db.Close()
if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'"); err != nil {
t.Fatalf("failed to disable load based splitting: %v", err)
}
}

t.Status("running workload")
m := newMonitor(ctx, c, c.Range(1, nodes))
m.Go(func(ctx context.Context) error {
Expand All @@ -72,7 +86,17 @@ func registerKV(r *testRegistry) {
opts.duration = 10 * time.Minute
}
duration := " --duration=" + ifLocal("10s", opts.duration.String())
readPercent := fmt.Sprintf(" --read-percent=%d", opts.readPercent)
var readPercent string
if opts.spanReads {
// SFU makes sense only if we repeat writes to the same key. Here
// we've arbitrarily picked a cycle-length of 1000, so 1 in 1000
// writes will contend with the limited scan wrt locking.
readPercent =
fmt.Sprintf(" --span-percent=%d --span-limit=1 --sfu-writes=true --cycle-length=1000",
opts.readPercent)
} else {
readPercent = fmt.Sprintf(" --read-percent=%d", opts.readPercent)
}
histograms := " --histograms=" + perfArtifactsDir + "/stats.json"
var batchSize string
if opts.batchSize > 0 {
Expand Down Expand Up @@ -143,14 +167,22 @@ func registerKV(r *testRegistry) {
{nodes: 3, cpus: 32, readPercent: 0, sequential: true},
{nodes: 3, cpus: 32, readPercent: 95, sequential: true},

// Configs with reads, that are of limited spans, along with SFU writes.
{nodes: 1, cpus: 8, readPercent: 95, spanReads: true, splits: -1 /* no splits */, sequential: true},
{nodes: 1, cpus: 32, readPercent: 95, spanReads: true, splits: -1 /* no splits */, sequential: true},

// Weekly larger scale configurations.
{nodes: 32, cpus: 8, readPercent: 0, tags: []string{"weekly"}, duration: time.Hour},
{nodes: 32, cpus: 8, readPercent: 95, tags: []string{"weekly"}, duration: time.Hour},
} {
opts := opts

var nameParts []string
nameParts = append(nameParts, fmt.Sprintf("kv%d", opts.readPercent))
var limitedSpanStr string
if opts.spanReads {
limitedSpanStr = "limited-spans"
}
nameParts = append(nameParts, fmt.Sprintf("kv%d%s", opts.readPercent, limitedSpanStr))
if len(opts.tags) > 0 {
nameParts = append(nameParts, strings.Join(opts.tags, "/"))
}
Expand Down
199 changes: 199 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
Expand Down Expand Up @@ -3779,3 +3780,201 @@ func TestRaftSchedulerPrioritizesNodeLiveness(t *testing.T) {
priorityID := store.RaftSchedulerPriorityID()
require.Equal(t, livenessRangeID, priorityID)
}

func TestOptimisticEvalRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := base.TestServerArgs{}
s, _, db := serverutils.StartServer(t, args)
defer s.Stopper().Stop(ctx)

require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
time.Sleep(10 * time.Millisecond)
tup, err := db.Get(ctx, "a")
require.NoError(t, err)
require.NotNil(t, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(t, err)
require.NotNil(t, tup.Value)

txn1 := db.NewTxn(ctx, "locking txn")
_, err = txn1.ScanForUpdate(ctx, "a", "c", 0)
require.NoError(t, err)

time.Sleep(10 * time.Millisecond)
readDone := make(chan error)
go func() {
readDone <- db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
return err
}
return txn.Commit(ctx)
})
}()
removedLocks := false
timer := timeutil.NewTimer()
timer.Reset(time.Second * 2)
defer timer.Stop()
done := false
for !done {
select {
case err := <-readDone:
require.NoError(t, err)
require.True(t, removedLocks)
done = true
case <-timer.C:
require.NoError(t, txn1.Commit(ctx))
removedLocks = true
}
}
}

func TestOptimisticEvalNoContention(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := base.TestServerArgs{}
s, _, db := serverutils.StartServer(t, args)
defer s.Stopper().Stop(ctx)

require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
time.Sleep(10 * time.Millisecond)
tup, err := db.Get(ctx, "a")
require.NoError(t, err)
require.NotNil(t, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(t, err)
require.NotNil(t, tup.Value)

txn1 := db.NewTxn(ctx, "locking txn")
_, err = txn1.ScanForUpdate(ctx, "b", "c", 0)
require.NoError(t, err)

time.Sleep(10 * time.Millisecond)
readDone := make(chan error)
go func() {
readDone <- db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
// There is no contention when doing optimistic evaluation, since it can read a
// which is not locked.
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
return err
}
return txn.Commit(ctx)
})
}()
err = <-readDone
require.NoError(t, err)
require.NoError(t, txn1.Commit(ctx))
}

func BenchmarkOptimisticEval(b *testing.B) {
defer log.Scope(b).Close(b)
ctx := context.Background()
args := base.TestServerArgs{}
s, _, db := serverutils.StartServer(b, args)
defer s.Stopper().Stop(ctx)

require.NoError(b, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
b.Log(err)
}()
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
time.Sleep(10 * time.Millisecond)
tup, err := db.Get(ctx, "a")
require.NoError(b, err)
require.NotNil(b, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(b, err)
require.NotNil(b, tup.Value)

for _, realContention := range []bool{false, true} {
b.Run(fmt.Sprintf("real-contention=%t", realContention),
func(b *testing.B) {
lockStart := "b"
if realContention {
lockStart = "a"
}
finishWrites := make(chan struct{})
var writers sync.WaitGroup
for i := 0; i < 1; i++ {
writers.Add(1)
go func() {
for {
txn := db.NewTxn(ctx, "locking txn")
_, err = txn.ScanForUpdate(ctx, lockStart, "c", 0)
require.NoError(b, err)
time.Sleep(5 * time.Millisecond)
// Normally, it would do a write here, but we don't bother.
require.NoError(b, txn.Commit(ctx))
select {
case _, recv := <-finishWrites:
if !recv {
writers.Done()
return
}
default:
}
}
}()
}
time.Sleep(10 * time.Millisecond)
b.ResetTimer()
for i := 0; i < b.N; i++ {
db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
panic(err)
}
err = txn.Commit(ctx)
if err != nil {
panic(err)
}
return err
})
}
b.StopTimer()
close(finishWrites)
writers.Wait()
})
}
}
57 changes: 54 additions & 3 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,20 @@ type RequestSequencer interface {
// request is guaranteed sufficient isolation for the duration of its
// evaluation, until the returned request guard is released.
// NOTE: this last part will not be true until replicated locks are pulled
// into the concurrency manager.
// into the concurrency manager. This is the normal behavior for a request
// marked as PessimisticEval. For OptimisticEval, it can optimize by not
// acquiring locks, and the request must call
// Guard.CheckOptimisticNoConflicts after evaluation. For
// PessimisticAfterFailedOptimisticEval, latches are already held.
// TODO(sumeer): change OptimisticEval to only queue the latches and not
// wait for them, so PessimisticAfterFailedOptimisticEval will wait for them.
//
// An optional existing request guard can be provided to SequenceReq. This
// allows the request's position in lock wait-queues to be retained across
// sequencing attempts. If provided, the guard should not be holding latches
// already. The expected usage of this parameter is that it will only be
// provided after acquiring a Guard from a ContentionHandler method.
// already for PessimisticEval. The expected usage of this parameter is that
// it will only be provided after acquiring a Guard from a ContentionHandler
// method (for non-OptimisticEval).
//
// If the method returns a non-nil request guard then the caller must ensure
// that the guard is eventually released by passing it to FinishReq.
Expand Down Expand Up @@ -308,6 +315,27 @@ type MetricExporter interface {
// External API Type Definitions //
///////////////////////////////////

// RequestEvalKind informs the manager of the evaluation kind for the current
// evaluation attempt. Optimistic evaluation is used for requests involving
// limited scans, where the checking of locks and latches may be (partially)
// postponed until after evaluation, using Guard.CheckOptimisticNoConflicts.
// Note that intents (replicated single-key locks) will still be observed
// during evaluation.
//
// The setting can change across different calls to SequenceReq. The current
// sequence of possibilities is:
// - OptimisticEval: when optimistic evaluation succeeds.
// - OptimisticEval, PessimisticAfterFailedOptimisticEval, PessimisticEval*:
// when optimistic evaluation failed.
// - PessimisticEval+: when only pessimistic evaluation was attempted.
type RequestEvalKind int

const (
PessimisticEval RequestEvalKind = iota
OptimisticEval
PessimisticAfterFailedOptimisticEval
)

// Request is the input to Manager.SequenceReq. The struct contains all of the
// information necessary to sequence a KV request and determine which locks and
// other in-flight requests it conflicts with.
Expand Down Expand Up @@ -351,6 +379,13 @@ type Request struct {
// (Txn == nil), all reads and writes are considered to take place at
// Timestamp.
LockSpans *spanset.SpanSet

// EvalKinds represents the evaluation kind for the current evaluation
// attempt of the request.
// TODO(sumeer): Move this into Guard. Confirm that the Request object
// passed to SequenceReq should not change across calls since we stash
// the first one into Guard.
EvalKind RequestEvalKind
}

// Guard is returned from Manager.SequenceReq. The guard is passed back in to
Expand Down Expand Up @@ -460,6 +495,13 @@ type lockTable interface {
// function.
ScanAndEnqueue(Request, lockTableGuard) lockTableGuard

// ScanOptimistic takes a snapshot of the lock table for later checking for
// conflicts, and returns a guard. It is for optimistic evaluation of
// requests that will typically scan a small subset of the spans mentioned
// in the Request. After Request evaluation, CheckOptimisticNoConflicts
// must be called on the guard.
ScanOptimistic(Request) lockTableGuard

// Dequeue removes the request from its lock wait-queues. It should be
// called when the request is finished, whether it evaluated or not. The
// guard should not be used after being dequeued.
Expand Down Expand Up @@ -599,6 +641,15 @@ type lockTableGuard interface {
// This must be called after the waiting state has transitioned to
// doneWaiting.
ResolveBeforeScanning() []roachpb.LockUpdate

// CheckOptimisticNoConflicts uses the SpanSet representing the spans that
// were actually read, to check for conflicting locks, after an optimistic
// evaluation. It returns true if there were no conflicts. See
// lockTable.ScanOptimistic for context. Note that the evaluation has
// already seen any intents (replicated single-key locks) that conflicted,
// so this checking is practically only going to find unreplicated locks
// that conflict.
CheckOptimisticNoConflicts(*spanset.SpanSet) (ok bool)
}

// lockTableWaiter is concerned with waiting in lock wait-queues for locks held
Expand Down
Loading

0 comments on commit 2a0ea09

Please sign in to comment.