Skip to content

Commit

Permalink
Merge #113646
Browse files Browse the repository at this point in the history
113646: kv: split LockConflictError, revive WriteIntentError over wire r=nvanbenschoten a=nvanbenschoten

Fixes #113271.

This commit resolves the backwards incompatibility introduced by 350dc60 when `WriteIntentError` was renamed to `LockConflictError`. This rename broke mixed-version compatibility, because error details in `kvpb.Error` are packaged into an `errorspb.EncodedError`, which internally uses a `protobuf/types.Any`. `protobuf/types.Any` encodes the error's name as a string, relying on the receiving node having a matching type in order to decode the error.

Without this, we saw the following logs on v23.1 nodes.
```
error while unmarshalling error: ‹any: message type "cockroach.kv.kvpb.LockConflictError" isn't linked
```
As a result, error handling for requests that used `WaitPolicy_Error` was broken.

This commit resolves the issue by re-introducing `WriteIntentError` over the wire, so that v23.1 and v23.2 nodes still use the same name to refer to the same error. It does so without reverting 350dc60 and losing the naming improvement in most of the code by splitting `LockConflictError` into its two distinct roles. `LockConflictError` remains in the kvserver to communicate locking conflicts between batch evaluation and concurrency handling. However, the smaller role of communicating locking conflicts to clients that use a `WaitPolicy_Error`, a lock timeout, or a maximum wait-queue length is split into a "new" error called `WriteIntentError`. Splitting these errors was a cleanup we wanted to do anyway, so this commit just does it now to fix the bug. The unfortunate naming of `WriteIntentError` is a battle that we can fight another day.

While this commit doesn't introduce any new tests, we have sufficient testing of the two uses of `WriteIntentError` for single-version clusters in the unit tests. For mixed-version clusters, we have the `backup-restore/mixed-version` roachtest, which caught the bug and exercises backup's use of `WriteIntentError`.

