Skip to content

Commit

Permalink
concurrency,kvserver: limited scans optimistically check for locks
Browse files Browse the repository at this point in the history
This optimistic checking happens after the evaluation. The evaluation
will already discover any conflicting intents, so the subsequent
checking of the lock table is primarily to find conflicting
unreplicated locks.

This structure should be easy to extend to optimistic latching.

Benchmark numbers from the new kv roachtest that does SFU to
introduce false contention:
name                                           old ops/sec  new ops/sec  delta
kv95limited-spans/enc=false/nodes=1/splt=0/seq   5.68k ± 0%   9.96k ± 1%  +75.46%  (p=0.000 n=8+9)

name                                           old p50      new p50      delta
kv95limited-spans/enc=false/nodes=1/splt=0/seq    13.1 ± 0%     5.5 ± 0%  -58.02%  (p=0.000 n=8+8)

name                                           old p95      new p95      delta
kv95limited-spans/enc=false/nodes=1/splt=0/seq    18.9 ± 0%    16.8 ± 0%  -11.11%  (p=0.001 n=8+9)

name                                           old p99      new p99      delta
kv95limited-spans/enc=false/nodes=1/splt=0/seq    22.0 ± 0%    25.6 ± 2%  +16.57%  (p=0.000 n=8+9)

Microbenchmark numbers for the OptimisticEval microbenchmark,
where the real-contention=true case typically causes optimistic
evaluation to retry.

name                                     old time/op    new time/op    delta
OptimisticEval/real-contention=false-16    5.76ms ± 5%    0.03ms ± 2%  -99.49%  (p=0.000 n=8+10)
OptimisticEval/real-contention=true-16     5.75ms ± 4%    5.63ms ± 5%     ~     (p=0.393 n=10+10)

name                                     old alloc/op   new alloc/op   delta
OptimisticEval/real-contention=false-16    34.3kB ±24%     9.9kB ± 2%  -71.29%  (p=0.000 n=9+10)
OptimisticEval/real-contention=true-16     33.0kB ±20%    35.7kB ± 9%     ~     (p=0.065 n=8+8)

name                                     old allocs/op  new allocs/op  delta
OptimisticEval/real-contention=false-16       273 ±19%        83 ± 0%  -69.56%  (p=0.000 n=9+10)
OptimisticEval/real-contention=true-16        268 ± 2%       308 ± 2%  +14.90%  (p=0.000 n=8+8)

Fixes #49973
Informs #9521

Release note (performance improvement): A limited scan now checks
for conflicting locks in an optimistic manner, which means it will
not conflict with locks (typically unreplicated locks) that were
held in the scan's full spans, but were not in the spans that were
scanned until the limit was reached. This behavior can be turned
off by changing the value of the cluster setting
kv.concurrency.optimistic_eval_limited_scans.enabled to false.
  • Loading branch information
sumeerbhola committed May 20, 2021
1 parent 1bb90a3 commit 31847ac
Show file tree
Hide file tree
Showing 23 changed files with 1,495 additions and 282 deletions.
50 changes: 41 additions & 9 deletions pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ func registerKV(r *testRegistry) {
nodes int
cpus int
readPercent int
batchSize int
blockSize int
splits int // 0 implies default, negative implies 0
encryption bool
sequential bool
duration time.Duration
tags []string
// If true, the reads are limited reads over the full span of the table.
// Currently this also enables SFU writes on the workload since this is
// geared towards testing optimistic locking and latching.
spanReads bool
batchSize int
blockSize int
splits int // 0 implies default, negative implies 0
encryption bool
sequential bool
duration time.Duration
tags []string
}
computeNumSplits := func(opts kvOptions) int {
// TODO(ajwerner): set this default to a more sane value or remove it and
Expand All @@ -62,6 +66,16 @@ func registerKV(r *testRegistry) {
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, t, c.Range(1, nodes), startArgs(fmt.Sprintf("--encrypt=%t", opts.encryption)))

if opts.splits < 0 {
// In addition to telling the workload to not split, disable load-based
// splitting.
db := c.Conn(ctx, 1)
defer db.Close()
if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'"); err != nil {
t.Fatalf("failed to disable load based splitting: %v", err)
}
}

