From a4005e31c3bb180404451a7b61bf0b97c9ef2020 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 9 Aug 2023 23:27:16 -0400 Subject: [PATCH] kv: unit test PrepareTransactionForRetry and TransactionRefreshTimestamp Informs #104233. This commit adds a pair of new unit tests to verify the behavior of `PrepareTransactionForRetry` and `TransactionRefreshTimestamp`. These functions will be getting more complex for #104233, so it will be helpful to have these tests in place. The tests also serve as good documentation. Release note: None --- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 9 +- pkg/kv/kvpb/BUILD.bazel | 2 + pkg/kv/kvpb/data.go | 31 +-- pkg/kv/kvpb/data_test.go | 233 ++++++++++++++++++++ 4 files changed, 257 insertions(+), 18 deletions(-) create mode 100644 pkg/kv/kvpb/data_test.go diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 0b928bde1d8e..57c24ce3e364 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -804,9 +804,7 @@ func (tc *TxnCoordSender) UpdateStateOnRemoteRetryableErr( // not be usable afterwards (in case of TransactionAbortedError). The caller is // expected to check the ID of the resulting transaction. If the TxnCoordSender // can still be used, it will have been prepared for a new epoch. -func (tc *TxnCoordSender) handleRetryableErrLocked( - ctx context.Context, pErr *kvpb.Error, -) *kvpb.TransactionRetryWithProtoRefreshError { +func (tc *TxnCoordSender) handleRetryableErrLocked(ctx context.Context, pErr *kvpb.Error) error { // If the error is a transaction retry error, update metrics to // reflect the reason for the restart. More details about the // different error types are documented above on the metaRestart @@ -842,7 +840,10 @@ func (tc *TxnCoordSender) handleRetryableErrLocked( tc.metrics.RestartsUnknown.Inc() } errTxnID := pErr.GetTxn().ID - newTxn := kvpb.PrepareTransactionForRetry(ctx, pErr, tc.mu.userPriority, tc.clock) + newTxn, assertErr := kvpb.PrepareTransactionForRetry(pErr, tc.mu.userPriority, tc.clock) + if assertErr != nil { + return assertErr + } // We'll pass a TransactionRetryWithProtoRefreshError up to the next layer. retErr := kvpb.NewTransactionRetryWithProtoRefreshError( diff --git a/pkg/kv/kvpb/BUILD.bazel b/pkg/kv/kvpb/BUILD.bazel index c89c653cf675..59f427d58c33 100644 --- a/pkg/kv/kvpb/BUILD.bazel +++ b/pkg/kv/kvpb/BUILD.bazel @@ -55,6 +55,7 @@ go_test( srcs = [ "api_test.go", "batch_test.go", + "data_test.go", "errors_test.go", "node_decommissioned_error_test.go", "replica_unavailable_error_test.go", @@ -74,6 +75,7 @@ go_test( "//pkg/util/buildutil", "//pkg/util/hlc", "//pkg/util/protoutil", + "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", diff --git a/pkg/kv/kvpb/data.go b/pkg/kv/kvpb/data.go index eb3471f7e7a6..9df6732ae722 100644 --- a/pkg/kv/kvpb/data.go +++ b/pkg/kv/kvpb/data.go @@ -12,11 +12,9 @@ package kvpb import ( - "context" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" ) // PrepareTransactionForRetry returns a new Transaction to be used for retrying @@ -34,14 +32,18 @@ import ( // In case retryErr tells us that a new Transaction needs to be created, // isolation and name help initialize this new transaction. func PrepareTransactionForRetry( - ctx context.Context, pErr *Error, pri roachpb.UserPriority, clock *hlc.Clock, -) roachpb.Transaction { + pErr *Error, pri roachpb.UserPriority, clock *hlc.Clock, +) (roachpb.Transaction, error) { + if pErr == nil { + return roachpb.Transaction{}, errors.AssertionFailedf("nil error") + } if pErr.TransactionRestart() == TransactionRestart_NONE { - log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr) + return roachpb.Transaction{}, errors.AssertionFailedf( + "invalid retryable error (%T): %s", pErr.GetDetail(), pErr) } - if pErr.GetTxn() == nil { - log.Fatalf(ctx, "missing txn for retryable error: %s", pErr) + return roachpb.Transaction{}, errors.AssertionFailedf( + "missing txn for retryable error: %s", pErr) } txn := *pErr.GetTxn() @@ -108,19 +110,20 @@ func PrepareTransactionForRetry( // IntentMissingErrors are not expected to be handled at this level; // We instead expect the txnPipeliner to transform them into a // TransactionRetryErrors(RETRY_ASYNC_WRITE_FAILURE) error. - log.Fatalf( - ctx, "unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail(), - ) + return roachpb.Transaction{}, errors.AssertionFailedf( + "unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail()) default: - log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr) + return roachpb.Transaction{}, errors.AssertionFailedf( + "invalid retryable err (%T): %s", pErr.GetDetail(), pErr) } if !aborted { if txn.Status.IsFinalized() { - log.Fatalf(ctx, "transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr) + return roachpb.Transaction{}, errors.AssertionFailedf( + "transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr) } txn.Restart(pri, txn.Priority, txn.WriteTimestamp) } - return txn + return txn, nil } // TransactionRefreshTimestamp returns whether the supplied error is a retry diff --git a/pkg/kv/kvpb/data_test.go b/pkg/kv/kvpb/data_test.go new file mode 100644 index 000000000000..96eba928871f --- /dev/null +++ b/pkg/kv/kvpb/data_test.go @@ -0,0 +1,233 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvpb + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestPrepareTransactionForRetry(t *testing.T) { + ts1 := hlc.Timestamp{WallTime: 1} + ts2 := hlc.Timestamp{WallTime: 2} + tsClock := hlc.Timestamp{WallTime: 3} + txn := roachpb.MakeTransaction("test", nil, isolation.Serializable, -1, ts1, 0, 99) + txn2ID := uuid.MakeV4() // used if txn is aborted + tests := []struct { + name string + err *Error + expTxn roachpb.Transaction + expErr bool + }{ + { + name: "no error", + err: nil, + expErr: true, + }, + { + name: "no txn", + err: NewError(errors.New("random")), + expErr: true, + }, + { + name: "random error", + err: NewErrorWithTxn(errors.New("random"), &txn), + expErr: true, + }, + { + name: "txn aborted error", + err: NewErrorWithTxn(&TransactionAbortedError{}, &txn), + expTxn: func() roachpb.Transaction { + nextTxn := txn + nextTxn.ID = txn2ID + nextTxn.ReadTimestamp = tsClock + nextTxn.WriteTimestamp = tsClock + nextTxn.MinTimestamp = tsClock + nextTxn.LastHeartbeat = tsClock + nextTxn.GlobalUncertaintyLimit = tsClock + return nextTxn + }(), + }, + { + name: "read within uncertainty error", + err: NewErrorWithTxn(&ReadWithinUncertaintyIntervalError{ValueTimestamp: ts2}, &txn), + expTxn: func() roachpb.Transaction { + nextTxn := txn + nextTxn.Epoch++ + nextTxn.ReadTimestamp = ts2.Next() + nextTxn.WriteTimestamp = ts2.Next() + return nextTxn + }(), + }, + { + name: "txn push error", + err: NewErrorWithTxn(&TransactionPushError{ + PusheeTxn: roachpb.Transaction{TxnMeta: enginepb.TxnMeta{WriteTimestamp: ts2, Priority: 3}}, + }, &txn), + expTxn: func() roachpb.Transaction { + nextTxn := txn + nextTxn.Epoch++ + nextTxn.ReadTimestamp = ts2 + nextTxn.WriteTimestamp = ts2 + nextTxn.Priority = 2 + return nextTxn + }(), + }, + { + name: "txn retry error (reason: write too old)", + err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_WRITE_TOO_OLD}, &txn), + expTxn: func() roachpb.Transaction { + nextTxn := txn + nextTxn.Epoch++ + return nextTxn + }(), + }, + { + name: "txn retry error (reason: serializable)", + err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_SERIALIZABLE}, &txn), + expTxn: func() roachpb.Transaction { + nextTxn := txn + nextTxn.Epoch++ + nextTxn.ReadTimestamp = tsClock + nextTxn.WriteTimestamp = tsClock + return nextTxn + }(), + }, + { + name: "write too old error", + err: NewErrorWithTxn(&WriteTooOldError{ActualTimestamp: ts2}, &txn), + expTxn: func() roachpb.Transaction { + nextTxn := txn + nextTxn.Epoch++ + nextTxn.ReadTimestamp = ts2 + nextTxn.WriteTimestamp = ts2 + return nextTxn + }(), + }, + { + name: "intent missing error", + err: NewErrorWithTxn(&IntentMissingError{}, &txn), + expErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, tsClock.WallTime))) + nextTxn, err := PrepareTransactionForRetry(tt.err, -1 /* pri */, clock) + if tt.expErr { + require.Error(t, err) + require.True(t, errors.IsAssertionFailure(err)) + require.Zero(t, nextTxn) + } else { + require.NoError(t, err) + if nextTxn.ID != txn.ID { + // Eliminate randomness from ID generation. + nextTxn.ID = txn2ID + } + require.Equal(t, tt.expTxn, nextTxn) + } + }) + } +} + +func TestTransactionRefreshTimestamp(t *testing.T) { + ts1 := hlc.Timestamp{WallTime: 1} + ts2 := hlc.Timestamp{WallTime: 2} + txn := roachpb.MakeTransaction("test", nil, isolation.Serializable, 1, ts1, 0, 99) + tests := []struct { + name string + err *Error + expOk bool + expTs hlc.Timestamp + }{ + { + name: "no error", + err: nil, + expOk: false, + expTs: hlc.Timestamp{}, + }, + { + name: "no txn", + err: NewError(errors.New("random")), + expOk: false, + expTs: hlc.Timestamp{}, + }, + { + name: "random error", + err: NewErrorWithTxn(errors.New("random"), &txn), + expOk: false, + expTs: hlc.Timestamp{}, + }, + { + name: "txn aborted error", + err: NewErrorWithTxn(&TransactionAbortedError{}, &txn), + expOk: false, + expTs: hlc.Timestamp{}, + }, + { + name: "txn retry error (reason: unknown)", + err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_REASON_UNKNOWN}, &txn), + expOk: false, + expTs: hlc.Timestamp{}, + }, + { + name: "txn retry error (reason: write too old)", + err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_WRITE_TOO_OLD}, &txn), + expOk: true, + expTs: ts1, + }, + { + name: "txn retry error (reason: serializable)", + err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_SERIALIZABLE}, &txn), + expOk: true, + expTs: ts1, + }, + { + name: "txn retry error (reason: async write failure)", + err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_ASYNC_WRITE_FAILURE}, &txn), + expOk: false, + expTs: hlc.Timestamp{}, + }, + { + name: "txn retry error (reason: commit deadline exceeded)", + err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_COMMIT_DEADLINE_EXCEEDED}, &txn), + expOk: false, + expTs: hlc.Timestamp{}, + }, + { + name: "write too old error", + err: NewErrorWithTxn(&WriteTooOldError{ActualTimestamp: ts2}, &txn), + expOk: true, + expTs: ts2, + }, + { + name: "read within uncertainty error", + err: NewErrorWithTxn(&ReadWithinUncertaintyIntervalError{ValueTimestamp: ts2}, &txn), + expOk: true, + expTs: ts2.Next(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ok, ts := TransactionRefreshTimestamp(tt.err) + require.Equal(t, tt.expOk, ok) + require.Equal(t, tt.expTs, ts) + }) + } +}