Skip to content

Commit

Permalink
sql: introduce limit on number of retries in IntExec.Exec{Ex} methods
Browse files Browse the repository at this point in the history
This commit introduces a limit on the number of times the
InternalExecutor machinery can retry errors in `Exec{Ex}` methods. That
logic was introduced in c09860b in
order to reduce the impact of fixes in other commits in #101477.
However, there is no limit in the number of retries, and we hit the
stack overflow twice in our own testing recently seemingly in this code
path. Thus, this commit adds a limit, 5 by default.

It is stored in the session data, but it actually has no meaning in the
user's session, so this commit also adds a corresponding cluster setting
to be able to change the default used in the internal executor (just in
case we need it).

Note that I'm not sure exactly what bug, if any, can lead to the stack
overflow. One seemingly-unlikely theory is that there is no bug, meaning
that we were simply retrying forever because the stmts were pushed by
higher priority txns every time. Still, this change seems beneficial on
its own and should prevent stack overflows even if we don't fully
understand the root cause.

An additional improvement is that we now track the depth of recursion,
and once it exceeds 1000, we'll return an error. This should prevent the
stack overflows due to other reasons.

There is no release note given we've only seen this twice in our own
testing and it involved cluster-to-cluster streaming.

Release note: None
  • Loading branch information
yuzefovich committed Nov 15, 2023
1 parent fea4eb0 commit dfef014
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 13 deletions.
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3483,6 +3483,10 @@ func (m *sessionDataMutator) SetInjectRetryErrorsEnabled(val bool) {
m.data.InjectRetryErrorsEnabled = val
}

func (m *sessionDataMutator) SetInternalExecutorRowsAffectedRetryLimit(val int64) {
m.data.InternalExecutorRowsAffectedRetryLimit = val
}

func (m *sessionDataMutator) SetMaxRetriesForReadCommitted(val int32) {
m.data.MaxRetriesForReadCommitted = val
}
Expand Down
71 changes: 60 additions & 11 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
Expand Down Expand Up @@ -202,6 +203,15 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData)
}
}

var ieRowsAffectedRetryLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.internal_executor.rows_affected_retry_limit",
"limit on the number of retries that can be transparently performed "+
"by the InternalExecutor's Exec{Ex} methods",
5,
settings.NonNegativeInt,
)

func (ie *InternalExecutor) runWithEx(
ctx context.Context,
txn *kv.Txn,
Expand Down Expand Up @@ -263,11 +273,12 @@ func (ie *InternalExecutor) initConnEx(
w: w,
mode: mode,
sync: syncCallback,
resetRowsAffected: func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
},
}
clientComm.rowsAffectedState.rewind = func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
}
clientComm.rowsAffectedState.numRewindsLimit = sd.InternalExecutorRowsAffectedRetryLimit

applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */)
sds := sessiondata.NewStack(sd)
Expand Down Expand Up @@ -434,6 +445,10 @@ type ieIteratorResult struct {
type rowsIterator struct {
r ieResultReader

// depth tracks the current depth of recursion in Next(). Once it exceeds
// iteratorDepthLimit, an error is returned to prevent stack overflow.
depth int64

rowsAffected int
resultCols colinfo.ResultColumns

Expand Down Expand Up @@ -469,6 +484,13 @@ type rowsIterator struct {
var _ isql.Rows = &rowsIterator{}
var _ eval.InternalRows = &rowsIterator{}

// iteratorDepthLimit is maximum allowed depth of recursion in Next(). It is set
// to be sufficiently large to not matter under normal circumstances while
// preventing the possibility of the stack overflow (as we've seen in #109197).
const iteratorDepthLimit = 1000

var iteratorDepthLimitExceededErr = errors.New("rowsIterator exceeded recursion depth limit")

func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
// Due to recursive calls to Next() below, this deferred function might get
// executed multiple times, yet it is not a problem because Close() is
Expand All @@ -486,8 +508,16 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
r.errCallback = nil
}
retErr = r.lastErr
r.depth--
}()

r.depth++
if r.depth > iteratorDepthLimit {
r.lastErr = iteratorDepthLimitExceededErr
r.done = true
return false, r.lastErr
}

if r.done {
return false, r.lastErr
}
Expand Down Expand Up @@ -832,6 +862,9 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess
}
// We always override the injection knob based on the override struct.
sd.InjectRetryErrorsEnabled = o.InjectRetryErrorsEnabled
if o.RowsAffectedRetryLimit > 0 {
sd.InternalExecutorRowsAffectedRetryLimit = o.RowsAffectedRetryLimit
}
}

