From 79219f705fc76eae5753ea5e28b7077dfacfd213 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 21 Feb 2024 16:19:33 -0500 Subject: [PATCH] kv: limit retry count in db.Txn retry loop Fixes #113941. This commit adds a new `kv.transaction.internal.max_auto_retries` cluster setting, which controls the maximum number of auto-retries a call to `kv.DB.Txn` will perform before aborting the transaction and returning an error. This is used to prevent infinite retry loops where a transaction has little chance of succeeding in the future but continues to hold locks from earlier epochs. The setting defaults to a value of 100 retries. There are two primary benefits to this change: 1. internal transactions (those run by jobs, migrations, etc.) which use the `kv.DB.Txn` API will no longer risk holding locks indefinitely across an unbounded number of epochs if they get stuck in a retry loop scenario. Eventually, they will get aborted and the locks will be released. This will unwedge other transactions in the system that are waiting for locks, limiting the blast radius of a stuck transaction and preventing an outage caused by it from persisting indefinitely. 2. internal transactions will no longer get stuck in unobservable infinite retry loops where logging exists above the call to `kv.DB.Txn`, but no logging exists below it. In these cases, we previously struggled to identify the retry loop (who was stuck retrying) or diagnose its cause (why were they stuck retrying). By hitting a retry limit and returning an error up the stack, we will help debuggers answer both of these questions. Release note: None --- pkg/kv/db_test.go | 45 ++++++++++++++++++++++++++++++ pkg/kv/kvserver/single_key_test.go | 8 ++++++ pkg/kv/txn.go | 38 +++++++++++++++++++++++++ 3 files changed, 91 insertions(+) diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index 99859532c961..013181d201ee 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -971,6 +971,51 @@ func testDBTxnRetry(t *testing.T, isoLevel isolation.Level, returnNil bool) { require.NoError(t, err1) } +// TestDB_TxnRetryLimit tests kv.transaction.internal.max_auto_retries. +func TestDB_TxnRetryLimit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + isolation.RunEachLevel(t, func(t *testing.T, isoLevel isolation.Level) { + testDBTxnRetryLimit(t, isoLevel) + }) +} + +func testDBTxnRetryLimit(t *testing.T, isoLevel isolation.Level) { + ctx := context.Background() + s, db := setup(t) + defer s.Stopper().Stop(ctx) + + // Configure a low retry limit. + const maxRetries = 7 + const maxAttempts = maxRetries + 1 + kv.MaxInternalTxnAutoRetries.Override(ctx, &s.ClusterSettings().SV, maxRetries) + + // Run the txn, aborting it on each attempt. + attempts := 0 + err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + attempts++ + require.NoError(t, txn.SetIsoLevel(isoLevel)) + require.NoError(t, txn.Put(ctx, "a", "1")) + + { + // High priority txn - will abort the other txn each attempt. + hpTxn := kv.NewTxn(ctx, db, 0) + require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority)) + require.NoError(t, hpTxn.Put(ctx, "a", "hp txn")) + require.NoError(t, hpTxn.Commit(ctx)) + } + + // Read, so that we'll get a retryable error. + _, err := txn.Get(ctx, "a") + require.Error(t, err) + require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err) + return err + }) + require.Error(t, err) + require.Regexp(t, "Terminating retry loop and returning error", err) + require.Equal(t, maxAttempts, attempts) +} + func TestPreservingSteppingOnSenderReplacement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/single_key_test.go b/pkg/kv/kvserver/single_key_test.go index 4e6636aa3b02..03edb0c59cb0 100644 --- a/pkg/kv/kvserver/single_key_test.go +++ b/pkg/kv/kvserver/single_key_test.go @@ -12,6 +12,7 @@ package kvserver_test import ( "context" + "math" "sync/atomic" "testing" "time" @@ -43,6 +44,13 @@ func TestSingleKey(t *testing.T) { defer tc.Stopper().Stop(context.Background()) ctx := context.Background() + // Increase the kv.transaction.internal.max_auto_retries setting to + // avoid transaction retry limit exceeded errors under heavy stress. + for i := 0; i < num; i++ { + sv := tc.Servers[i].DB().SettingsValues() + kv.MaxInternalTxnAutoRetries.Override(ctx, sv, math.MaxInt64) + } + // Initialize the value for our test key to zero. const key = "test-key" initDB := tc.Servers[0].DB() diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index fb89fdf3da95..009d35d9e50f 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -13,12 +13,14 @@ package kv import ( "context" "fmt" + "math" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" @@ -46,6 +48,21 @@ import ( // under load), then it makes sense to abandon the cleanup before too long. const asyncRollbackTimeout = time.Minute +// MaxInternalTxnAutoRetries controls the maximum number of auto-retries a call +// to kv.DB.Txn will perform before aborting the transaction and returning an +// error. This is used to prevent infinite retry loops where a transaction has +// little chance of succeeding in the future but continues to hold locks from +// earlier epochs. +var MaxInternalTxnAutoRetries = settings.RegisterIntSetting( + settings.ApplicationLevel, + "kv.transaction.internal.max_auto_retries", + "controls the maximum number of auto-retries an internal KV transaction will "+ + "perform before aborting and returning an error. Can be set to 0 to disable any "+ + "retry attempt.", + 100, + settings.NonNegativeInt, +) + // Txn is an in-progress distributed database transaction. A Txn is safe for // concurrent use by multiple goroutines. type Txn struct { @@ -1086,6 +1103,27 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) break } + // Determine whether the transaction has exceeded the maximum number of + // automatic retries. We check this after each failed attempt to allow the + // cluster setting to be changed while a transaction is stuck in a retry + // loop. + maxRetries := math.MaxInt64 + if txn.db.ctx.Settings != nil { + // txn.db.ctx.Settings == nil is only expected in tests. + maxRetries = int(MaxInternalTxnAutoRetries.Get(&txn.db.ctx.Settings.SV)) + } + if attempt > maxRetries { + // If the retries limit has been exceeded, rollback and return an error. + rollbackErr := txn.Rollback(ctx) + // NOTE: we don't errors.Wrap the most recent retry error because we want + // to terminate it here. Instead, we just include it in the error text. + err = errors.Errorf("have retried transaction: %s %d times, most recently because of the "+ + "retryable error: %s. Terminating retry loop and returning error due to cluster setting %s (%d). "+ + "Rollback error: %v.", txn.DebugName(), attempt, err, MaxInternalTxnAutoRetries.Name(), maxRetries, rollbackErr) + log.Warningf(ctx, "%v", err) + break + } + const warnEvery = 10 if attempt%warnEvery == 0 { log.Warningf(ctx, "have retried transaction: %s %d times, most recently because of the "+