Skip to content

Commit

Permalink
kv,sql: simplify the Txn API by removing 2 cleanup functions
Browse files Browse the repository at this point in the history
Txn.CleanupOnError() basically does a rollback, and in addition takes an error
only for the purpose of logging it.

Txn.CommitOrCleanup() tries to commit and if unsuccessful it tries a rollback.
The error from the rollback is logged but not returned, the error from the
commit is returned.

Removing these 2 functions means that the caller should call Commit and
Rollback directly when needed, and handle the returned errors. For example,
sql may need to log errors to a different channel from the one used but Txn,
and tests may want to fail when a Rollback fails unexpectedly. This PR
removes those functions.

Release note: None
  • Loading branch information
lidorcarmel committed Nov 1, 2022
1 parent 4815c5f commit 8b00b5d
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 69 deletions.
4 changes: 3 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,9 @@ func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn)
return retryable(ctx, txn)
})
if err != nil {
txn.CleanupOnError(ctx, err)
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
log.Eventf(ctx, "failure aborting transaction: %s; abort caused by: %s", rollbackErr, err)
}
}
// Terminate TransactionRetryWithProtoRefreshError here, so it doesn't cause a higher-level
// txn to be retried. We don't do this in any of the other functions in DB; I
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvclient/kvcoord/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

// This file contains contains integration tests that don't fit anywhere else.
Expand Down Expand Up @@ -177,9 +177,8 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
<-readerBlocked
<-readerBlocked