func (ie *InternalExecutor) maybeRootSessionDataOverride(
Expand Down Expand Up @@ -1313,11 +1346,20 @@ type internalClientComm struct {
// mode determines how the results of the query execution are consumed.
mode ieExecutionMode

// resetRowsAffected is a callback that sends a single ieIteratorResult
// object to w in order to set the number of rows affected to zero. Only
// used in rowsAffectedIEExecutionMode when discarding a result (indicating
// that a command will be retried).
resetRowsAffected func()
// rowsAffectedState is only used in rowsAffectedIEExecutionMode.
rowsAffectedState struct {
// rewind is a callback that sends a single ieIteratorResult object to w
// in order to set the number of rows affected to zero. Used when
// discarding a result (indicating that a command will be retried).
rewind func()
// numRewinds tracks the number of times rewind() has been called.
numRewinds int64
// numRewindsLimit is the limit on the number of times we will perform
// the transparent retry. Once numRewinds reaches numRewindsLimit, the
// internal executor machinery will no longer retry and, instead, will
// return the error to the client.
numRewindsLimit int64
}

// sync, if set, is called whenever a Sync is executed with all accumulated
// results since the last Sync.
Expand Down Expand Up @@ -1357,7 +1399,8 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult {
// "finalized").
icc.results = icc.results[:len(icc.results)-1]
if icc.mode == rowsAffectedIEExecutionMode {
icc.resetRowsAffected()
icc.rowsAffectedState.numRewinds++
icc.rowsAffectedState.rewind()
}
},
}
Expand Down Expand Up @@ -1454,12 +1497,18 @@ func (icc *internalClientComm) Close() {}

// ClientPos is part of the ClientLock interface.
func (icc *internalClientComm) ClientPos() CmdPos {
if icc.mode == rowsAffectedIEExecutionMode {
if icc.mode == rowsAffectedIEExecutionMode &&
icc.rowsAffectedState.numRewinds < icc.rowsAffectedState.numRewindsLimit {
// With the "rows affected" mode, any command can be rewound since we
// assume that only a single command results in actual "rows affected",
// and in Discard we will reset the number to zero (if we were in
// process of evaluation that command when we encountered the retry
// error).
//
// However, to prevent stack overflow due to large (infinite?) number of
// retries we also need to check that we haven't reached the limit yet.
// If we have, then we fall back to the general logic below of
// determining whether we can retry.
return -1
}
// Find the latest result that cannot be rewound.
Expand Down
18 changes: 16 additions & 2 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,9 @@ func TestInternalExecutorEncountersRetry(t *testing.T) {

ctx := context.Background()
params, _ := createTestServerParams()
s, db, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
srv, db, kvDB := serverutils.StartServer(t, params)
defer srv.Stopper().Stop(ctx)
s := srv.ApplicationLayer()

if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -795,6 +796,19 @@ func TestInternalExecutorEncountersRetry(t *testing.T) {
}
})

// This test case verifies that ExecEx stops retrying once the limit on the
// number of retries is reached.
t.Run("ExecEx retry limit reached in implicit txn", func(t *testing.T) {
override := ieo
// This number must be less than the number of errors injected (which is
// determined by sql.numTxnRetryErrors = 3).
override.RowsAffectedRetryLimit = 1
_, err := ie.ExecEx(ctx, "read rows", nil /* txn */, override, rowsStmt)
if err == nil {
t.Fatal("expected to get an injected retriable error")
}
})

// TODO(yuzefovich): add a test for when a schema change is done in-between
// the retries.
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/sessiondata/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type InternalExecutorOverride struct {
// does **not** propagate further to "nested" executors that are spawned up
// by the "top" executor.
InjectRetryErrorsEnabled bool
// RowsAffectedRetryLimit determines the maximum number of retries that the
// InternalExecutor can perform transparently when executing Exec{Ex}
// function calls (which it translates into "rows affected" mode). If
// non-zero, it overrides the InternalExecutorRowsAffectedRetryLimit from
// the session data.
RowsAffectedRetryLimit int64
}

// NoSessionDataOverride is the empty InternalExecutorOverride which does not
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,10 @@ message LocalOnlySessionData {
// being emitted for changes to data made in a session.
// TODO(yang): Plumb this session variable down to KV.
bool disable_changefeed_replication = 116;
// InternalExecutorRowsAffectedRetryLimit determines the maximum number of
// retries that the InternalExecutor can perform transparently when executing
// Exec{Ex} function calls (which it translates into "rows affected" mode).
int64 internal_executor_rows_affected_retry_limit = 117;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,30 @@ var varGen = map[string]sessionVar{
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`internal_executor_rows_affected_retry_limit`: {
Hidden: true,
GetStringVal: makeIntGetStringValFn(`internal_executor_rows_affected_retry_limit`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
if b < 0 {
return pgerror.Newf(pgcode.InvalidParameterValue,
"cannot set internal_executor_rows_affected_retry_limit to a negative value: %d", b)
}
m.SetInternalExecutorRowsAffectedRetryLimit(b)
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return strconv.FormatInt(evalCtx.SessionData().InternalExecutorRowsAffectedRetryLimit, 10), nil
},
GlobalDefault: func(sv *settings.Values) string {
return strconv.FormatInt(ieRowsAffectedRetryLimit.Get(sv), 10)
},
},

// CockroachDB extension. Allows for testing of transaction retry logic
// using the cockroach_restart savepoint.
`inject_retry_errors_on_commit_enabled`: {
Expand Down

0 comments on commit dfef014

Please sign in to comment.