Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrency,kvserver: limited scans optimistically check for locks #58670

Merged
merged 1 commit into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ func registerKV(r *testRegistry) {
nodes int
cpus int
readPercent int
batchSize int
blockSize int
splits int // 0 implies default, negative implies 0
encryption bool
sequential bool
duration time.Duration
tags []string
// If true, the reads are limited reads over the full span of the table.
// Currently this also enables SFU writes on the workload since this is
// geared towards testing optimistic locking and latching.
spanReads bool
batchSize int
blockSize int
splits int // 0 implies default, negative implies 0
encryption bool
sequential bool
duration time.Duration
tags []string
}
computeNumSplits := func(opts kvOptions) int {
// TODO(ajwerner): set this default to a more sane value or remove it and
Expand All @@ -62,6 +66,16 @@ func registerKV(r *testRegistry) {
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, t, c.Range(1, nodes), startArgs(fmt.Sprintf("--encrypt=%t", opts.encryption)))

if opts.splits < 0 {
// In addition to telling the workload to not split, disable load-based
// splitting.
db := c.Conn(ctx, 1)
defer db.Close()
if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'"); err != nil {
t.Fatalf("failed to disable load based splitting: %v", err)
}
}

t.Status("running workload")
m := newMonitor(ctx, c, c.Range(1, nodes))
m.Go(func(ctx context.Context) error {
Expand All @@ -72,7 +86,17 @@ func registerKV(r *testRegistry) {
opts.duration = 10 * time.Minute
}
duration := " --duration=" + ifLocal("10s", opts.duration.String())
readPercent := fmt.Sprintf(" --read-percent=%d", opts.readPercent)
var readPercent string
if opts.spanReads {
// SFU makes sense only if we repeat writes to the same key. Here
// we've arbitrarily picked a cycle-length of 1000, so 1 in 1000
// writes will contend with the limited scan wrt locking.
readPercent =
fmt.Sprintf(" --span-percent=%d --span-limit=1 --sfu-writes=true --cycle-length=1000",
opts.readPercent)
} else {
readPercent = fmt.Sprintf(" --read-percent=%d", opts.readPercent)
}
histograms := " --histograms=" + perfArtifactsDir + "/stats.json"
var batchSize string
if opts.batchSize > 0 {
Expand Down Expand Up @@ -143,14 +167,22 @@ func registerKV(r *testRegistry) {
{nodes: 3, cpus: 32, readPercent: 0, sequential: true},
{nodes: 3, cpus: 32, readPercent: 95, sequential: true},

// Configs with reads, that are of limited spans, along with SFU writes.
{nodes: 1, cpus: 8, readPercent: 95, spanReads: true, splits: -1 /* no splits */, sequential: true},
{nodes: 1, cpus: 32, readPercent: 95, spanReads: true, splits: -1 /* no splits */, sequential: true},

// Weekly larger scale configurations.
{nodes: 32, cpus: 8, readPercent: 0, tags: []string{"weekly"}, duration: time.Hour},
{nodes: 32, cpus: 8, readPercent: 95, tags: []string{"weekly"}, duration: time.Hour},
} {
opts := opts

var nameParts []string
nameParts = append(nameParts, fmt.Sprintf("kv%d", opts.readPercent))
var limitedSpanStr string
if opts.spanReads {
limitedSpanStr = "limited-spans"
}
nameParts = append(nameParts, fmt.Sprintf("kv%d%s", opts.readPercent, limitedSpanStr))
if len(opts.tags) > 0 {
nameParts = append(nameParts, strings.Join(opts.tags, "/"))
}
Expand Down
194 changes: 194 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
Expand Down Expand Up @@ -3786,3 +3787,196 @@ func TestRaftSchedulerPrioritizesNodeLiveness(t *testing.T) {
priorityID := store.RaftSchedulerPriorityID()
require.Equal(t, livenessRangeID, priorityID)
}

func setupDBAndWriteAAndB(t *testing.T) (serverutils.TestServerInterface, *kv.DB) {
ctx := context.Background()
args := base.TestServerArgs{}
s, _, db := serverutils.StartServer(t, args)

require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
tup, err := db.Get(ctx, "a")
require.NoError(t, err)
require.NotNil(t, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(t, err)
require.NotNil(t, tup.Value)
return s, db
}

// TestOptimisticEvalRetry tests the case where an optimistically evaluated
// scan encounters contention from a concurrent txn holding unreplicated
// exclusive locks, and therefore re-evaluates pessimistically, and eventually
// succeeds once the contending txn commits.
func TestOptimisticEvalRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db := setupDBAndWriteAAndB(t)
defer s.Stopper().Stop(ctx)

txn1 := db.NewTxn(ctx, "locking txn")
_, err := txn1.ScanForUpdate(ctx, "a", "c", 0)
require.NoError(t, err)

readDone := make(chan error)
go func() {
readDone <- db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
// We can't actually prove that the optimistic evaluation path was
// taken, but it should happen based on the fact that this is a limited
// scan with a limit of 1 row, and the replica has 2 rows.
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
return err
}
return txn.Commit(ctx)
})
}()
removedLocks := false
timer := timeutil.NewTimer()
timer.Reset(time.Second * 2)
defer timer.Stop()
done := false
for !done {
select {
case err := <-readDone:
if !removedLocks {
t.Fatal("read completed before exclusive locks were released")
}
require.NoError(t, err)
require.True(t, removedLocks)
done = true
case <-timer.C:
require.NoError(t, txn1.Commit(ctx))
removedLocks = true
}
}
}

// TestOptimisticEvalNoContention tests the case where an optimistically
// evaluated scan has a span that overlaps with a concurrent txn holding
// unreplicated exclusive locks, but the actual span that is read does not
// overlap, and therefore the scan succeeds before the lock holding txn
// commits.
func TestOptimisticEvalNoContention(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db := setupDBAndWriteAAndB(t)
defer s.Stopper().Stop(ctx)

txn1 := db.NewTxn(ctx, "locking txn")
_, err := txn1.ScanForUpdate(ctx, "b", "c", 0)
require.NoError(t, err)

readDone := make(chan error)
go func() {
readDone <- db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
t.Log(err)
}()
// There is no contention when doing optimistic evaluation, since it can read a
// which is not locked.
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
return err
}
return txn.Commit(ctx)
})
}()
err = <-readDone
require.NoError(t, err)
require.NoError(t, txn1.Commit(ctx))
}