t.Status("running workload")
m := newMonitor(ctx, c, c.Range(1, nodes))
m.Go(func(ctx context.Context) error {
Expand All @@ -72,7 +86,17 @@ func registerKV(r *testRegistry) {
opts.duration = 10 * time.Minute
}
duration := " --duration=" + ifLocal("10s", opts.duration.String())
readPercent := fmt.Sprintf(" --read-percent=%d", opts.readPercent)
var readPercent string
if opts.spanReads {
// SFU makes sense only if we repeat writes to the same key. Here
// we've arbitrarily picked a cycle-length of 1000, so 1 in 1000
// writes will contend with the limited scan wrt locking.
readPercent =
fmt.Sprintf(" --span-percent=%d --span-limit=1 --sfu-writes=true --cycle-length=1000",
opts.readPercent)
} else {
readPercent = fmt.Sprintf(" --read-percent=%d", opts.readPercent)
}
histograms := " --histograms=" + perfArtifactsDir + "/stats.json"
var batchSize string
if opts.batchSize > 0 {
Expand Down Expand Up @@ -143,14 +167,22 @@ func registerKV(r *testRegistry) {
{nodes: 3, cpus: 32, readPercent: 0, sequential: true},
{nodes: 3, cpus: 32, readPercent: 95, sequential: true},

// Configs with reads, that are of limited spans, along with SFU writes.
{nodes: 1, cpus: 8, readPercent: 95, spanReads: true, splits: -1 /* no splits */, sequential: true},
{nodes: 1, cpus: 32, readPercent: 95, spanReads: true, splits: -1 /* no splits */, sequential: true},

// Weekly larger scale configurations.
{nodes: 32, cpus: 8, readPercent: 0, tags: []string{"weekly"}, duration: time.Hour},
{nodes: 32, cpus: 8, readPercent: 95, tags: []string{"weekly"}, duration: time.Hour},
} {
opts := opts

var nameParts []string
nameParts = append(nameParts, fmt.Sprintf("kv%d", opts.readPercent))
var limitedSpanStr string
if opts.spanReads {
limitedSpanStr = "limited-spans"
}
nameParts = append(nameParts, fmt.Sprintf("kv%d%s", opts.readPercent, limitedSpanStr))
if len(opts.tags) > 0 {
nameParts = append(nameParts, strings.Join(opts.tags, "/"))
}
Expand Down
194 changes: 194 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
Expand Down Expand Up @@ -3786,3 +3787,196 @@ func TestRaftSchedulerPrioritizesNodeLiveness(t *testing.T) {
priorityID := store.RaftSchedulerPriorityID()
require.Equal(t, livenessRangeID, priorityID)
}

func setupDBAndWriteAAndB(t *testing.T) (serverutils.TestServerInterface, *kv.DB) {
ctx := context.Background()
args := base.TestServerArgs{}
s, _, db := serverutils.StartServer(t, args)

require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
tup, err := db.Get(ctx, "a")
require.NoError(t, err)
require.NotNil(t, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(t, err)
require.NotNil(t, tup.Value)
return s, db
}

// TestOptimisticEvalRetry tests the case where an optimistically evaluated
// scan encounters contention from a concurrent txn holding unreplicated
// exclusive locks, and therefore re-evaluates pessimistically, and eventually
// succeeds once the contending txn commits.
func TestOptimisticEvalRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db := setupDBAndWriteAAndB(t)
defer s.Stopper().Stop(ctx)

txn1 := db.NewTxn(ctx, "locking txn")
_, err := txn1.ScanForUpdate(ctx, "a", "c", 0)
require.NoError(t, err)

readDone := make(chan error)
go func() {
readDone <- db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
// We can't actually prove that the optimistic evaluation path was
// taken, but it should happen based on the fact that this is a limited
// scan with a limit of 1 row, and the replica has 2 rows.
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
return err
}
return txn.Commit(ctx)
})
}()
removedLocks := false
timer := timeutil.NewTimer()
timer.Reset(time.Second * 2)
defer timer.Stop()
done := false
for !done {
select {
case err := <-readDone:
if !removedLocks {
t.Fatal("read completed before exclusive locks were released")
}
require.NoError(t, err)
require.True(t, removedLocks)
done = true
case <-timer.C:
require.NoError(t, txn1.Commit(ctx))
removedLocks = true
}
}
}

// TestOptimisticEvalNoContention tests the case where an optimistically
// evaluated scan has a span that overlaps with a concurrent txn holding
// unreplicated exclusive locks, but the actual span that is read does not
// overlap, and therefore the scan succeeds before the lock holding txn
// commits.
func TestOptimisticEvalNoContention(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db := setupDBAndWriteAAndB(t)
defer s.Stopper().Stop(ctx)

txn1 := db.NewTxn(ctx, "locking txn")
_, err := txn1.ScanForUpdate(ctx, "b", "c", 0)
require.NoError(t, err)

readDone := make(chan error)
go func() {
readDone <- db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
// There is no contention when doing optimistic evaluation, since it can read a
// which is not locked.
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
return err
}
return txn.Commit(ctx)
})
}()
err = <-readDone
require.NoError(t, err)
require.NoError(t, txn1.Commit(ctx))
}

func BenchmarkOptimisticEval(b *testing.B) {
defer log.Scope(b).Close(b)
ctx := context.Background()
args := base.TestServerArgs{}
s, _, db := serverutils.StartServer(b, args)
defer s.Stopper().Stop(ctx)

require.NoError(b, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
b.Log(err)
}()
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
tup, err := db.Get(ctx, "a")
require.NoError(b, err)
require.NotNil(b, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(b, err)
require.NotNil(b, tup.Value)

for _, realContention := range []bool{false, true} {
b.Run(fmt.Sprintf("real-contention=%t", realContention),
func(b *testing.B) {
lockStart := "b"
if realContention {
lockStart = "a"
}
finishWrites := make(chan struct{})
var writers sync.WaitGroup
for i := 0; i < 1; i++ {
writers.Add(1)
go func() {
for {
txn := db.NewTxn(ctx, "locking txn")
_, err = txn.ScanForUpdate(ctx, lockStart, "c", 0)
require.NoError(b, err)
time.Sleep(5 * time.Millisecond)
// Normally, it would do a write here, but we don't bother.
require.NoError(b, txn.Commit(ctx))
select {
case _, recv := <-finishWrites:
if !recv {
writers.Done()
return
}
default:
}
}
}()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
panic(err)
}
err = txn.Commit(ctx)
if err != nil {
panic(err)
}
return err
})
}
b.StopTimer()
close(finishWrites)
writers.Wait()
})
}
}
Loading

0 comments on commit 31847ac

Please sign in to comment.