diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 51c2588d8794..cc43e4ce0bbb 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -839,9 +839,9 @@ message EndTxnRequest { // If set, deadline represents the maximum (exclusive) timestamp at which the // transaction can commit (i.e. the maximum timestamp for the txn's reads and // writes). - // If EndTxn(Commit=true) finds that the txn's timestamp has been pushed above - // this deadline, an error will be returned and the client is supposed to - // rollback the txn. + // If EndTxn(Commit=true) finds that the txn's timestamp has been pushed to or + // above this deadline, an error will be returned and the client is supposed + // to rollback the txn. util.hlc.Timestamp deadline = 3 [(gogoproto.nullable) = false]; // commit triggers. Note that commit triggers are for // internal use only and will cause an error if requested through the diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index 4cfb66529222..d97a526127e4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -361,7 +361,7 @@ func EndTxn( // NOTE: if the transaction is in the implicit commit state and this EndTxn // request is marking the commit as explicit, this check must succeed. We // assert this in txnCommitter.makeTxnCommitExplicitAsync. - if retry, reason, extraMsg := IsEndTxnTriggeringRetryError(reply.Txn, args); retry { + if retry, reason, extraMsg := IsEndTxnTriggeringRetryError(reply.Txn, args.Deadline); retry { return result.Result{}, kvpb.NewTransactionRetryError(reason, extraMsg) } @@ -491,36 +491,31 @@ func IsEndTxnExceedingDeadline(commitTS hlc.Timestamp, deadline hlc.Timestamp) b return !deadline.IsEmpty() && deadline.LessEq(commitTS) } -// IsEndTxnTriggeringRetryError returns true if the EndTxnRequest cannot be +// IsEndTxnTriggeringRetryError returns true if the Transaction cannot be // committed and needs to return a TransactionRetryError. It also returns the // reason and possibly an extra message to be used for the error. func IsEndTxnTriggeringRetryError( - txn *roachpb.Transaction, args *kvpb.EndTxnRequest, + txn *roachpb.Transaction, deadline hlc.Timestamp, ) (retry bool, reason kvpb.TransactionRetryReason, extraMsg redact.RedactableString) { - // If we saw any WriteTooOldErrors, we must restart to avoid lost - // update anomalies. if txn.WriteTooOld { - retry, reason = true, kvpb.RETRY_WRITE_TOO_OLD - } else { - readTimestamp := txn.ReadTimestamp - isTxnPushed := txn.WriteTimestamp != readTimestamp - + // If we saw any WriteTooOldErrors, we must restart to avoid lost + // update anomalies. + return true, kvpb.RETRY_WRITE_TOO_OLD, "" + } + if txn.WriteTimestamp != txn.ReadTimestamp { // Return a transaction retry error if the commit timestamp isn't equal to // the txn timestamp. - if isTxnPushed { - retry, reason = true, kvpb.RETRY_SERIALIZABLE - } + return true, kvpb.RETRY_SERIALIZABLE, "" } - - // A transaction must obey its deadline, if set. - if !retry && IsEndTxnExceedingDeadline(txn.WriteTimestamp, args.Deadline) { - exceededBy := txn.WriteTimestamp.GoTime().Sub(args.Deadline.GoTime()) + if IsEndTxnExceedingDeadline(txn.WriteTimestamp, deadline) { + // A transaction must obey its deadline, if set. + exceededBy := txn.WriteTimestamp.GoTime().Sub(deadline.GoTime()) extraMsg = redact.Sprintf( "txn timestamp pushed too much; deadline exceeded by %s (%s > %s)", - exceededBy, txn.WriteTimestamp, args.Deadline) - retry, reason = true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED + exceededBy, txn.WriteTimestamp, deadline) + return true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg } - return retry, reason, extraMsg + return false, 0, "" } const lockResolutionBatchSize = 500 diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go index c39ada0ef153..94ef3dc77262 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -30,6 +31,91 @@ import ( "github.com/stretchr/testify/require" ) +func TestIsEndTxnExceedingDeadline(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tests := []struct { + name string + commitTS int64 + deadline int64 + exp bool + }{ + {"no deadline", 10, 0, false}, + {"later deadline", 10, 11, false}, + {"equal deadline", 10, 10, true}, + {"earlier deadline", 10, 9, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + commitTS := hlc.Timestamp{WallTime: tt.commitTS} + deadline := hlc.Timestamp{WallTime: tt.deadline} + require.Equal(t, tt.exp, IsEndTxnExceedingDeadline(commitTS, deadline)) + }) + } +} + +func TestIsEndTxnTriggeringRetryError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tests := []struct { + txnIsoLevel isolation.Level + txnWriteTooOld bool + txnWriteTimestampPushed bool + txnExceedingDeadline bool + + expRetry bool + expReason kvpb.TransactionRetryReason + }{ + {isolation.Serializable, false, false, false, false, 0}, + {isolation.Serializable, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED}, + {isolation.Serializable, false, true, false, true, kvpb.RETRY_SERIALIZABLE}, + {isolation.Serializable, false, true, true, true, kvpb.RETRY_SERIALIZABLE}, + {isolation.Serializable, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.Serializable, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.Serializable, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.Serializable, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.Snapshot, false, false, false, false, 0}, + {isolation.Snapshot, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED}, + {isolation.Snapshot, false, true, false, true, kvpb.RETRY_SERIALIZABLE}, + {isolation.Snapshot, false, true, true, true, kvpb.RETRY_SERIALIZABLE}, + {isolation.Snapshot, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.Snapshot, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.Snapshot, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.Snapshot, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.ReadCommitted, false, false, false, false, 0}, + {isolation.ReadCommitted, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED}, + {isolation.ReadCommitted, false, true, false, true, kvpb.RETRY_SERIALIZABLE}, + {isolation.ReadCommitted, false, true, true, true, kvpb.RETRY_SERIALIZABLE}, + {isolation.ReadCommitted, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.ReadCommitted, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.ReadCommitted, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD}, + {isolation.ReadCommitted, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD}, + } + for _, tt := range tests { + name := fmt.Sprintf("iso=%s/wto=%t/pushed=%t/deadline=%t", + tt.txnIsoLevel, tt.txnWriteTooOld, tt.txnWriteTimestampPushed, tt.txnExceedingDeadline) + t.Run(name, func(t *testing.T) { + txn := roachpb.MakeTransaction("test", nil, tt.txnIsoLevel, 0, hlc.Timestamp{WallTime: 10}, 0, 1) + if tt.txnWriteTooOld { + txn.WriteTooOld = true + } + if tt.txnWriteTimestampPushed { + txn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0) + } + deadline := txn.WriteTimestamp.Next() + if tt.txnExceedingDeadline { + deadline = txn.WriteTimestamp.Prev() + } + + gotRetry, gotReason, _ := IsEndTxnTriggeringRetryError(&txn, deadline) + require.Equal(t, tt.expRetry, gotRetry) + require.Equal(t, tt.expReason, gotReason) + }) + } +} + // TestEndTxnUpdatesTransactionRecord tests EndTxn request across its various // possible transaction record state transitions and error cases. func TestEndTxnUpdatesTransactionRecord(t *testing.T) { diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 94be47931caf..9c8f75973d70 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -796,7 +796,7 @@ func isOnePhaseCommit(ba *kvpb.BatchRequest) bool { } arg, _ := ba.GetArg(kvpb.EndTxn) etArg := arg.(*kvpb.EndTxnRequest) - if retry, _, _ := batcheval.IsEndTxnTriggeringRetryError(ba.Txn, etArg); retry { + if retry, _, _ := batcheval.IsEndTxnTriggeringRetryError(ba.Txn, etArg.Deadline); retry { return false } // If the transaction has already restarted at least once then it may have