The remaining place where this broke mixed-version compatibility was `SELECT FOR UPDATE NOWAIT`. We should add mixed-version testing for all `SELECT FOR UPDATE` variants. In the meantime, I have manually verified that the following script works on a mixed-version cluster:
```
roachprod create nathan-113271 -n3
roachprod stage  nathan-113271 release v23.1.11
roachprod start  nathan-113271
roachprod stop   nathan-113271:2-3
roachprod put    nathan-113271:2-3 cockroach # with this commit
roachprod start  nathan-113271:2-3 --sequential=false

roachprod sql nathan-113271:1
roachprod sql nathan-113271:2

-- either shell
create table t(i int primary key);
insert into t values (1);
select lease_holder from [show ranges from table t with details];
alter table t scatter;

-- 23.2 shell
begin; select * from t for update;

-- 23.1 shell
begin; select * from t for update nowait;

-- if broken:
ERROR: conflicting locks on /Table/104/1/1/0 [reason=wait_policy]
-- if fixed:
ERROR: could not obtain lock on row (i)=(1) in t@t_pkey

-- same thing but in opposite direction, with 23.1 leaseholder and 23.2 gateway
```

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Nov 3, 2023
2 parents bea5506 + f294b36 commit 58d5679
Show file tree
Hide file tree
Showing 19 changed files with 191 additions and 140 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@
<tr><td>APPLICATION</td><td>distsender.rpc.err.transactionstatuserrtype</td><td>Number of TransactionStatusErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.txnalreadyencounterederrtype</td><td>Number of TxnAlreadyEncounteredErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.unsupportedrequesterrtype</td><td>Number of UnsupportedRequestErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.writeintenterrtype</td><td>Number of WriteIntentErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.err.writetooolderrtype</td><td>Number of WriteTooOldErrType errors received replica-bound RPCs<br/><br/>This counts how often error of the specified type was received back from replicas<br/>as part of executing possibly range-spanning requests. Failures to reach the target<br/>replica will be accounted for as &#39;roachpb.CommunicationErrType&#39; and unclassified<br/>errors as &#39;roachpb.InternalErrType&#39;.<br/></td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.export.sent</td><td>Number of Export requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.gc.sent</td><td>Number of GC requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,13 @@ func runBackupProcessor(
return nil
})
if exportRequestErr != nil {
if lockErr, ok := pErr.GetDetail().(*kvpb.LockConflictError); ok {
if lockErr, ok := pErr.GetDetail().(*kvpb.WriteIntentError); ok {
span.lastTried = timeutil.Now()
span.attempts++
todo <- span
// TODO(dt): send a progress update to update job progress to note
// the intents being hit.
log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered LockConflictError: %s", span.span, lockErr.Error())
log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, lockErr.Error())
span = spanAndTime{}
continue
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3661,7 +3661,7 @@ func TestMultipleErrorsMerged(t *testing.T) {
retryErr := kvpb.NewTransactionRetryError(kvpb.RETRY_SERIALIZABLE, "test err")
abortErr := kvpb.NewTransactionAbortedError(kvpb.ABORT_REASON_ABORTED_RECORD_FOUND)
conditionFailedErr := &kvpb.ConditionFailedError{}
lockConflictErr := &kvpb.LockConflictError{}
writeIntentErr := &kvpb.WriteIntentError{}
sendErr := &sendError{}
ambiguousErr := &kvpb.AmbiguousResultError{}
randomErr := &kvpb.IntegerOverflowError{}
Expand Down Expand Up @@ -3732,19 +3732,19 @@ func TestMultipleErrorsMerged(t *testing.T) {
err2: randomErr,
expErr: "results in overflow",
},
// LockConflictError also has a low score since it's "not ambiguous".
// WriteIntentError also has a low score since it's "not ambiguous".
{
err1: lockConflictErr,
err1: writeIntentErr,
err2: ambiguousErr,
expErr: "result is ambiguous",
},
{
err1: lockConflictErr,
err1: writeIntentErr,
err2: sendErr,
expErr: "failed to send RPC",
},
{
err1: lockConflictErr,
err1: writeIntentErr,
err2: randomErr,
expErr: "results in overflow",
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/priority_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// causing a livelock. This livelock is not possible after #108190 because
// refresh requests now declare isolated keys and go through the lock table
// where they can push a lower-priority request. In this particular example,
// when the refresh encounters the intent, it returns a LockConflictError, which
// when the refresh encounters the intent, it returns a WriteIntentError, which
// is handled and a lock is added to the lock table; then, when the refresh
// request retries, it pushes the lower-priority lock-holder's timestamp and
// succeeds.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ savepoint x

get conflict-key locking nowait
----
(*kvpb.LockConflictError) conflicting locks on "conflict-key" [reason=wait_policy]
(*kvpb.WriteIntentError) conflicting locks on "conflict-key" [reason=wait_policy]

can-use x
----
Expand All @@ -460,7 +460,7 @@ rollback x

put conflict-key b nowait
----
(*kvpb.LockConflictError) conflicting locks on "conflict-key" [reason=wait_policy]
(*kvpb.WriteIntentError) conflicting locks on "conflict-key" [reason=wait_policy]

can-use x
----
Expand Down Expand Up @@ -497,7 +497,7 @@ savepoint x

get conflict-key-2 lock-timeout
----
(*kvpb.LockConflictError) conflicting locks on "conflict-key-2" [reason=lock_timeout]
(*kvpb.WriteIntentError) conflicting locks on "conflict-key-2" [reason=lock_timeout]

can-use x
----
Expand All @@ -509,7 +509,7 @@ rollback x

put conflict-key-2 b lock-timeout
----
(*kvpb.LockConflictError) conflicting locks on "conflict-key-2" [reason=lock_timeout]
(*kvpb.WriteIntentError) conflicting locks on "conflict-key-2" [reason=lock_timeout]

can-use x
----
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,29 +1474,29 @@ func TestTxnPipelinerIgnoresLocksOnUnambiguousFailure(t *testing.T) {
expLocks = append(expLocks, roachpb.Span{Key: keyC, EndKey: keyC.Next()})
require.Equal(t, expLocks, tp.lockFootprint.asSlice())

// Return a LockConflictError for a Scan. The lock spans correspond to the
// Return a WriteIntentError for a Scan. The lock spans correspond to the
// Scan are not added to the lock footprint, but the lock spans for all
// other requests in the batch are.
ba.Requests = nil
ba.Add(&kvpb.ConditionalPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyD}})
ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyE, EndKey: keyE.Next()}})
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyF, EndKey: keyF.Next()}, KeyLockingStrength: lock.Exclusive})

lockConflictErr := kvpb.NewError(&kvpb.LockConflictError{})
lockConflictErr.SetErrorIndex(2)
writeIntentErr := kvpb.NewError(&kvpb.WriteIntentError{})
writeIntentErr.SetErrorIndex(2)
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 3)
require.False(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.ConditionalPutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[2].GetInner())

return nil, lockConflictErr
return nil, writeIntentErr
})

br, pErr = tp.SendLocked(ctx, ba)
require.Nil(t, br)
require.Equal(t, lockConflictErr, pErr)
require.Equal(t, writeIntentErr, pErr)
require.Equal(t, 0, tp.ifWrites.len())