func BenchmarkOptimisticEval(b *testing.B) {
defer log.Scope(b).Close(b)
ctx := context.Background()
args := base.TestServerArgs{}
s, _, db := serverutils.StartServer(b, args)
defer s.Stopper().Stop(ctx)

require.NoError(b, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
defer func() {
b.Log(err)
}()
if err := txn.Put(ctx, "a", "a"); err != nil {
return err
}
if err := txn.Put(ctx, "b", "b"); err != nil {
return err
}
return txn.Commit(ctx)
}))
tup, err := db.Get(ctx, "a")
require.NoError(b, err)
require.NotNil(b, tup.Value)
tup, err = db.Get(ctx, "b")
require.NoError(b, err)
require.NotNil(b, tup.Value)

for _, realContention := range []bool{false, true} {
b.Run(fmt.Sprintf("real-contention=%t", realContention),
func(b *testing.B) {
lockStart := "b"
if realContention {
lockStart = "a"
}
finishWrites := make(chan struct{})
var writers sync.WaitGroup
for i := 0; i < 1; i++ {
writers.Add(1)
go func() {
for {
txn := db.NewTxn(ctx, "locking txn")
_, err = txn.ScanForUpdate(ctx, lockStart, "c", 0)
require.NoError(b, err)
time.Sleep(5 * time.Millisecond)
// Normally, it would do a write here, but we don't bother.
require.NoError(b, txn.Commit(ctx))
select {
case _, recv := <-finishWrites:
if !recv {
writers.Done()
return
}
default:
}
}
}()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
_, err = txn.Scan(ctx, "a", "c", 1)
if err != nil {
panic(err)
}
err = txn.Commit(ctx)
if err != nil {
panic(err)
}
return err
})
}
b.StopTimer()
close(finishWrites)
writers.Wait()
})
}
}
Loading