Skip to content

Commit

Permalink
kv: tolerate write skew under weak isolation levels
Browse files Browse the repository at this point in the history
_or "support kv-level snapshot isolation"._

Fixes #100131.

This commit adds support for weak transaction isolation levels (Snapshot and
Read Committed) to commit even when their read and write timestamps are skewed.
Thanks to prior cleanup and refactoring, this change is limited to an update to
the transaction commit condition and a clarification of the one-phase commit
requirements, along with a collection of updates to tests.

Release note: None
  • Loading branch information
nvanbenschoten committed May 18, 2023
1 parent 56e15c3 commit 1e43ec8
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 56 deletions.
71 changes: 46 additions & 25 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2184,10 +2184,14 @@ func TestTxnCoordSenderRetries(t *testing.T) {
_, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */)
return err
},
// No retry, preemptive refresh before commit.
allIsoLevels: &expect{
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: false,
perIsoLevel: map[isolation.Level]*expect{
// No retry, preemptive refresh before commit.
isolation.Serializable: {
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: false,
},
// No refresh, no retry.
isolation.Snapshot: {},
},
},
{
Expand Down Expand Up @@ -2321,14 +2325,14 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("a", "put") // advance timestamp
return txn.CommitInBatch(ctx, b)
},
// Read-only request (Get) prevents server-side refresh.
// TODO(nvanbenschoten): This is written like this to exercise the
// perIsoLevel mechanism.
perIsoLevel: map[isolation.Level]*expect{
// Read-only request (Get) prevents server-side refresh.
isolation.Serializable: {
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
},
// No refresh, no retry.
isolation.Snapshot: {},
},
},
{
Expand All @@ -2343,10 +2347,14 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("a", "put") // advance timestamp
return txn.CommitInBatch(ctx, b)
},
// Read-only request (Scan) prevents server-side refresh.
allIsoLevels: &expect{
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
perIsoLevel: map[isolation.Level]*expect{
// Read-only request (Scan) prevents server-side refresh.
isolation.Serializable: {
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
},
// No refresh, no retry.
isolation.Snapshot: {},
},
},
{
Expand Down Expand Up @@ -2454,10 +2462,14 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.CommitInBatch(ctx, b)
},
// No retry, preemptive refresh before commit.
allIsoLevels: &expect{
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: false,
perIsoLevel: map[isolation.Level]*expect{
// No retry, preemptive refresh before commit.
isolation.Serializable: {
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: false,
},
// No refresh, no retry.
isolation.Snapshot: {},
},
},
{
Expand Down Expand Up @@ -3111,11 +3123,16 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b := txn.NewBatch()
b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value"))
b.Put("c", "put")
return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry from get
return txn.CommitInBatch(ctx, b)
},
allIsoLevels: &expect{
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
perIsoLevel: map[isolation.Level]*expect{
// Both writes will succeed, EndTxn will retry.
isolation.Serializable: {
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
},
// No refresh, no retry.
isolation.Snapshot: {},
},
},
{
Expand All @@ -3131,11 +3148,16 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b := txn.NewBatch()
b.DelRange("a", "b", false /* returnKeys */)
b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry
return txn.CommitInBatch(ctx, b)
},
allIsoLevels: &expect{
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
perIsoLevel: map[isolation.Level]*expect{
// Both writes will succeed, EndTxn will retry.
isolation.Serializable: {
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
},
// No refresh, no retry.
isolation.Snapshot: {},
},
},
{
Expand Down Expand Up @@ -3634,8 +3656,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// TODO(nvanbenschoten): test Snapshot isolation.
testLevels := []isolation.Level{isolation.Serializable}
testLevels := []isolation.Level{isolation.Serializable, isolation.Snapshot}
for _, iso := range testLevels {
t.Run(iso.String(), func(t *testing.T) {
run(t, tc, iso)
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,24 @@ func (tc *TxnCoordSender) CommitTimestamp() hlc.Timestamp {
tc.mu.Lock()
defer tc.mu.Unlock()
txn := &tc.mu.txn
if txn.Status == roachpb.COMMITTED {
return txn.ReadTimestamp
}
// If the transaction is not yet committed, configure the CommitTimestampFixed
// flag to ensure that the transaction's commit timestamp is not pushed before
// it commits.
//
// This operates by disabling the transaction refresh mechanism. For isolation
// levels that can tolerate write skew, this is not enough to prevent the
// transaction from committing with a later timestamp. In fact, it's not even
// clear what timestamp to consider the "commit timestamp" for these
// transactions. For this reason, we currently disable the CommitTimestamp
// method for these isolation levels.
// TODO(nvanbenschoten): figure out something better to do here. At least
// return an error. Tracked in #103245.
if txn.IsoLevel.ToleratesWriteSkew() {
panic("unsupported")
}
tc.mu.txn.CommitTimestampFixed = true
return txn.ReadTimestamp
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) {
}
}

// Test that the theartbeat loop detects aborted transactions and stops.
// Test that the heartbeat loop detects aborted transactions and stops.
func TestTxnCoordSenderHeartbeat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,11 +1284,9 @@ func runWriteSkewTest(t *testing.T, iso isolation.Level) {
return nil
}
checks[isolation.Snapshot] = func(env map[string]int64) error {
// TODO(nvanbenschoten): re-enable this exception when we permit write skew
// under Snapshot isolation.
//if env["A"] == 1 && env["B"] == 1 {
// return nil
//}
if env["A"] == 1 && env["B"] == 1 {
return nil
}
return checks[isolation.Serializable](env)
}

Expand Down
68 changes: 64 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func BenchmarkSingleRoundtripWithLatency(b *testing.B) {
}
}

// TestLostIncrement verifies that Increment with any isolation level is not
// TestTxnLostIncrement verifies that Increment with any isolation level is not
// susceptible to the lost update anomaly between the value that the increment
// reads and the value that it writes. In other words, the increment is atomic,
// regardless of isolation level.
Expand All @@ -143,7 +143,7 @@ func BenchmarkSingleRoundtripWithLatency(b *testing.B) {
// increment. Demonstrate that doing so allows for increment to applied to a
// newer value than that returned by the get, but that the increment is still
// atomic.
func TestLostIncrement(t *testing.T) {
func TestTxnLostIncrement(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -214,7 +214,7 @@ func TestLostIncrement(t *testing.T) {
}
}

// TestLostUpdate verifies that transactions are not susceptible to the
// TestTxnLostUpdate verifies that transactions are not susceptible to the
// lost update anomaly, regardless of isolation level.
//
// The transaction history looks as follows:
Expand All @@ -224,7 +224,7 @@ func TestLostIncrement(t *testing.T) {
// TODO(nvanbenschoten): once we address #100133, update this test to advance
// the read snapshot for ReadCommitted transactions between the read and the
// write. Demonstrate that doing so allows for a lost update.
func TestLostUpdate(t *testing.T) {
func TestTxnLostUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -303,6 +303,66 @@ func TestLostUpdate(t *testing.T) {
}
}

// TestTxnWeakIsolationLevelsTolerateWriteSkew verifies that transactions run
// under weak isolation levels (snapshot and read committed) can tolerate their
// write timestamp being skewed from their read timestamp, while transaction run
// under strong isolation levels (serializable) cannot.
func TestTxnWeakIsolationLevelsTolerateWriteSkew(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

run := func(isoLevel isolation.Level) {
s := createTestDB(t)
defer s.Stop()
ctx := context.Background()

// Begin the test's transaction.
txn1 := s.DB.NewTxn(ctx, "txn1")
require.NoError(t, txn1.SetIsoLevel(isoLevel))

// Read from key "a" in txn1 and then write to key "a" in txn2. This
// establishes an anti-dependency from txn1 to txn2, meaning that txn1's
// read snapshot must be ordered before txn2's commit. In practice, this
// prevents txn1 from refreshing.
{
res, err := txn1.Get(ctx, "a")
require.NoError(t, err)
require.False(t, res.Exists())

txn2 := s.DB.NewTxn(ctx, "txn2")
require.NoError(t, txn2.Put(ctx, "a", "value"))
require.NoError(t, txn2.Commit(ctx))
}

// Now read from key "b" in a txn3 before writing to key "b" in txn1. This
// establishes an anti-dependency from txn3 to txn1, meaning that txn3's
// read snapshot must be ordered before txn1's commit. In practice, this
// pushes txn1's write timestamp forward through the timestamp cache.
{
txn3 := s.DB.NewTxn(ctx, "txn3")
res, err := txn3.Get(ctx, "b")
require.NoError(t, err)
require.False(t, res.Exists())
require.NoError(t, txn3.Commit(ctx))

require.NoError(t, txn1.Put(ctx, "b", "value"))
}

// Finally, try to commit. This should succeed for isolation levels that
// allow for write skew. It should fail for isolation levels that do not.
err := txn1.Commit(ctx)
if isoLevel.ToleratesWriteSkew() {
require.NoError(t, err)
} else {
require.Error(t, err)
require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err)
}
}
for _, isoLevel := range isolation.Levels() {
t.Run(isoLevel.String(), func(t *testing.T) { run(isoLevel) })
}
}

// TestPriorityRatchetOnAbortOrPush verifies that the priority of
// a transaction is ratcheted by successive aborts or pushes. In
// particular, we want to ensure ratcheted priorities when the txn
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
kvpb "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -710,7 +711,15 @@ func (v *validator) processOp(op Operation) {
for _, op := range ops {
v.processOp(op)
}
prevFailures := v.failures
// TODO(nvanbenschoten): add isolation level to the atomicType string:
// atomicTxnType := fmt.Sprintf(`%s txn`, strings.ToLower(t.IsoLevel.String()))
v.checkAtomic(`txn`, t.Result)
if t.IsoLevel == isolation.Snapshot {
// TODO(nvanbenschoten): for now, we run snapshot transactions in the mix
// but don't validate their results. Doing so is non-trivial. See #100169.
v.failures = prevFailures
}
case *SplitOperation:
execTimestampStrictlyOptional = true
v.failIfError(op, t.Result) // splits should never return *any* error
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func IsEndTxnTriggeringRetryError(
// update anomalies.
return true, kvpb.RETRY_WRITE_TOO_OLD, ""
}
if txn.WriteTimestamp != txn.ReadTimestamp {
if !txn.IsoLevel.ToleratesWriteSkew() && txn.WriteTimestamp != txn.ReadTimestamp {
// Return a transaction retry error if the commit timestamp isn't equal to
// the txn timestamp.
return true, kvpb.RETRY_SERIALIZABLE, ""
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,16 @@ func TestIsEndTxnTriggeringRetryError(t *testing.T) {
{isolation.Serializable, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Snapshot, false, false, false, false, 0},
{isolation.Snapshot, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED},
{isolation.Snapshot, false, true, false, true, kvpb.RETRY_SERIALIZABLE},
{isolation.Snapshot, false, true, true, true, kvpb.RETRY_SERIALIZABLE},
{isolation.Snapshot, false, true, false, false, 0},
{isolation.Snapshot, false, true, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED},
{isolation.Snapshot, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Snapshot, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Snapshot, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Snapshot, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.ReadCommitted, false, false, false, false, 0},
{isolation.ReadCommitted, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED},
{isolation.ReadCommitted, false, true, false, true, kvpb.RETRY_SERIALIZABLE},
{isolation.ReadCommitted, false, true, true, true, kvpb.RETRY_SERIALIZABLE},
{isolation.ReadCommitted, false, true, false, false, 0},
{isolation.ReadCommitted, false, true, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED},
{isolation.ReadCommitted, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.ReadCommitted, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.ReadCommitted, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD},
Expand Down
41 changes: 30 additions & 11 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,13 @@ func (r *Replica) canAttempt1PCEvaluation(
return false
}

if ba.Timestamp != ba.Txn.WriteTimestamp {
log.Fatalf(ctx, "unexpected 1PC execution with diverged timestamp. %s != %s",
ba.Timestamp, ba.Txn.WriteTimestamp)
// isOnePhaseCommit ensured that the transaction has a non-skewed read/write
// timestamp, even for isolation levels that can commit with such skew. Sanity
// check that this timestamp is equal to the batch timestamp.
if ba.Timestamp != ba.Txn.ReadTimestamp || ba.Timestamp != ba.Txn.WriteTimestamp {
log.Fatalf(ctx, "unexpected 1PC execution with diverged read or write timestamps; "+
"ba.Timestamp: %s, ba.Txn.ReadTimestamp: %s, ba.Txn.WriteTimestamp: %s",
ba.Timestamp, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
}

// The EndTxn checks whether the txn record can be created and, if so, at what
Expand Down Expand Up @@ -399,9 +403,10 @@ func (r *Replica) evaluateWriteBatch(
) (storage.Batch, enginepb.MVCCStats, *kvpb.BatchResponse, result.Result, *kvpb.Error) {
log.Event(ctx, "executing read-write batch")

// If the transaction has been pushed but it can commit at the higher
// If the transaction has been pushed but it can be forwarded to the higher
// timestamp, let's evaluate the batch at the bumped timestamp. This will
// allow it commit, and also it'll allow us to attempt the 1PC code path.
// allow serializable transactions to commit. It will also allow transactions
// with any isolation level to attempt the 1PC code path.
maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g)

// Attempt 1PC execution, if applicable. If not transactional or there are
Expand Down Expand Up @@ -773,16 +778,30 @@ func (r *Replica) newBatchedEngine(
// isOnePhaseCommit returns true iff the BatchRequest contains all writes in the
// transaction and ends with an EndTxn. One phase commits are disallowed if any
// of the following conditions are true:
// (1) the transaction has already been flagged with a write too old error
// (2) the transaction's commit timestamp has been forwarded
// (3) the transaction exceeded its deadline
// (4) the transaction is not in its first epoch and the EndTxn request does
//
// not require one phase commit.
// 1. the transaction's commit timestamp has been forwarded. Note that this
// prevents one phase commit even for isolation levels that can otherwise
// tolerate write skew.
// 2. the transaction is failing a commit condition and must retry. This
// condition is isolation level dependent.
// 3. the transaction is not in its first epoch and the EndTxn request does
// not require one phase commit.
func isOnePhaseCommit(ba *kvpb.BatchRequest) bool {
if ba.Txn == nil {
return false
}
if ba.Txn.ReadTimestamp != ba.Txn.WriteTimestamp {
// If the transaction's read and write timestamp are skewed, one phase
// commit is not allowed. This is true even for isolation levels that can
// otherwise tolerate write skew. This is because the one phase commit
// evaluation logic operates using a non-transactional batch which does not
// know how to evaluate with reads and writes at different timestamps. Even
// for write-only batches, the non-transactional path would be unable to
// detect write-write version conflicts between the transaction's read and
// write timestamps.
//
// NOTE: ba.Timestamp == ba.Txn.ReadTimestamp
return false
}
if !ba.IsCompleteTransaction() {
return false
}
Expand Down
Loading

0 comments on commit 1e43ec8

Please sign in to comment.