expLocks = append(expLocks, roachpb.Span{Key: keyD})
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,11 @@ func newRetryErrorOnFailedPreemptiveRefresh(
conflictingTxn = refreshErr.ConflictingTxn
}
msg.Printf(" due to %s", refreshErr)
} else if lcErr, ok := pErr.GetDetail().(*kvpb.LockConflictError); ok {
if len(lcErr.Locks) > 0 {
conflictingTxn = &lcErr.Locks[0].Txn
} else if wiErr, ok := pErr.GetDetail().(*kvpb.WriteIntentError); ok {
if len(wiErr.Locks) > 0 {
conflictingTxn = &wiErr.Locks[0].Txn
}
msg.Printf(" due to %s", lcErr)
msg.Printf(" due to %s", wiErr)
} else {
msg.Printf(" - unknown error: %s", pErr)
}
Expand Down Expand Up @@ -588,8 +588,8 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans(
// WaitPolicy_Error allows a Refresh request to immediately push any
// conflicting transactions in the lock table wait queue without blocking. If
// the push fails, the request returns either a RefreshFailedError (if it
// encountered a committed value) or a LockConflictError (if it encountered an
// intent). These errors are handled in maybeRefreshPreemptively.
// encountered a committed value) or a WriteIntentError (if it encountered
// an intent). These errors are handled in maybeRefreshPreemptively.
refreshSpanBa.WaitPolicy = lock.WaitPolicy_Error
addRefreshes := func(refreshes *condensableSpanSet) {
// We're going to check writes between the previous refreshed timestamp, if
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvclient/kvcoord/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,9 +1044,9 @@ func TestTxnContinueAfterCputError(t *testing.T) {
}

// Test that a transaction can be used after a locking request returns a
// LockConflictError. This is not generally allowed for other errors, but
// a LockConflictError is special.
func TestTxnContinueAfterLockConflictError(t *testing.T) {
// WriteIntentError. This is not generally allowed for other errors, but
// a WriteIntentError is special.
func TestTxnContinueAfterWriteIntentError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
Expand All @@ -1062,7 +1062,7 @@ func TestTxnContinueAfterLockConflictError(t *testing.T) {
b.Header.WaitPolicy = lock.WaitPolicy_Error
b.Put("a", "c")
err := txn.Run(ctx, b)
require.IsType(t, &kvpb.LockConflictError{}, err)
require.IsType(t, &kvpb.WriteIntentError{}, err)

require.NoError(t, txn.Put(ctx, "a'", "c"))
require.NoError(t, txn.Commit(ctx))
Expand Down Expand Up @@ -1126,9 +1126,9 @@ func TestTxnWaitPolicies(t *testing.T) {
// Should return error immediately, without blocking.
err := <-errorC
require.NotNil(t, err)
lcErr := new(kvpb.LockConflictError)
lcErr := new(kvpb.WriteIntentError)
require.True(t, errors.As(err, &lcErr))
require.Equal(t, kvpb.LockConflictError_REASON_WAIT_POLICY, lcErr.Reason)
require.Equal(t, kvpb.WriteIntentError_REASON_WAIT_POLICY, lcErr.Reason)
}

// SkipLocked wait policy.
Expand Down Expand Up @@ -1203,9 +1203,9 @@ func TestTxnErrorWaitPolicyWithOldVersionPushTouch(t *testing.T) {
// Should return error immediately, without blocking, regardless of priority.
err := <-errorC
require.NotNil(t, err)
lcErr := new(kvpb.LockConflictError)
lcErr := new(kvpb.WriteIntentError)
require.True(t, errors.As(err, &lcErr))
require.Equal(t, kvpb.LockConflictError_REASON_WAIT_POLICY, lcErr.Reason)
require.Equal(t, kvpb.WriteIntentError_REASON_WAIT_POLICY, lcErr.Reason)

require.NoError(t, txn.Commit(ctx))
})
Expand All @@ -1227,9 +1227,9 @@ func TestTxnLockTimeout(t *testing.T) {
b.Get(key)
err := s.DB.Run(ctx, &b)
require.NotNil(t, err)
lcErr := new(kvpb.LockConflictError)
lcErr := new(kvpb.WriteIntentError)
require.True(t, errors.As(err, &lcErr))
require.Equal(t, kvpb.LockConflictError_REASON_LOCK_TIMEOUT, lcErr.Reason)
require.Equal(t, kvpb.WriteIntentError_REASON_LOCK_TIMEOUT, lcErr.Reason)
}

// TestTxnReturnsWriteTooOldErrorOnConflictingDeleteRange tests that if two
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvpb/errordetailtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 58d5679

Please sign in to comment.