Skip to content

Commit

Permalink
sql: introduce limit on number of retries in IntExec.Exec{Ex} methods
Browse files Browse the repository at this point in the history
This commit introduces a limit on the number of times the
InternalExecutor machinery can retry errors in `Exec{Ex}` methods. That
logic was introduced in c09860b in
order to reduce the impact of fixes in other commits in cockroachdb#101477.
However, there is no limit in the number of retries, and we hit the
stack overflow twice in our own testing recently seemingly in this code
path. Thus, this commit adds a limit, 5 by default.

Note that I'm not sure exactly what bug, if any, can lead to the stack
overflow. One seemingly-unlikely theory is that there is no bug, meaning
that we were simply retrying forever because the stmts were pushed by
higher priority txns every time. Still, this change seems beneficial on
its own and should prevent stack overflows even if we don't fully
understand the root cause.

An additional improvement is that we now track the depth of recursion,
and once it exceeds 1000, we'll return an error. This should prevent the
stack overflows due to other reasons.

There is no release note given we've only seen this twice in our own
testing and it involved cluster-to-cluster streaming.

Release note: None
  • Loading branch information
yuzefovich committed Nov 15, 2023
1 parent 32cc819 commit f36bb29
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 13 deletions.
68 changes: 57 additions & 11 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
Expand Down Expand Up @@ -202,6 +203,15 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData)
}
}

var ieRowsAffectedRetryLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.internal_executor.rows_affected_retry_limit",
"limit on the number of retries that can be transparently performed "+
"by the InternalExecutor's Exec{Ex} methods",
5,
settings.NonNegativeInt,
)

func (ie *InternalExecutor) runWithEx(
ctx context.Context,
txn *kv.Txn,
Expand Down Expand Up @@ -263,11 +273,12 @@ func (ie *InternalExecutor) initConnEx(
w: w,
mode: mode,
sync: syncCallback,
resetRowsAffected: func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
},
}
clientComm.rowsAffectedState.rewind = func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
}
clientComm.rowsAffectedState.numRewindsLimit = ieRowsAffectedRetryLimit.Get(&ie.s.cfg.Settings.SV)

applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */)
sds := sessiondata.NewStack(sd)
Expand Down Expand Up @@ -434,6 +445,10 @@ type ieIteratorResult struct {
type rowsIterator struct {
r ieResultReader

// depth tracks the current depth of recursion in Next(). Once it exceeds
// iteratorDepthLimit, an error is returned to prevent stack overflow.
depth int64

rowsAffected int
resultCols colinfo.ResultColumns

Expand Down Expand Up @@ -469,6 +484,13 @@ type rowsIterator struct {
var _ isql.Rows = &rowsIterator{}
var _ eval.InternalRows = &rowsIterator{}

// iteratorDepthLimit is maximum allowed depth of recursion in Next(). It is set
// to be sufficiently large to not matter under normal circumstances while
// preventing the possibility of the stack overflow (as we've seen in #109197).
const iteratorDepthLimit = 1000

var iteratorDepthLimitExceededErr = errors.New("rowsIterator exceeded recursion depth limit")

func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
// Due to recursive calls to Next() below, this deferred function might get
// executed multiple times, yet it is not a problem because Close() is
Expand All @@ -486,8 +508,16 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
r.errCallback = nil
}
retErr = r.lastErr
r.depth--
}()

r.depth++
if r.depth > iteratorDepthLimit {
r.lastErr = iteratorDepthLimitExceededErr
r.done = true
return false, r.lastErr
}

if r.done {
return false, r.lastErr
}
Expand Down Expand Up @@ -1313,11 +1343,20 @@ type internalClientComm struct {
// mode determines how the results of the query execution are consumed.
mode ieExecutionMode

// resetRowsAffected is a callback that sends a single ieIteratorResult
// object to w in order to set the number of rows affected to zero. Only
// used in rowsAffectedIEExecutionMode when discarding a result (indicating
// that a command will be retried).
resetRowsAffected func()
// rowsAffectedState is only used in rowsAffectedIEExecutionMode.
rowsAffectedState struct {
// rewind is a callback that sends a single ieIteratorResult object to w
// in order to set the number of rows affected to zero. Used when
// discarding a result (indicating that a command will be retried).
rewind func()
// numRewinds tracks the number of times rewind() has been called.
numRewinds int64
// numRewindsLimit is the limit on the number of times we will perform
// the transparent retry. Once numRewinds reaches numRewindsLimit, the
// internal executor machinery will no longer retry and, instead, will
// return the error to the client.
numRewindsLimit int64
}

// sync, if set, is called whenever a Sync is executed with all accumulated
// results since the last Sync.
Expand Down Expand Up @@ -1357,7 +1396,8 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult {
// "finalized").
icc.results = icc.results[:len(icc.results)-1]
if icc.mode == rowsAffectedIEExecutionMode {
icc.resetRowsAffected()
icc.rowsAffectedState.numRewinds++
icc.rowsAffectedState.rewind()
}
},
}
Expand Down Expand Up @@ -1454,12 +1494,18 @@ func (icc *internalClientComm) Close() {}

// ClientPos is part of the ClientLock interface.
func (icc *internalClientComm) ClientPos() CmdPos {
if icc.mode == rowsAffectedIEExecutionMode {
if icc.mode == rowsAffectedIEExecutionMode &&
icc.rowsAffectedState.numRewinds < icc.rowsAffectedState.numRewindsLimit {
// With the "rows affected" mode, any command can be rewound since we
// assume that only a single command results in actual "rows affected",
// and in Discard we will reset the number to zero (if we were in
// process of evaluation that command when we encountered the retry
// error).
//
// However, to prevent stack overflow due to large (infinite?) number of
// retries we also need to check that we haven't reached the limit yet.
// If we have, then we fall back to the general logic below of
// determining whether we can retry.
return -1
}
// Find the latest result that cannot be rewound.
Expand Down
24 changes: 22 additions & 2 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,9 @@ func TestInternalExecutorEncountersRetry(t *testing.T) {

ctx := context.Background()
params, _ := createTestServerParams()
s, db, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
srv, db, kvDB := serverutils.StartServer(t, params)
defer srv.Stopper().Stop(ctx)
s := srv.ApplicationLayer()

if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -795,6 +796,25 @@ func TestInternalExecutorEncountersRetry(t *testing.T) {
}
})

// This test case verifies that ExecEx stops retrying once the limit on the
// number of retries is reached.
t.Run("ExecEx retry limit reached in implicit txn", func(t *testing.T) {
// This number must be less than the number of errors injected (which is
// determined by sql.numTxnRetryErrors = 3).
if _, err := db.Exec("SET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit = 1;"); err != nil {
t.Fatal(err)
}
defer func() {
if _, err := db.Exec("RESET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit;"); err != nil {
t.Fatal(err)
}
}()
_, err := ie.ExecEx(ctx, "read rows", nil /* txn */, ieo, rowsStmt)
if err == nil {
t.Fatal("expected to get an injected retriable error")
}
})

// TODO(yuzefovich): add a test for when a schema change is done in-between
// the retries.
}
Expand Down

0 comments on commit f36bb29

Please sign in to comment.