From 22dd3036457a7595b69329f3ab999f853c29930f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 13 Apr 2023 15:28:28 +0000 Subject: [PATCH 1/5] sql: minor cleanup around streamingCommandResult This commit cleans up `streamingCommandResult` a bit by storing `CmdPos` directly in it (which allows us to remove redundant `resWithPos` wrapper) and removing a redundant argument in `closeCallback`. This is effectively a purely mechanical change. Also fix up some comments. Release note: None --- pkg/sql/conn_executor_internal_test.go | 17 +++++++---- pkg/sql/conn_io.go | 25 +++++++++------- pkg/sql/internal.go | 40 +++++++++++--------------- pkg/sql/pgwire/command_result.go | 2 +- 4 files changed, 43 insertions(+), 41 deletions(-) diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 774da85939ce..57e4d9fbdaa7 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -261,7 +261,14 @@ func mustParseOne(s string) parser.Statement { // need to read from it. func startConnExecutor( ctx context.Context, -) (*StmtBuf, <-chan []resWithPos, <-chan error, *stop.Stopper, ieResultReader, error) { +) ( + *StmtBuf, + <-chan []*streamingCommandResult, + <-chan error, + *stop.Stopper, + ieResultReader, + error, +) { // A lot of boilerplate for creating a connExecutor. stopper := stop.NewStopper() clock := hlc.NewClockForTesting(nil) @@ -338,10 +345,10 @@ func startConnExecutor( s := NewServer(cfg, pool) buf := NewStmtBuf() - syncResults := make(chan []resWithPos, 1) + syncResults := make(chan []*streamingCommandResult, 1) resultChannel := newAsyncIEResultChannel() var cc ClientComm = &internalClientComm{ - sync: func(res []resWithPos) { + sync: func(res []*streamingCommandResult) { syncResults <- res }, w: resultChannel, @@ -378,9 +385,9 @@ func TestSessionCloseWithPendingTempTableInTxn(t *testing.T) { srv := s.SQLServer().(*Server) stmtBuf := NewStmtBuf() - flushed := make(chan []resWithPos) + flushed := make(chan []*streamingCommandResult) clientComm := &internalClientComm{ - sync: func(res []resWithPos) { + sync: func(res []*streamingCommandResult) { flushed <- res }, } diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index ab76542887e5..6ae89a3e5c77 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -634,7 +634,7 @@ const ( // // ClientComm is implemented by the pgwire connection. type ClientComm interface { - // createStatementResult creates a StatementResult for stmt. + // CreateStatementResult creates a StatementResult for stmt. // // descOpt specifies if result needs to inform the client about row schema. If // it doesn't, a SetColumns call becomes a no-op. @@ -683,7 +683,7 @@ type ClientComm interface { // CreateDrainResult creates a result for a Drain command. CreateDrainResult(pos CmdPos) DrainResult - // lockCommunication ensures that no further results are delivered to the + // LockCommunication ensures that no further results are delivered to the // client. The returned ClientLock can be queried to see what results have // been already delivered to the client and to discard results that haven't // been delivered. @@ -927,10 +927,11 @@ type ClientLock interface { // connection. ClientPos() CmdPos - // RTrim iterates backwards through the results and drops all results with - // position >= pos. - // It is illegal to call rtrim with a position <= clientPos(). In other words, - // results can + // RTrim drops all results with position >= pos. + // + // It is illegal to call RTrim with a position <= ClientPos(). In other + // words, results can only be trimmed if they haven't been sent to the + // client. RTrim(ctx context.Context, pos CmdPos) } @@ -971,6 +972,8 @@ const discarded resCloseType = false // streamingCommandResult is a CommandResult that streams rows on the channel // and can call a provided callback when closed. type streamingCommandResult struct { + pos CmdPos + // All the data (the rows and the metadata) are written into w. The // goroutine writing into this streamingCommandResult might block depending // on the synchronization strategy. @@ -980,7 +983,7 @@ type streamingCommandResult struct { rowsAffected int // closeCallback, if set, is called when Close()/Discard() is called. - closeCallback func(*streamingCommandResult, resCloseType) + closeCallback func(resCloseType) } var _ RestrictedCommandResult = &streamingCommandResult{} @@ -993,7 +996,7 @@ func (r *streamingCommandResult) ErrAllowReleased() error { // RevokePortalPausability is part of the sql.RestrictedCommandResult interface. func (r *streamingCommandResult) RevokePortalPausability() error { - return errors.AssertionFailedf("forPausablePortal is for limitedCommandResult only") + return errors.AssertionFailedf("RevokePortalPausability is for limitedCommandResult only") } // SetColumns is part of the RestrictedCommandResult interface. @@ -1056,7 +1059,7 @@ func (r *streamingCommandResult) SetError(err error) { // in execStmtInOpenState(). } -// GetEntryFromExtraInfo is part of the sql.RestrictedCommandResult interface. +// GetBulkJobId is part of the sql.RestrictedCommandResult interface. func (r *streamingCommandResult) GetBulkJobId() uint64 { return 0 } @@ -1084,14 +1087,14 @@ func (r *streamingCommandResult) RowsAffected() int { // Close is part of the CommandResultClose interface. func (r *streamingCommandResult) Close(context.Context, TransactionStatusIndicator) { if r.closeCallback != nil { - r.closeCallback(r, closed) + r.closeCallback(closed) } } // Discard is part of the CommandResult interface. func (r *streamingCommandResult) Discard() { if r.closeCallback != nil { - r.closeCallback(r, discarded) + r.closeCallback(discarded) } } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index a9b187eedeac..b74b539fc177 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -162,7 +162,7 @@ func (ie *InternalExecutor) runWithEx( sd *sessiondata.SessionData, stmtBuf *StmtBuf, wg *sync.WaitGroup, - syncCallback func([]resWithPos), + syncCallback func([]*streamingCommandResult), errCallback func(error), ) error { ex, err := ie.initConnEx(ctx, txn, w, sd, stmtBuf, syncCallback) @@ -203,7 +203,7 @@ func (ie *InternalExecutor) initConnEx( w ieResultWriter, sd *sessiondata.SessionData, stmtBuf *StmtBuf, - syncCallback func([]resWithPos), + syncCallback func([]*streamingCommandResult), ) (*connExecutor, error) { clientComm := &internalClientComm{ w: w, @@ -890,7 +890,7 @@ func (ie *InternalExecutor) execInternal( // statement we care about before that command is sent for execution. var resPos CmdPos - syncCallback := func(results []resWithPos) { + syncCallback := func(results []*streamingCommandResult) { // Close the stmtBuf so that the connExecutor exits its run() loop. stmtBuf.Close() for _, res := range results { @@ -928,7 +928,6 @@ func (ie *InternalExecutor) execInternal( } typeHints := make(tree.PlaceholderTypes, numParams) for i, d := range datums { - // Arg numbers start from 1. typeHints[tree.PlaceholderIdx(i)] = d.ResolvedType() } if len(qargs) == 0 { @@ -1096,7 +1095,7 @@ func (ie *InternalExecutor) checkIfTxnIsConsistent(txn *kv.Txn) error { type internalClientComm struct { // results will contain the results of the commands executed by an // InternalExecutor. - results []resWithPos + results []*streamingCommandResult // The results of the query execution will be written into w. w ieResultWriter @@ -1104,16 +1103,11 @@ type internalClientComm struct { lastDelivered CmdPos // sync, if set, is called whenever a Sync is executed. - sync func([]resWithPos) + sync func([]*streamingCommandResult) } var _ ClientComm = &internalClientComm{} -type resWithPos struct { - *streamingCommandResult - pos CmdPos -} - // CreateStatementResult is part of the ClientComm interface. func (icc *internalClientComm) CreateStatementResult( _ tree.Statement, @@ -1133,17 +1127,15 @@ func (icc *internalClientComm) CreateStatementResult( // createRes creates a result. onClose, if not nil, is called when the result is // closed. func (icc *internalClientComm) createRes(pos CmdPos, onClose func()) *streamingCommandResult { - res := &streamingCommandResult{ - w: icc.w, - closeCallback: func(res *streamingCommandResult, typ resCloseType) { - if typ == discarded { - return - } - icc.results = append(icc.results, resWithPos{streamingCommandResult: res, pos: pos}) - if onClose != nil { - onClose() - } - }, + res := &streamingCommandResult{pos: pos, w: icc.w} + res.closeCallback = func(typ resCloseType) { + if typ == discarded { + return + } + icc.results = append(icc.results, res) + if onClose != nil { + onClose() + } } return res } @@ -1163,7 +1155,7 @@ func (icc *internalClientComm) CreateBindResult(pos CmdPos) BindResult { // The returned SyncResult will call the sync callback when its closed. func (icc *internalClientComm) CreateSyncResult(pos CmdPos) SyncResult { return icc.createRes(pos, func() { - results := make([]resWithPos, len(icc.results)) + results := make([]*streamingCommandResult, len(icc.results)) copy(results, icc.results) icc.results = icc.results[:0] icc.sync(results) @@ -1236,7 +1228,7 @@ func (ncl *noopClientLock) ClientPos() CmdPos { // RTrim is part of the ClientLock interface. func (ncl *noopClientLock) RTrim(_ context.Context, pos CmdPos) { var i int - var r resWithPos + var r *streamingCommandResult for i, r = range ncl.results { if r.pos >= pos { break diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 254e589ea2d0..eed5070a4de9 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -386,7 +386,7 @@ func (r *commandResult) ResetStmtType(stmt tree.Statement) { r.cmdCompleteTag = stmt.StatementTag() } -// GetEntryFromExtraInfo is part of the sql.RestrictedCommandResult interface. +// GetBulkJobId is part of the sql.RestrictedCommandResult interface. func (r *commandResult) GetBulkJobId() uint64 { return r.bulkJobId } From 8567e60e9ca257880c049ea0d5381fcedd3fd7c7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 18 Apr 2023 17:32:23 +0000 Subject: [PATCH 2/5] sql: document how the machinery of the internal executor fits together Release note: None --- pkg/sql/internal.go | 72 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index b74b539fc177..831b68ac85bd 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -777,6 +777,78 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{ }, } +// execInternal is the main entry point for executing a statement via the +// InternalExecutor. From the high level it does the following: +// - parses the statement as well as its arguments +// - creates an "internal" connExecutor that runs in a separate goroutine +// - pushes a few commands onto the StmtBuf of the connExecutor to be evaluated +// - blocks until the first row of data is sent by the connExecutor +// - returns the rowsIterator that can consume the result of the statement. +// +// Only a single statement is supported. If there are no query arguments, then +// {ExecStmt, Sync} commands are pushed onto the StmtBuf, if there are some +// query arguments, then {PrepareStmt, BindStmt, ExecPortal, Sync} are pushed. +// +// The coordination between the rowsIterator and the connExecutor is managed by +// the internalClientComm as well as the ieResultChannel. The rowsIterator is +// the reader of the ieResultChannel while the connExecutor is the writer. The +// connExecutor goroutine exits (achieved by closing the StmtBuf) once the +// result for the Sync command evaluation is closed. +// +// execInternal defines two callbacks that are passed into the connExecutor +// machinery: +// - syncCallback is called when the result for the Sync command evaluation is +// closed. It is responsible for closing the StmtBuf (to allow the connExecutor +// to exit its 'run' loop) as well iterating over the results to see whether an +// error was encountered. Note that, unlike rows that are sent directly from the +// streamingCommandResult (the writer) to the rowsIterator (the reader), errors +// are buffered in the results - this is needed since the errors might be +// updated by the connExecutor after they have been generated (e.g. replacing +// context cancellation error with a nice "statement timed out" error). +// - errCallback is called when the connExecutor's 'run' returns an error in +// order to propagate the error to the rowsIterator. +// +// It's worth noting that rows as well some metadata (column schema as well as +// "rows affected" number) are sent directly from the streamingCommandResult to +// the rowsIterator, meaning that this communication doesn't go through the +// internalClientComm. +// +// The returned rowsIterator can be synchronized with the connExecutor goroutine +// if "synchronous" ieResultChannel is provided. In this case, only one +// goroutine (among the rowsIterator and the connExecutor) is active at any +// point in time since each read / write is blocked until the "send" / "receive" +// happens on the ieResultChannel. +// +// It's also worth noting that execInternal doesn't return until the +// connExecutor reaches the execution engine (i.e. until after the query +// planning has been performed). This is needed in order to avoid concurrent +// access to the txn by the rowsIterator and the connExecutor goroutines. In +// particular, this blocking allows us to avoid invalid concurrent txn access +// when during the stmt evaluation the internal executor needs to run "nested" +// internally-executed stmt (see #62415 for an example). +// TODO(yuzefovich): currently, this statement is not entirely true if the retry +// occurs. +// +// An additional responsibility of the internalClientComm is handling the retry +// errors. At the moment of writing, this is done incorrectly - namely, the +// internalClientComm implements the ClientLock interface in such a fashion as +// if any command can be transparently retried. +// TODO(yuzefovich): fix this. +// +// Note that only implicit txns can be retried internally. If an explicit txn is +// passed to execInternal, then the retry error is propagated to the +// rowsIterator in the following manner (say we use {ExecStmt, Sync} commands): +// - ExecStmt evaluation encounters a retry error +// - the error is stored in internalClientComm.results[0] (since it's not +// propagated right away to the rowsIterator) +// - the connExecutor's state machine rolls back the stmt +// - the connExecutor then processes the Sync command, and when the +// corresponding result is closed, syncCallback is called +// - in the syncCallback we iterate over two results and find the error in the +// zeroth result - the error is sent on the ieResultChannel +// - the rowsIterator receives the error and returns it to the caller of +// execInternal. + // execInternal executes a statement. // // sessionDataOverride can be used to control select fields in the executor's From bafd8a9768ccaaa6393222edf872d1676cb73779 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 13 Apr 2023 17:32:22 +0000 Subject: [PATCH 3/5] sql: use SetRowsAffected instead of IncrementRowsAffected This commit changes the `RestrictedCommandResult` interface a bit to make `SetRowsAffected` just set the number of rows affected, rather than increment it. This method is supposed to be called only once, so this change seems reasonable on its own, but it also has an additional benefit for the follow-up commit. In particular, in the follow-up commit we will preserve the ability to automatically retry statements of non-ROWS stmt type by the internal executor. Note that this commit on its own fixes the behavior for the "rows affected" statements (the only known occurrence of the bug that is generally fixed in the following commit). Release note: None --- pkg/ccl/changefeedccl/changefeed_dist.go | 4 +- pkg/sql/conn_io.go | 24 +++++------ pkg/sql/distsql_running.go | 22 +++++----- pkg/sql/internal.go | 28 ++++++++---- pkg/sql/internal_test.go | 43 +++++++++++++++++++ pkg/sql/pgwire/command_result.go | 12 ++---- pkg/sql/recursive_cte.go | 4 +- pkg/sql/routine.go | 4 +- pkg/sql/sessiondata/internal.go | 13 ++++++ .../local_only_session_data.proto | 3 ++ 10 files changed, 108 insertions(+), 49 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index aa6285955b2b..853a5b2816f6 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -492,8 +492,8 @@ func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) er return nil } } -func (w *changefeedResultWriter) IncrementRowsAffected(ctx context.Context, n int) { - w.rowsAffected += n +func (w *changefeedResultWriter) SetRowsAffected(ctx context.Context, n int) { + w.rowsAffected = n } func (w *changefeedResultWriter) SetError(err error) { w.err = err diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index 6ae89a3e5c77..9bb314e99c9c 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -791,12 +791,13 @@ type RestrictedCommandResult interface { // AddBatch is undefined. SupportsAddBatch() bool - // IncrementRowsAffected increments a counter by n. This is used for all + // SetRowsAffected sets RowsAffected counter to n. This is used for all // result types other than tree.Rows. - IncrementRowsAffected(ctx context.Context, n int) + SetRowsAffected(ctx context.Context, n int) - // RowsAffected returns either the number of times AddRow was called, or the - // sum of all n passed into IncrementRowsAffected. + // RowsAffected returns either the number of times AddRow was called, total + // number of rows pushed via AddBatch, or the last value of n passed into + // SetRowsAffected. RowsAffected() int // DisableBuffering can be called during execution to ensure that @@ -1026,7 +1027,7 @@ func (r *streamingCommandResult) ResetStmtType(stmt tree.Statement) { // AddRow is part of the RestrictedCommandResult interface. func (r *streamingCommandResult) AddRow(ctx context.Context, row tree.Datums) error { - // AddRow() and IncrementRowsAffected() are never called on the same command + // AddRow() and SetRowsAffected() are never called on the same command // result, so we will not double count the affected rows by an increment // here. r.rowsAffected++ @@ -1069,13 +1070,13 @@ func (r *streamingCommandResult) Err() error { return r.err } -// IncrementRowsAffected is part of the RestrictedCommandResult interface. -func (r *streamingCommandResult) IncrementRowsAffected(ctx context.Context, n int) { - r.rowsAffected += n +// SetRowsAffected is part of the RestrictedCommandResult interface. +func (r *streamingCommandResult) SetRowsAffected(ctx context.Context, n int) { + r.rowsAffected = n // streamingCommandResult might be used outside of the internal executor // (i.e. not by rowsIterator) in which case the channel is not set. if r.w != nil { - _ = r.w.addResult(ctx, ieIteratorResult{rowsAffectedIncrement: &n}) + _ = r.w.addResult(ctx, ieIteratorResult{rowsAffected: &n}) } } @@ -1113,11 +1114,6 @@ func (r *streamingCommandResult) SetPortalOutput( ) { } -// SetRowsAffected is part of the sql.CopyInResult interface. -func (r *streamingCommandResult) SetRowsAffected(ctx context.Context, rows int) { - r.rowsAffected = rows -} - // SendCopyOut is part of the sql.CopyOutResult interface. func (r *streamingCommandResult) SendCopyOut( ctx context.Context, cols colinfo.ResultColumns, format pgwirebase.FormatCode, diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 988608eb3536..87ef4cb5613d 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1001,7 +1001,7 @@ type rowResultWriter interface { // AddRow writes a result row. // Note that the caller owns the row slice and might reuse it. AddRow(ctx context.Context, row tree.Datums) error - IncrementRowsAffected(ctx context.Context, n int) + SetRowsAffected(ctx context.Context, n int) SetError(error) Err() error } @@ -1092,8 +1092,8 @@ func (w *errOnlyResultWriter) AddBatch(ctx context.Context, batch coldata.Batch) panic("AddBatch not supported by errOnlyResultWriter") } -func (w *errOnlyResultWriter) IncrementRowsAffected(ctx context.Context, n int) { - panic("IncrementRowsAffected not supported by errOnlyResultWriter") +func (w *errOnlyResultWriter) SetRowsAffected(ctx context.Context, n int) { + panic("SetRowsAffected not supported by errOnlyResultWriter") } // RowResultWriter is a thin wrapper around a RowContainer. @@ -1110,9 +1110,9 @@ func NewRowResultWriter(rowContainer *rowContainerHelper) *RowResultWriter { return &RowResultWriter{rowContainer: rowContainer} } -// IncrementRowsAffected implements the rowResultWriter interface. -func (b *RowResultWriter) IncrementRowsAffected(ctx context.Context, n int) { - b.rowsAffected += n +// SetRowsAffected implements the rowResultWriter interface. +func (b *RowResultWriter) SetRowsAffected(ctx context.Context, n int) { + b.rowsAffected = n } // AddRow implements the rowResultWriter interface. @@ -1150,9 +1150,9 @@ func NewCallbackResultWriter( return &CallbackResultWriter{fn: fn} } -// IncrementRowsAffected is part of the rowResultWriter interface. -func (c *CallbackResultWriter) IncrementRowsAffected(ctx context.Context, n int) { - c.rowsAffected += n +// SetRowsAffected is part of the rowResultWriter interface. +func (c *CallbackResultWriter) SetRowsAffected(ctx context.Context, n int) { + c.rowsAffected = n } // AddRow is part of the rowResultWriter interface. @@ -1436,7 +1436,7 @@ func (r *DistSQLReceiver) Push( // We only need the row count. planNodeToRowSource is set up to handle // ensuring that the last stage in the pipeline will return a single-column // row with the row count in it, so just grab that and exit. - r.resultWriterMu.row.IncrementRowsAffected(r.ctx, n) + r.resultWriterMu.row.SetRowsAffected(r.ctx, n) return r.status } @@ -1520,7 +1520,7 @@ func (r *DistSQLReceiver) PushBatch( // We only need the row count. planNodeToRowSource is set up to handle // ensuring that the last stage in the pipeline will return a single-column // row with the row count in it, so just grab that and exit. - r.resultWriterMu.row.IncrementRowsAffected(r.ctx, int(batch.ColVec(0).Int64()[0])) + r.resultWriterMu.row.SetRowsAffected(r.ctx, int(batch.ColVec(0).Int64()[0])) return r.status } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 831b68ac85bd..0a68712d92f5 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -353,10 +353,10 @@ func (ie *InternalExecutor) newConnExecutorWithTxn( type ieIteratorResult struct { // Exactly one of these 4 fields will be set. - row tree.Datums - rowsAffectedIncrement *int - cols colinfo.ResultColumns - err error + row tree.Datums + rowsAffected *int + cols colinfo.ResultColumns + err error } type rowsIterator struct { @@ -429,8 +429,8 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { r.lastRow = data.row return true, nil } - if data.rowsAffectedIncrement != nil { - r.rowsAffected += *data.rowsAffectedIncrement + if data.rowsAffected != nil { + r.rowsAffected = *data.rowsAffected return r.Next(ctx) } if data.cols != nil { @@ -748,6 +748,8 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess if o.QualityOfService != nil { sd.DefaultTxnQualityOfService = o.QualityOfService.ValidateInternal() } + // We always override the injection knob based on the override struct. + sd.InjectRetryErrorsEnabled = o.InjectRetryErrorsEnabled } func (ie *InternalExecutor) maybeRootSessionDataOverride( @@ -830,9 +832,10 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{ // occurs. // // An additional responsibility of the internalClientComm is handling the retry -// errors. At the moment of writing, this is done incorrectly - namely, the -// internalClientComm implements the ClientLock interface in such a fashion as -// if any command can be transparently retried. +// errors. At the moment of writing, this is done incorrectly (except for stmts +// of "RowsAffected" type) - namely, the internalClientComm implements the +// ClientLock interface in such a fashion as if any command can be transparently +// retried. // TODO(yuzefovich): fix this. // // Note that only implicit txns can be retried internally. If an explicit txn is @@ -848,6 +851,13 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{ // zeroth result - the error is sent on the ieResultChannel // - the rowsIterator receives the error and returns it to the caller of // execInternal. +// +// Retries for implicit txns and statements of "RowsAffected" type are achieved +// by overriding the "rows affected" number, stored in the rowsIterator, with +// the latest information. With such setup, even if the stmt execution before +// the retry communicated its incorrect "rows affected" information, that info +// is overridden accordingly after the connExecutor re-executes the +// corresponding command. // execInternal executes a statement. // diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index aa52be7b87d7..bc4541b3cb78 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -684,6 +684,49 @@ func TestInternalDBWithOverrides(t *testing.T) { assert.Equal(t, "'off'", drow[0].String()) } +// TestInternalExecutorEncountersRetry verifies that if the internal executor +// encounters a retry error after some data (rows or metadata) have been +// communicated to the client, the query either results in a retry error (when +// rows have been sent) or correctly transparently retries (#98558). +func TestInternalExecutorEncountersRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + s, _, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + ie := s.InternalExecutor().(*sql.InternalExecutor) + + // This test case verifies that if we execute the stmt of the RowsAffected + // type, it is transparently retried and the correct number of "rows + // affected" is reported. + t.Run("RowsAffected stmt", func(t *testing.T) { + // We will use PAUSE SCHEDULES statement which is of RowsAffected type. + // + // Notably, internally this statement will run some other queries via + // the "nested" internal executor, but those "nested" queries don't hit + // the injected retry error since this knob only applies to the "top" + // IE. + const stmt = `PAUSE SCHEDULES SELECT id FROM [SHOW SCHEDULES FOR SQL STATISTICS];` + paused, err := ie.ExecEx( + ctx, "pause schedule", nil, /* txn */ + sessiondata.InternalExecutorOverride{ + User: username.RootUserName(), + InjectRetryErrorsEnabled: true, + }, + stmt, + ) + if err != nil { + t.Fatal(err) + } + if paused != 1 { + t.Fatalf("expected 1 schedule to be paused, got %d", paused) + } + }) +} + // TODO(andrei): Test that descriptor leases are released by the // Executor, with and without a higher-level txn. When there is no // higher-level txn, the leases are released normally by the txn finishing. When diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index eed5070a4de9..462a728d88a2 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -331,12 +331,6 @@ func (r *commandResult) SetPortalOutput( _ /* err */ = r.conn.writeRowDescription(ctx, cols, formatCodes, &r.conn.writerState.buf) } -// SetRowsAffected is part of the sql.CopyIn interface. -func (r *commandResult) SetRowsAffected(ctx context.Context, n int) { - r.assertNotReleased() - r.rowsAffected = n -} - // SendCopyOut is part of the sql.CopyOutResult interface. func (r *commandResult) SendCopyOut( ctx context.Context, cols colinfo.ResultColumns, format pgwirebase.FormatCode, @@ -367,10 +361,10 @@ func (r *commandResult) SendCopyDone(ctx context.Context) error { return r.conn.bufferCopyDone() } -// IncrementRowsAffected is part of the sql.RestrictedCommandResult interface. -func (r *commandResult) IncrementRowsAffected(ctx context.Context, n int) { +// SetRowsAffected is part of the sql.RestrictedCommandResult interface. +func (r *commandResult) SetRowsAffected(ctx context.Context, n int) { r.assertNotReleased() - r.rowsAffected += n + r.rowsAffected = n } // RowsAffected is part of the sql.RestrictedCommandResult interface. diff --git a/pkg/sql/recursive_cte.go b/pkg/sql/recursive_cte.go index 6a092d6366b7..450b366922d0 100644 --- a/pkg/sql/recursive_cte.go +++ b/pkg/sql/recursive_cte.go @@ -198,8 +198,8 @@ func (n *recursiveCTENode) AddRow(ctx context.Context, row tree.Datums) error { return n.workingRows.AddRow(ctx, row) } -// IncrementRowsAffected is part of the rowResultWriter interface. -func (n *recursiveCTENode) IncrementRowsAffected(context.Context, int) { +// SetRowsAffected is part of the rowResultWriter interface. +func (n *recursiveCTENode) SetRowsAffected(context.Context, int) { } // SetError is part of the rowResultWriter interface. diff --git a/pkg/sql/routine.go b/pkg/sql/routine.go index 0e3e9ee671fb..7f66db6de90d 100644 --- a/pkg/sql/routine.go +++ b/pkg/sql/routine.go @@ -208,8 +208,8 @@ func (d *droppingResultWriter) AddRow(ctx context.Context, row tree.Datums) erro return nil } -// IncrementRowsAffected is part of the rowResultWriter interface. -func (d *droppingResultWriter) IncrementRowsAffected(ctx context.Context, n int) {} +// SetRowsAffected is part of the rowResultWriter interface. +func (d *droppingResultWriter) SetRowsAffected(ctx context.Context, n int) {} // SetError is part of the rowResultWriter interface. func (d *droppingResultWriter) SetError(err error) { diff --git a/pkg/sql/sessiondata/internal.go b/pkg/sql/sessiondata/internal.go index 2a83beb23aa3..0c2590fe2894 100644 --- a/pkg/sql/sessiondata/internal.go +++ b/pkg/sql/sessiondata/internal.go @@ -34,6 +34,19 @@ type InternalExecutorOverride struct { // used as long as that value has a QoSLevel defined // (see QoSLevel.ValidateInternal). QualityOfService *sessiondatapb.QoSLevel + // InjectRetryErrorsEnabled, if true, injects a transaction retry error + // _after_ the statement has been processed by the execution engine and + // _before_ the control flow is returned to the connExecutor state machine. + // + // The error will be injected (roughly speaking) three times (determined by + // the numTxnRetryErrors constant in conn_executor_exec.go). + // + // For testing only. + // + // NB: this override applies only to the "top" internal executor, i.e. it + // does **not** propagate further to "nested" executors that are spawned up + // by the "top" executor. + InjectRetryErrorsEnabled bool } // NoSessionDataOverride is the empty InternalExecutorOverride which does not diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 19914953017a..348a314c294e 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -203,6 +203,9 @@ message LocalOnlySessionData { // InjectRetryErrorsEnabled causes statements inside an explicit // transaction to return a transaction retry error. It is intended for // developers to test their app's retry logic. + // + // Note that this session variable is **not** propagated to the internal + // executors - use InternalExecutorOverride for that. bool inject_retry_errors_enabled = 54; // NullOrderedLast controls whether NULL is ordered last. We default to // NULLS FIRST for ascending order by default, whereas postgres defaults From 6a20389058206c54b34a839cd77c4c6ca8c50c7d Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 14 Apr 2023 21:25:34 +0000 Subject: [PATCH 4/5] sql: fix internal executor when it encounters a retry error This commit fixes a bug with the internal executor when it encounters an internal retry. Previously, the implementation of the "rewind capability" was such that we always assumed that we can rewind the StmtBuf for the IE at any point; however, that is not generally true. In particular, if we communicated anything to the client of the IE's connExecutor (`rowsIterator` in this context), then we cannot rewind the current command we're evaluating. In theory, we could have pushed some rows through the iterator only to encounter the internal retry later which would then lead to re-executing the command from the start (in other words, the rows that we've pushed before the retry would be double-pushed); in practice, however, we haven't actually seen this (at least yet). What we have seen is the number of rows affected being double counted. This particular reproduction was already fixed in the previous commit, but this commit fixes the problem more generally. This commit makes it so that for every `streamingCommandResult` we are tracking whether it has already communicated something to the client so that the result can no longer be rewound, and then we use that tracking mechanism to correctly implement the rewind capability. We have three possible types of data that we communicate: - rows - number of rows affected - column schema. If the retry happens after some rows have been communicated, we're out of luck - there is no way we can retry the stmt internally, so from now on we will return a retry error to the client. If the retry happens after "rows affected", then given the adjustment in the previous commit we can proceed transparently. In order to avoid propagating the retry error up when it occurs after having received the column schema but before pushing out any rows, this commit adjusts the behavior to always keep the latest column schema, thus, we can still proceed transparently in this case. This bug has been present since at least 21.1 when `streamingCommandResult` was introduced. However, since we now might return a retry error in some cases, this could lead to test failures or flakes, or even to errors in some internal CRDB operations that execute statements of ROWS type (if there is no appropriate retry logic), so I intend to only backport this to 23.1. There is also no release note since the only failure we've seen is about double counted "rows affected" number, the likelihood of which has significantly increased due to the jobs system refactor (i.e. mostly 23.1 is affected AFAIK). Additionally, this commit makes it so that we correctly block the `execInternal` call until the first actual, non-metadata result is seen (this behavior is needed to correctly synchronize access to the txn before the stmt is given to the execution engine). Release note: None --- pkg/sql/conn_io.go | 28 ++++-- pkg/sql/internal.go | 200 +++++++++++++++++++++------------------ pkg/sql/internal_test.go | 33 +++++-- 3 files changed, 150 insertions(+), 111 deletions(-) diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index 9bb314e99c9c..a28051ee4515 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -965,11 +965,6 @@ func (rc *rewindCapability) close() { rc.cl.Close() } -type resCloseType bool - -const closed resCloseType = true -const discarded resCloseType = false - // streamingCommandResult is a CommandResult that streams rows on the channel // and can call a provided callback when closed. type streamingCommandResult struct { @@ -980,11 +975,17 @@ type streamingCommandResult struct { // on the synchronization strategy. w ieResultWriter + // cannotRewind indicates whether this result has communicated some data + // (rows or metadata) such that the corresponding command cannot be rewound. + cannotRewind bool + err error rowsAffected int - // closeCallback, if set, is called when Close()/Discard() is called. - closeCallback func(resCloseType) + // closeCallback, if set, is called when Close() is called. + closeCallback func() + // discardCallback, if set, is called when Discard() is called. + discardCallback func() } var _ RestrictedCommandResult = &streamingCommandResult{} @@ -1007,6 +1008,8 @@ func (r *streamingCommandResult) SetColumns(ctx context.Context, cols colinfo.Re if cols == nil { cols = colinfo.ResultColumns{} } + // NB: we do not set r.cannotRewind here because the correct columns will be + // set in rowsIterator.Next. _ = r.w.addResult(ctx, ieIteratorResult{cols: cols}) } @@ -1033,6 +1036,9 @@ func (r *streamingCommandResult) AddRow(ctx context.Context, row tree.Datums) er r.rowsAffected++ rowCopy := make(tree.Datums, len(row)) copy(rowCopy, row) + // Once we add this row to the writer, it can be immediately consumed by the + // reader, so this result can no longer be rewound. + r.cannotRewind = true return r.w.addResult(ctx, ieIteratorResult{row: rowCopy}) } @@ -1076,6 +1082,8 @@ func (r *streamingCommandResult) SetRowsAffected(ctx context.Context, n int) { // streamingCommandResult might be used outside of the internal executor // (i.e. not by rowsIterator) in which case the channel is not set. if r.w != nil { + // NB: we do not set r.cannotRewind here because rowsAffected value will + // be overwritten in rowsIterator.Next correctly if necessary. _ = r.w.addResult(ctx, ieIteratorResult{rowsAffected: &n}) } } @@ -1088,14 +1096,14 @@ func (r *streamingCommandResult) RowsAffected() int { // Close is part of the CommandResultClose interface. func (r *streamingCommandResult) Close(context.Context, TransactionStatusIndicator) { if r.closeCallback != nil { - r.closeCallback(closed) + r.closeCallback() } } // Discard is part of the CommandResult interface. func (r *streamingCommandResult) Discard() { - if r.closeCallback != nil { - r.closeCallback(discarded) + if r.discardCallback != nil { + r.discardCallback() } } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 0a68712d92f5..76703b60adaf 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -206,10 +206,8 @@ func (ie *InternalExecutor) initConnEx( syncCallback func([]*streamingCommandResult), ) (*connExecutor, error) { clientComm := &internalClientComm{ - w: w, - // init lastDelivered below the position of the first result (0). - lastDelivered: -1, - sync: syncCallback, + w: w, + sync: syncCallback, } applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */) @@ -434,16 +432,10 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { return r.Next(ctx) } if data.cols != nil { - // Ignore the result columns if they are already set on the - // iterator: it is possible for ROWS statement type to be executed - // in a 'rows affected' mode, in such case the correct columns are - // set manually when instantiating the iterator, but the result - // columns of the statement are also sent by SetColumns() (we need - // to keep the former). - if r.resultCols == nil { - r.resultCols = data.cols - } - return r.Next(ctx) + // At this point we don't expect to see the columns - we should only + // return the rowsIterator to the caller of execInternal after the + // columns have been determined. + data.err = errors.AssertionFailedf("unexpectedly received non-nil cols in Next: %v", data) } if data.err == nil { data.err = errors.AssertionFailedf("unexpectedly empty ieIteratorResult object") @@ -823,20 +815,35 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{ // // It's also worth noting that execInternal doesn't return until the // connExecutor reaches the execution engine (i.e. until after the query -// planning has been performed). This is needed in order to avoid concurrent -// access to the txn by the rowsIterator and the connExecutor goroutines. In -// particular, this blocking allows us to avoid invalid concurrent txn access -// when during the stmt evaluation the internal executor needs to run "nested" -// internally-executed stmt (see #62415 for an example). +// planning has been performed). This blocking behavior is still respected in +// case a retry error occurs after the column schema is communicated, but before +// the stmt reaches the execution engine. This is needed in order to avoid +// concurrent access to the txn by the rowsIterator and the connExecutor +// goroutines. In particular, this blocking allows us to avoid invalid +// concurrent txn access when during the stmt evaluation the internal executor +// needs to run "nested" internally-executed stmt (see #62415 for an example). // TODO(yuzefovich): currently, this statement is not entirely true if the retry // occurs. // // An additional responsibility of the internalClientComm is handling the retry -// errors. At the moment of writing, this is done incorrectly (except for stmts -// of "RowsAffected" type) - namely, the internalClientComm implements the -// ClientLock interface in such a fashion as if any command can be transparently -// retried. -// TODO(yuzefovich): fix this. +// errors. If a retry error is encountered with an implicit txn (i.e. nil txn +// is passed to execInternal), then we do our best to retry the execution +// transparently; however, we can **not** do so in all cases, so sometimes the +// retry error will be propagated to the user of the rowsIterator. In +// particular, here is the summary of how retries are handled: +// - If the retry error occurs after some rows have been sent from the +// streamingCommandResult to the rowsIterator, we have no choice but to return +// the retry error to the caller. +// - If the retry error occurs after the "rows affected" metadata was sent for +// stmts of "RowsAffected" type, then we will always retry transparently. This +// is achieved by overriding the "rows affected" number, stored in the +// rowsIterator, with the latest information. With such setup, even if the +// stmt execution before the retry communicated its incorrect "rows affected" +// information, that info is overridden accordingly after the connExecutor +// re-executes the corresponding command. +// - If the retry error occurs after the column schema is sent, then - similar +// to how we handle the "rows affected" metadata - we always transparently +// retry by keeping the latest information. // // Note that only implicit txns can be retried internally. If an explicit txn is // passed to execInternal, then the retry error is propagated to the @@ -851,13 +858,6 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{ // zeroth result - the error is sent on the ieResultChannel // - the rowsIterator receives the error and returns it to the caller of // execInternal. -// -// Retries for implicit txns and statements of "RowsAffected" type are achieved -// by overriding the "rows affected" number, stored in the rowsIterator, with -// the latest information. With such setup, even if the stmt execution before -// the retry communicated its incorrect "rows affected" information, that info -// is overridden accordingly after the connExecutor re-executes the -// corresponding command. // execInternal executes a statement. // @@ -968,10 +968,6 @@ func (ie *InternalExecutor) execInternal( return nil, err } - // resPos will be set to the position of the command that represents the - // statement we care about before that command is sent for execution. - var resPos CmdPos - syncCallback := func(results []*streamingCommandResult) { // Close the stmtBuf so that the connExecutor exits its run() loop. stmtBuf.Close() @@ -983,15 +979,7 @@ func (ie *InternalExecutor) execInternal( _ = rw.addResult(ctx, ieIteratorResult{err: res.Err()}) return } - if res.pos == resPos { - return - } } - _ = rw.addResult(ctx, ieIteratorResult{ - err: errors.AssertionFailedf( - "missing result for pos: %d and no previous error", resPos, - ), - }) } // errCallback is called if an error is returned from the connExecutor's // run() loop. @@ -1013,7 +1001,6 @@ func (ie *InternalExecutor) execInternal( typeHints[tree.PlaceholderIdx(i)] = d.ResolvedType() } if len(qargs) == 0 { - resPos = 0 if err := stmtBuf.Push( ctx, ExecStmt{ @@ -1028,7 +1015,6 @@ func (ie *InternalExecutor) execInternal( return nil, err } } else { - resPos = 2 if err := stmtBuf.Push( ctx, PrepareStmt{ @@ -1078,15 +1064,22 @@ func (ie *InternalExecutor) execInternal( r.first = &first } } - if !r.done && r.first.cols != nil { + for !r.done && r.first.cols != nil { // If the query is of ROWS statement type, the very first thing sent on // the channel will be the column schema. This will occur before the // query is given to the execution engine, so we actually need to get // the next piece from the data channel. // + // We also need to keep on looping until we get the first actual result + // with rows. In theory, it is possible for a stmt of ROWS type to + // encounter a retry error after sending the column schema but before + // going into the execution engine. In such a scenario we want to keep + // the latest column schema (in case there was a schema change + // in-between retries). + // // Note that only statements of ROWS type should send the cols, but we // choose to be defensive and don't assert that. - if r.resultCols == nil { + if parsed.AST.StatementReturnType() == tree.Rows { r.resultCols = r.first.cols } var first ieIteratorResult @@ -1173,22 +1166,30 @@ func (ie *InternalExecutor) checkIfTxnIsConsistent(txn *kv.Txn) error { } // internalClientComm is an implementation of ClientComm used by the -// InternalExecutor. Result rows are buffered in memory. +// InternalExecutor. Result rows are streamed on the channel to the +// ieResultWriter. type internalClientComm struct { - // results will contain the results of the commands executed by an + // results contains the results of the commands executed by the // InternalExecutor. + // + // In production setting we expect either two (ExecStmt, Sync) or four + // (PrepareStmt, BindStmt, ExecPortal, Sync) commands pushed to the StmtBuf, + // after which point the internalClientComm is no longer used. We also take + // advantage of the invariant that only a single command is being evaluated + // at any point in time (i.e. any command is created, evaluated, and then + // closed / discarded, and only after that a new command can be processed). results []*streamingCommandResult // The results of the query execution will be written into w. w ieResultWriter - lastDelivered CmdPos - - // sync, if set, is called whenever a Sync is executed. + // sync, if set, is called whenever a Sync is executed with all accumulated + // results since the last Sync. sync func([]*streamingCommandResult) } var _ ClientComm = &internalClientComm{} +var _ ClientLock = &internalClientComm{} // CreateStatementResult is part of the ClientComm interface. func (icc *internalClientComm) CreateStatementResult( @@ -1203,51 +1204,65 @@ func (icc *internalClientComm) CreateStatementResult( _ bool, _ PortalPausablity, ) CommandResult { - return icc.createRes(pos, nil /* onClose */) -} - -// createRes creates a result. onClose, if not nil, is called when the result is -// closed. -func (icc *internalClientComm) createRes(pos CmdPos, onClose func()) *streamingCommandResult { - res := &streamingCommandResult{pos: pos, w: icc.w} - res.closeCallback = func(typ resCloseType) { - if typ == discarded { - return - } - icc.results = append(icc.results, res) - if onClose != nil { - onClose() - } + return icc.createRes(pos) +} + +// createRes creates a result. +func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult { + res := &streamingCommandResult{ + pos: pos, + w: icc.w, + discardCallback: func() { + // If this result is being discarded, then we can simply remove the + // last item from the slice. Such behavior is valid since we don't + // create a new result until the previous one is either closed or + // discarded (i.e. we are always processing the last entry in the + // results slice at the moment and all previous results have been + // "finalized"). + icc.results = icc.results[:len(icc.results)-1] + }, } + icc.results = append(icc.results, res) return res } // CreatePrepareResult is part of the ClientComm interface. func (icc *internalClientComm) CreatePrepareResult(pos CmdPos) ParseResult { - return icc.createRes(pos, nil /* onClose */) + return icc.createRes(pos) } // CreateBindResult is part of the ClientComm interface. func (icc *internalClientComm) CreateBindResult(pos CmdPos) BindResult { - return icc.createRes(pos, nil /* onClose */) + return icc.createRes(pos) } // CreateSyncResult is part of the ClientComm interface. // -// The returned SyncResult will call the sync callback when its closed. +// The returned SyncResult will call the sync callback when it's closed. func (icc *internalClientComm) CreateSyncResult(pos CmdPos) SyncResult { - return icc.createRes(pos, func() { - results := make([]*streamingCommandResult, len(icc.results)) - copy(results, icc.results) - icc.results = icc.results[:0] - icc.sync(results) - icc.lastDelivered = pos - } /* onClose */) + res := icc.createRes(pos) + if icc.sync != nil { + res.closeCallback = func() { + // sync might communicate with the reader, so we defensively mark + // this result as no longer being able to rewind. This shouldn't be + // that important though - we shouldn't be trying to rewind the Sync + // command anyway, so we're being conservative here. + icc.results[len(icc.results)-1].cannotRewind = true + icc.sync(icc.results) + icc.results = icc.results[:0] + } + } + return res } // LockCommunication is part of the ClientComm interface. +// +// The current implementation writes results from the same goroutine as the one +// calling LockCommunication (main connExecutor's goroutine). Therefore, there's +// nothing to "lock" - communication is naturally blocked as the command +// processor won't write any more results. func (icc *internalClientComm) LockCommunication() ClientLock { - return (*noopClientLock)(icc) + return icc } // Flush is part of the ClientComm interface. @@ -1257,7 +1272,7 @@ func (icc *internalClientComm) Flush(pos CmdPos) error { // CreateDescribeResult is part of the ClientComm interface. func (icc *internalClientComm) CreateDescribeResult(pos CmdPos) DescribeResult { - return icc.createRes(pos, nil /* onClose */) + return icc.createRes(pos) } // CreateDeleteResult is part of the ClientComm interface. @@ -1295,28 +1310,29 @@ func (icc *internalClientComm) CreateDrainResult(pos CmdPos) DrainResult { panic("unimplemented") } -// noopClientLock is an implementation of ClientLock that says that no results -// have been communicated to the client. -type noopClientLock internalClientComm - // Close is part of the ClientLock interface. -func (ncl *noopClientLock) Close() {} +func (icc *internalClientComm) Close() {} // ClientPos is part of the ClientLock interface. -func (ncl *noopClientLock) ClientPos() CmdPos { - return ncl.lastDelivered +func (icc *internalClientComm) ClientPos() CmdPos { + // Find the latest result that cannot be rewound. + lastDelivered := CmdPos(-1) + for _, r := range icc.results { + if r.cannotRewind { + lastDelivered = r.pos + } + } + return lastDelivered } // RTrim is part of the ClientLock interface. -func (ncl *noopClientLock) RTrim(_ context.Context, pos CmdPos) { - var i int - var r *streamingCommandResult - for i, r = range ncl.results { +func (icc *internalClientComm) RTrim(_ context.Context, pos CmdPos) { + for i, r := range icc.results { if r.pos >= pos { - break + icc.results = icc.results[:i] + return } } - ncl.results = ncl.results[:i] } // extraTxnState is to store extra transaction state info that diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index bc4541b3cb78..dcae521a4dab 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -694,10 +694,18 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { ctx := context.Background() params, _ := tests.CreateTestServerParams() - s, _, _ := serverutils.StartServer(t, params) + s, db, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) + if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil { + t.Fatal(err) + } + ie := s.InternalExecutor().(*sql.InternalExecutor) + ieo := sessiondata.InternalExecutorOverride{ + User: username.RootUserName(), + InjectRetryErrorsEnabled: true, + } // This test case verifies that if we execute the stmt of the RowsAffected // type, it is transparently retried and the correct number of "rows @@ -710,14 +718,7 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { // the injected retry error since this knob only applies to the "top" // IE. const stmt = `PAUSE SCHEDULES SELECT id FROM [SHOW SCHEDULES FOR SQL STATISTICS];` - paused, err := ie.ExecEx( - ctx, "pause schedule", nil, /* txn */ - sessiondata.InternalExecutorOverride{ - User: username.RootUserName(), - InjectRetryErrorsEnabled: true, - }, - stmt, - ) + paused, err := ie.ExecEx(ctx, "pause schedule", nil /* txn */, ieo, stmt) if err != nil { t.Fatal(err) } @@ -725,6 +726,20 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { t.Fatalf("expected 1 schedule to be paused, got %d", paused) } }) + + // This test case verifies that if the retry error occurs after some rows + // have been communicated to the client, then the stmt results in the retry + // error too - the IE cannot transparently retry it. + t.Run("Rows stmt", func(t *testing.T) { + const stmt = `SELECT * FROM test.t` + _, err := ie.QueryBufferedEx(ctx, "read rows", nil /* txn */, ieo, stmt) + if !testutils.IsError(err, "inject_retry_errors_enabled") { + t.Fatalf("expected to see injected retry error, got %v", err) + } + }) + + // TODO(yuzefovich): add a test for when a schema change is done in-between + // the retries. } // TODO(andrei): Test that descriptor leases are released by the From 63861f49c58e0608b79b1458ae0ddb1a3ccf344d Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 14 Apr 2023 01:19:25 +0000 Subject: [PATCH 5/5] sql: transparently retry InternalExecutor.Exec{Ex} for ROWS stmts This commit teaches the internal executor to avoid propagating the retry errors to the client when executing `Exec{Ex}` methods. In those methods, we only return the number of rows affected, thus, in order to preserve the ability to transparently retry all statements (even if they are of ROWS statement type), we simply need to be able to reset the rows affected number on the `rowsIterator` if a result is being discarded (which indicates that a retry error has been encountered and the rewind mechanism is at play). This structure relies on the assumption that we push at most one command into the StmtBuf that results in "rows affected" which is true at the moment of writing, and I don't foresee that changing, at least for now. (This assumption would be incorrect only if we were to allow executing multiple statements as part of a single method invocation on the internal executor, but I don't think that's going to happen - we only might do so in a few tests.) Release note: None --- pkg/sql/internal.go | 79 +++++++++++++++++++++++++++++++++++++--- pkg/sql/internal_test.go | 29 +++++++++++++-- 2 files changed, 99 insertions(+), 9 deletions(-) diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 76703b60adaf..03959108d732 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -159,13 +159,14 @@ func (ie *InternalExecutor) runWithEx( ctx context.Context, txn *kv.Txn, w ieResultWriter, + mode ieExecutionMode, sd *sessiondata.SessionData, stmtBuf *StmtBuf, wg *sync.WaitGroup, syncCallback func([]*streamingCommandResult), errCallback func(error), ) error { - ex, err := ie.initConnEx(ctx, txn, w, sd, stmtBuf, syncCallback) + ex, err := ie.initConnEx(ctx, txn, w, mode, sd, stmtBuf, syncCallback) if err != nil { return err } @@ -201,13 +202,19 @@ func (ie *InternalExecutor) initConnEx( ctx context.Context, txn *kv.Txn, w ieResultWriter, + mode ieExecutionMode, sd *sessiondata.SessionData, stmtBuf *StmtBuf, syncCallback func([]*streamingCommandResult), ) (*connExecutor, error) { clientComm := &internalClientComm{ w: w, + mode: mode, sync: syncCallback, + resetRowsAffected: func() { + var zero int + _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) + }, } applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */) @@ -363,6 +370,8 @@ type rowsIterator struct { rowsAffected int resultCols colinfo.ResultColumns + mode ieExecutionMode + // first, if non-nil, is the first object read from r. We block the return // of the created rowsIterator in execInternal() until the producer writes // something into the corresponding ieResultWriter because this indicates @@ -432,6 +441,12 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { return r.Next(ctx) } if data.cols != nil { + if r.mode == rowsAffectedIEExecutionMode { + // In "rows affected" execution mode we simply ignore the column + // schema since we always return the number of rows affected + // (i.e. a single integer column). + return r.Next(ctx) + } // At this point we don't expect to see the columns - we should only // return the rowsIterator to the caller of execInternal after the // columns have been determined. @@ -559,7 +574,7 @@ func (ie *InternalExecutor) queryInternalBuffered( // We will run the query to completion, so we can use an async result // channel. rw := newAsyncIEResultChannel() - it, err := ie.execInternal(ctx, opName, rw, txn, sessionDataOverride, stmt, qargs...) + it, err := ie.execInternal(ctx, opName, rw, defaultIEExecutionMode, txn, sessionDataOverride, stmt, qargs...) if err != nil { return nil, nil, err } @@ -661,7 +676,11 @@ func (ie *InternalExecutor) ExecEx( // We will run the query to completion, so we can use an async result // channel. rw := newAsyncIEResultChannel() - it, err := ie.execInternal(ctx, opName, rw, txn, session, stmt, qargs...) + // Since we only return the number of rows affected as given by the + // rowsIterator, we execute this stmt in "rows affected" mode allowing the + // internal executor to transparently retry. + const mode = rowsAffectedIEExecutionMode + it, err := ie.execInternal(ctx, opName, rw, mode, txn, session, stmt, qargs...) if err != nil { return 0, err } @@ -700,7 +719,7 @@ func (ie *InternalExecutor) QueryIteratorEx( qargs ...interface{}, ) (isql.Rows, error) { return ie.execInternal( - ctx, opName, newSyncIEResultChannel(), txn, session, stmt, qargs..., + ctx, opName, newSyncIEResultChannel(), defaultIEExecutionMode, txn, session, stmt, qargs..., ) } @@ -834,6 +853,12 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{ // - If the retry error occurs after some rows have been sent from the // streamingCommandResult to the rowsIterator, we have no choice but to return // the retry error to the caller. +// - The only exception to this is when the stmt of "Rows" type was issued via +// ExecEx call. In such a scenario, we only need to report the number of +// "rows affected" that we obtain by counting all rows seen by the +// rowsIterator. With such a setup, we can transparently retry the execution +// of the corresponding command by simply resetting the counter when +// discarding the result of Sync command after the retry error occurs. // - If the retry error occurs after the "rows affected" metadata was sent for // stmts of "RowsAffected" type, then we will always retry transparently. This // is achieved by overriding the "rows affected" number, stored in the @@ -868,6 +893,7 @@ func (ie *InternalExecutor) execInternal( ctx context.Context, opName string, rw *ieResultChannel, + mode ieExecutionMode, txn *kv.Txn, sessionDataOverride sessiondata.InternalExecutorOverride, stmt string, @@ -986,7 +1012,7 @@ func (ie *InternalExecutor) execInternal( errCallback := func(err error) { _ = rw.addResult(ctx, ieIteratorResult{err: err}) } - err = ie.runWithEx(ctx, txn, rw, sd, stmtBuf, &wg, syncCallback, errCallback) + err = ie.runWithEx(ctx, txn, rw, mode, sd, stmtBuf, &wg, syncCallback, errCallback) if err != nil { return nil, err } @@ -1047,6 +1073,7 @@ func (ie *InternalExecutor) execInternal( } r = &rowsIterator{ r: rw, + mode: mode, stmtBuf: stmtBuf, wg: &wg, } @@ -1112,7 +1139,7 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error { rw := newAsyncIEResultChannel() stmtBuf := NewStmtBuf() - ex, err := ie.initConnEx(ctx, ie.extraTxnState.txn, rw, sd, stmtBuf, nil /* syncCallback */) + ex, err := ie.initConnEx(ctx, ie.extraTxnState.txn, rw, defaultIEExecutionMode, sd, stmtBuf, nil /* syncCallback */) if err != nil { return errors.Wrap(err, "cannot create conn executor to commit txn") } @@ -1165,6 +1192,26 @@ func (ie *InternalExecutor) checkIfTxnIsConsistent(txn *kv.Txn) error { return nil } +// ieExecutionMode determines how the internal executor consumes the results of +// the statement evaluation. +type ieExecutionMode int + +const ( + // defaultIEExecutionMode is the execution mode in which the results of the + // statement evaluation are consumed according to the statement's type. + defaultIEExecutionMode ieExecutionMode = iota + // rowsAffectedIEExecutionMode is the execution mode in which the internal + // executor is only interested in the number of rows affected, regardless of + // the statement's type. + // + // With this mode, if a stmt encounters a retry error, the internal executor + // will proceed to transparently reset the number of rows affected (if any + // have been seen by the rowsIterator) and retry the corresponding command. + // Such behavior makes sense given that in production code at most one + // command in the StmtBuf results in "rows affected". + rowsAffectedIEExecutionMode +) + // internalClientComm is an implementation of ClientComm used by the // InternalExecutor. Result rows are streamed on the channel to the // ieResultWriter. @@ -1183,6 +1230,15 @@ type internalClientComm struct { // The results of the query execution will be written into w. w ieResultWriter + // mode determines how the results of the query execution are consumed. + mode ieExecutionMode + + // resetRowsAffected is a callback that sends a single ieIteratorResult + // object to w in order to set the number of rows affected to zero. Only + // used in rowsAffectedIEExecutionMode when discarding a result (indicating + // that a command will be retried). + resetRowsAffected func() + // sync, if set, is called whenever a Sync is executed with all accumulated // results since the last Sync. sync func([]*streamingCommandResult) @@ -1220,6 +1276,9 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult { // results slice at the moment and all previous results have been // "finalized"). icc.results = icc.results[:len(icc.results)-1] + if icc.mode == rowsAffectedIEExecutionMode { + icc.resetRowsAffected() + } }, } icc.results = append(icc.results, res) @@ -1315,6 +1374,14 @@ func (icc *internalClientComm) Close() {} // ClientPos is part of the ClientLock interface. func (icc *internalClientComm) ClientPos() CmdPos { + if icc.mode == rowsAffectedIEExecutionMode { + // With the "rows affected" mode, any command can be rewound since we + // assume that only a single command results in actual "rows affected", + // and in Discard we will reset the number to zero (if we were in + // process of evaluation that command when we encountered the retry + // error). + return -1 + } // Find the latest result that cannot be rewound. lastDelivered := CmdPos(-1) for _, r := range icc.results { diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index dcae521a4dab..068d310e3b60 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -694,7 +694,7 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { ctx := context.Background() params, _ := tests.CreateTestServerParams() - s, db, _ := serverutils.StartServer(t, params) + s, db, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil { @@ -727,12 +727,35 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { } }) + const rowsStmt = `SELECT * FROM test.t` + // This test case verifies that if the retry error occurs after some rows // have been communicated to the client, then the stmt results in the retry // error too - the IE cannot transparently retry it. t.Run("Rows stmt", func(t *testing.T) { - const stmt = `SELECT * FROM test.t` - _, err := ie.QueryBufferedEx(ctx, "read rows", nil /* txn */, ieo, stmt) + _, err := ie.QueryBufferedEx(ctx, "read rows", nil /* txn */, ieo, rowsStmt) + if !testutils.IsError(err, "inject_retry_errors_enabled") { + t.Fatalf("expected to see injected retry error, got %v", err) + } + }) + + // This test case verifies that ExecEx of a stmt of Rows type correctly and + // transparently to us retries the stmt. + t.Run("ExecEx retries in implicit txn", func(t *testing.T) { + numRows, err := ie.ExecEx(ctx, "read rows", nil /* txn */, ieo, rowsStmt) + if err != nil { + t.Fatal(err) + } + if numRows != 1 { + t.Fatalf("expected 1 rowsAffected, got %d", numRows) + } + }) + + // This test case verifies that ExecEx doesn't retry when it's provided with + // an explicit txn. + t.Run("ExecEx doesn't retry in explicit txn", func(t *testing.T) { + txn := kvDB.NewTxn(ctx, "explicit") + _, err := ie.ExecEx(ctx, "read rows", txn, ieo, rowsStmt) if !testutils.IsError(err, "inject_retry_errors_enabled") { t.Fatalf("expected to see injected retry error, got %v", err) }