From dfef0141aed9800c8a021b447ee4b7a9bee674eb Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 13 Nov 2023 21:03:59 -0800 Subject: [PATCH] sql: introduce limit on number of retries in IntExec.Exec{Ex} methods This commit introduces a limit on the number of times the InternalExecutor machinery can retry errors in `Exec{Ex}` methods. That logic was introduced in c09860b656e31bbdf996350382a8000f3e37d370 in order to reduce the impact of fixes in other commits in #101477. However, there is no limit in the number of retries, and we hit the stack overflow twice in our own testing recently seemingly in this code path. Thus, this commit adds a limit, 5 by default. It is stored in the session data, but it actually has no meaning in the user's session, so this commit also adds a corresponding cluster setting to be able to change the default used in the internal executor (just in case we need it). Note that I'm not sure exactly what bug, if any, can lead to the stack overflow. One seemingly-unlikely theory is that there is no bug, meaning that we were simply retrying forever because the stmts were pushed by higher priority txns every time. Still, this change seems beneficial on its own and should prevent stack overflows even if we don't fully understand the root cause. An additional improvement is that we now track the depth of recursion, and once it exceeds 1000, we'll return an error. This should prevent the stack overflows due to other reasons. There is no release note given we've only seen this twice in our own testing and it involved cluster-to-cluster streaming. Release note: None --- pkg/sql/exec_util.go | 4 ++ pkg/sql/internal.go | 71 ++++++++++++++++--- pkg/sql/internal_test.go | 18 ++++- pkg/sql/sessiondata/internal.go | 6 ++ .../local_only_session_data.proto | 4 ++ pkg/sql/vars.go | 24 +++++++ 6 files changed, 114 insertions(+), 13 deletions(-) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 2a40ebf198db..92c716ae249d 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3483,6 +3483,10 @@ func (m *sessionDataMutator) SetInjectRetryErrorsEnabled(val bool) { m.data.InjectRetryErrorsEnabled = val } +func (m *sessionDataMutator) SetInternalExecutorRowsAffectedRetryLimit(val int64) { + m.data.InternalExecutorRowsAffectedRetryLimit = val +} + func (m *sessionDataMutator) SetMaxRetriesForReadCommitted(val int32) { m.data.MaxRetriesForReadCommitted = val } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 8306bc6c5e9e..f64f1b1a9417 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata" @@ -202,6 +203,15 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) } } +var ieRowsAffectedRetryLimit = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.internal_executor.rows_affected_retry_limit", + "limit on the number of retries that can be transparently performed "+ + "by the InternalExecutor's Exec{Ex} methods", + 5, + settings.NonNegativeInt, +) + func (ie *InternalExecutor) runWithEx( ctx context.Context, txn *kv.Txn, @@ -263,11 +273,12 @@ func (ie *InternalExecutor) initConnEx( w: w, mode: mode, sync: syncCallback, - resetRowsAffected: func() { - var zero int - _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) - }, } + clientComm.rowsAffectedState.rewind = func() { + var zero int + _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) + } + clientComm.rowsAffectedState.numRewindsLimit = sd.InternalExecutorRowsAffectedRetryLimit applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */) sds := sessiondata.NewStack(sd) @@ -434,6 +445,10 @@ type ieIteratorResult struct { type rowsIterator struct { r ieResultReader + // depth tracks the current depth of recursion in Next(). Once it exceeds + // iteratorDepthLimit, an error is returned to prevent stack overflow. + depth int64 + rowsAffected int resultCols colinfo.ResultColumns @@ -469,6 +484,13 @@ type rowsIterator struct { var _ isql.Rows = &rowsIterator{} var _ eval.InternalRows = &rowsIterator{} +// iteratorDepthLimit is maximum allowed depth of recursion in Next(). It is set +// to be sufficiently large to not matter under normal circumstances while +// preventing the possibility of the stack overflow (as we've seen in #109197). +const iteratorDepthLimit = 1000 + +var iteratorDepthLimitExceededErr = errors.New("rowsIterator exceeded recursion depth limit") + func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { // Due to recursive calls to Next() below, this deferred function might get // executed multiple times, yet it is not a problem because Close() is @@ -486,8 +508,16 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { r.errCallback = nil } retErr = r.lastErr + r.depth-- }() + r.depth++ + if r.depth > iteratorDepthLimit { + r.lastErr = iteratorDepthLimitExceededErr + r.done = true + return false, r.lastErr + } + if r.done { return false, r.lastErr } @@ -832,6 +862,9 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess } // We always override the injection knob based on the override struct. sd.InjectRetryErrorsEnabled = o.InjectRetryErrorsEnabled + if o.RowsAffectedRetryLimit > 0 { + sd.InternalExecutorRowsAffectedRetryLimit = o.RowsAffectedRetryLimit + } } func (ie *InternalExecutor) maybeRootSessionDataOverride( @@ -1313,11 +1346,20 @@ type internalClientComm struct { // mode determines how the results of the query execution are consumed. mode ieExecutionMode - // resetRowsAffected is a callback that sends a single ieIteratorResult - // object to w in order to set the number of rows affected to zero. Only - // used in rowsAffectedIEExecutionMode when discarding a result (indicating - // that a command will be retried). - resetRowsAffected func() + // rowsAffectedState is only used in rowsAffectedIEExecutionMode. + rowsAffectedState struct { + // rewind is a callback that sends a single ieIteratorResult object to w + // in order to set the number of rows affected to zero. Used when + // discarding a result (indicating that a command will be retried). + rewind func() + // numRewinds tracks the number of times rewind() has been called. + numRewinds int64 + // numRewindsLimit is the limit on the number of times we will perform + // the transparent retry. Once numRewinds reaches numRewindsLimit, the + // internal executor machinery will no longer retry and, instead, will + // return the error to the client. + numRewindsLimit int64 + } // sync, if set, is called whenever a Sync is executed with all accumulated // results since the last Sync. @@ -1357,7 +1399,8 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult { // "finalized"). icc.results = icc.results[:len(icc.results)-1] if icc.mode == rowsAffectedIEExecutionMode { - icc.resetRowsAffected() + icc.rowsAffectedState.numRewinds++ + icc.rowsAffectedState.rewind() } }, } @@ -1454,12 +1497,18 @@ func (icc *internalClientComm) Close() {} // ClientPos is part of the ClientLock interface. func (icc *internalClientComm) ClientPos() CmdPos { - if icc.mode == rowsAffectedIEExecutionMode { + if icc.mode == rowsAffectedIEExecutionMode && + icc.rowsAffectedState.numRewinds < icc.rowsAffectedState.numRewindsLimit { // With the "rows affected" mode, any command can be rewound since we // assume that only a single command results in actual "rows affected", // and in Discard we will reset the number to zero (if we were in // process of evaluation that command when we encountered the retry // error). + // + // However, to prevent stack overflow due to large (infinite?) number of + // retries we also need to check that we haven't reached the limit yet. + // If we have, then we fall back to the general logic below of + // determining whether we can retry. return -1 } // Find the latest result that cannot be rewound. diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index 3b4415f4c23c..4feefb2ee3c5 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -728,8 +728,9 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { ctx := context.Background() params, _ := createTestServerParams() - s, db, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) + srv, db, kvDB := serverutils.StartServer(t, params) + defer srv.Stopper().Stop(ctx) + s := srv.ApplicationLayer() if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil { t.Fatal(err) @@ -795,6 +796,19 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { } }) + // This test case verifies that ExecEx stops retrying once the limit on the + // number of retries is reached. + t.Run("ExecEx retry limit reached in implicit txn", func(t *testing.T) { + override := ieo + // This number must be less than the number of errors injected (which is + // determined by sql.numTxnRetryErrors = 3). + override.RowsAffectedRetryLimit = 1 + _, err := ie.ExecEx(ctx, "read rows", nil /* txn */, override, rowsStmt) + if err == nil { + t.Fatal("expected to get an injected retriable error") + } + }) + // TODO(yuzefovich): add a test for when a schema change is done in-between // the retries. } diff --git a/pkg/sql/sessiondata/internal.go b/pkg/sql/sessiondata/internal.go index 0c2590fe2894..3c98ff0e03cf 100644 --- a/pkg/sql/sessiondata/internal.go +++ b/pkg/sql/sessiondata/internal.go @@ -47,6 +47,12 @@ type InternalExecutorOverride struct { // does **not** propagate further to "nested" executors that are spawned up // by the "top" executor. InjectRetryErrorsEnabled bool + // RowsAffectedRetryLimit determines the maximum number of retries that the + // InternalExecutor can perform transparently when executing Exec{Ex} + // function calls (which it translates into "rows affected" mode). If + // non-zero, it overrides the InternalExecutorRowsAffectedRetryLimit from + // the session data. + RowsAffectedRetryLimit int64 } // NoSessionDataOverride is the empty InternalExecutorOverride which does not diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index b875a2f7d27a..45e1a3425310 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -467,6 +467,10 @@ message LocalOnlySessionData { // being emitted for changes to data made in a session. // TODO(yang): Plumb this session variable down to KV. bool disable_changefeed_replication = 116; + // InternalExecutorRowsAffectedRetryLimit determines the maximum number of + // retries that the InternalExecutor can perform transparently when executing + // Exec{Ex} function calls (which it translates into "rows affected" mode). + int64 internal_executor_rows_affected_retry_limit = 117; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index 5babfba71432..38695444e194 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -2047,6 +2047,30 @@ var varGen = map[string]sessionVar{ GlobalDefault: globalFalse, }, + // CockroachDB extension. + `internal_executor_rows_affected_retry_limit`: { + Hidden: true, + GetStringVal: makeIntGetStringValFn(`internal_executor_rows_affected_retry_limit`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + if b < 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "cannot set internal_executor_rows_affected_retry_limit to a negative value: %d", b) + } + m.SetInternalExecutorRowsAffectedRetryLimit(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return strconv.FormatInt(evalCtx.SessionData().InternalExecutorRowsAffectedRetryLimit, 10), nil + }, + GlobalDefault: func(sv *settings.Values) string { + return strconv.FormatInt(ieRowsAffectedRetryLimit.Get(sv), 10) + }, + }, + // CockroachDB extension. Allows for testing of transaction retry logic // using the cockroach_restart savepoint. `inject_retry_errors_on_commit_enabled`: {