Skip to content

Commit

Permalink
Merge #114398
Browse files Browse the repository at this point in the history
114398: sql: introduce limit on number of retries in IntExec.Exec{Ex} methods r=yuzefovich a=yuzefovich

This commit introduces a limit on the number of times the InternalExecutor machinery can retry errors in `Exec{Ex}` methods. That logic was introduced in c09860b in order to reduce the impact of fixes in other commits in #101477. However, there is no limit in the number of retries, and we hit the stack overflow twice in our own testing recently seemingly in this code path. Thus, this commit adds a limit, 5 by default.

Note that I'm not sure exactly what bug, if any, can lead to the stack overflow. One seemingly-unlikely theory is that there is no bug, meaning that we were simply retrying forever because the stmts were pushed by higher priority txns every time. Still, this change seems beneficial on its own and should prevent stack overflows even if we don't fully understand the root cause.

An additional improvement is that we now track the depth of recursion, and once it exceeds 1000, we'll return an error. This should prevent the stack overflows due to other reasons.

There is no release note given we've only seen this twice in our own testing and it involved cluster-to-cluster streaming.

Fixes: #109197.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Nov 16, 2023
2 parents 12467e9 + f36bb29 commit f94233d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 13 deletions.
68 changes: 57 additions & 11 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
Expand Down Expand Up @@ -202,6 +203,15 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData)
}
}

var ieRowsAffectedRetryLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.internal_executor.rows_affected_retry_limit",
"limit on the number of retries that can be transparently performed "+
"by the InternalExecutor's Exec{Ex} methods",
5,
settings.NonNegativeInt,
)

func (ie *InternalExecutor) runWithEx(
ctx context.Context,
txn *kv.Txn,
Expand Down Expand Up @@ -263,11 +273,12 @@ func (ie *InternalExecutor) initConnEx(
w: w,
mode: mode,
sync: syncCallback,
resetRowsAffected: func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
},
}
clientComm.rowsAffectedState.rewind = func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
}
clientComm.rowsAffectedState.numRewindsLimit = ieRowsAffectedRetryLimit.Get(&ie.s.cfg.Settings.SV)

applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */)
sds := sessiondata.NewStack(sd)
Expand Down Expand Up @@ -434,6 +445,10 @@ type ieIteratorResult struct {
type rowsIterator struct {
r ieResultReader

// depth tracks the current depth of recursion in Next(). Once it exceeds
// iteratorDepthLimit, an error is returned to prevent stack overflow.
depth int64

rowsAffected int
resultCols colinfo.ResultColumns

Expand Down Expand Up @@ -469,6 +484,13 @@ type rowsIterator struct {
var _ isql.Rows = &rowsIterator{}
var _ eval.InternalRows = &rowsIterator{}

// iteratorDepthLimit is maximum allowed depth of recursion in Next(). It is set
// to be sufficiently large to not matter under normal circumstances while
// preventing the possibility of the stack overflow (as we've seen in #109197).
const iteratorDepthLimit = 1000

var iteratorDepthLimitExceededErr = errors.New("rowsIterator exceeded recursion depth limit")

func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
// Due to recursive calls to Next() below, this deferred function might get
// executed multiple times, yet it is not a problem because Close() is
Expand All @@ -486,8 +508,16 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
r.errCallback = nil
}
retErr = r.lastErr
r.depth--
}()

r.depth++
if r.depth > iteratorDepthLimit {
r.lastErr = iteratorDepthLimitExceededErr
r.done = true
return false, r.lastErr
}

if r.done {
return false, r.lastErr
}
Expand Down Expand Up @@ -1313,11 +1343,20 @@ type internalClientComm struct {
// mode determines how the results of the query execution are consumed.
mode ieExecutionMode

// resetRowsAffected is a callback that sends a single ieIteratorResult
// object to w in order to set the number of rows affected to zero. Only
// used in rowsAffectedIEExecutionMode when discarding a result (indicating
// that a command will be retried).
resetRowsAffected func()
// rowsAffectedState is only used in rowsAffectedIEExecutionMode.
rowsAffectedState struct {
// rewind is a callback that sends a single ieIteratorResult object to w
// in order to set the number of rows affected to zero. Used when
// discarding a result (indicating that a command will be retried).
rewind func()
// numRewinds tracks the number of times rewind() has been called.
numRewinds int64
// numRewindsLimit is the limit on the number of times we will perform
// the transparent retry. Once numRewinds reaches numRewindsLimit, the
// internal executor machinery will no longer retry and, instead, will
// return the error to the client.
numRewindsLimit int64
}

// sync, if set, is called whenever a Sync is executed with all accumulated
// results since the last Sync.
Expand Down Expand Up @@ -1357,7 +1396,8 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult {
// "finalized").
icc.results = icc.results[:len(icc.results)-1]
if icc.mode == rowsAffectedIEExecutionMode {
icc.resetRowsAffected()
icc.rowsAffectedState.numRewinds++
icc.rowsAffectedState.rewind()
}
},
}
Expand Down Expand Up @@ -1454,12 +1494,18 @@ func (icc *internalClientComm) Close() {}

// ClientPos is part of the ClientLock interface.
func (icc *internalClientComm) ClientPos() CmdPos {
if icc.mode == rowsAffectedIEExecutionMode {
if icc.mode == rowsAffectedIEExecutionMode &&
icc.rowsAffectedState.numRewinds < icc.rowsAffectedState.numRewindsLimit {
// With the "rows affected" mode, any command can be rewound since we
// assume that only a single command results in actual "rows affected",
// and in Discard we will reset the number to zero (if we were in
// process of evaluation that command when we encountered the retry
// error).
//
// However, to prevent stack overflow due to large (infinite?) number of
// retries we also need to check that we haven't reached the limit yet.
// If we have, then we fall back to the general logic below of
// determining whether we can retry.
return -1
}
// Find the latest result that cannot be rewound.
Expand Down
24 changes: 22 additions & 2 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,9 @@ func TestInternalExecutorEncountersRetry(t *testing.T) {

ctx := context.Background()
params, _ := createTestServerParams()
s, db, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
srv, db, kvDB := serverutils.StartServer(t, params)
defer srv.Stopper().Stop(ctx)
s := srv.ApplicationLayer()

if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -795,6 +796,25 @@ func TestInternalExecutorEncountersRetry(t *testing.T) {
}
})

// This test case verifies that ExecEx stops retrying once the limit on the
// number of retries is reached.
t.Run("ExecEx retry limit reached in implicit txn", func(t *testing.T) {
// This number must be less than the number of errors injected (which is
// determined by sql.numTxnRetryErrors = 3).
if _, err := db.Exec("SET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit = 1;"); err != nil {
t.Fatal(err)
}
defer func() {
if _, err := db.Exec("RESET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit;"); err != nil {
t.Fatal(err)
}
}()
_, err := ie.ExecEx(ctx, "read rows", nil /* txn */, ieo, rowsStmt)
if err == nil {
t.Fatal("expected to get an injected retriable error")
}
})

// TODO(yuzefovich): add a test for when a schema change is done in-between
// the retries.
}
Expand Down

0 comments on commit f94233d

Please sign in to comment.