diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 2a40ebf198db..92c716ae249d 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3483,6 +3483,10 @@ func (m *sessionDataMutator) SetInjectRetryErrorsEnabled(val bool) { m.data.InjectRetryErrorsEnabled = val } +func (m *sessionDataMutator) SetInternalExecutorRowsAffectedRetryLimit(val int64) { + m.data.InternalExecutorRowsAffectedRetryLimit = val +} + func (m *sessionDataMutator) SetMaxRetriesForReadCommitted(val int32) { m.data.MaxRetriesForReadCommitted = val } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 8306bc6c5e9e..f64f1b1a9417 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata" @@ -202,6 +203,15 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) } } +var ieRowsAffectedRetryLimit = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.internal_executor.rows_affected_retry_limit", + "limit on the number of retries that can be transparently performed "+ + "by the InternalExecutor's Exec{Ex} methods", + 5, + settings.NonNegativeInt, +) + func (ie *InternalExecutor) runWithEx( ctx context.Context, txn *kv.Txn, @@ -263,11 +273,12 @@ func (ie *InternalExecutor) initConnEx( w: w, mode: mode, sync: syncCallback, - resetRowsAffected: func() { - var zero int - _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) - }, } + clientComm.rowsAffectedState.rewind = func() { + var zero int + _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) + } + clientComm.rowsAffectedState.numRewindsLimit = sd.InternalExecutorRowsAffectedRetryLimit applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */) sds := sessiondata.NewStack(sd) @@ -434,6 +445,10 @@ type ieIteratorResult struct { type rowsIterator struct { r ieResultReader + // depth tracks the current depth of recursion in Next(). Once it exceeds + // iteratorDepthLimit, an error is returned to prevent stack overflow. + depth int64 + rowsAffected int resultCols colinfo.ResultColumns @@ -469,6 +484,13 @@ type rowsIterator struct { var _ isql.Rows = &rowsIterator{} var _ eval.InternalRows = &rowsIterator{} +// iteratorDepthLimit is maximum allowed depth of recursion in Next(). It is set +// to be sufficiently large to not matter under normal circumstances while +// preventing the possibility of the stack overflow (as we've seen in #109197). +const iteratorDepthLimit = 1000 + +var iteratorDepthLimitExceededErr = errors.New("rowsIterator exceeded recursion depth limit") + func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { // Due to recursive calls to Next() below, this deferred function might get // executed multiple times, yet it is not a problem because Close() is @@ -486,8 +508,16 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { r.errCallback = nil } retErr = r.lastErr + r.depth-- }() + r.depth++ + if r.depth > iteratorDepthLimit { + r.lastErr = iteratorDepthLimitExceededErr + r.done = true + return false, r.lastErr + } + if r.done { return false, r.lastErr } @@ -832,6 +862,9 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess } // We always override the injection knob based on the override struct. sd.InjectRetryErrorsEnabled = o.InjectRetryErrorsEnabled + if o.RowsAffectedRetryLimit > 0 { + sd.InternalExecutorRowsAffectedRetryLimit = o.RowsAffectedRetryLimit + } } func (ie *InternalExecutor) maybeRootSessionDataOverride( @@ -1313,11 +1346,20 @@ type internalClientComm struct { // mode determines how the results of the query execution are consumed. mode ieExecutionMode - // resetRowsAffected is a callback that sends a single ieIteratorResult - // object to w in order to set the number of rows affected to zero. Only - // used in rowsAffectedIEExecutionMode when discarding a result (indicating - // that a command will be retried). - resetRowsAffected func() + // rowsAffectedState is only used in rowsAffectedIEExecutionMode. + rowsAffectedState struct { + // rewind is a callback that sends a single ieIteratorResult object to w + // in order to set the number of rows affected to zero. Used when + // discarding a result (indicating that a command will be retried). + rewind func() + // numRewinds tracks the number of times rewind() has been called. + numRewinds int64 + // numRewindsLimit is the limit on the number of times we will perform + // the transparent retry. Once numRewinds reaches numRewindsLimit, the + // internal executor machinery will no longer retry and, instead, will + // return the error to the client. + numRewindsLimit int64 + } // sync, if set, is called whenever a Sync is executed with all accumulated // results since the last Sync. @@ -1357,7 +1399,8 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult { // "finalized"). icc.results = icc.results[:len(icc.results)-1] if icc.mode == rowsAffectedIEExecutionMode { - icc.resetRowsAffected() + icc.rowsAffectedState.numRewinds++ + icc.rowsAffectedState.rewind() } }, } @@ -1454,12 +1497,18 @@ func (icc *internalClientComm) Close() {} // ClientPos is part of the ClientLock interface. func (icc *internalClientComm) ClientPos() CmdPos { - if icc.mode == rowsAffectedIEExecutionMode { + if icc.mode == rowsAffectedIEExecutionMode && + icc.rowsAffectedState.numRewinds < icc.rowsAffectedState.numRewindsLimit { // With the "rows affected" mode, any command can be rewound since we // assume that only a single command results in actual "rows affected", // and in Discard we will reset the number to zero (if we were in // process of evaluation that command when we encountered the retry // error). + // + // However, to prevent stack overflow due to large (infinite?) number of + // retries we also need to check that we haven't reached the limit yet. + // If we have, then we fall back to the general logic below of + // determining whether we can retry. return -1 } // Find the latest result that cannot be rewound. diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index 3b4415f4c23c..4feefb2ee3c5 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -728,8 +728,9 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { ctx := context.Background() params, _ := createTestServerParams() - s, db, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) + srv, db, kvDB := serverutils.StartServer(t, params) + defer srv.Stopper().Stop(ctx) + s := srv.ApplicationLayer() if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil { t.Fatal(err) @@ -795,6 +796,19 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { } }) + // This test case verifies that ExecEx stops retrying once the limit on the + // number of retries is reached. + t.Run("ExecEx retry limit reached in implicit txn", func(t *testing.T) { + override := ieo + // This number must be less than the number of errors injected (which is + // determined by sql.numTxnRetryErrors = 3). + override.RowsAffectedRetryLimit = 1 + _, err := ie.ExecEx(ctx, "read rows", nil /* txn */, override, rowsStmt) + if err == nil { + t.Fatal("expected to get an injected retriable error") + } + }) + // TODO(yuzefovich): add a test for when a schema change is done in-between // the retries. } diff --git a/pkg/sql/sessiondata/internal.go b/pkg/sql/sessiondata/internal.go index 0c2590fe2894..3c98ff0e03cf 100644 --- a/pkg/sql/sessiondata/internal.go +++ b/pkg/sql/sessiondata/internal.go @@ -47,6 +47,12 @@ type InternalExecutorOverride struct { // does **not** propagate further to "nested" executors that are spawned up // by the "top" executor. InjectRetryErrorsEnabled bool + // RowsAffectedRetryLimit determines the maximum number of retries that the + // InternalExecutor can perform transparently when executing Exec{Ex} + // function calls (which it translates into "rows affected" mode). If + // non-zero, it overrides the InternalExecutorRowsAffectedRetryLimit from + // the session data. + RowsAffectedRetryLimit int64 } // NoSessionDataOverride is the empty InternalExecutorOverride which does not diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index b875a2f7d27a..45e1a3425310 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -467,6 +467,10 @@ message LocalOnlySessionData { // being emitted for changes to data made in a session. // TODO(yang): Plumb this session variable down to KV. bool disable_changefeed_replication = 116; + // InternalExecutorRowsAffectedRetryLimit determines the maximum number of + // retries that the InternalExecutor can perform transparently when executing + // Exec{Ex} function calls (which it translates into "rows affected" mode). + int64 internal_executor_rows_affected_retry_limit = 117; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index 5babfba71432..38695444e194 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -2047,6 +2047,30 @@ var varGen = map[string]sessionVar{ GlobalDefault: globalFalse, }, + // CockroachDB extension. + `internal_executor_rows_affected_retry_limit`: { + Hidden: true, + GetStringVal: makeIntGetStringValFn(`internal_executor_rows_affected_retry_limit`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + if b < 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "cannot set internal_executor_rows_affected_retry_limit to a negative value: %d", b) + } + m.SetInternalExecutorRowsAffectedRetryLimit(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return strconv.FormatInt(evalCtx.SessionData().InternalExecutorRowsAffectedRetryLimit, 10), nil + }, + GlobalDefault: func(sv *settings.Values) string { + return strconv.FormatInt(ieRowsAffectedRetryLimit.Get(sv), 10) + }, + }, + // CockroachDB extension. Allows for testing of transaction retry logic // using the cockroach_restart savepoint. `inject_retry_errors_on_commit_enabled`: {