Skip to content

Commit

Permalink
kv: revive TestLostIncrement and TestLostUpdate
Browse files Browse the repository at this point in the history
Informs cockroachdb#100131.

This commit revives and modernizes two tests that were removed/stripped
down when snapshot isolation was previously deleted. It then expands the
tests to exercise all isolation levels.

Release note: None
  • Loading branch information
nvanbenschoten authored and Miral Gadani committed Apr 24, 2023
1 parent 6dcb2c4 commit 5964238
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2855,7 +2855,7 @@ func TestTxnSetIsoLevel(t *testing.T) {
txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)

defaultLevel := isolation.Serializable
levels := []isolation.Level{isolation.ReadCommitted, isolation.Snapshot, isolation.Serializable}
levels := isolation.Levels()

// The default isolation level is Serializable.
require.Equal(t, defaultLevel, txn.IsoLevel())
Expand Down
213 changes: 159 additions & 54 deletions pkg/kv/kvclient/kvcoord/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -128,73 +129,177 @@ func BenchmarkSingleRoundtripWithLatency(b *testing.B) {
}
}

// TestLostUpdate verifies that transactions are not susceptible to the
// lost update anomaly.
// TestLostIncrement verifies that Increment with any isolation level is not
// susceptible to the lost update anomaly between the value that the increment
// reads and the value that it writes. In other words, the increment is atomic,
// regardless of isolation level.
//
// The transaction history looks as follows ("2" refers to the
// independent goroutine's actions)
// The transaction history looks as follows:
//
// R1(A) W2(A,"hi") W1(A,"oops!") C1 [serializable restart] R1(A) W1(A,"correct") C1
func TestLostUpdate(t *testing.T) {
// R1(A) W2(A,+1) W1(A,+1) [write-write restart] R1(A) W1(A,+1) C1
//
// TODO(nvanbenschoten): once we address #100133, update this test to advance
// the read snapshot for ReadCommitted transactions between the read and the
// increment. Demonstrate that doing so allows for increment to applied to a
// newer value than that returned by the get, but that the increment is still
// atomic.
func TestLostIncrement(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s := createTestDB(t)
defer s.Stop()
var key = roachpb.Key("a")

done := make(chan error)
start := make(chan struct{})
go func() {
<-start
done <- s.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
return txn.Put(ctx, key, "hi")
})
}()
run := func(isoLevel isolation.Level, commitInBatch bool) {
s := createTestDB(t)
defer s.Stop()
ctx := context.Background()
key := roachpb.Key("a")

firstAttempt := true
if err := s.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
// Issue a read to get initial value.
gr, err := txn.Get(ctx, key)
if err != nil {
t.Fatal(err)
incrementKey := func() {
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.Inc(ctx, key, 1)
require.NoError(t, err)
return nil
})
require.NoError(t, err)
}
if txn.Epoch() == 0 {
close(start) // let someone write into our future
// When they're done, write based on what we read.
if err := <-done; err != nil {
t.Fatal(err)

err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
epoch := txn.Epoch()
require.LessOrEqual(t, epoch, enginepb.TxnEpoch(1), "should experience just one restart")
require.NoError(t, txn.SetIsoLevel(isoLevel))

// Issue a read to get initial value.
gr, err := txn.Get(ctx, key)
require.NoError(t, err)
// NOTE: expect 0 during first attempt, 1 during second attempt.
require.Equal(t, int64(epoch), gr.ValueInt())

// During the first attempt, perform a conflicting increment in a
// different transaction.
if epoch == 0 {
incrementKey()
}
} else if txn.Epoch() > 1 {
t.Fatal("should experience just one restart")
}

var newVal string
if gr.Exists() && bytes.Equal(gr.ValueBytes(), []byte("hi")) {
newVal = "correct"
} else {
newVal = "oops!"
}
b := txn.NewBatch()
b.Put(key, newVal)
err = txn.Run(ctx, b)
if firstAttempt {
require.Regexp(t, "RETRY_WRITE_TOO_OLD", err)
firstAttempt = false
return err
}
// Increment the key.
b := txn.NewBatch()
b.Inc(key, 1)
if commitInBatch {
err = txn.CommitInBatch(ctx, b)
} else {
err = txn.Run(ctx, b)
}
ir := b.Results[0].Rows[0]

// During the first attempt, this should encounter a write-write conflict
// and force a transaction retry.
if epoch == 0 {
require.Error(t, err)
require.Regexp(t, "TransactionRetryWithProtoRefreshError: .*WriteTooOldError", err)
return err
}

// During the second attempt, this should succeed.
require.NoError(t, err)
require.Equal(t, int64(2), ir.ValueInt())
return nil
})
require.NoError(t, err)
return nil
}); err != nil {
t.Fatal(err)
}

