Skip to content

Commit

Permalink
storage: favor WriteTooOld errors over ConditionFailed errors
Browse files Browse the repository at this point in the history
This commit adjusts mvccPutInternal to return WriteTooOld errors instead of
ConditionFailedErrors when a write-write version conflict is encountered during
conditional write evaluation and the write's condition is also not met.
Previously, the logic favored ConditionFailed errors, which was necessary for
correctness because of write-too-old deferral. This resulted in subtle logic to
determine which timestamp to evaluate the condition at. It was also overly
pessimistic in cases where the condition failed at the version below the
write-write conflict but succeeded at the version after. This is demonstrated by
some of the changes to the test cases in TestTxnCoordSenderRetries and
TestReplicaServersideRefreshes.

Now that write-too-old deferral is gone, we can simplify the logic.

Release note: None
  • Loading branch information
nvanbenschoten committed May 13, 2023
1 parent e7cf003 commit 59f90a5
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 96 deletions.
14 changes: 9 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2589,9 +2589,10 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put"))
},
// The transaction performs a server-side refresh due to the write-write
// conflict and then succeeds during its CPut.
allIsoLevels: &expect{
expClientAutoRetryAfterRefresh: false, // fails on first attempt at cput
expFailure: "unexpected value", // the failure we get is a condition failed error
expServerRefresh: true,
},
},
{
Expand Down Expand Up @@ -2779,9 +2780,10 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put2", false)
},
// No txn coord retry as we get condition failed error.
// The transaction performs a server-side refresh due to the write-write
// conflict and then succeeds during its InitPut.
allIsoLevels: &expect{
expFailure: "unexpected value", // the failure we get is a condition failed error
expServerRefresh: true,
},
},
{
Expand All @@ -2796,8 +2798,10 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put2", true)
},
// The transaction performs a server-side refresh due to the write-write
// conflict and then succeeds during its InitPut.
allIsoLevels: &expect{
expFailure: "unexpected value", // condition failed error when failing on tombstones
expServerRefresh: true,
},
},
{
Expand Down
48 changes: 24 additions & 24 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7092,28 +7092,31 @@ func TestBatchErrorWithIndex(t *testing.T) {
tc.Start(ctx, t, stopper)

ba := &kvpb.BatchRequest{}
ba.Txn = newTransaction("test", roachpb.Key("k"), 1, tc.Clock())
// This one succeeds.
ba.Add(&kvpb.PutRequest{
put := &kvpb.PutRequest{
RequestHeader: kvpb.RequestHeader{Key: roachpb.Key("k")},
Value: roachpb.MakeValueFromString("not nil"),
})
}
// This one fails with a ConditionalPutError, which will populate the
// returned error's index.
ba.Add(&kvpb.ConditionalPutRequest{
cput := &kvpb.ConditionalPutRequest{
RequestHeader: kvpb.RequestHeader{Key: roachpb.Key("k")},
Value: roachpb.MakeValueFromString("irrelevant"),
ExpBytes: nil, // not true after above Put
})
}
// This one is never executed.
ba.Add(&kvpb.GetRequest{
get := &kvpb.GetRequest{
RequestHeader: kvpb.RequestHeader{Key: roachpb.Key("k")},
})

if _, pErr := tc.Sender().Send(ctx, ba); pErr == nil {
t.Fatal("expected an error")
} else if pErr.Index == nil || pErr.Index.Index != 1 || !testutils.IsPError(pErr, "unexpected value") {
t.Fatalf("invalid index or error type: %s", pErr)
}
assignSeqNumsForReqs(ba.Txn, put, cput, get)
ba.Add(put, cput, get)

_, pErr := tc.Sender().Send(ctx, ba)
require.NotNil(t, pErr)
require.NotNil(t, pErr.Index)
require.Equal(t, int32(1), pErr.Index.Index)
require.Regexp(t, "unexpected value", pErr)
}

func TestReplicaDestroy(t *testing.T) {
Expand Down Expand Up @@ -10518,7 +10521,7 @@ func TestConsistenctQueueErrorFromCheckConsistency(t *testing.T) {
}

// TestReplicaServersideRefreshes verifies local retry logic for transactional
// and non transactional batches. Verifies the timestamp cache is updated to
// and non-transactional batches. Verifies the timestamp cache is updated to
// reflect the timestamp at which retried batches are executed.
func TestReplicaServersideRefreshes(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down Expand Up @@ -10738,11 +10741,9 @@ func TestReplicaServersideRefreshes(t *testing.T) {
return
},
},
// This test tests a scenario where an InitPut is failing at its timestamp,
// but it would succeed if it'd evaluate at a bumped timestamp. The request
// is not retried at the bumped timestamp. We don't necessarily like this
// current behavior; for example since there's nothing to refresh, the
// request could be retried.
// This test tests a scenario where an InitPut would fail at its original
// timestamp, but it succeeds when evaluated at a bumped timestamp after a
// server-side refresh.
{
name: "serverside-refresh of write too old on non-1PC txn initput without prior reads",
setupFn: func() (hlc.Timestamp, error) {
Expand All @@ -10753,6 +10754,7 @@ func TestReplicaServersideRefreshes(t *testing.T) {
return put("c-iput", "put2")
},
batchFn: func(ts hlc.Timestamp) (ba *kvpb.BatchRequest, expTS hlc.Timestamp) {
expTS = ts.Next()
ba = &kvpb.BatchRequest{}
ba.Txn = newTxn("c-iput", ts.Prev())
ba.CanForwardReadTimestamp = true
Expand All @@ -10761,7 +10763,6 @@ func TestReplicaServersideRefreshes(t *testing.T) {
assignSeqNumsForReqs(ba.Txn, &iput)
return
},
expErr: "unexpected value: .*",
},
// Non-1PC serializable txn locking scan with CanForwardReadTimestamp
// set to true will succeed with write too old error.
Expand Down Expand Up @@ -10838,18 +10839,18 @@ func TestReplicaServersideRefreshes(t *testing.T) {
return
},
},
// This test tests a scenario where a CPut is failing at its timestamp, but it would
// succeed if it'd evaluate at a bumped timestamp. The request is not retried at the
// bumped timestamp. We don't necessarily like this current behavior; for example if
// there's nothing to refresh, the request could be retried.
// This test tests a scenario where an CPut would fail at its original
// timestamp, but it succeeds when evaluated at a bumped timestamp after a
// server-side refresh.
// The previous test shows different behavior for a non-transactional
// request or a 1PC one.
{
name: "no serverside-refresh with failed cput despite write too old errors on txn",
name: "serverside-refresh with failed cput despite write too old errors on txn",
setupFn: func() (hlc.Timestamp, error) {
return put("e1", "put")
},
batchFn: func(ts hlc.Timestamp) (ba *kvpb.BatchRequest, expTS hlc.Timestamp) {
expTS = ts.Next()
txn := newTxn("e1", ts.Prev())

// Send write to another key first to avoid 1PC.
Expand All @@ -10875,7 +10876,6 @@ func TestReplicaServersideRefreshes(t *testing.T) {
assignSeqNumsForReqs(ba.Txn, &et)
return
},
expErr: "unexpected value: <nil>",
},
// Handle multiple write too old errors on a non-transactional request.
//
Expand Down
20 changes: 0 additions & 20 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2074,26 +2074,6 @@ func mvccPutInternal(
// before committing.
writeTimestamp.Forward(metaTimestamp.Next())
writeTooOldErr := kvpb.NewWriteTooOldError(readTimestamp, writeTimestamp, key)
{
// NOTE TO REVIEWERS: the code currently favors ConditionFailedErrors
// over WriteTooOldErrors, and jumps through some hoops to make sure
// that this works correctly. The following commit will remove this
// behavior and update tests accordingly. For this commit, we keep the
// existing behavior to avoid multiple moving parts.

// If we're in a transaction, always get the value at the orig
// timestamp. Outside of a transaction, the read timestamp advances
// to the the latest value's timestamp + 1 as well. The new
// timestamp is returned to the caller in maybeTooOldErr. Because
// we're outside of a transaction, we'll never actually commit this
// value, but that's a concern of evaluateBatch and not here.
if txn == nil {
readTimestamp = writeTimestamp
}
if _, err = maybeGetValue(ctx, iter, key, value, ok, readTimestamp, valueFn); err != nil {
return false, err
}
}
return false, writeTooOldErr
} else {
if value, err = maybeGetValue(ctx, iter, key, value, ok, readTimestamp, valueFn); err != nil {
Expand Down
63 changes: 25 additions & 38 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2271,46 +2271,32 @@ func TestMVCCInitPutWithTxn(t *testing.T) {
txn := *txn1
txn.Sequence++
err := MVCCInitPut(ctx, engine, nil, testKey1, txn.ReadTimestamp, hlc.ClockTimestamp{}, value1, false, &txn)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

// A repeat of the command will still succeed.
txn.Sequence++
err = MVCCInitPut(ctx, engine, nil, testKey1, txn.ReadTimestamp, hlc.ClockTimestamp{}, value1, false, &txn)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

// A repeat of the command with a different value at a different epoch
// will still succeed.
txn.Sequence++
txn.Epoch = 2
err = MVCCInitPut(ctx, engine, nil, testKey1, txn.ReadTimestamp, hlc.ClockTimestamp{}, value2, false, &txn)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

// Commit value3.
txnCommit := txn
txnCommit.Status = roachpb.COMMITTED
txnCommit.WriteTimestamp = clock.Now().Add(1, 0)
if _, _, _, err := MVCCResolveWriteIntent(ctx, engine, nil,
_, _, _, err = MVCCResolveWriteIntent(ctx, engine, nil,
roachpb.MakeLockUpdate(&txnCommit, roachpb.Span{Key: testKey1}),
MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
MVCCResolveWriteIntentOptions{})
require.NoError(t, err)

// Write value4 with an old timestamp without txn...should get an error.
err = MVCCInitPut(ctx, engine, nil, testKey1, clock.Now(), hlc.ClockTimestamp{}, value4, false, nil)
if e := (*kvpb.ConditionFailedError)(nil); errors.As(err, &e) {
if !bytes.Equal(e.ActualValue.RawBytes, value2.RawBytes) {
t.Fatalf("the value %s in get result does not match the value %s in request",
e.ActualValue.RawBytes, value2.RawBytes)
}
} else {
t.Fatalf("unexpected error %T", e)
}
require.ErrorAs(t, err, new(*kvpb.WriteTooOldError))
}

// TestMVCCReverseScan verifies that MVCCReverseScan scans [start,
Expand Down Expand Up @@ -2817,10 +2803,8 @@ func TestMVCCResolveIntentTxnTimestampMismatch(t *testing.T) {

// TestMVCCConditionalPutOldTimestamp tests a case where a conditional
// put with an older timestamp happens after a put with a newer timestamp.
//
// The conditional put uses the actual value at the timestamp as the
// basis for comparison first, and then may fail later with a
// WriteTooOldError if that timestamp isn't recent.
// The conditional put fails with WriteTooOld errors, regardless of whether
// the condition succeeds or not.
func TestMVCCConditionalPutOldTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -2833,20 +2817,23 @@ func TestMVCCConditionalPutOldTimestamp(t *testing.T) {
err = MVCCPut(ctx, engine, nil, testKey1, hlc.Timestamp{WallTime: 3}, hlc.ClockTimestamp{}, value2, nil)
require.NoError(t, err)

// Check that a condition failed error is thrown if the value doesn't match.
err = MVCCConditionalPut(ctx, engine, nil, testKey1, hlc.Timestamp{WallTime: 2}, hlc.ClockTimestamp{}, value3, value1.TagAndDataBytes(), CPutFailIfMissing, nil)
require.ErrorAs(t, err, new(*kvpb.ConditionFailedError))

// But if value does match the most recently written version, we'll get
// a write too old error.
err = MVCCConditionalPut(ctx, engine, nil, testKey1, hlc.Timestamp{WallTime: 2}, hlc.ClockTimestamp{}, value3, value2.TagAndDataBytes(), CPutFailIfMissing, nil)
require.ErrorAs(t, err, new(*kvpb.WriteTooOldError))
// Check that a write too old error is thrown, regardless of whether the value
// matches.
for _, expVal := range []roachpb.Value{
// Condition does not match.
value1,
// Condition matches.
value2,
} {
err = MVCCConditionalPut(ctx, engine, nil, testKey1, hlc.Timestamp{WallTime: 2}, hlc.ClockTimestamp{}, value3, expVal.TagAndDataBytes(), CPutFailIfMissing, nil)
require.ErrorAs(t, err, new(*kvpb.WriteTooOldError))

// Either way, no new value is written.
ts := hlc.Timestamp{WallTime: 3}
valueRes, err := MVCCGet(ctx, engine, testKey1, ts, MVCCGetOptions{})
require.NoError(t, err)
require.Equal(t, value2.RawBytes, valueRes.Value.RawBytes)
// Either way, no new value is written.
ts := hlc.Timestamp{WallTime: 3}
valueRes, err := MVCCGet(ctx, engine, testKey1, ts, MVCCGetOptions{})
require.NoError(t, err)
require.Equal(t, value2.RawBytes, valueRes.Value.RawBytes)
}
}

// TestMVCCMultiplePutOldTimestamp tests a case where multiple transactional
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
# This test verifies the differing behavior
# of conditional puts when writing with an older timestamp than the
# existing write. If there's no transaction, the conditional put
# should use the latest value. When there's a transaction, then it
# should use the value at the specified timestamp.
# This test verifies the differing behavior of conditional puts when writing
# with an older timestamp than the existing write. In these cases, a WriteTooOld
# error is returned and the condition is not evaluated until after the
# write-write conflict is resolved.

run ok
put ts=10 k=k v=v1
----
>> at end:
data: "k"/10.000000000,0 -> /BYTES/v1

# Try a non-transactional put @t=1 with expectation of nil; should fail.
# Try a non-transactional put @t=1 with expectation of nil; should fail with a WriteTooOld error.
run error
cput ts=1 k=k v=v2
----
>> at end:
data: "k"/10.000000000,0 -> /BYTES/v1
error: (*kvpb.ConditionFailedError:) unexpected value: raw_bytes:"\000\000\000\000\003v1" timestamp:<wall_time:10000000000 >
error: (*kvpb.WriteTooOldError:) WriteTooOldError: write for key "k" at timestamp 1.000000000,0 too old; must write at or above 10.000000000,1

# Now do a non-transactional put @t=1 with expectation of value1; will return WriteTooOld error @t=10,1.
run error
Expand All @@ -26,7 +25,7 @@ cput ts=1 k=k v=v2 cond=v1
data: "k"/10.000000000,0 -> /BYTES/v1
error: (*kvpb.WriteTooOldError:) WriteTooOldError: write for key "k" at timestamp 1.000000000,0 too old; must write at or above 10.000000000,1

# Try a transactional put @t=1 with expectation of value2; should fail.
# Try a transactional put @t=1 with expectation of value2; should fail with a WriteTooOld error.
run error
with t=a
txn_begin ts=1
Expand All @@ -35,7 +34,7 @@ with t=a
>> at end:
txn: "a" meta={id=00000000 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=1.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=1.000000000,0 wto=false gul=0,0
data: "k"/10.000000000,0 -> /BYTES/v1
error: (*kvpb.ConditionFailedError:) unexpected value: <nil>
error: (*kvpb.WriteTooOldError:) WriteTooOldError: write for key "k" at timestamp 1.000000000,0 too old; must write at or above 10.000000000,1

# Now do a transactional put @t=1 with expectation of nil; will return WriteTooOld error @t=10,2.
run error
Expand Down

0 comments on commit 59f90a5

Please sign in to comment.