Skip to content

Commit

Permalink
Merge #101766
Browse files Browse the repository at this point in the history
101766: kv: unit test IsEndTxnExceedingDeadline and IsEndTxnTriggeringRetryError r=arulajmani a=nvanbenschoten

Informs #100131.

This commit adds unit testing around IsEndTxnExceedingDeadline and IsEndTxnTriggeringRetryError. We'll be changing the latter function in future commits, so it's helpful to have tests in place.

While here, the commit cleans up the code slightly. It narrows the signature of the IsEndTxnTriggeringRetryError function to make its inputs more clear. It also makes the logic more readable.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Apr 19, 2023
2 parents 50bc51e + 534b8e5 commit 36fed37
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -839,9 +839,9 @@ message EndTxnRequest {
// If set, deadline represents the maximum (exclusive) timestamp at which the
// transaction can commit (i.e. the maximum timestamp for the txn's reads and
// writes).
// If EndTxn(Commit=true) finds that the txn's timestamp has been pushed above
// this deadline, an error will be returned and the client is supposed to
// rollback the txn.
// If EndTxn(Commit=true) finds that the txn's timestamp has been pushed to or
// above this deadline, an error will be returned and the client is supposed
// to rollback the txn.
util.hlc.Timestamp deadline = 3 [(gogoproto.nullable) = false];
// commit triggers. Note that commit triggers are for
// internal use only and will cause an error if requested through the
Expand Down
35 changes: 15 additions & 20 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func EndTxn(
// NOTE: if the transaction is in the implicit commit state and this EndTxn
// request is marking the commit as explicit, this check must succeed. We
// assert this in txnCommitter.makeTxnCommitExplicitAsync.
if retry, reason, extraMsg := IsEndTxnTriggeringRetryError(reply.Txn, args); retry {
if retry, reason, extraMsg := IsEndTxnTriggeringRetryError(reply.Txn, args.Deadline); retry {
return result.Result{}, kvpb.NewTransactionRetryError(reason, extraMsg)
}

Expand Down Expand Up @@ -491,36 +491,31 @@ func IsEndTxnExceedingDeadline(commitTS hlc.Timestamp, deadline hlc.Timestamp) b
return !deadline.IsEmpty() && deadline.LessEq(commitTS)
}

// IsEndTxnTriggeringRetryError returns true if the EndTxnRequest cannot be
// IsEndTxnTriggeringRetryError returns true if the Transaction cannot be
// committed and needs to return a TransactionRetryError. It also returns the
// reason and possibly an extra message to be used for the error.
func IsEndTxnTriggeringRetryError(
txn *roachpb.Transaction, args *kvpb.EndTxnRequest,
txn *roachpb.Transaction, deadline hlc.Timestamp,
) (retry bool, reason kvpb.TransactionRetryReason, extraMsg redact.RedactableString) {
// If we saw any WriteTooOldErrors, we must restart to avoid lost
// update anomalies.
if txn.WriteTooOld {
retry, reason = true, kvpb.RETRY_WRITE_TOO_OLD
} else {
readTimestamp := txn.ReadTimestamp
isTxnPushed := txn.WriteTimestamp != readTimestamp

// If we saw any WriteTooOldErrors, we must restart to avoid lost
// update anomalies.
return true, kvpb.RETRY_WRITE_TOO_OLD, ""
}
if txn.WriteTimestamp != txn.ReadTimestamp {
// Return a transaction retry error if the commit timestamp isn't equal to
// the txn timestamp.
if isTxnPushed {
retry, reason = true, kvpb.RETRY_SERIALIZABLE
}
return true, kvpb.RETRY_SERIALIZABLE, ""
}

// A transaction must obey its deadline, if set.
if !retry && IsEndTxnExceedingDeadline(txn.WriteTimestamp, args.Deadline) {
exceededBy := txn.WriteTimestamp.GoTime().Sub(args.Deadline.GoTime())
if IsEndTxnExceedingDeadline(txn.WriteTimestamp, deadline) {
// A transaction must obey its deadline, if set.
exceededBy := txn.WriteTimestamp.GoTime().Sub(deadline.GoTime())
extraMsg = redact.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s)",
exceededBy, txn.WriteTimestamp, args.Deadline)
retry, reason = true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED
exceededBy, txn.WriteTimestamp, deadline)
return true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg
}
return retry, reason, extraMsg
return false, 0, ""
}