// Verify final value.
gr, err := s.DB.Get(context.Background(), key)
if err != nil {
t.Fatal(err)
for _, isoLevel := range isolation.Levels() {
t.Run(isoLevel.String(), func(t *testing.T) {
testutils.RunTrueAndFalse(t, "commitInBatch", func(t *testing.T, commitInBatch bool) {
run(isoLevel, commitInBatch)
})
})
}
if !gr.Exists() || !bytes.Equal(gr.ValueBytes(), []byte("correct")) {
t.Fatalf("expected \"correct\", got %q", gr.ValueBytes())
}

// TestLostUpdate verifies that transactions are not susceptible to the
// lost update anomaly, regardless of isolation level.
//
// The transaction history looks as follows:
//
// R1(A) W2(A,"hi") W1(A,"oops!") C1 [write-write restart] R1(A) W1(A,"correct") C1
//
// TODO(nvanbenschoten): once we address #100133, update this test to advance
// the read snapshot for ReadCommitted transactions between the read and the
// write. Demonstrate that doing so allows for a lost update.
func TestLostUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

run := func(isoLevel isolation.Level, commitInBatch bool) {
s := createTestDB(t)
defer s.Stop()
ctx := context.Background()
key := roachpb.Key("a")

putKey := func() {
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.Put(ctx, key, "hi")
})
require.NoError(t, err)
}

err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
epoch := txn.Epoch()
require.LessOrEqual(t, epoch, enginepb.TxnEpoch(1))
require.NoError(t, txn.SetIsoLevel(isoLevel))

// Issue a read to get initial value.
gr, err := txn.Get(ctx, key)
require.NoError(t, err)
var newVal string
if epoch == 0 {
require.False(t, gr.Exists())
newVal = "oops!"
} else {
require.True(t, gr.Exists())
require.Equal(t, []byte("hi"), gr.ValueBytes())
newVal = "correct"
}

// During the first attempt, perform a conflicting write.
if epoch == 0 {
putKey()
}

// Write to the key.
b := txn.NewBatch()
b.Put(key, newVal)
if commitInBatch {
err = txn.CommitInBatch(ctx, b)
} else {
err = txn.Run(ctx, b)
}

// During the first attempt, this should encounter a write-write conflict
// and force a transaction retry.
if epoch == 0 {
require.Error(t, err)
require.Regexp(t, "TransactionRetryWithProtoRefreshError: .*WriteTooOldError", err)
return err
}

// During the second attempt, this should succeed.
require.NoError(t, err)
return nil
})
require.NoError(t, err)

// Verify final value.
gr, err := s.DB.Get(ctx, key)
require.NoError(t, err)
require.True(t, gr.Exists())
require.Equal(t, []byte("correct"), gr.ValueBytes())
}

for _, isoLevel := range isolation.Levels() {
t.Run(isoLevel.String(), func(t *testing.T) {
testutils.RunTrueAndFalse(t, "commitInBatch", func(t *testing.T, commitInBatch bool) {
run(isoLevel, commitInBatch)
})
})
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/concurrency/isolation/levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (l Level) PerStatementReadSnapshot() bool {

// SafeValue implements the redact.SafeValue interface.
func (Level) SafeValue() {}

// Levels returns a list of all isolation levels, ordered from strongest to
// weakest.
func Levels() []Level { return []Level{Serializable, Snapshot, ReadCommitted} }

0 comments on commit 5964238

Please sign in to comment.