if err := txn.CommitOrCleanup(ctx); !testutils.IsError(err, "test injected err") {
t.Fatalf("expected injected err, got: %v", err)
}
require.ErrorContains(t, txn.Commit(ctx), "test injected err", "expected injected error")
require.NoError(t, txn.Rollback(ctx))
// Wait for the txn wait queue to be pinged and check the status.
if status := <-txnUpdate; status != roachpb.ABORTED {
t.Fatalf("expected the wait queue to be updated with an Aborted txn, instead got: %s", status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestSavepoints(t *testing.T) {
ptxn()

case "commit":
if err := txn.CommitOrCleanup(ctx); err != nil {
if err := txn.Commit(ctx); err != nil {
fmt.Fprintf(&buf, "(%T) %v\n", err, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {
if err := conflictTxn.Put(ctx, key, "pusher was here"); err != nil {
return err
}
return conflictTxn.CommitOrCleanup(ctx)
return conflictTxn.Commit(ctx)
}

// Make a db with a short heartbeat interval.
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {

// Check that further sends through the aborted txn are rejected. The
// TxnCoordSender is supposed to synthesize a TransactionAbortedError.
if err := txn.CommitOrCleanup(ctx); !testutils.IsError(
if err := txn.Commit(ctx); !testutils.IsError(
err, "TransactionRetryWithProtoRefreshError: TransactionAbortedError",
) {
t.Fatalf("expected aborted error, got: %s", err)
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {
err := txn.UpdateDeadline(ctx, pushedTimestamp.Next())
require.NoError(t, err, "Deadline update to future failed")
}
err = txn.CommitOrCleanup(ctx)
if err = txn.Commit(ctx); err != nil {
require.NoError(t, txn.Rollback(ctx))
}

switch i {
case 0:
Expand Down Expand Up @@ -624,11 +626,9 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {

// Now end the transaction and verify we've cleanup up, even though
// end transaction failed.
err := txn1.CommitOrCleanup(ctx)
err := txn1.Commit(ctx)
assertTransactionAbortedError(t, err)
if err := txn2.CommitOrCleanup(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, txn2.Commit(ctx))
verifyCleanup(key, s.Eng, t, txn1.Sender().(*kvcoord.TxnCoordSender), txn2.Sender().(*kvcoord.TxnCoordSender))
}

Expand All @@ -655,9 +655,7 @@ func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) {
txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now())

// Now immediately commit.
if err := txn.CommitOrCleanup(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, txn.Commit(ctx))
verifyCleanup(key, s.Eng, t, txn.Sender().(*kvcoord.TxnCoordSender))
}

Expand Down Expand Up @@ -1322,7 +1320,10 @@ func TestTxnRestartCount(t *testing.T) {
})

// Commit (should cause restart metric to increase).
err := txn.CommitOrCleanup(ctx)
err := txn.Commit(ctx)
if err != nil {
require.NoError(t, txn.Rollback(ctx))
}
assertTransactionRetryError(t, err)
checkTxnMetrics(t, metrics, "restart txn", 0, 0, 1 /* aborts */, 1 /* restarts */)
}
Expand Down Expand Up @@ -1620,9 +1621,8 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
if pErr := txn.Put(ctx, "a", "b"); pErr != nil {
t.Fatalf("put failed: %s", pErr)
}
if pErr := txn.CommitOrCleanup(ctx); pErr == nil {
t.Fatalf("unexpected commit success")
}
require.Error(t, txn.Commit(ctx), "unexpected commit success")
require.NoError(t, txn.Rollback(ctx))

if !commit.Load().(bool) {
t.Errorf("%T: failed to find initial commit request", test.err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2505,7 +2505,7 @@ func TestDistributedTxnCleanup(t *testing.T) {
return errors.New("forced abort")
}
if err := txnFn(ctx, txn); err != nil {
txn.CleanupOnError(ctx, err)
require.NoError(t, txn.Rollback(ctx))
if !force && commit {
t.Fatalf("expected success with commit == true; got %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,9 @@ func (r *Replica) AdminMerge(
err := runMergeTxn(txn)
if err != nil {
log.VEventf(ctx, 2, "merge txn failed: %s", err)
txn.CleanupOnError(ctx, err)
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
log.VEventf(ctx, 2, "merge txn rollback failed: %s", rollbackErr)
}
}
if !errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) {
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/reports/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,11 @@ func (r *meta2RangeIter) handleErr(ctx context.Context, err error) {
}
if !errIsRetriable(err) {
if r.txn != nil {
log.Eventf(ctx, "non-retriable error: %s", err)
// On any non-retriable error, rollback.
r.txn.CleanupOnError(ctx, err)
if rollbackErr := r.txn.Rollback(ctx); rollbackErr != nil {
log.Eventf(ctx, "rollback failed: %s", rollbackErr)
}
r.txn = nil
}
r.reset()
Expand Down
43 changes: 4 additions & 39 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ type Txn struct {
// Note: for KV usage that should be subject to admission control, prefer
// NewTxnRootKV() below.
//
// If the transaction is used to send any operations, CommitOrCleanup() or
// CleanupOnError() should eventually be called to commit/rollback the
// transaction (including stopping the heartbeat loop).
// If the transaction is used to send any operations, Commit() or Rollback()
// should eventually be called to commit/rollback the transaction (including
// stopping the heartbeat loop).
//
// gatewayNodeID: If != 0, this is the ID of the node on whose behalf this
//
Expand Down Expand Up @@ -689,27 +689,7 @@ func (txn *Txn) commit(ctx context.Context) error {
return pErr.GoError()
}

// CleanupOnError cleans up the transaction as a result of an error.
func (txn *Txn) CleanupOnError(ctx context.Context, err error) {
if txn.typ != RootTxn {
panic(errors.WithContextTags(errors.AssertionFailedf("CleanupOnError() called on leaf txn"), ctx))
}

if err == nil {
panic(errors.WithContextTags(errors.AssertionFailedf("CleanupOnError() called with nil error"), ctx))
}
if replyErr := txn.rollback(ctx); replyErr != nil {
if _, ok := replyErr.GetDetail().(*roachpb.TransactionStatusError); ok || txn.IsAborted() {
log.Eventf(ctx, "failure aborting transaction: %s; abort caused by: %s", replyErr, err)
} else {
log.Warningf(ctx, "failure aborting transaction: %s; abort caused by: %s", replyErr, err)
}
}
}

// Commit is the same as CommitOrCleanup but will not attempt to clean
// up on failure. This can be used when the caller is prepared to do proper
// cleanup.
// Commit sends an EndTxnRequest with Commit=true.
func (txn *Txn) Commit(ctx context.Context) error {
if txn.typ != RootTxn {
return errors.WithContextTags(errors.AssertionFailedf("Commit() called on leaf txn"), ctx)
Expand Down Expand Up @@ -741,21 +721,6 @@ func (txn *Txn) CommitInBatch(ctx context.Context, b *Batch) error {
return txn.Run(ctx, b)
}

// CommitOrCleanup sends an EndTxnRequest with Commit=true.
// If that fails, an attempt to rollback is made.
// txn should not be used to send any more commands after this call.
func (txn *Txn) CommitOrCleanup(ctx context.Context) error {
if txn.typ != RootTxn {
return errors.WithContextTags(errors.AssertionFailedf("CommitOrCleanup() called on leaf txn"), ctx)
}

err := txn.commit(ctx)
if err != nil {
txn.CleanupOnError(ctx, err)
}
return err
}

// UpdateDeadline sets the transactions deadline to the passed deadline.
// It may move the deadline to any timestamp above the current read timestamp.
// If the deadline is below the current provisional commit timestamp (write timestamp),
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,7 @@ func TestTransactionStatus(t *testing.T) {
}
}
if commit {
if pErr := txn.CommitOrCleanup(ctx); pErr != nil {
t.Fatal(pErr)
}
require.NoError(t, txn.Commit(ctx))
if a, e := txn.TestingCloneTxn().Status, roachpb.COMMITTED; a != e {
t.Errorf("write: %t, commit: %t transaction expected to have status %q but had %q", write, commit, e, a)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func cleanupAndFinishOnError(args fsm.Args) error {
func() {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause())
_ = ts.mu.txn.Rollback(ts.Ctx)
}()
finishedTxnID := ts.finishSQLTxn()
ts.setAdvanceInfo(
Expand Down
12 changes: 10 additions & 2 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,11 +801,19 @@ func (p *planner) preparePlannerForCopy(
// committing its transactions and the execution didn't already commit it
// (through the planner.autoCommit optimization).
if autoCommit && !txn.IsCommitted() {
return txn.CommitOrCleanup(ctx)
err = txn.Commit(ctx)
if err != nil {
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
log.Eventf(ctx, "rollback failed: %s", rollbackErr)
}
}
return err
}
return nil
}
txn.CleanupOnError(ctx, prevErr)
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
log.Eventf(ctx, "rollback failed: %s", rollbackErr)
}
return prevErr
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
if err := conflictTxn.Put(ctx, key, "pusher was here"); err != nil {
return err
}
return conflictTxn.CommitOrCleanup(ctx)
err = conflictTxn.Commit(ctx)
require.NoError(t, err)
t.Log(conflictTxn.Rollback(ctx))
return err
}

// Make a db with a short heartbeat interval, so that the aborted txn finds
Expand Down

0 comments on commit 8b00b5d

Please sign in to comment.