const lockResolutionBatchSize = 500
Expand Down
86 changes: 86 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand All @@ -30,6 +31,91 @@ import (
"github.com/stretchr/testify/require"
)

func TestIsEndTxnExceedingDeadline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := []struct {
name string
commitTS int64
deadline int64
exp bool
}{
{"no deadline", 10, 0, false},
{"later deadline", 10, 11, false},
{"equal deadline", 10, 10, true},
{"earlier deadline", 10, 9, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
commitTS := hlc.Timestamp{WallTime: tt.commitTS}
deadline := hlc.Timestamp{WallTime: tt.deadline}
require.Equal(t, tt.exp, IsEndTxnExceedingDeadline(commitTS, deadline))
})
}
}

func TestIsEndTxnTriggeringRetryError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := []struct {
txnIsoLevel isolation.Level
txnWriteTooOld bool
txnWriteTimestampPushed bool
txnExceedingDeadline bool

expRetry bool
expReason kvpb.TransactionRetryReason
}{
{isolation.Serializable, false, false, false, false, 0},
{isolation.Serializable, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED},
{isolation.Serializable, false, true, false, true, kvpb.RETRY_SERIALIZABLE},
{isolation.Serializable, false, true, true, true, kvpb.RETRY_SERIALIZABLE},
{isolation.Serializable, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Serializable, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Serializable, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Serializable, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Snapshot, false, false, false, false, 0},
{isolation.Snapshot, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED},
{isolation.Snapshot, false, true, false, true, kvpb.RETRY_SERIALIZABLE},
{isolation.Snapshot, false, true, true, true, kvpb.RETRY_SERIALIZABLE},
{isolation.Snapshot, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Snapshot, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Snapshot, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.Snapshot, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.ReadCommitted, false, false, false, false, 0},
{isolation.ReadCommitted, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED},
{isolation.ReadCommitted, false, true, false, true, kvpb.RETRY_SERIALIZABLE},
{isolation.ReadCommitted, false, true, true, true, kvpb.RETRY_SERIALIZABLE},
{isolation.ReadCommitted, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.ReadCommitted, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.ReadCommitted, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD},
{isolation.ReadCommitted, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD},
}
for _, tt := range tests {
name := fmt.Sprintf("iso=%s/wto=%t/pushed=%t/deadline=%t",
tt.txnIsoLevel, tt.txnWriteTooOld, tt.txnWriteTimestampPushed, tt.txnExceedingDeadline)
t.Run(name, func(t *testing.T) {
txn := roachpb.MakeTransaction("test", nil, tt.txnIsoLevel, 0, hlc.Timestamp{WallTime: 10}, 0, 1)
if tt.txnWriteTooOld {
txn.WriteTooOld = true
}
if tt.txnWriteTimestampPushed {
txn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0)
}
deadline := txn.WriteTimestamp.Next()
if tt.txnExceedingDeadline {
deadline = txn.WriteTimestamp.Prev()
}

gotRetry, gotReason, _ := IsEndTxnTriggeringRetryError(&txn, deadline)
require.Equal(t, tt.expRetry, gotRetry)
require.Equal(t, tt.expReason, gotReason)
})
}
}

// TestEndTxnUpdatesTransactionRecord tests EndTxn request across its various
// possible transaction record state transitions and error cases.
func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func isOnePhaseCommit(ba *kvpb.BatchRequest) bool {
}
arg, _ := ba.GetArg(kvpb.EndTxn)
etArg := arg.(*kvpb.EndTxnRequest)
if retry, _, _ := batcheval.IsEndTxnTriggeringRetryError(ba.Txn, etArg); retry {
if retry, _, _ := batcheval.IsEndTxnTriggeringRetryError(ba.Txn, etArg.Deadline); retry {
return false
}
// If the transaction has already restarted at least once then it may have
Expand Down

0 comments on commit 36fed37

Please sign in to comment.