diff --git a/pkg/kv/db.go b/pkg/kv/db.go index d48e8155d758..1d50f56ad0d1 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -951,7 +951,9 @@ func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn) return retryable(ctx, txn) }) if err != nil { - txn.CleanupOnError(ctx, err) + if rollbackErr := txn.Rollback(ctx); rollbackErr != nil { + log.Eventf(ctx, "failure aborting transaction: %s; abort caused by: %s", rollbackErr, err) + } } // Terminate TransactionRetryWithProtoRefreshError here, so it doesn't cause a higher-level // txn to be retried. We don't do this in any of the other functions in DB; I diff --git a/pkg/kv/kvclient/kvcoord/integration_test.go b/pkg/kv/kvclient/kvcoord/integration_test.go index 38007282a06c..8fbb65265285 100644 --- a/pkg/kv/kvclient/kvcoord/integration_test.go +++ b/pkg/kv/kvclient/kvcoord/integration_test.go @@ -22,11 +22,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" ) // This file contains contains integration tests that don't fit anywhere else. @@ -177,9 +177,8 @@ func TestWaiterOnRejectedCommit(t *testing.T) { <-readerBlocked <-readerBlocked - if err := txn.CommitOrCleanup(ctx); !testutils.IsError(err, "test injected err") { - t.Fatalf("expected injected err, got: %v", err) - } + require.ErrorContains(t, txn.Commit(ctx), "test injected err", "expected injected error") + require.NoError(t, txn.Rollback(ctx)) // Wait for the txn wait queue to be pinged and check the status. if status := <-txnUpdate; status != roachpb.ABORTED { t.Fatalf("expected the wait queue to be updated with an Aborted txn, instead got: %s", status) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index 009c1ba4cd5b..c1e8854de246 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -94,7 +94,7 @@ func TestSavepoints(t *testing.T) { ptxn() case "commit": - if err := txn.CommitOrCleanup(ctx); err != nil { + if err := txn.Commit(ctx); err != nil { fmt.Fprintf(&buf, "(%T) %v\n", err, err) } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go index 4b1ee530ff0e..4899ad9ad56c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go @@ -81,7 +81,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) { if err := conflictTxn.Put(ctx, key, "pusher was here"); err != nil { return err } - return conflictTxn.CommitOrCleanup(ctx) + return conflictTxn.Commit(ctx) } // Make a db with a short heartbeat interval. @@ -128,7 +128,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) { // Check that further sends through the aborted txn are rejected. The // TxnCoordSender is supposed to synthesize a TransactionAbortedError. - if err := txn.CommitOrCleanup(ctx); !testutils.IsError( + if err := txn.Commit(ctx); !testutils.IsError( err, "TransactionRetryWithProtoRefreshError: TransactionAbortedError", ) { t.Fatalf("expected aborted error, got: %s", err) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 69ebb445d53c..b7aa70c26b1a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -432,7 +432,9 @@ func TestTxnCoordSenderEndTxn(t *testing.T) { err := txn.UpdateDeadline(ctx, pushedTimestamp.Next()) require.NoError(t, err, "Deadline update to future failed") } - err = txn.CommitOrCleanup(ctx) + if err = txn.Commit(ctx); err != nil { + require.NoError(t, txn.Rollback(ctx)) + } switch i { case 0: @@ -624,11 +626,9 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) { // Now end the transaction and verify we've cleanup up, even though // end transaction failed. - err := txn1.CommitOrCleanup(ctx) + err := txn1.Commit(ctx) assertTransactionAbortedError(t, err) - if err := txn2.CommitOrCleanup(ctx); err != nil { - t.Fatal(err) - } + require.NoError(t, txn2.Commit(ctx)) verifyCleanup(key, s.Eng, t, txn1.Sender().(*kvcoord.TxnCoordSender), txn2.Sender().(*kvcoord.TxnCoordSender)) } @@ -655,9 +655,7 @@ func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) { txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now()) // Now immediately commit. - if err := txn.CommitOrCleanup(ctx); err != nil { - t.Fatal(err) - } + require.NoError(t, txn.Commit(ctx)) verifyCleanup(key, s.Eng, t, txn.Sender().(*kvcoord.TxnCoordSender)) } @@ -1322,7 +1320,10 @@ func TestTxnRestartCount(t *testing.T) { }) // Commit (should cause restart metric to increase). - err := txn.CommitOrCleanup(ctx) + err := txn.Commit(ctx) + if err != nil { + require.NoError(t, txn.Rollback(ctx)) + } assertTransactionRetryError(t, err) checkTxnMetrics(t, metrics, "restart txn", 0, 0, 1 /* aborts */, 1 /* restarts */) } @@ -1620,9 +1621,8 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) { if pErr := txn.Put(ctx, "a", "b"); pErr != nil { t.Fatalf("put failed: %s", pErr) } - if pErr := txn.CommitOrCleanup(ctx); pErr == nil { - t.Fatalf("unexpected commit success") - } + require.Error(t, txn.Commit(ctx), "unexpected commit success") + require.NoError(t, txn.Rollback(ctx)) if !commit.Load().(bool) { t.Errorf("%T: failed to find initial commit request", test.err) diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 07afc6ee8db1..b46833e9770f 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2505,7 +2505,7 @@ func TestDistributedTxnCleanup(t *testing.T) { return errors.New("forced abort") } if err := txnFn(ctx, txn); err != nil { - txn.CleanupOnError(ctx, err) + require.NoError(t, txn.Rollback(ctx)) if !force && commit { t.Fatalf("expected success with commit == true; got %v", err) } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index dd30a423ff59..e58abf18fe25 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -802,7 +802,9 @@ func (r *Replica) AdminMerge( err := runMergeTxn(txn) if err != nil { log.VEventf(ctx, 2, "merge txn failed: %s", err) - txn.CleanupOnError(ctx, err) + if rollbackErr := txn.Rollback(ctx); rollbackErr != nil { + log.VEventf(ctx, 2, "merge txn rollback failed: %s", rollbackErr) + } } if !errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) { if err != nil { diff --git a/pkg/kv/kvserver/reports/reporter.go b/pkg/kv/kvserver/reports/reporter.go index 9d606a49ff13..535b224fac79 100644 --- a/pkg/kv/kvserver/reports/reporter.go +++ b/pkg/kv/kvserver/reports/reporter.go @@ -771,8 +771,11 @@ func (r *meta2RangeIter) handleErr(ctx context.Context, err error) { } if !errIsRetriable(err) { if r.txn != nil { + log.Eventf(ctx, "non-retriable error: %s", err) // On any non-retriable error, rollback. - r.txn.CleanupOnError(ctx, err) + if rollbackErr := r.txn.Rollback(ctx); rollbackErr != nil { + log.Eventf(ctx, "rollback failed: %s", rollbackErr) + } r.txn = nil } r.reset() diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index b675e60ddac9..b9a18e1196c5 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -100,9 +100,9 @@ type Txn struct { // Note: for KV usage that should be subject to admission control, prefer // NewTxnRootKV() below. // -// If the transaction is used to send any operations, CommitOrCleanup() or -// CleanupOnError() should eventually be called to commit/rollback the -// transaction (including stopping the heartbeat loop). +// If the transaction is used to send any operations, Commit() or Rollback() +// should eventually be called to commit/rollback the transaction (including +// stopping the heartbeat loop). // // gatewayNodeID: If != 0, this is the ID of the node on whose behalf this // @@ -689,27 +689,7 @@ func (txn *Txn) commit(ctx context.Context) error { return pErr.GoError() } -// CleanupOnError cleans up the transaction as a result of an error. -func (txn *Txn) CleanupOnError(ctx context.Context, err error) { - if txn.typ != RootTxn { - panic(errors.WithContextTags(errors.AssertionFailedf("CleanupOnError() called on leaf txn"), ctx)) - } - - if err == nil { - panic(errors.WithContextTags(errors.AssertionFailedf("CleanupOnError() called with nil error"), ctx)) - } - if replyErr := txn.rollback(ctx); replyErr != nil { - if _, ok := replyErr.GetDetail().(*roachpb.TransactionStatusError); ok || txn.IsAborted() { - log.Eventf(ctx, "failure aborting transaction: %s; abort caused by: %s", replyErr, err) - } else { - log.Warningf(ctx, "failure aborting transaction: %s; abort caused by: %s", replyErr, err) - } - } -} - -// Commit is the same as CommitOrCleanup but will not attempt to clean -// up on failure. This can be used when the caller is prepared to do proper -// cleanup. +// Commit sends an EndTxnRequest with Commit=true. func (txn *Txn) Commit(ctx context.Context) error { if txn.typ != RootTxn { return errors.WithContextTags(errors.AssertionFailedf("Commit() called on leaf txn"), ctx) @@ -741,21 +721,6 @@ func (txn *Txn) CommitInBatch(ctx context.Context, b *Batch) error { return txn.Run(ctx, b) } -// CommitOrCleanup sends an EndTxnRequest with Commit=true. -// If that fails, an attempt to rollback is made. -// txn should not be used to send any more commands after this call. -func (txn *Txn) CommitOrCleanup(ctx context.Context) error { - if txn.typ != RootTxn { - return errors.WithContextTags(errors.AssertionFailedf("CommitOrCleanup() called on leaf txn"), ctx) - } - - err := txn.commit(ctx) - if err != nil { - txn.CleanupOnError(ctx, err) - } - return err -} - // UpdateDeadline sets the transactions deadline to the passed deadline. // It may move the deadline to any timestamp above the current read timestamp. // If the deadline is below the current provisional commit timestamp (write timestamp), diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index ffcb560855cf..07a5146025f0 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -364,9 +364,7 @@ func TestTransactionStatus(t *testing.T) { } } if commit { - if pErr := txn.CommitOrCleanup(ctx); pErr != nil { - t.Fatal(pErr) - } + require.NoError(t, txn.Commit(ctx)) if a, e := txn.TestingCloneTxn().Status, roachpb.COMMITTED; a != e { t.Errorf("write: %t, commit: %t transaction expected to have status %q but had %q", write, commit, e, a) } diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index b26bc3ad6c56..077dede964d8 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -534,7 +534,7 @@ func cleanupAndFinishOnError(args fsm.Args) error { func() { ts.mu.Lock() defer ts.mu.Unlock() - ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause()) + _ = ts.mu.txn.Rollback(ts.Ctx) }() finishedTxnID := ts.finishSQLTxn() ts.setAdvanceInfo( diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index 3482c32e34f0..ab8fca256b4c 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -801,11 +801,19 @@ func (p *planner) preparePlannerForCopy( // committing its transactions and the execution didn't already commit it // (through the planner.autoCommit optimization). if autoCommit && !txn.IsCommitted() { - return txn.CommitOrCleanup(ctx) + err = txn.Commit(ctx) + if err != nil { + if rollbackErr := txn.Rollback(ctx); rollbackErr != nil { + log.Eventf(ctx, "rollback failed: %s", rollbackErr) + } + } + return err } return nil } - txn.CleanupOnError(ctx, prevErr) + if rollbackErr := txn.Rollback(ctx); rollbackErr != nil { + log.Eventf(ctx, "rollback failed: %s", rollbackErr) + } return prevErr } } diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index db3515687a47..73b772afee73 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -101,7 +101,10 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { if err := conflictTxn.Put(ctx, key, "pusher was here"); err != nil { return err } - return conflictTxn.CommitOrCleanup(ctx) + err = conflictTxn.Commit(ctx) + require.NoError(t, err) + t.Log(conflictTxn.Rollback(ctx)) + return err } // Make a db with a short heartbeat interval, so that the aborted txn finds